Apache Iceberg はデータの世界に旋風を巻き起こしているようです。当初は Ryan Blue によって Netflix で開発されましたが、最終的には現在存在する Apache Software Foundation に送信されました。その核心は、大規模な分析データ セット (数百 TB から数百 PB を考えてください) のためのオープン テーブル形式です。
マルチエンジン対応フォーマットです。これが意味するのは、Spark、Trino、Flink、Presto、Hive、および Impala はすべて、データ セット上で独立して同時に動作できるということです。データ分析の共通語である SQL に加え、完全なスキーマの進化、非表示のパーティショニング、タイム トラベル、ロールバックとデータの圧縮などの主要な機能もサポートしています。
この投稿では、Iceberg と MinIO がどのように相互に補完し合うか、またさまざまな分析フレームワーク (Spark、Flink、Trino、Dremio、Snowflake) がこの 2 つをどのように活用できるかに焦点を当てます。
Apache Hive は当時としては大きな前進でしたが、分析アプリケーションがより多く、多様化し、洗練されるにつれて、最終的には亀裂が目立ち始めました。パフォーマンスを実現するには、データをディレクトリに残し、それらのディレクトリを常に管理する必要がありました。これにより、ディレクトリのデータベースが作成されました。これでデータがどこにあるかという問題は解決しましたが、そのテーブルの状態がどのようなものかという問題が生じました。テーブルは 2 つの場所 (ディレクトリのデータベースとファイル システム) に存在しました。
これにより、できることや柔軟性が制限され、特に変更に関しては、1 回の操作で両方の場所で変更を保証できませんでした。
複数年にわたる大量のデータが日付ごとにパーティション化されていると想像してください。年を月や週に分割し、週を日、日を時間などに分割すると、ディレクトリのリストが爆発的に増加します。 Hive Metastore (HMS) はトランザクション RDBMS です。ファイル システム (HDFS) は非トランザクションです。パーティション情報を変更すると、パーティション ストアとファイル システムの両方を再作成する必要があります。
この問題は持続不可能であり、いくらパッチを当てても固有の問題は解決されませんでした。実際、データの増加に伴い、課題は加速するばかりでした。
データ レイクハウス アーキテクチャに関する重要なセールス ポイントの 1 つは、複数の分析エンジンとフレームワークをサポートしていることです。たとえば、ELT (抽出、ロード、変換) と ETL (抽出、変換、ロード) の両方をサポートする必要があります。ビジネス インテリジェンス、ビジネス分析、AI/ML タイプのワークロードをサポートする必要があります。安全かつ予測可能な方法で、同じテーブルのセットと正常に接続する必要があります。これは、Spark、Flink、Trino、Arrow、Dask などの複数のエンジンがすべて、何らかの方法で一貫したアーキテクチャに結び付けられる必要があることを意味します。
各エンジンの成功を可能にしながらデータを効率的に格納するマルチエンジン プラットフォームは、分析業界が切望してきたものであり、Iceberg および Data Lakehouse アーキテクチャが提供するものです。
これは単純ではなく、多くの課題があります。複数のエンジンを使用してデータを確実に更新する簡単な方法はありません。しかし、信頼性の高いアップデートを提供するフォーマットが 2 つまたは 3 つある現在でも、依然として多くの混乱があり、この分野には問題があります。
最新の要件は次のようになります。
中央テーブル ストレージ: コンピューティングから独立してデータを保存することは、アーキテクチャ上の重要な決定になります。これが重要な理由は、データには重力があり、データの位置に向かって私たちを引き寄せるからです。したがって、データが完全に 1 つのベンダーまたはクラウド プロバイダーにある場合、私たちはそのベンダーまたはクラウド プロバイダーにのみ結び付けられます。これらのシステムがクローズドまたは設計に特化している場合、これは本質的に問題となります。オープン ソフトウェアは、最新のアーキテクチャの要件になります。
ポータブル コンピューティング: もう 1 つの現代的な要件は、コンピューティング エンジンを別のベンダー/クラウド プロバイダーに導入したり、特殊なコンピューティング エンジンを活用したりできることです。多くは重心 (データ) に焦点を当てていますが、企業はロジック、コード、SQL の移植性も必要としています。
アクセス制御: ほとんどの企業は、エンジン間で一貫した認可ポリシーを適用するという大きな課題を抱えています。ただし、これは単なるアーキテクチャではなく、複数のエンジンにわたってこれらのポリシーを成功裏に再現可能に適用することが運用上の必須事項となります。
構造の維持: 過去数年間に私たちが目にした人間の仕事の最大の原因の 1 つは、データが他の場所に移動されるときにデータ構造が失われることです。完璧な例はかつてスノーフレークでした。データを Snowflake に移動するプロセスは手動で行われ、サードパーティのデータ セットの導入により、ファイル形式の違いや移動中の形式の変更により手戻りが発生しました。
Apache Iceberg は、オープン テーブル フォーマットを実装するための基礎として、上記の課題と目標のほとんどを基にしてゼロから設計されています。これは次の課題に対処します。
データを移動しないでください。複数のエンジンがシームレスに動作する必要がある
バッチ、ストリーミング、アドホック ジョブをサポート
JVM フレームワークだけでなく、多くの言語のコードをサポート
CRUD 操作を確実に実行できる SQL テーブルによる信頼性の高いトランザクション
懸念事項を実際のテーブルから分離すると、分離が可能になります。
Apache Iceberg は、Apache Hive とは異なり、オブジェクト ストレージに記録を保持します。 Iceberg を使用すると、SQL の動作を複数のエンジンで利用できるようになり、巨大なテーブル用に設計されています。運用環境では、1 つのテーブルに数十ペタバイトのデータが含まれる可能性があるため、これは非常に重要です。マルチペタバイトのテーブルであっても、テーブルのメタデータを選別するために分散 SQL エンジンを必要とせずに、単一ノードから読み取ることができます。
出典: https://iceberg.apache.org/spec/
Iceberg には、ビッグ データ スタックで使用されるときは表示されないという暗黙のルールがあります。この哲学は SQL テーブルスペースから来ており、SQL テーブルの下に何があるかについては決して考えません。実務者なら誰でも知っているように、Hadoop や Hive のようなテーブルを操作する場合には、これは当てはまりません。
Iceberg は 2 つの方法でシンプルにしています。まず、テーブルに変更が加えられたときに不愉快な事態を避ける必要があります。たとえば、削除されたデータを変更によって元に戻すことはできません。第 2 に、Iceberg は、テーブルの下にあるものは重要ではないため、コンテキストの切り替えを減らします。重要なのは、行われる作業です。
FileIO は、コア Iceberg ライブラリと基盤となるストレージ間のインターフェイスです。 FileIO は、分散コンピューティングとストレージが分離された世界で Iceberg が機能する方法として作成されました。従来の Hadoop エコシステムでは、階層パスとパーティション構造が必要ですが、実際には、オブジェクト ストレージの世界で速度とスケールを達成するために使用される方法とは正反対です。
Hadoop と Hive は、高性能でスケーラブルなクラウドネイティブ オブジェクト ストレージのアンチパターンです。 S3 API に依存して MinIO と対話するデータ レイク アプリケーションは、数百万または数十億のオブジェクトに対して 1 秒あたり数千のトランザクションに簡単に拡張できます。複数の同時リクエストを並行して処理することで、 読み取りおよび書き込みのパフォーマンスを向上させることができます。これを実現するには、プレフィックス (最初の文字で始まるオブジェクト名のサブセットである文字列) をバケットに追加し、それぞれがプレフィックスごとに接続を開く並列操作を記述します。
さらに、Hadoop のファイル システム ディレクトリへの依存関係はオブジェクト ストレージには変換されません。データ セットを物理的に異なるディレクトリに整理し、パスが存在しない場合にパスでアドレス指定することは困難です。 Hadoop はファイル システムに依存してデータ セットを定義し、同時実行性と競合解決のためのロック メカニズムを提供します。さらに、Hadoop エコシステムでは、名前変更操作を処理するジョブはアトミックである必要があります。名前変更は実際にはコピーと削除の 2 つの操作であるため、これは S3 API を使用して行うことはできません。残念ながら、その結果、読み取りと書き込みが分離されず、競合、衝突、不一致が発生する可能性があります。
対照的に、Iceberg は、オブジェクト ストレージを使用して物理ストレージから完全に抽象化されて実行されるように設計されています。メタデータで定義されているように、すべての場所は「明示的、不変、絶対的」です。 Iceberg は、ディレクトリを参照する手間をかけずに、テーブルの完全な状態を追跡します。 S3 API を使用して階層全体を一覧表示するよりも、メタデータを使用してテーブルを検索する方が大幅に高速です。名前の変更はありません。コミットでは、メタデータ テーブルに新しいエントリが追加されるだけです。
FileIO API は、計画段階とコミット段階でメタデータ操作を実行します。タスクは FileIO を使用して基礎となるデータ ファイルの読み取りと書き込みを行い、これらのファイルの場所はコミット中にテーブルのメタデータに含まれます。エンジンがこれを正確にどのように行うかは、FileIO の実装によって異なります。レガシー環境の場合、 HadoopFileIO
、既存の Hadoop FileSystem 実装と Iceberg 内の FileIO API の間のアダプター層として機能します。
S3FileIO
はネイティブ S3 実装であるため、代わりに S3FileIO に焦点を当てます。クラウドネイティブのレイクハウスを構築するときに、Hadoop の素材を持ち歩く必要はありません。 Iceberg FileIO: Cloud Native Tablesによると、ネイティブ S3 実装の利点は次のとおりです。
コントラクト動作: Hadoop FileSystem 実装には厳密なコントラクト動作があり、その結果、追加のリクエスト (存在チェック、ディレクトリとパスの競合解消) が発生し、オーバーヘッドと複雑さが増加します。 Iceberg は完全にアドレス指定可能な一意のパスを使用するため、さらなる複雑さを回避できます。
最適化されたアップロード: S3FileIO
、データを段階的にアップロードすることでストレージ/メモリを最適化し、大規模なタスクのディスク消費を最小限に抑え、複数のファイルが出力用に開かれているときのメモリ消費を低く抑えます。
S3 クライアントのカスタマイズ:クライアントは最新の AWS SDK メジャー バージョン (v2) を使用し、ユーザーが S3 (S3 API 互換エンドポイントを含む) で使用するためにクライアントを完全にカスタマイズできるようにします。
シリアル化パフォーマンス: HadoopFileIO
によるタスク処理には Hadoop 構成のシリアル化が必要ですが、これは非常に大規模であり、縮退した場合には処理が遅くなり、処理されるデータよりも多くのオーバーヘッドが発生する可能性があります。
依存関係の削減: Hadoop FileSystem 実装では大規模な依存関係ツリーが導入され、実装が簡素化されることで全体的なパッケージングの複雑さが軽減されます。
Iceberg は、 0.11.0
以降のすべてのバージョンの Spark および Flink ランタイムにバンドルされているIceberg-awsモジュールを通じて、さまざまな AWS サービスとの統合を提供します。 Iceberg を使用すると、ユーザーはS3FileIO
を通じて S3 にデータを書き込むことができます。 S3FileIO
を使用する場合、カタログはio-impl
カタログ プロパティを使用して S3 API を使用するように構成されます。 S3FileIO
最適化されたセキュリティ (S3 アクセス制御リスト、3 つの S3 サーバー側暗号化モードすべて) とパフォーマンス (プログレッシブ マルチパート アップロード) を実現する最新の S3 機能を採用しているため、オブジェクト ストレージのユースケースに推奨されます。
現時点では、Spark は Iceberg を操作するための最も機能が豊富なコンピューティング エンジンであるため、このチュートリアルでは、Spark と Spark-SQL を使用して Iceberg の概念と機能を理解することに重点を置いています。 Ubuntu 20.04 では、Java の依存関係を慎重にダウンロードして構成しながら、Java、カタログまたはメタデータ ポインターとしての PostgreSQL、Spark、および MinIO をインストールして構成します。次に、Spark-SQL を実行して、テーブルの作成、データ入力、クエリ、および変更を行います。また、スキーマの進化、隠しパーティションの操作、タイムトラベル、ロールバックなど、Iceberg を使用して実行できる素晴らしい機能のいくつかについても説明します。各ステップの後に、MinIO の Iceberg バケットのスクリーンショットが含まれるので、舞台裏で何が起こっているかを確認できます。
MinIO サーバーをダウンロードして起動します。 IP アドレス、TCP ポート、アクセス キー、および秘密キーを記録します。
MinIO クライアントをダウンロードしてインストールします。
MinIO クライアントを使用してエイリアスを設定し、Iceberg のバケットを作成します
mc alias set minio http://<your-MinIO-IP:port> <your-MinIO-access-key> <your-MinIO-secret-key> mc mb minio/iceberg Bucket created successfully `myminio/iceberg`.
Hadoop、AWS S3、JDBC などのさまざまな機能を有効にするために、必要な Java アーカイブ (JAR) を使用するように Spark をダウンロードして設定する必要があります。また、PATH および CLASSPATH に必要な各 JAR および構成ファイルの正しいバージョンが必要です。残念ながら、さまざまなバージョンの JAR を呼び出すことは非常に簡単で、どの JAR を実行しているか分からなくなり、重大な非互換性が発生します。
Java ランタイムをまだインストールしていない場合はインストールします。 Ubuntu 20.04の場合、コマンドは次のとおりです。
sudo apt install curl mlocate default-jdk -y
PostgreSQL をダウンロードしてシステム サービスとして実行するように構成する
sudo sh -c 'echo "deb http://apt.postgresql.org/pub/repos/apt $(lsb_release -cs)-pgdg main" > /etc/apt/sources.list.d/pgdg.list' wget --quiet -O - https://www.postgresql.org/media/keys/ACCC4CF8.asc | sudo apt-key add - sudo apt-get update sudo apt-get -y install postgresql sudo systemctl start postgresql.service
スーパーユーザーとして役割icebergcat
を作成し、パスワードを設定してデータベースicebergcat
を作成します。
sudo -u postgres createuser --interactive ALTER ROLE icebergcat PASSWORD 'minio'; sudo -u postgres createdb icebergcat
データベースにログインして動作を確認すると、パスワードの入力を求められます。
psql -U icebergcat -d icebergcat -W -h 127.0.0.1
Apache Spark をダウンロード、解凍、移動する
$ wget https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz $ tar zxvf spark-3.2.1-bin-hadoop3.2.tgz $ sudo mv spark-3.2.1-bin-hadoop3.2/ /opt/spark
以下を~/.bashrc
に追加し、シェルを再起動して変更を適用することで、Spark 環境を設定します。
export SPARK_HOME=/opt/spark export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin bash -l
次の .jar ファイルが必要です。 .jar ファイルを Spark マシン上の必要な場所 ( /opt/spark/jars
など) にダウンロードしてコピーします。
S3 プロトコルをサポートするには、aws-java-sdk-bundle/1.11.901.jar (またはそれ以降) が必要です。
$ wget https://repo1.maven.org/maven2/software/amazon/awssdk/bundle/2.17.230/bundle-2.17.230.jar
Iceberg-spark-runtime-3.2_2.12.jarが必要です。
$ wget https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.1_2.12/0.13.2/iceberg-spark-runtime-3.1_2.12-0.13.2.jar
Spark スタンドアロン マスター サーバーを起動する
$ start-master.sh starting org.apache.spark.deploy.master.Master, logging to /opt/spark/logs/spark-msarrel-org.apache.spark.deploy.master.Master-1-<Your-Machine-Name>.out
ブラウザを開いて http: // Your-IPaddress:7077にアクセスします。
Spark は、spark://<Your-Machine-Name>:7077 で生きています。
Spark ワーカー プロセスを開始する
$ /opt/spark/sbin/start-worker.sh spark://<Your-Machine-Name>:7077 starting org.apache.spark.deploy.worker.Worker, logging to /opt/spark/logs/spark-msarrel-org.apache.spark.deploy.worker.Worker-1-<Your-Machine-Name>.out
Spark-SQL を起動する前に環境を初期化します。
export AWS_ACCESS_KEY_ID=minioadmin export AWS_SECRET_ACCESS_KEY=minioadmin export AWS_S3_ENDPOINT=10.0.0.10:9000 export AWS_REGION=us-east-1 export MINIO_REGION=us-east-1 export DEPENDENCIES="org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:0.13.2" export AWS_SDK_VERSION=2.17.230 export AWS_MAVEN_GROUP=software.amazon.awssdk export AWS_PACKAGES=( "bundle" "url-connection-client" ) for pkg in "${AWS_PACKAGES[@]}"; do export DEPENDENCIES+=",$AWS_MAVEN_GROUP:$pkg:$AWS_SDK_VERSION" done
次のコマンドを実行して、MinIO に必要なメタデータと S3 API のサポートに PostgreSQL を使用して Iceberg で Spark-SQL を起動します。あるいは、ローカルのspark-defaults.conf
ファイルを使用して構成を設定することもできます。
$ spark-sql --packages $DEPENDENCIES \ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ --conf spark.sql.catalog.demo=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.jdbc.JdbcCatalog \ --conf spark.sql.catalog.my_catalog.uri=jdbc:postgresql://127.0.0.1:5432/icebergcat \ --conf spark.sql.catalog.my_catalog.jdbc.user=icebergcat \ --conf spark.sql.catalog.my_catalog.jdbc.password=minio \ --conf spark.sql.catalog.my_catalog.warehouse=s3://iceberg \ --conf spark.sql.catalog.my_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO \ --conf spark.sql.catalog.my_catalog.s3.endpoint=http://10.0.0.10:9000 \ --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \ --conf spark.sql.defaultCatalog=my_catalog \ --conf spark.eventLog.enabled=true \ --conf spark.eventLog.dir=/home/iceicedata/spark-events \ --conf spark.history.fs.logDirectory= /home/iceicedata/spark-events \ --conf spark.sql.catalogImplementation=in-memory
この構成に関する重要な注意事項がいくつかあります
icebergcat
テーブルを使用するカタログmy_catalog
を宣言します。S3FileIO
を使用してアクセスするように Iceberg を構成しました。次に、簡単なテーブルを作成します。
CREATE TABLE my_catalog.my_table ( id bigint, data string, category string) USING iceberg LOCATION 's3://iceberg' PARTITIONED BY (category);
Iceberg が S3FileIO によって提供する大幅なパフォーマンスの向上を次に示します。これは、S3 で従来の Hive ストレージ レイアウトを使用しているときに、オブジェクト プレフィックスに基づいてリクエストを調整した結果、パフォーマンスの低下に悩まされていた人にとって、大きな安心感をもたらします。 AWS S3 でパーティション分割された Athena/Hive テーブルを作成するのに 30 ~ 60 分かかることは周知の事実です。 Iceberg はデフォルトで Hive ストレージ レイアウトを使用しますが、 ObjectStoreLocationProvider
を使用するように切り替えることができます。
ObjectStoreLocationProvider
を使用すると、保存されたファイルごとに決定論的ハッシュが生成され、そのハッシュがwrite.data.path
の直後に追加されます。これにより、S3 互換のオブジェクト ストレージに書き込まれるファイルが S3 バケット内の複数のプレフィックスに均等に分散されるようになり、その結果、S3 関連の IO 操作のスロットルが最小限に抑えられ、スループットが最大になります。 ObjectStoreLocationProvider
を使用する場合、Iceberg テーブル全体に共有の短いwrite.data.path
を設定すると、パフォーマンスが向上します。 Iceberg では、 Hive よりもパフォーマンスと信頼性を向上させるために、さらに多くのことが行われています。
CREATE TABLE my_catalog.my_table ( id bigint, data string, category string) USING iceberg OPTIONS ( 'write.object-storage.enabled'=true, 'write.data.path'='s3://iceberg') PARTITIONED BY (category);
MinIO コンソールを見ると、 my_table
のiceberg
バケットの下にパスが作成されていることがわかります。
バケットにはmetadata
パスが含まれています
この時点では、テーブルにはデータがなく、テーブルを説明するメタデータのみが存在します。 PostgreSQL の Iceberg カタログ テーブルに保存されているこのメタデータへのポインターもあります。 Spark-SQL (クエリ エンジン) は、テーブル名 ( my_catalog
) で Iceberg カタログ ( my_table
) を検索し、現在のメタデータ ファイルへの URI を取得します。
最初のメタデータ ファイルを見てみましょう。このファイルには、テーブルのスキーマ、パーティション、スナップショットに関する情報が保存されています。すべてのスナップショットが定義されている間、 current-snapshot-id
どのスナップショットを使用するかをクエリ エンジンに指示し、クエリ エンジンはsnapshots
manifest-list
の値を取得して、そのスナップショット内のマニフェスト ファイルを開きます。順番にリストします。この例では、テーブルが作成されたばかりなのでスナップショットが 1 つだけあり、まだデータを挿入していないためマニフェストがないことに注意してください。
{ "format-version" : 1, "table-uuid" : "b72c46d1-0648-4e02-aab3-0d2853c97363", "location" : "s3://iceberg/my_table", "last-updated-ms" : 1658795119167, "last-column-id" : 3, "schema" : { "type" : "struct", "schema-id" : 0, "fields" : [ { "id" : 1, "name" : "id", "required" : false, "type" : "long" }, { "id" : 2, "name" : "data", "required" : false, "type" : "string" }, { "id" : 3, "name" : "category", "required" : false, "type" : "string" } ] }, "current-schema-id" : 0, "schemas" : [ { "type" : "struct", "schema-id" : 0, "fields" : [ { "id" : 1, "name" : "id", "required" : false, "type" : "long" }, { "id" : 2, "name" : "data", "required" : false, "type" : "string" }, { "id" : 3, "name" : "category", "required" : false, "type" : "string" } ] } ], "partition-spec" : [ { "name" : "category", "transform" : "identity", "source-id" : 3, "field-id" : 1000 } ], "default-spec-id" : 0, "partition-specs" : [ { "spec-id" : 0, "fields" : [ { "name" : "category", "transform" : "identity", "source-id" : 3, "field-id" : 1000 } ] } ], "last-partition-id" : 1000, "default-sort-order-id" : 0, "sort-orders" : [ { "order-id" : 0, "fields" : [ ] } ], "properties" : { "option.write.data.path" : "s3://iceberg/my_table", "owner" : "msarrel", "option.write.object-storage.enabled" : "true", "write.data.path" : "s3://iceberg/my_table", "write.object-storage.enabled" : "true" }, "current-snapshot-id" : -1, "snapshots" : [ ], "snapshot-log" : [ ], "metadata-log" : [ ] }
次に、模擬データを挿入して、Iceberg が MinIO に保存するファイルを観察してみましょう。 iceberg
バケット内には、 my_table/metadata
およびmy_table/data
プレフィックスが存在します。
INSERT INTO my_catalog.my_table VALUES (1, 'a', "music"), (2, 'b', "music"), (3, 'c', "video");
メタデータ プレフィックスには、元のメタデータ ファイル、マニフェスト リスト、およびマニフェスト ファイルが含まれます。マニフェスト リストは、ご想像のとおり、マニフェスト ファイルのリストです。マニフェスト リストには、各スナップショットに含まれる各マニフェスト ファイルに関する情報 (マニフェスト ファイルの場所、その結果として追加されたスナップショット、パーティション化に関する情報、関連データ ファイルのパーティション列の下限と上限) が含まれます。クエリ中に、クエリ エンジンはマニフェスト ファイルの場所の値をマニフェスト リストから読み取り、適切なマニフェスト ファイルを開きます。マニフェスト リストは AVRO 形式です。
マニフェスト ファイルはデータ ファイルを追跡し、各ファイルの詳細と事前に計算された統計が含まれます。最初に追跡されるのは、ファイル形式と場所です。マニフェスト ファイルは、Iceberg がファイル システムの場所による Hive スタイルの追跡データを排除する方法です。マニフェスト ファイルには、パーティション メンバーシップ、レコード数、各列の下限と上限などの詳細が含まれるため、データ ファイルの読み取りの効率とパフォーマンスが向上します。統計は書き込み操作中に書き込まれ、Hive 統計よりもタイムリーで正確かつ最新である可能性が高くなります。
SELECT クエリが送信されると、クエリ エンジンはメタデータ データベースからマニフェスト リストの場所を取得します。次に、クエリ エンジンは各data-file
オブジェクトのfile-path
エントリの値を読み取り、データ ファイルを開いてクエリを実行します。
以下に、パーティションごとに整理されたdata
プレフィックスの内容を示します。
パーティション内には、テーブルの行ごとにデータ ファイルがあります。
クエリの例を実行してみましょう。
spark-sql> SELECT count(1) as count, data FROM my_catalog.my_table GROUP BY data; 1 a 1 b 1 c Time taken: 9.715 seconds, Fetched 3 row(s) spark-sql>
Iceberg テーブルのさまざまなコンポーネントと、クエリ エンジンがそれらのコンポーネントをどのように操作するかを理解したところで、次は Iceberg の優れた機能と、それらをデータ レイクで活用する方法について詳しく見ていきましょう。
追加、削除、名前変更、更新などのスキーマ進化の変更はメタデータの変更です。つまり、更新を実行するためにデータ ファイルを変更/書き換える必要はありません。 Iceberg はまた、これらのスキーマ進化の変更が独立しており、副作用がないことを保証します。 Iceberg は一意の ID を使用してテーブル内の各列を追跡します。これにより、新しい列が追加された場合でも、誤って既存の ID を利用することはありません。
Iceberg テーブル パーティションは、クエリがパーティション値を直接参照しないため、既存のテーブルで更新できます。新しいデータが書き込まれると、新しいレイアウトで新しい仕様が使用され、異なる仕様で以前に書き込まれたデータは変更されません。これにより、新しいクエリを作成するときに分割計画が発生します。パフォーマンスを向上させるために、Iceberg は非表示のパーティショニングを使用するため、ユーザーは高速化するために特定のパーティション レイアウトに対するクエリを作成する必要がありません。ユーザーは必要なデータのクエリを作成することに集中し、一致するデータを含まないファイルは Iceberg に削除させます。
非常に便利なもう 1 つの進化は、Iceberg のソート順序もパーティション仕様と同様に既存のテーブルで更新できることです。ソートに法外なコストがかかる場合、さまざまなエンジンがソートされていない順序で最新のソート順序でデータを書き込むことを選択できます。以前のソート順序で書き込まれた古いデータは変更されません。
spark-sql> ALTER TABLE my_catalog.my_table > RENAME my_catalog.my_table_2;
最初の数回は、その速さに驚かれるでしょう。これは、テーブルを書き換えるのではなく、単にメタデータを操作しているだけであるためです。この場合、 table_name
を変更しただけで、Iceberg はこれを約 10 分の 1 秒で実行しました。
他のスキーマ変更も同様に手間がかかりません。
spark-sql> ALTER TABLE my_catalog.my_table RENAME COLUMN data TO quantity; spark-sql> ALTER TABLE my_catalog.my_table ADD COLUMN buyer string AFTER quantity; spark-sql> ALTER TABLE my_catalog.my_table ALTER COLUMN quantity AFTER buyer;
前述したように、パーティションは他のハイブ形式でもサポートされていますが、Iceberg は、テーブル内の行のパーティション値を生成するという退屈でエラーが発生しやすいタスクを処理できる隠しパーティション化をサポートしています。ユーザーは、ビジネス上の問題を解決するクエリにフィルターを追加することに集中し、テーブルがどのように分割されているかについては気にしません。 Iceberg は、不要なパーティションからの読み取りを自動的に回避します。
Iceberg は、テーブルのパーティション分割とパーティション スキームの変更の複雑な作業を自動的に処理し、エンド ユーザーのプロセスを大幅に簡素化します。パーティショニングを定義することも、Iceberg に任せることもできます。 Iceberg は、イベント時間などのタイムスタンプに基づいてパーティションを分割することを好みます。パーティションはマニフェスト内のスナップショットによって追跡されます。クエリはテーブルの物理レイアウトに依存しなくなりました。物理テーブルと論理テーブルがこのように分離されているため、Iceberg テーブルでは、データが追加されるにつれて時間の経過とともにパーティションが進化する可能性があります。たとえば、Hive テーブルを再パーティション化するには、新しいテーブルを作成し、そこに古いデータを読み込む必要があります。また、すでに作成したすべてのクエリの PARTITION 値を変更する必要がありますが、これは面倒なことです。
spark-sql> ALTER TABLE my_catalog.my_table ADD COLUMN month int AFTER category; ALTER TABLE my_catalog.my_table ADD PARTITION FIELD month;
同じテーブルに対して 2 つのパーティション化スキームが存在します。 Hive では不可能だったことが、Iceberg では透過的に行われています。今後、クエリ プランは分割され、古いパーティション スキームを使用して古いデータをクエリし、新しいパーティション スキームを使用して新しいデータをクエリします。 Iceberg がこれを処理します。テーブルをクエリする人は、データが 2 つのパーティション スキームを使用して保存されていることを知る必要はありません。 Iceberg は、舞台裏の WHERE 句と、一致しないデータ ファイルを除外するパーティション フィルターを組み合わせてこれを実行します。
Iceberg テーブルに書き込むたびに、新しいスナップショットが作成されます。スナップショットはバージョンのようなもので、MinIO バージョン管理機能と同じようにタイムトラベルやロールバックに使用できます。スナップショットの管理方法は、システムが適切に維持されるように、 expireSnapshot
設定することによって行われます。タイム トラベルにより、まったく同じテーブル スナップショットを使用する再現可能なクエリが可能になったり、ユーザーが変更を簡単に調べたりできるようになります。バージョンのロールバックにより、ユーザーはテーブルを良好な状態にリセットすることで問題を迅速に修正できます。
テーブルが変更されると、Iceberg は各バージョンをスナップショットとして追跡し、テーブルのクエリ時に任意のスナップショットにタイムトラベルする機能を提供します。これは、履歴クエリを実行する場合や、以前のクエリの結果を再現する場合 (おそらくレポート作成など) に非常に役立ちます。タイム トラベルは、既知の結果のクエリを使用して新しいコードをテストできるため、新しいコードの変更をテストするときにも役立ちます。
テーブルに保存されているスナップショットを表示するには、次の手順を実行します。
spark-sql> SELECT * FROM my_catalog.my_table.snapshots; 2022-07-25 17:26:47.53 527713811620162549 NULL append s3://iceberg/my_table/metadata/snap-527713811620162549-1-c16452b4-b384-42bc-af07-b2731299e2b8.avro {"added-data-files":"3","added-files-size":"2706","added-records":"3","changed-partition-count":"2","spark.app.id":"local-1658795082601","total-data-files":"3","total-delete-files":"0","total-equality-deletes":"0","total-files-size":"2706","total-position-deletes":"0","total-records":"3"} Time taken: 7.236 seconds, Fetched 1 row(s)
いくつかの例:
-- time travel to October 26, 1986 at 01:21:00 spark-sql> SELECT * FROM my_catalog.my_table TIMESTAMP AS OF '1986-10-26 01:21:00'; -- time travel to snapshot with id 10963874102873 spark-sql> SELECT * FROM prod.db.table VERSION AS OF 10963874102873;
スナップショットを使用して増分読み取りを実行できますが、Spark-SQL ではなく Spark を使用する必要があります。例えば:
scala> spark.read() .format(“iceberg”) .option(“start-snapshot-id”, “10963874102873”) .option(“end-snapshot-id”, “10963874102994”) .load(“s3://iceberg/my_table”)
次の 2 つの例のように、テーブルを特定の時点または特定のスナップショットにロールバックすることもできます。
spark-sql> CALL my_catalog.system.rollback_to_timestamp('my_table', TIMESTAMP '2022-07-25 12:15:00.000'); spark-sql> CALL my_catalog.system.rollback_to_snapshot('my_table', 527713811620162549);
Iceberg は、行レベルの削除、マージ、更新などの表現力豊かな SQL コマンドをすべてサポートしています。最も注目すべき点は、Iceberg が Eager 戦略と Lazy 戦略の両方をサポートしていることです。削除する必要があるもの (GDPR や CCPA など) をすべてエンコードできますが、それらのデータ ファイルすべてをすぐに書き換えることはできません。必要に応じてガベージを遅延収集でき、これは Iceberg がサポートする巨大なテーブルの効率に非常に役立ちます。
たとえば、特定の述語に一致するテーブル内のすべてのレコードを削除できます。以下は、ビデオ カテゴリからすべての行を削除します。
spark-sql> DELETE FROM my_catalog.my_table WHERE category = 'video';
あるいは、CREATE TABLE AS SELECT または REPLACE TABLE AS SELECT を使用してこれを実現することもできます。
spark-sql> CREATE TABLE my_catalog.my_table_music AS SELECT * FROM my_catalog.my_table WHERE category = 'music';
2 つのテーブルを非常に簡単にマージできます。
spark-sql> MERGE INTO my_catalog.my_data pt USING (SELECT * FROM my_catalog.my_data_new) st ON pt.id = st.id WHEN NOT MATCHED THEN INSERT *;
Iceberg はオープン分析テーブル標準の基盤であり、他のハイブ テーブル形式とは異なり、SQL の動作と実際のテーブル抽象化を使用し、データ ウェアハウスの基礎を適用して、問題が発生したことに気づく前に問題を解決します。宣言型データ エンジニアリングを使用すると、テーブルを構成できるため、データのニーズに合わせて各エンジンを変更することを心配する必要がなくなります。これにより、自動最適化と推奨が可能になります。安全なコミットにより、人間によるデータ ワークロードの子守りを回避できるデータ サービスが可能になります。
テーブルの履歴、スナップショット、その他のメタデータを検査するために、Iceberg はメタデータのクエリをサポートしています。メタデータ テーブルは、クエリ内の元のテーブル名の後にメタデータ テーブル名 (たとえば、history) を追加することで識別されます。
テーブルのデータ ファイルを表示するには:
spark-sql> SELECT * FROM my_catalog.my_table.files;
マニフェストを表示するには:
spark-sql> SELECT * FROM my_catalog.my_table.manifests;
テーブル履歴を表示するには:
spark-sql> SELECT * FROM my_catalog.my_table.history;
スナップショットを表示するには:
spark-sql> SELECT * FROM my_catalog.my_table.snapshots;
スナップショットとテーブル履歴を結合して、各スナップショットを書き込んだアプリケーションを確認することもできます。
spark-sql> select h.made_current_at, s.operation, h.snapshot_id, h.is_current_ancestor, s.summary['spark.app.id'] from my_catalog.my_table.history h join my_catalog.my_table.snapshots s on h.snapshot_id = s.snapshot_id order by made_current_at;
基本を学習したので、データの一部を Iceberg にロードしてから、 Spark および Iceberg のクイックスタートとIceberg のドキュメントで詳細を学習してください。
Apache Iceberg にはさまざまなクエリおよび実行エンジンが統合されており、これらのコネクタによって Apache Iceberg テーブルを作成および管理できます。 Iceberg をサポートするエンジンは、 Spark 、 Flink 、 Hive 、 Presto 、 Trino 、 Dremio 、 Snowflakeです。
Apache Iceberg は、データレイクのテーブル形式として大きな注目を集めています。オープンソース コミュニティが成長し、複数のクラウド プロバイダーやアプリケーション フレームワークからの統合が増加していることは、Iceberg を真剣に受け止め、実験、学習、および既存のデータ レイク アーキテクチャへの統合計画を開始する時期が来たことを意味します。マルチクラウドのデータレイクと分析のために、Iceberg と MinIO を組み合わせます。
Iceberg と MinIO を使い始めたら、 Slack チャネルを通じて連絡して経験を共有したり、質問したりしてください。
ここでも公開されています。