paint-brush
使用 Iceberg 和 MinIO 的 Lakehouse 架构权威指南经过@minio
9,088 讀數
9,088 讀數

使用 Iceberg 和 MinIO 的 Lakehouse 架构权威指南

经过 MinIO22m2023/08/08
Read on Terminal Reader

太長; 讀書

Apache Iceberg 似乎席卷了数据世界。最初由 Ryan Blue 在 Netflix 孵化,最终被转移到目前所在的 Apache 软件基金会。它的核心是一种用于大规模分析数据集(想想数百 TB 到数百 PB)的开放表格式。 它是一种多引擎兼容格式。这意味着 Spark、Trino、Flink、Presto、Hive 和 Impala 都可以在数据集上独立且同时运行。它支持数据分析、SQL 等通用语言,以及完整模式演化、隐藏分区、时间旅行、回滚和数据压缩等关键功能。 这篇文章重点介绍 Iceberg 和 MinIO 如何相互补充,以及各种分析框架(Spark、Flink、Trino、Dremio 和 Snowflake)如何利用这两者。
featured image - 使用 Iceberg 和 MinIO 的 Lakehouse 架构权威指南
MinIO HackerNoon profile picture
0-item
1-item
2-item

Apache Iceberg 似乎席卷了数据世界。最初由 Ryan Blue 在 Netflix 孵化,最终被转移到目前所在的 Apache 软件基金会。它的核心是一种用于大规模分析数据集(想想数百 TB 到数百 PB)的开放表格式。


它是一种多引擎兼容格式。这意味着 Spark、Trino、Flink、Presto、Hive 和 Impala 都可以在数据集上独立且同时运行。它支持数据分析、SQL 等通用语言,以及完整模式演化、隐藏分区、时间旅行、回滚和数据压缩等关键功能。


这篇文章重点介绍 Iceberg 和 MinIO 如何相互补充,以及各种分析框架(Spark、Flink、Trino、Dremio 和 Snowflake)如何利用这两者。

背景

虽然 Apache Hive 在当时是向前迈出的一大步,但随着分析应用程序变得更加数量、多样化和复杂,它最终开始出现问题。为了实现性能,数据需要保留在目录中,并且需要不断管理这些目录。这导致了目录数据库的出现。这解决了数据在哪里的问题,但它引入了该表的状态是什么的问题——该表现在位于两个地方(目录数据库和文件系统)。


这限制了您可以做的事情以及存在的灵活性 - 特别是在更改方面,无法通过单个操作在两个地方都保证这一点。


想象一下按日期分区的大量多年数据。年划分为月和周,如果周划分为日,日划分为小时等等,目录列表就会爆炸。 Hive Metastore (HMS) 是一个事务性 RDBMS。文件系统 (HDFS) 是非事务性的。当分区信息更改时,需要重新创建分区存储和文件系统。


这个问题是不可持续的,无论进行多少修补都无法解决固有问题。事实上,随着数据的增长,挑战只会愈演愈烈。

现代开放表格式的目标

数据湖屋架构的关键卖点之一是它支持多种分析引擎和框架。例如,您需要同时支持 ELT(提取、加载、转换)和 ETL(提取、转换、加载)。您需要支持商业智能、业务分析和 AI/ML 类型的工作负载。您需要以安全且可预测的方式成功地与同一组表进行交互。这意味着 Spark、Flink、Trino、Arrow 和 Dask 等多个引擎都需要以某种方式绑定到一个有凝聚力的架构中。


一个能够高效容纳数据同时使每个引擎都能成功的多引擎平台是分析世界一直渴望的,也是 Iceberg 和 Data Lakehouse 架构所提供的。


这并不简单,而且有很多挑战;没有简单的方法可以使用多个引擎来可靠地更新数据。但即使现在我们有两三种可以提供可靠更新的格式,仍然存在很多混乱,并且该领域存在问题。



