Apache Hudi 是一个流数据湖平台,可将核心仓库和数据库功能直接引入数据湖。 Hudi 不满足于称自己为Delta或Apache Iceberg等开放文件格式,它提供表、事务、更新插入/删除、高级索引、流式摄取服务、数据集群/压缩优化和并发性。
Hudi 于 2016 年推出,牢牢扎根于 Hadoop 生态系统,其名称背后的含义是:Hadoop Upserts and Incrementals。它是为了管理 HDFS 上大型分析数据集的存储而开发的。 Hudi 的主要目的是减少流数据摄取期间的延迟。
随着时间的推移,Hudi 已经发展到使用云存储和对象存储,包括 MinIO。 Hudi 从 HDFS 的转变与世界的大趋势齐头并进,抛弃传统的 HDFS,转向高性能、可扩展和云原生对象存储。 Hudi 承诺提供优化,使 Apache Spark、Flink、Presto、Trino 等的分析工作负载更快,这与 MinIO 大规模云原生应用程序性能的承诺非常吻合。
在生产中使用 Hudi 的公司包括Uber 、 Amazon 、 ByteDance和Robinhood 。这些是世界上最大的一些流数据湖。在这个用例中,Hudi 的关键在于它提供了一个增量数据处理堆栈,可以对柱状数据进行低延迟处理。通常,系统使用 Apache Parquet 或 ORC 等开放文件格式一次性写出数据,并将其存储在高度可扩展的对象存储或分布式文件系统之上。 Hudi 充当数据平面来摄取、转换和管理这些数据。 Hudi 使用Hadoop FileSystem API与存储进行交互,该 API 与从 HDFS 到对象存储再到内存文件系统的实现兼容(但不一定是最佳的)。
Hudi 使用基本文件和增量日志文件来存储对给定基本文件的更新/更改。基础文件可以是 Parquet(列式)或 HFile(索引式)。增量日志保存为Avro (行),因为在基本文件发生更改时记录这些更改是有意义的。
Hudi 将给定基本文件的所有更改编码为块序列。块可以是数据块、删除块或回滚块。这些块被合并以获得更新的基础文件。此编码还会创建一个独立的日志。
来源。
表格式由表的文件布局、表的架构以及跟踪表更改的元数据组成。 Hudi 强制执行写入模式,这与对流处理的强调一致,以确保管道不会因非向后兼容的更改而中断。
Hudi 将给定表/分区的文件分组在一起,并在记录键和文件组之间进行映射。如上所述,所有更新都记录到特定文件组的增量日志文件中。这种设计比 Hive ACID 更高效,Hive ACID 必须将所有数据记录与所有基础文件合并才能处理查询。 Hudi 的设计预期基于键的快速更新插入和删除,因为它适用于文件组的增量日志,而不是整个数据集。
Hudi 将给定表/分区的文件分组在一起,并在记录键和文件组之间进行映射。如上所述,所有更新都记录到特定文件组的增量日志文件中。这种设计比 Hive ACID 更高效,Hive ACID 必须将所有数据记录与所有基础文件合并才能处理查询。 Hudi 的设计预期基于键的快速更新插入和删除,因为它适用于文件组的增量日志,而不是整个数据集。
来源。
时间线对于理解至关重要,因为它是所有 Hudi 表元数据的真实事件日志来源。时间线存储在.hoodie
文件夹中,或者在我们的例子中存储在存储桶中。事件会保留在时间线上,直到被删除。时间线适用于整个表以及文件组,从而可以通过将增量日志应用到原始基础文件来重建文件组。为了优化频繁写入/提交,Hudi 的设计使元数据相对于整个表的大小较小。
时间线上的新事件被保存到内部元数据表中,并作为一系列读时合并表实现,从而提供低写入放大。因此,Hudi 可以快速吸收元数据的快速变化。此外,元数据表使用 HFile 基本文件格式,通过一组索引键查找进一步优化性能,避免读取整个元数据表。作为表一部分的所有物理文件路径都包含在元数据中,以避免昂贵且耗时的云文件列表。
Hudi 编写器促进了这样的架构:Hudi 充当高性能写入层,具有 ACID 事务支持,可实现非常快速的增量更改,例如更新和删除。
典型的 Hudi 架构依赖于 Spark 或 Flink 管道将数据传输到 Hudi 表。 Hudi 写入路径经过优化,比简单地将 Parquet 或 Avro 文件写入磁盘更有效。 Hudi 分析写入操作并将其分类为增量操作( insert
、 upsert
、 delete
)或批量操作( insert_overwrite
、 insert_overwrite_table
、 delete_partition
、 bulk_insert
),然后应用必要的优化。
Hudi 编写者还负责维护元数据。对于每条记录,都会写入提交时间和该记录唯一的序列号(这类似于 Kafka 偏移量),从而可以导出记录级别的更改。用户还可以在传入数据流中指定事件时间字段,并使用元数据和 Hudi 时间线跟踪它们。这可以显着改进流处理,因为 Hudi 包含每个记录的到达时间和事件时间,从而可以为复杂的流处理管道构建强大的水印。
写入器和读取器之间的快照隔离允许从所有主要数据湖查询引擎(包括 Spark、Hive、Flink、Prest、Trino 和 Impala)一致地查询表快照。与 Parquet 和 Avro 一样,Hudi 表可以被Snowflake和SQL Server等读取为外部表。
Hudi 阅读器被开发为轻量级。只要有可能,就会使用特定于引擎的矢量化读取器和缓存,例如 Presto 和 Spark 中的那些。当 Hudi 必须合并查询的基本文件和日志文件时,Hudi 使用可溢出映射和延迟读取等机制来提高合并性能,同时还提供读取优化的查询。
Hudi 包含多种非常强大的增量查询功能。元数据是其核心,它允许将大型提交作为较小的块使用,并完全解耦数据的写入和增量查询。通过有效使用元数据,时间旅行只是另一个具有定义的起点和终点的增量查询。 Hudi 在任何给定时间点以原子方式将键映射到单个文件组,支持 Hudi 表上的完整 CDC 功能。正如上面 Hudi writers 部分所讨论的,每个表都由文件组组成,每个文件组都有自己包含的元数据。
Hudi 的最大优势在于它摄取流数据和批量数据的速度。通过提供upsert
功能,Hudi 执行任务的速度比重写整个表或分区快几个数量级。
为了利用 Hudi 的摄取速度,数据湖屋需要具有高 IOPS 和吞吐量的存储层。 MinIO 的可扩展性和高性能的结合正是 Hudi 所需要的。 MinIO 完全能够满足为实时企业数据湖提供支持所需的性能 — 最近的基准测试在 GET 上实现了 325 GiB/s (349 GB/s),在 PUT 上实现了 165 GiB/s (177 GB/s)。 32 个节点的现成 NVMe SSD。
活跃的企业 Hudi 数据湖存储大量小型 Parquet 和 Avro 文件。 MinIO 包括许多小文件优化,可实现更快的数据湖。小对象与元数据内联保存,减少了读写 Hudi 元数据和索引等小文件所需的 IOPS。
模式是每个 Hudi 表的重要组成部分。 Hudi 可以强制执行模式,也可以允许模式演变,以便流数据管道可以在不中断的情况下进行调整。此外,Hudi 强制执行 schema-on-writer 以确保更改不会破坏管道。 Hudi 依赖 Avro 来存储、管理和发展表的架构。
Hudi 为数据湖提供ACID 事务保证。 Hudi 确保原子写入:提交以原子方式提交到时间线,并给出一个时间戳,该时间戳表示操作被视为发生的时间。 Hudi 将写入器、表和读取器进程之间的快照隔离开来,以便每个进程都对表的一致快照进行操作。 Hudi 通过写入器之间的乐观并发控制 (OCC) 以及表服务和写入器之间以及多个表服务之间的基于非阻塞 MVCC 的并发控制来解决此问题。
本教程将引导您完成 Spark、Hudi 和 MinIO 的设置,并介绍一些基本的 Hudi 功能。本教程基于Apache Hudi Spark 指南,适用于云原生 MinIO 对象存储。
请注意,使用版本化存储桶会增加 Hudi 的一些维护开销。任何被删除的对象都会创建一个删除标记。当 Hudi 使用Cleaner 实用程序清理文件时,删除标记的数量会随着时间的推移而增加。正确配置生命周期管理以清理这些删除标记非常重要,因为如果删除标记的数量达到 1000,列表操作可能会阻塞。Hudi 项目维护人员建议在使用生命周期规则一天后清理删除标记。
下载并安装Apache Spark。
下载并安装MinIO。记录控制台的 IP 地址、TCP 端口、访问密钥和秘密密钥。
下载并安装MinIO 客户端。
下载 AWS 和 AWS Hadoop 库并将它们添加到您的类路径中,以便使用 S3A 来处理对象存储。
AWS: aws-java-sdk:1.10.34
(或更高版本)
Hadoop: hadoop-aws:2.7.3
(或更高版本)
下载Jar 文件,解压缩并将其复制到/opt/spark/jars
。
使用 MinIO 客户端创建一个存储桶来存放 Hudi 数据:
mc alias set myminio http://<your-MinIO-IP:port> <your-MinIO-access-key> <your-MinIO-secret-key> mc mb myminio/hudi
启动 Spark shell,并将 Hudi 配置为使用 MinIO 进行存储。确保使用 MinIO 设置配置 S3A 条目。
spark-shell \ --packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.12.0,org.apache.hadoop:hadoop-aws:3.3.4 \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \ --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \ --conf 'spark.hadoop.fs.s3a.access.key=<your-MinIO-access-key>' \ --conf 'spark.hadoop.fs.s3a.secret.key=<your-MinIO-secret-key>'\ --conf 'spark.hadoop.fs.s3a.endpoint=<your-MinIO-IP>:9000' \ --conf 'spark.hadoop.fs.s3a.path.style.access=true' \ --conf 'fs.s3a.signing-algorithm=S3SignerType'
然后,在 Spark 中初始化 Hudi。
import org.apache.hudi.QuickstartUtils._ import scala.collection.JavaConversions._ import org.apache.spark.sql.SaveMode._ import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.config.HoodieWriteConfig._ import org.apache.hudi.common.model.HoodieRecord
请注意,它将简化 Hudi 的重复使用来创建外部配置文件。
尝试一下并使用 Scala 创建一个简单的小型 Hudi 表。 Hudi DataGenerator 是一种基于示例行程架构生成示例插入和更新的快速且简单的方法。
val tableName = "hudi_trips_cow" val basePath = "s3a://hudi/hudi_trips_cow" val dataGen = new DataGenerator
下面将生成新的行程数据,将它们加载到 DataFrame 中,并将我们刚刚创建的 DataFrame 作为 Hudi 表写入 MinIO。如果表已存在, mode(Overwrite)
将覆盖并重新创建该表。行程数据依赖于记录键 ( uuid
)、分区字段 ( region/country/city
) 和逻辑 ( ts
) 以确保每个分区的行程记录是唯一的。我们将使用默认的写入操作upsert
。当您的工作负载没有更新时,您可以使用insert
或bulk_insert
,这可能会更快。
val inserts = convertToStringList(dataGen.generateInserts(10)) val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) df.write.format("hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Overwrite). save(basePath)
打开浏览器并使用您的访问密钥和秘密密钥通过http://<your-MinIO-IP>:<port>
登录 MinIO。您将在存储桶中看到 Hudi 表。
该存储桶还包含包含元数据的.hoodie
路径以及包含数据的americas
和asia
路径。
查看元数据。这就是完成整个教程后我的.hoodie
路径的样子。我们可以看到我在2022年9月13日星期二的9:02、10:37、10:48、10:52和10:56修改了表格。
让我们将 Hudi 数据加载到 DataFrame 中并运行示例查询。
// spark-shell val tripsSnapshotDF = spark. read. format("hudi"). load(basePath) tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot") spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show() spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_trips_snapshot").show()
不,我们不是在谈论 1988 年去看Hootie and the Blowfish音乐会。
每次写入 Hudi 表都会创建新的快照。将快照视为可用于时间旅行查询的表版本。
尝试一些时间旅行查询(您必须更改时间戳才能与您相关)。
spark.read. format("hudi"). option("as.of.instant", "2022-09-13 09:02:08.200"). load(basePath)
这个过程与我们之前插入新数据时类似。为了展示 Hudi 更新数据的能力,我们将生成对现有行程记录的更新,将它们加载到 DataFrame 中,然后将 DataFrame 写入已保存在 MinIO 中的 Hudi 表中。
请注意,我们使用的是append
保存模式。一般准则是使用append
模式,除非您要创建新表,这样不会覆盖任何记录。使用 Hudi 的典型方法是实时摄取流数据,将它们附加到表中,然后编写一些逻辑,根据刚刚附加的内容合并和更新现有记录。或者,使用overwrite
模式写入会删除并重新创建该表(如果该表已存在)。
// spark-shell val updates = convertToStringList(dataGen.generateUpdates(10)) val df = spark.read.json(spark.sparkContext.parallelize(updates, 2)) df.write.format("hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Append). save(basePath)
查询数据会显示更新的行程记录。
Hudi 可以使用增量查询提供自给定时间戳以来更改的记录流。我们需要做的就是提供一个开始时间,从该时间开始流式传输更改,以查看当前提交之前的更改,并且我们可以使用结束时间来限制流。
增量查询对于 Hudi 来说非常重要,因为它允许您在批量数据上构建流式管道。
// spark-shell // reload data spark. read. format("hudi"). load(basePath). createOrReplaceTempView("hudi_trips_snapshot") val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(50) val beginTime = commits(commits.length - 2) // commit time we are interested in // incrementally query data val tripsIncrementalDF = spark.read.format("hudi"). option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL). option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). load(basePath) tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental") spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_incremental where fare > 20.0").show()
Hudi可以查询截至特定时间和日期的数据。
// spark-shell val beginTime = "000" // Represents all commits > this time. val endTime = commits(commits.length - 2) // commit time we are interested in //incrementally query data val tripsPointInTimeDF = spark.read.format("hudi"). option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL). option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). option(END_INSTANTTIME_OPT_KEY, endTime). load(basePath) tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time") spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_point_in_time where fare > 20.0").show()
Hudi 支持两种不同的删除记录的方式。软删除保留记录键并清空所有其他字段的值。软删除会持久保存在 MinIO 中,并且只能使用硬删除从数据湖中删除。
// spark-shell spark. read. format("hudi"). load(basePath). createOrReplaceTempView("hudi_trips_snapshot") // fetch total records count spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count() spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is not null").count() // fetch two records for soft deletes val softDeleteDs = spark.sql("select * from hudi_trips_snapshot").limit(2) // prepare the soft deletes by ensuring the appropriate fields are nullified val nullifyColumns = softDeleteDs.schema.fields. map(field => (field.name, field.dataType.typeName)). filter(pair => (!HoodieRecord.HOODIE_META_COLUMNS.contains(pair._1) && !Array("ts", "uuid", "partitionpath").contains(pair._1))) val softDeleteDf = nullifyColumns. foldLeft(softDeleteDs.drop(HoodieRecord.HOODIE_META_COLUMNS: _*))( (ds, col) => ds.withColumn(col._1, lit(null).cast(col._2))) // simply upsert the table after setting these fields to null softDeleteDf.write.format("hudi"). options(getQuickstartWriteConfigs). option(OPERATION_OPT_KEY, "upsert"). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Append). save(basePath) // reload data spark. read. format("hudi"). load(basePath). createOrReplaceTempView("hudi_trips_snapshot") // This should return the same total count as before spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count() // This should return (total - 2) count as two records are updated with nulls spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is not null").count()
相比之下,硬删除就是我们所说的删除。记录键和关联字段将从表中删除。
// spark-shell // fetch total records count spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count() // fetch two records to be deleted val ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2) // issue deletes val deletes = dataGen.generateDeletes(ds.collectAsList()) val hardDeleteDf = spark.read.json(spark.sparkContext.parallelize(deletes, 2)) hardDeleteDf.write.format("hudi"). options(getQuickstartWriteConfigs). option(OPERATION_OPT_KEY,"delete"). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Append). save(basePath) // run the same read query as above. val roAfterDeleteViewDF = spark. read. format("hudi"). load(basePath) roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot") // fetch should return (total - 2) records spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
当数据湖获得更新现有数据的能力时,它就成为数据湖屋。我们将生成一些新的行程数据,然后覆盖现有数据。此操作比upsert
更快,其中 Hudi 会立即为您计算整个目标分区。在这里,我们指定配置,以便绕过upsert
为您执行的自动索引、预组合和重新分区。
// spark-shell spark. read.format("hudi"). load(basePath). select("uuid","partitionpath"). sort("partitionpath","uuid"). show(100, false) val inserts = convertToStringList(dataGen.generateInserts(10)) val df = spark. read.json(spark.sparkContext.parallelize(inserts, 2)). filter("partitionpath = 'americas/united_states/san_francisco'") df.write.format("hudi"). options(getQuickstartWriteConfigs). option(OPERATION.key(),"insert_overwrite"). option(PRECOMBINE_FIELD.key(), "ts"). option(RECORDKEY_FIELD.key(), "uuid"). option(PARTITIONPATH_FIELD.key(), "partitionpath"). option(TBL_NAME.key(), tableName). mode(Append). save(basePath) // Should have different keys now for San Francisco alone, from query before. spark. read.format("hudi"). load(basePath). select("uuid","partitionpath"). sort("partitionpath","uuid"). show(100, false)
架构演变允许您更改 Hudi 表的架构以适应数据随时间发生的变化。
以下是如何查询和发展架构和分区的一些示例。更深入的讨论,请参阅Schema Evolution |阿帕奇胡迪。请注意,如果您运行这些命令,它们将更改您的 Hudi 表架构以与本教程不同。
-- Alter table name ALTER TABLE oldTableName RENAME TO newTableName -- Alter table add columns ALTER TABLE tableIdentifier ADD COLUMNS(colAndType (,colAndType)*) -- Alter table column type ALTER TABLE tableIdentifier CHANGE COLUMN colName colName colType -- Alter table properties ALTER TABLE tableIdentifier SET TBLPROPERTIES (key = 'value') #Alter table examples --rename to: ALTER TABLE hudi_cow_nonpcf_tbl RENAME TO hudi_cow_nonpcf_tbl2; --add column: ALTER TABLE hudi_cow_nonpcf_tbl2 add columns(remark string); --change column: ALTER TABLE hudi_cow_nonpcf_tbl2 change column uuid uuid bigint; --set properties; alter table hudi_cow_nonpcf_tbl2 set tblproperties (hoodie.keep.max.commits = '10');
目前, SHOW partitions
仅适用于文件系统,因为它基于文件系统表路径。
本教程使用 Spark 来展示 Hudi 的功能。但是,Hudi 可以支持多种表类型/查询类型,并且可以从 Hive、Spark、Presto 等查询引擎查询 Hudi 表。 Hudi 项目有一个演示视频,在基于 Docker 的设置上展示了所有这些内容,所有相关系统都在本地运行。
Apache Hudi 是第一个数据湖开放表格式,在流式架构中值得考虑。 Hudi 社区和生态系统充满活力,越来越重视用 Hudi/对象存储替代 Hadoop/HDFS,以实现云原生流数据湖。使用 MinIO 进行 Hudi 存储为多云数据湖和分析铺平了道路。 MinIO 包括主动-主动复制,可在本地、公共/私有云和边缘等位置之间同步数据,从而实现企业所需的出色功能,例如地理负载平衡和快速热热故障转移。
今天在 MinIO 上试试 Hudi。如果您有任何疑问或想要分享技巧,请通过我们的 Slack 频道联系。
也发布在这里。