脚本: Diptiman Raichaudhuri, Staff Developer Advocate at Confluent 脚本: Diptiman Raichaudhuri, Staff Developer Advocate at Confluent この記事では、データエンジニアが、軽量で強力な Python ライブラリである PyIceberg を使用する方法を説明します。 複雑なビジネス要件とより大きな情報量を分析する必要性により、データプラットフォームは過去数年間で大幅に変化し、企業が多様なデータソースからより多くの洞察と価値を抽出するのに役立ちました。 エンタープライズアナリティクス用例では、オープンデータレイクハウスプラットフォームはこのプラットフォームの進化の最前線にありました。オープンデータレイクハウスは、データチームがその生態系内で「コンポーシブル」アーキテクチャを作成することを可能にします。 Composable Data Platform アーキテクチャ Apache Icebergで構築されたデータプラットフォームには、通常、3つの層があります。 データ レイヤー: 物理データ ファイル (通常は Parquet、Avro、または ORC 形式) は、このレイヤーに保存されます。 メタデータ レイヤー:テーブルを個々のディレクトリに分類する代わりに、Iceberg はファイルのリストを保持します。 Iceberg Catalog: カタログは、テーブルの発見、作成、変更を容易にし、Icebergテーブルの管理のトランザクションの一貫性を確保する中央リポジトリです。 Apache Iceberg ドキュメンタリーからのこの一般的な図は、これらの層を示しています。 オリジナルタイトル: Apache Iceberg Layers ) 源泉 PyIcebergとは? PyIcebergは、アナリティクスとデータエンジニアが、さまざまなクラウド(AWS、Azure、Google Cloud)およびオンプレミスストレージ上で洗練されたオープンレイクハウスプラットフォームを構築することを可能にします。 PyIcebergは、IcebergのテーブルにアクセスするためのPython実装です。 PyIcebergを使用する開発者は、Java Virtual Machine(JVM)クラスターで高性能なクエリエンジンを実行する必要なく、Pythonicデータ変換を使用できます。 PyIcebergは、カタログを使用してIcebergテーブルをロードし、読み書き操作を実行します。それはIcebergのメタデータとテーブル形式の側面を処理し、データエンジニ PyIceberg は独立したプログラムとしてまたは Kubernetes クラスターで実行できます。REST、SQLCatalog、または AWS Glue などの Iceberg カタログプロトコルとのネイティブな統合により、JVM/py4j クラスターを必要とせずに Iceberg テーブルをクエリするのに人気のある簡単な選択肢です。 PyIcebergの生産展開は、しばしばデータストリーミングワークロードを統合します。 ストリーミングデータはApache Kafkaのトピックとして生成され、Icebergのテーブルとして現実化される。これは、オペレーティングデータシステムと分析データシステムの間のギャップを縮めるための強力なツールです。 テーブルフロー なんでPyCeberg? PyIceberg は、Iceberg テーブルでデータ操作言語 (DML) 操作を実行するための Python フレンドリーな方法を提供します。 Iceberg で 100 ギガバイトのデータを扱う小規模から中規模のデータプラットフォームでは、例えば、部門分析、内部レポート、または専門ツールを扱うプラットフォームのように、使いやすさは、複雑な機能よりも企業にとって重要です。データの量(歴史的にも増加的にも)巨大ではない場合、Iceberg 上でクエリを実行するための完全なブームクラスターの展開は圧倒的で過剰なもののように思えます。それは、これらのクエリエンジン(Spark と Flink のような)が、Java Virtual Machine (JVM) で動作する Scala または Java 次のセクションでは、Icebergの読み書きパターンがどのように機能するかを理解するためにPyIcebergとのデモを構築します。 PyIceberg、PyArrow、およびDuckDBを使用してデータレイクハウスを構築する方法 PyIceberg を使用して IoT センサー データ レイクハウスのデモを実践して構築しましょう. この例では、PyIceberg と PyArrow を使用して Iceberg データを挿入/upert し、削除し、Visual Studio Code (VS Code) で構築します。 まず、次のコマンドを実行することで、「pyiceberg_playground」という新しいPython仮想環境が作成されます。 $>python -m venv iceberg_playground 次に、このディレクトリ「iceberg_playground」が、PyIcebergプロジェクトがホストされるVS Codeで開きます。 PyIceberg やその他のライブラリは、次の 2 つのコマンドを実行することで、仮想環境にインストールされます。 $>source bin/activate (iceberg_playground)$>pip install pyiceberg daft duckdb sqlalchemy pandas この例では、PyIceberg は Iceberg テーブル情報をローカル SQLite データベースに格納する SqlCatalog を使用します。 .pyiceberg.yaml 構成ファイルは、プロジェクト ルートで次のコンテンツで作成されます。 catalog: pyarrowtest: uri: sqlite:///pyiceberg_catalog/pyarrow_catalog.db warehouse: file:////Users/diptimanraichaudhuri/testing_space/iceberg_playground/dw1 Iceberg カタログは、pyiceberg_catalog ディレクトリの下に SQLite ファイルとして保存され、DW1 ディレクトリにすべてのデータとメタデータを格納するデータストアに保存されていることに注意してください。 この 2 つのディレクトリは現在、プロジェクト ルートレベルで作成されています. This catalog is named pyarrowtest. 次に、PyIcebergの設定は、次のスクリプトを使用してチェックされます。 import os from pyiceberg.catalog import load_catalog os.environ["PYICEBERG_HOME"] = os.getcwd() catalog = load_catalog(name='pyarrowtest') print(catalog.properties) PyIceberg が YAML ファイルからカタログ名を読み、 pyiceberg_catalog ディレクトリ内のローカル SQLite データベースを作成する方法に注意してください. Since SQLite is distributed with Python installers, it does not need to be installed separately. スクリプトが正しく実行されている場合は、「pyarrow_catalog」属性が端末に表示されます。 「PYICEBEG_HOME」環境変数がプロジェクトのルートとして指定されているため、スクリプトは .YAML ファイルからカタログをロードしました。 次に、PyIceberg のスケジュールクラスを使用してスケジュールが追加されます。この例は、IoT センサーのセットからデータを格納しているため、スケジュールは、3 つの列と必要なデータタイプで構築されます。 Namespace は、倉庫内のテーブルの論理的グループ化です(YAML ファイルを定義する際に既に倉庫がどのように作成されているかを思い出してください)。 最初のデータロードは PyArrow in-memory リストを使用して行われ、sensor_table は PyIceberg scan() メソッドで読み取られ、データを pandas データフレームに変換します。 import os from pyiceberg.catalog import load_catalog from pyiceberg.schema import Schema from pyiceberg.types import (NestedField, StringType, FloatType) from pyiceberg.partitioning import PartitionSpec, PartitionField from pyiceberg.transforms import IdentityTransform import pyarrow as pa os.environ["PYICEBERG_HOME"] = os.getcwd() catalog = load_catalog(name='pyarrowtest') print(catalog.properties) # Define the schema schema = Schema( NestedField(1, "device_id", StringType(), required=True), NestedField(2, "ampere_hour", FloatType(), required=True), NestedField(3, "device_make", StringType(), required=True), identifier_field_ids=[1] # 'device_id' is the primary key ) # Define the partition spec - device_id as partition key partition_spec = PartitionSpec( PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="device_id") ) # Create a namespace and an iceberg table catalog.create_namespace_if_not_exists('sensor_ns') sensor_table = catalog.create_table_if_not_exists( identifier='sensor_ns.sensor_data', schema=schema, partition_spec=partition_spec ) # Insert initial data initial_data = pa.table([ pa.array(['b8:27:eb:bf:9d:51', '00:0f:00:70:91:0a', '1c:bf:ce:15:ec:4d']), pa.array([21.43, 17.86, 31.65]), pa.array(['ABB', 'Honeywell', 'Siemens']) ], schema=schema.as_arrow()) # Write initial data sensor_table.overwrite(initial_data) # Print a Pandas dataframe representation of the table data print("\nInsert operation completed") print(sensor_table.scan().to_pandas()) 上記のスクリプトが成功した場合、この結果がターミナルに表示されます。 挿入が成功すると、Icebergの3つの層が検証できます。 カタログ: これは、この記事で少し後で検証される SQLite ファイル pyarrow_catalog.db です。 メタデータ: 「メタデータ」ディレクトリ内では、メタデータファイルが作成され、CRUD(Create, Read, Update, Delete)オペレーションを有効にするために不可欠です。メタデータ JSON ファイルが 2 つ作成されます、テーブルが作成されたときに 1 つ、データの最初の挿入後に 2 つ。 データ: データファイルは .PARQUET 形式で書かれており、パーティションキーとして device_id が存在します。 3 つの異なるデバイスがあるので、3 つのディレクトリが作成されます。 Iceberg テーブル「sensor_data」は、倉庫「dw1」で「sensor_ns.db」の名スペースで作成されます。 これらのデータフィールドは、「sensor_data」テーブルの「data」ディレクトリで作成されます。 PyIceberg 表現は、レコードをフィルタリングするために使用できます. Some of the common expressions used for filtering are: StartsWith, EqualTo, GreaterThan, And, Or, etc. from pyiceberg.expressions import StartsWith, EqualTo # Filter columns print("\nFilter records with device_make == ABB ") print(sensor_table.scan(row_filter=EqualTo('device_make', 'ABB')).to_pandas()) PyIceberg は UPSERT オペレーションもサポートしています. The following code sample updates the existing device_make for one of the sensors from ‘Siemens’ to ‘Kepware’. # Create an UPSERT batch of Arrow records where one fot he device_make is changed upsert_data = pa.table([ pa.array(['b8:27:eb:bf:9d:51', '00:0f:00:70:91:0a', '1c:bf:ce:15:ec:4d']), pa.array([21.43, 17.86, 31.65]), pa.array(['ABB', 'Honeywell', 'Kepware']) ], schema=schema.as_arrow()) # UPSERT changed data try: join_columns = ["device_id"] upsert_result = sensor_table.upsert(upsert_data.select(["device_id", "ampere_hour", "device_make"])) except Exception as e: print(e) print("\nUpsert operation completed") print(sensor_table.scan().to_pandas()) 同様に、DELETE 操作もサポートされています: # Delete row sensor_table.delete(delete_filter=EqualTo('device_id', '1c:bf:ce:15:ec:4d')) print("\n After Delete") print(sensor_table.scan().to_pandas()) すべての倉庫での削除は微妙な操作であり、Icebergは例外ではありません。Icebergにおけるローブレベルの操作は、Copy-on-Write(CoW)と Merge-on-Read(MoR)の2つの戦略によって定義されています。 PyIceberg は現在、MOR 削除をサポートしていますが、いくつかのニュアンスがあります。PyIceberg は行を削除する機能を提供していますが、主にこれをデフォルトで CoW 削除を使用して実装しています、つまりデータ ファイルは削除されたファイルを作成する代わりに書き換えられます。 最後のステップとして、Pyarrow_catalog.db ファイルに保存されている SQLite Iceberg カタログをクエリするために DuckDB を使用します。 duckdb -ui pyiceberg_catalog/pyarrow_catalog.db これにより、ポート 4213 (デフォルト) でブラウザウィンドウが開きますが、Iceberg カタログで SQL クエリを実行できます。 これは、SQL カタログから洞察を抽出する簡単で簡単な方法を提供します。 PyIceberg でデータの洞察を解除する テラバイト未満のデータ量を持つ企業の場合、PyIceberg と PyArrow は Python でインタラクティブなクエリを実行するための迅速なオプションです。 データエンジニアは始めることができます。 保存され、更新され、その他、 このページには、すべての PyIceberg API の素晴らしい例があります。 PyIcebergドキュメンタリー 花火 ハッピーコード!