现代的需求是这样的:


  1. 中央表存储:独立于计算存储数据成为关键的架构决策。它之所以重要,是因为数据具有引力,它将我们拉向数据所在的位置。因此,如果我们的数据完全位于一个供应商或云提供商中,那么我们仅与该供应商或云提供商绑定。当这些系统是封闭的或专门设计时,这本质上是有问题的。开放软件成为现代架构的要求。


  2. 便携式计算:另一个现代要求是能够将您的计算引擎带到不同的供应商/云提供商或利用专门的计算引擎。虽然许多人关注重心(数据),但企业还需要逻辑、代码和 SQL 的可移植性。


  3. 访问控制:大多数企业都面临着跨引擎一致的授权策略的巨大挑战。然而,它不仅仅是架构,因为跨多个引擎成功且可重复地执行这些策略已成为运营的当务之急。


  4. 维护结构:过去几年我们看到的人类工作的最大来源之一是在数据结构转移到其他地方时丢失数据结构。雪花就是一个完美的例子。将数据移动到 Snowflake 的过程是手动的,并且由于文件格式不同以及移动过程中格式的变化,第三方数据集的引入也导致了返工。

阿帕奇冰山来救援

Apache Iceberg 是从头开始设计的,上面提到的大多数挑战和目标都是实现开放表格式的基础。它解决了以下挑战:


  1. 灵活计算
    • 不要移动数据;多个引擎应该无缝工作

    • 支持批处理、流式处理和临时作业

    • 支持多种语言的代码,而不仅仅是 JVM 框架


  2. SQL 仓库行为
    • 使用 SQL 表进行可靠事务,我们能够可靠地执行 CRUD 操作

    • 将关注点与真实表分离可以提供隔离


Apache Iceberg 将其记录保存在对象存储中——与 Apache Hive 不同。 Iceberg 使 SQL 行为能够被多个引擎利用,并且它是为巨大的表而设计的。在生产中,单个表可以包含数十 PB 的数据,这一点非常重要。即使是多 PB 表也可以从单个节点读取,无需分布式 SQL 引擎来筛选表元数据。


来源:https: //iceberg.apache.org/spec/


Iceberg 有一个不成文的规则,即在大数据堆栈中使用时是不可见的。这种哲学来自 SQL 表空间,我们从不考虑 SQL 表下面有什么。任何从业者都知道,使用 Hadoop 和类似 Hive 的表时情况并非如此。


Iceberg 通过两种方式保持简单。首先,避免在对表进行更改时出现令人不快的意外情况。例如,更改不应恢复已删除和删除的数据。其次,Iceberg 减少了上下文切换,因为桌子下面的内容并不重要——重要的是要完成的工作。

了解 Iceberg FileIO

FileIO 是核心 Iceberg 库和底层存储之间的接口。 FileIO 的创建是为了让 Iceberg 在分布式计算和存储分散的世界中发挥作用。传统 Hadoop 生态系统需要分层路径和分区结构,这在实践中与对象存储领域用于实现速度和规模的方法完全相反。


Hadoop 和 Hive 是高性能和可扩展的云原生对象存储的反模式。依赖 S3 API 与 MinIO 交互的数据湖应用程序可以轻松扩展到每秒处理数百万或数十亿个对象的数千个事务。您可以通过并行处理多个并发请求来提高读写性能。您可以通过将前缀(从第一个字符开始的对象名称子集的字符串)添加到存储桶,然后编写并行操作(每个操作为每个前缀打开一个连接)来实现此目的。


此外,Hadoop 对文件系统目录的依赖并不能转化为对象存储——当路径不存在时,很难将数据集物理组织到不同的目录中并通过路径对其进行寻址。 Hadoop 依赖文件系统来定义数据集并提供并发和冲突解决的锁定机制。此外,在Hadoop生态系统中,处理重命名操作的作业必须是原子的。使用 S3 API 不可能做到这一点,因为重命名实际上是两个操作:复制和删除。不幸的是,结果是读和写之间没有隔离,可能会引起冲突、碰撞和不一致。


相比之下,Iceberg 被设计为使用对象存储完全从物理存储中抽象出来运行。根据元数据的定义,所有位置都是“显式的、不可变的和绝对的”。 Iceberg 跟踪表的完整状态,而无需引用目录的包袱。使用元数据查找表比使用 S3 API 列出整个层次结构要快得多。没有重命名——提交只是将新条目添加到元数据表中。


FileIO API 在规划和提交阶段执行元数据操作。任务使用 FileIO 来读取和写入底层数据文件,并且这些文件的位置在提交期间包含在表元数据中。引擎具体如何执行此操作取决于 FileIO 的实现。对于遗留环境, HadoopFileIO充当现有 Hadoop 文件系统实现和 Iceberg 内的 FileIO API 之间的适配器层。


我们将重点关注S3FileIO ,因为它是本机 S3 实现。当我们构建云原生 Lakehouse 时,我们不需要随身携带 Hadoop 残骸。根据Iceberg FileIO:Cloud Native Tables 的说法,本机 S3 实现的优点包括:


  • 契约行为: Hadoop 文件系统实现具有严格的契约行为,导致额外的请求(存在性检查、消除目录和路径冲突),从而增加开销和复杂性。 Iceberg 使用完全可寻址且独特的路径,避免了额外的复杂性。

  • 优化上传: S3FileIO通过逐步上传数据来优化存储/内存,以最大程度地减少大型任务的磁盘消耗,并在打开多个文件进行输出时保持较低的内存消耗。

  • S3 客户端自定义:客户端使用最新的主要 AWS SDK 版本 (v2),并允许用户完全自定义客户端以与 S3 一起使用(包括任何 S3 API 兼容终端节点)。

  • 序列化性能:使用HadoopFileIO进行任务处理需要对 Hadoop 配置进行序列化,该配置非常大,在退化情况下可能会减慢处理速度,并导致比处理数据的开销更多。

  • 减少依赖: Hadoop 文件系统实现引入了大型依赖树,简化的实现降低了整体打包的复杂性。


Iceberg 通过iceberg-aws模块提供与不同AWS 服务的集成,从0.11.0起的所有版本都与Spark 和Flink 运行时捆绑在一起。 Iceberg允许用户通过S3FileIO将数据写入S3。使用S3FileIO时,目录被配置为使用io-impl目录属性使用 S3 API。 S3FileIO采用最新的 S3 功能来优化安全性(S3 访问控制列表、所有三种 S3 服务器端加密模式)和性能(渐进式分段上传),因此建议用于对象存储用例。

Iceberg 和 MinIO 教程

目前,Spark 是与 Iceberg 配合使用的功能最丰富的计算引擎,因此本教程重点介绍如何使用 Spark 和 Spark-SQL 来了解 Iceberg 的概念和功能。在 Ubuntu 20.04 上,我们将安装和配置 Java、PostgreSQL 作为目录或元数据指针、Spark 和 MinIO,同时仔细下载和配置 Java 依赖项。然后我们将运行 Spark-SQL 来创建、填充、查询和修改表。我们还将介绍您可以使用 Iceberg 完成的一些很棒的事情,例如模式演化、使用隐藏分区、时间旅行和回滚。在每个步骤之后,我们都会在 MinIO 中添加 Iceberg 存储桶的屏幕截图,以便您可以看到幕后发生的情况。

先决条件

下载并启动MinIO Server。记录 IP 地址、TCP 端口、访问密钥和秘密密钥。
下载并安装MinIO 客户端。


使用MinIO Client为Iceberg设置别名并创建bucket


 mc alias set minio http://<your-MinIO-IP:port> <your-MinIO-access-key> <your-MinIO-secret-key> mc mb minio/iceberg Bucket created successfully `myminio/iceberg`.


您需要下载并配置 Spark 以使用所需的 Java Archives (JAR),以便启用 Hadoop、AWS S3 和 JDBC 等各种功能。您还需要在 PATH 和 CLASSPATH 中拥有每个所需 JAR 和配置文件的正确版本。不幸的是,调用不同版本的 JAR 并忘记您正在运行哪个 JAR 非常容易,因此会遇到令人震惊的不兼容性。


如果尚未安装 Java 运行时,请安装。对于 Ubuntu 20.04,命令是


sudo apt install curl mlocate default-jdk -y


下载并配置 PostgreSQL 以作为系统服务运行


sudo sh -c 'echo "deb http://apt.postgresql.org/pub/repos/apt $(lsb_release -cs)-pgdg main" > /etc/apt/sources.list.d/pgdg.list' wget --quiet -O - https://www.postgresql.org/media/keys/ACCC4CF8.asc | sudo apt-key add - sudo apt-get update sudo apt-get -y install postgresql sudo systemctl start postgresql.service


我们将创建一个角色icebergcat作为超级用户,设置密码并创建一个数据库icebergcat


 sudo -u postgres createuser --interactive ALTER ROLE icebergcat PASSWORD 'minio'; sudo -u postgres createdb icebergcat


登录数据库以验证其工作,系统将提示您输入密码:


 psql -U icebergcat -d icebergcat -W -h 127.0.0.1


下载、解压和移动 Apache Spark


 $ wget https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz $ tar zxvf spark-3.2.1-bin-hadoop3.2.tgz $ sudo mv spark-3.2.1-bin-hadoop3.2/ /opt/spark


通过将以下内容添加到~/.bashrc来设置 Spark 环境,然后重新启动 shell 以应用更改。


 export SPARK_HOME=/opt/spark export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin bash -l


需要以下 .jar 文件。下载 .jar 文件并将其复制到 Spark 计算机上的任何所需位置,例如/opt/spark/jars


需要aws-java-sdk-bundle/1.11.901.jar (或更高版本)来支持 S3 协议。


 $ wget https://repo1.maven.org/maven2/software/amazon/awssdk/bundle/2.17.230/bundle-2.17.230.jar


需要iceberg-spark-runtime-3.2_2.12.jar


 $ wget https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.1_2.12/0.13.2/iceberg-spark-runtime-3.1_2.12-0.13.2.jar


启动火花

启动 Spark 独立主服务器


$ start-master.sh starting org.apache.spark.deploy.master.Master, logging to /opt/spark/logs/spark-msarrel-org.apache.spark.deploy.master.Master-1-<Your-Machine-Name>.out


打开浏览器并访问 http: // Your-IPaddress:7077



Spark 处于活动状态:spark://<Your-Machine-Name>:7077


启动 Spark 工作进程

$ /opt/spark/sbin/start-worker.sh spark://<Your-Machine-Name>:7077 starting org.apache.spark.deploy.worker.Worker, logging to /opt/spark/logs/spark-msarrel-org.apache.spark.deploy.worker.Worker-1-<Your-Machine-Name>.out

Spark-SQL 和 Iceberg

启动 Spark-SQL 之前初始化环境。


 export AWS_ACCESS_KEY_ID=minioadmin export AWS_SECRET_ACCESS_KEY=minioadmin export AWS_S3_ENDPOINT=10.0.0.10:9000 export AWS_REGION=us-east-1 export MINIO_REGION=us-east-1 export DEPENDENCIES="org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:0.13.2" export AWS_SDK_VERSION=2.17.230 export AWS_MAVEN_GROUP=software.amazon.awssdk export AWS_PACKAGES=( "bundle" "url-connection-client" ) for pkg in "${AWS_PACKAGES[@]}"; do export DEPENDENCIES+=",$AWS_MAVEN_GROUP:$pkg:$AWS_SDK_VERSION" done


运行以下命令通过 Iceberg 启动 Spark-SQL,使用 PostgreSQL 获取元数据并支持 MinIO 所需的 S3 API。或者,您可以使用本地spark-defaults.conf文件设置配置


$ spark-sql --packages $DEPENDENCIES \ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ --conf spark.sql.catalog.demo=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.jdbc.JdbcCatalog \ --conf spark.sql.catalog.my_catalog.uri=jdbc:postgresql://127.0.0.1:5432/icebergcat \ --conf spark.sql.catalog.my_catalog.jdbc.user=icebergcat \ --conf spark.sql.catalog.my_catalog.jdbc.password=minio \ --conf spark.sql.catalog.my_catalog.warehouse=s3://iceberg \ --conf spark.sql.catalog.my_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO \ --conf spark.sql.catalog.my_catalog.s3.endpoint=http://10.0.0.10:9000 \ --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \ --conf spark.sql.defaultCatalog=my_catalog \ --conf spark.eventLog.enabled=true \ --conf spark.eventLog.dir=/home/iceicedata/spark-events \ --conf spark.history.fs.logDirectory= /home/iceicedata/spark-events \ --conf spark.sql.catalogImplementation=in-memory


关于此配置的一些重要注意事项


  • 我们声明一个目录my_catalog ,它使用 JDBC 连接到内部 IP 地址上的 PostgreSQL,并使用icebergcat表作为元数据。
  • 然后,我们将仓库位置设置为之前创建的 MinIO 存储桶,并将 Iceberg 配置为使用S3FileIO来访问它。

创建表

接下来,我们将创建一个简单的表。


 CREATE TABLE my_catalog.my_table ( id bigint, data string, category string) USING iceberg LOCATION 's3://iceberg' PARTITIONED BY (category);


这是 Iceberg 通过 S3FileIO 提供的巨大性能改进。对于我们这些在使用传统 Hive 存储布局和 S3 时因基于对象前缀限制请求而遭受性能缓慢之苦的人来说,这是一个很大的缓解。众所周知,在 AWS S3 上创建分区 Athena/Hive 表可能需要 30-60 分钟。 Iceberg 默认使用 Hive 存储布局,但可以切换为使用ObjectStoreLocationProvider


使用ObjectStoreLocationProvider ,会为每个存储的文件生成确定性哈希,并将哈希直接附加在write.data.path之后。这可确保写入 S3 兼容对象存储的文件均匀分布在 S3 存储桶中的多个前缀中,从而实现 S3 相关 IO 操作的最小限制和最大吞吐量。使用ObjectStoreLocationProvider时,在 Iceberg 表中拥有共享且简短的write.data.path将提高性能。 Iceberg 做了更多工作来提高 Hive 的性能和可靠性


 CREATE TABLE my_catalog.my_table ( id bigint, data string, category string) USING iceberg OPTIONS ( 'write.object-storage.enabled'=true, 'write.data.path'='s3://iceberg') PARTITIONED BY (category);


查看 MinIO Console,我们看到在我们的iceberg Bucket 下为my_table创建了一条路径



存储桶包含metadata路径



此时,表中没有数据,只有描述该表的元数据。还有一个指向存储在 PostgreSQL 的 Iceberg 目录表中的元数据的指针。 Spark-SQL(查询引擎)按表名 ( my_catalog ) 搜索 Iceberg 目录 ( my_table ),并检索当前元数据文件的 URI。



让我们看一下第一个元数据文件,其中存储了有关表的架构、分区和快照的信息。定义所有快照后, current-snapshot-id告诉查询引擎要使用哪个快照,然后查询引擎在snapshots数组中搜索该值,获取该快照的manifest-list的值并打开该快照中的清单文件列表,按顺序。请注意,我们的示例只有一个快照,因为该表刚刚创建,并且没有清单,因为我们尚未插入数据。


 { "format-version" : 1, "table-uuid" : "b72c46d1-0648-4e02-aab3-0d2853c97363", "location" : "s3://iceberg/my_table", "last-updated-ms" : 1658795119167, "last-column-id" : 3, "schema" : { "type" : "struct", "schema-id" : 0, "fields" : [ { "id" : 1, "name" : "id", "required" : false, "type" : "long" }, { "id" : 2, "name" : "data", "required" : false, "type" : "string" }, { "id" : 3, "name" : "category", "required" : false, "type" : "string" } ] }, "current-schema-id" : 0, "schemas" : [ { "type" : "struct", "schema-id" : 0, "fields" : [ { "id" : 1, "name" : "id", "required" : false, "type" : "long" }, { "id" : 2, "name" : "data", "required" : false, "type" : "string" }, { "id" : 3, "name" : "category", "required" : false, "type" : "string" } ] } ], "partition-spec" : [ { "name" : "category", "transform" : "identity", "source-id" : 3, "field-id" : 1000 } ], "default-spec-id" : 0, "partition-specs" : [ { "spec-id" : 0, "fields" : [ { "name" : "category", "transform" : "identity", "source-id" : 3, "field-id" : 1000 } ] } ], "last-partition-id" : 1000, "default-sort-order-id" : 0, "sort-orders" : [ { "order-id" : 0, "fields" : [ ] } ], "properties" : { "option.write.data.path" : "s3://iceberg/my_table", "owner" : "msarrel", "option.write.object-storage.enabled" : "true", "write.data.path" : "s3://iceberg/my_table", "write.object-storage.enabled" : "true" }, "current-snapshot-id" : -1, "snapshots" : [ ], "snapshot-log" : [ ], "metadata-log" : [ ] }


接下来,让我们插入一些模拟数据并观察 Iceberg 存储在 MinIO 中的文件。在iceberg桶内,现在有my_table/metadatamy_table/data前缀。


 INSERT INTO my_catalog.my_table VALUES (1, 'a', "music"), (2, 'b', "music"), (3, 'c', "video"); 



元数据前缀包含原始元数据文件、清单列表和清单文件。您猜对了,清单列表是清单文件的列表。清单列表包含有关每个快照中包含的每个清单文件的信息:清单文件的位置、添加该清单文件的快照、有关分区的信息以及相关数据文件的分区列的下限和上限。在查询期间,查询引擎从清单列表中读取清单文件位置的值并打开相应的清单文件。清单列表采用 AVRO 格式。


清单文件跟踪数据文件并包括有关每个文件的详细信息和预先计算的统计信息。跟踪的第一件事是文件格式和位置。清单文件是 Iceberg 通过文件系统位置消除 Hive 式跟踪数据的方式。清单文件通过包含分区成员资格、记录计数以及每列的下限和上限等详细信息来提高读取数据文件的效率和性能。统计信息是在写操作期间写入的,并且比 Hive 统计信息更及时、准确和最新。



当提交 SELECT 查询时,查询引擎从元数据数据库获取清单列表的位置。然后,查询引擎读取每个data-file对象的file-path条目的值,然后打开数据文件以执行查询。


下面显示的是data前缀的内容,按分区组织。



在分区内部,每个表行都有一个数据文件。



让我们运行一个示例查询:

 spark-sql> SELECT count(1) as count, data FROM my_catalog.my_table GROUP BY data; 1 a 1 b 1 c Time taken: 9.715 seconds, Fetched 3 row(s) spark-sql>


现在我们了解了 Iceberg 表的不同组件以及查询引擎如何与它们配合使用,接下来让我们深入了解 Iceberg 的最佳功能以及如何在数据湖中利用它们。

桌子的演变

添加、删除、重命名和更新等架构演变更改是元数据更改,这意味着无需更改/重写数据文件即可执行更新。 Iceberg 还保证这些模式演化变化是独立的并且没有副作用。 Iceberg 使用唯一的 Id 来跟踪表中的每一列,如果添加新列,它永远不会错误地利用现有 Id。


Iceberg 表分区可以在现有表中更新,因为查询不直接引用分区值。当写入新数据时,它在新布局中使用新规范,以前使用不同规范写入的数据保持不变。当您编写新查询时,这会导致拆分计划。为了提高性能,Iceberg 使用隐藏分区,因此用户无需为特定分区布局编写快速查询。用户专注于编写所需数据的查询,并让 Iceberg 修剪不包含匹配数据的文件。


另一个非常有用的演变是 Iceberg 排序顺序也可以在现有表中更新,就像分区规范一样。当排序成本过高时,不同的引擎可以选择在未排序的顺序上以最新的排序顺序写入数据,而以先前的排序顺序写入的旧数据保持不变。


 spark-sql> ALTER TABLE my_catalog.my_table > RENAME my_catalog.my_table_2;


头几次这样做时,您会被它的速度所震惊。这是因为您没有重写表,而只是对元数据进行操作。在本例中,我们只更改了table_name ,Iceberg 在大约十分之一秒内为我们完成了此操作。



其他架构更改同样轻松:

 spark-sql> ALTER TABLE my_catalog.my_table RENAME COLUMN data TO quantity; spark-sql> ALTER TABLE my_catalog.my_table ADD COLUMN buyer string AFTER quantity; spark-sql> ALTER TABLE my_catalog.my_table ALTER COLUMN quantity AFTER buyer;

分区

正如我们之前提到的,其他 Hive 格式都支持分区,但是 Iceberg 支持隐藏分区,可以处理为表中的行生成分区值的繁琐且容易出错的任务。用户专注于向解决业务问题的查询添加过滤器,而不用担心表如何分区。 Iceberg 负责自动避免从不必要的分区进行读取。


Iceberg 为您处理复杂的分区和更改表的分区方案,极大地简化了最终用户的流程。您可以定义分区或让 Iceberg 为您处理。 Iceberg 喜欢根据时间戳进行分区,例如事件时间。分区通过清单中的快照进行跟踪。查询不再依赖于表的物理布局。由于物理表和逻辑表之间的这种分离,随着更多数据的添加,Iceberg 表可能会随着时间的推移而演变分区。例如,重新分区 Hive 表需要创建一个新表并将旧数据读入其中。您还必须更改已编写的每个查询中的 PARTITION 值 - 这并不有趣。


 spark-sql> ALTER TABLE my_catalog.my_table ADD COLUMN month int AFTER category; ALTER TABLE my_catalog.my_table ADD PARTITION FIELD month;


现在我们对同一个表有两种分区方案。在 Hive 中不可能发生的事情在 Iceberg 中透明地发生了。从现在开始,查询计划被拆分,使用旧的分区方案查询旧数据,使用新的分区方案查询新数据。 Iceberg 会为您处理这个问题——查询表的人不需要知道数据是使用两种分区方案存储的。 Iceberg 通过结合幕后 WHERE 子句和分区过滤器(删除不匹配的数据文件)来实现此目的。

时间旅行与回滚

每次写入 Iceberg 表都会创建新的快照。快照就像版本一样,可用于时间旅行和回滚,就像我们使用 MinIO 版本控制功能一样。快照的管理方式是通过设置expireSnapshot来保证系统的良好维护。时间旅行支持使用完全相同的表快照的可重复查询,或者让用户轻松检查更改。版本回滚允许用户通过将表重置到良好状态来快速纠正问题。


当表发生更改时,Iceberg 将每个版本作为快照进行跟踪,然后在查询表时提供时间旅行到任何快照的功能。如果您想要运行历史查询或重现以前查询的结果(也许用于报告),这可能非常有用。时间旅行在测试新代码更改时也很有帮助,因为您可以使用已知结果的查询来测试新代码。


要查看已为表保存的快照:

 spark-sql> SELECT * FROM my_catalog.my_table.snapshots; 2022-07-25 17:26:47.53 527713811620162549 NULL append s3://iceberg/my_table/metadata/snap-527713811620162549-1-c16452b4-b384-42bc-af07-b2731299e2b8.avro {"added-data-files":"3","added-files-size":"2706","added-records":"3","changed-partition-count":"2","spark.app.id":"local-1658795082601","total-data-files":"3","total-delete-files":"0","total-equality-deletes":"0","total-files-size":"2706","total-position-deletes":"0","total-records":"3"} Time taken: 7.236 seconds, Fetched 1 row(s)


一些例子:

 -- time travel to October 26, 1986 at 01:21:00 spark-sql> SELECT * FROM my_catalog.my_table TIMESTAMP AS OF '1986-10-26 01:21:00'; -- time travel to snapshot with id 10963874102873 spark-sql> SELECT * FROM prod.db.table VERSION AS OF 10963874102873;


您可以使用快照进行增量读取,但必须使用 Spark,而不是 Spark-SQL。例如:

 scala> spark.read() .format(“iceberg”) .option(“start-snapshot-id”, “10963874102873”) .option(“end-snapshot-id”, “10963874102994”) .load(“s3://iceberg/my_table”)


您还可以将表回滚到某个时间点或特定快照,如以下两个示例所示:

 spark-sql> CALL my_catalog.system.rollback_to_timestamp('my_table', TIMESTAMP '2022-07-25 12:15:00.000'); spark-sql> CALL my_catalog.system.rollback_to_snapshot('my_table', 527713811620162549);

表达性 SQL

Iceberg 支持所有富有表现力的 SQL 命令,例如行级删除、合并和更新,最值得强调的是 Iceberg 支持 Eager 和 Lazy 策略。我们可以对需要删除的所有内容(例如 GDPR 或 CCPA)进行编码,但不会立即重写所有这些数据文件,我们可以根据需要延迟收集垃圾,这确实有助于提高 Iceberg 支持的大型表的效率。


例如,您可以删除表中与特定谓词匹配的所有记录。以下将从视频类别中删除所有行:

 spark-sql> DELETE FROM my_catalog.my_table WHERE category = 'video';


或者,您可以使用 CREATE TABLE AS SELECT 或 REPLACE TABLE AS SELECT 来完成此操作:

 spark-sql> CREATE TABLE my_catalog.my_table_music AS SELECT * FROM my_catalog.my_table WHERE category = 'music';


您可以非常轻松地合并两个表:

 spark-sql> MERGE INTO my_catalog.my_data pt USING (SELECT * FROM my_catalog.my_data_new) st ON pt.id = st.id WHEN NOT MATCHED THEN INSERT *;

数据工程

Iceberg 是开放分析表标准的基础,与其他 Hive 表格式不同,它使用 SQL 行为和真实的表抽象,并应用数据仓库基础原理来解决问题。通过声明性数据工程,我们可以配置表,而不必担心更改每个引擎来满足数据的需求。这将解锁自动优化和建议。通过安全提交,数据服务成为可能,这有助于避免人类照顾数据工作负载。


要检查表的历史记录、快照和其他元数据,Iceberg 支持查询元数据。元数据表通过在查询中的原始表名称后添加元数据表名称(例如历史记录)来标识。


显示表的数据文件:

 spark-sql> SELECT * FROM my_catalog.my_table.files;


显示清单:

 spark-sql> SELECT * FROM my_catalog.my_table.manifests;


显示表历史记录:

 spark-sql> SELECT * FROM my_catalog.my_table.history;


显示快照:

 spark-sql> SELECT * FROM my_catalog.my_table.snapshots;


您还可以连接快照和表历史记录以查看写入每个快照的应用程序:

 spark-sql> select h.made_current_at, s.operation, h.snapshot_id, h.is_current_ancestor, s.summary['spark.app.id'] from my_catalog.my_table.history h join my_catalog.my_table.snapshots s on h.snapshot_id = s.snapshot_id order by made_current_at;


现在您已经了解了基础知识,请将一些数据加载到 Iceberg 中,然后从Spark 和 Iceberg 快速入门以及Iceberg 文档 中了解更多信息。

集成

Apache Iceberg 与各种查询和执行引擎集成,Apache Iceberg 表可以由这些连接器创建和管理。支持 Iceberg 的引擎有SparkFlinkHivePrestoTrinoDremioSnowflake

使用 Iceberg 和 MinIO 构建数据湖很酷

Apache Iceberg 作为数据湖的表格式而受到广泛关注。不断增长的开源社区以及来自多个云提供商和应用程序框架的集成数量不断增加,意味着是时候认真对待 Iceberg、开始试验、学习并计划将其集成到现有的数据湖架构中了。将 Iceberg 与 MinIO 配对以实现多云数据湖和分析。


当您开始使用 Iceberg 和 MinIO 时,请通过我们的 Slack 频道分享您的经验或提出问题。


也发布在这里