paint-brush
Hudi と MinIO を使用したストリーミング データ レイクの開発@minio
7,246 測定値
7,246 測定値

Hudi と MinIO を使用したストリーミング データ レイクの開発

MinIO14m2023/08/29
Read on Terminal Reader

長すぎる; 読むには

Apache Hudi はデータ レイク用の最初のオープン テーブル形式であり、ストリーミング アーキテクチャで検討する価値があります。 Hudi ストレージに MinIO を使用すると、マルチクラウドのデータ レイクと分析への道が開かれます。
featured image - Hudi と MinIO を使用したストリーミング データ レイクの開発
MinIO HackerNoon profile picture
0-item
1-item
2-item

Apache Hudi は、コア ウェアハウスとデータベースの機能をデータ レイクに直接提供するストリーミング データ レイク プラットフォームです。 DeltaApache Icebergのようなオープン ファイル形式を自称することに満足せず、Hudi はテーブル、トランザクション、更新/挿入/削除、高度なインデックス、ストリーミング インジェスト サービス、データ クラスタリング/圧縮の最適化、同時実行性を提供します。


2016 年に導入された Hudi は、Hadoop エコシステムにしっかりと根付いており、「Hadoop Upserts と D Incrementals」という名前の背後にある意味を説明しています。これは、HDFS 上の大規模な分析データセットのストレージを管理するために開発されました。 Hudi の主な目的は、ストリーミング データの取り込み中の遅延を短縮することです。


フーディテーブル


時間の経過とともに、Hudi はクラウド ストレージと MinIO を含むオブジェクト ストレージを使用するように進化してきました。 HDFS からの Hudi の移行は、パフォーマンス、スケーラブル、クラウドネイティブのオブジェクト ストレージのために従来の HDFS を残す世界のより大きなトレンドと連動しています。 Apache Spark、Flink、Presto、Trino などの分析ワークロードを高速化する最適化を提供するという Hudi の約束は、大規模なクラウドネイティブ アプリケーションのパフォーマンスという MinIO の約束とうまく一致します。


Hudi を運用環境で使用している企業には、 UberAmazonByteDanceRobinhood などがあります。これらは世界最大級のストリーミング データ レイクの一部です。このユースケースにおける Hudi の重要な点は、列形式データに対して低レイテンシの処理を実行する増分データ処理スタックを提供することです。通常、システムは、Apache Parquet や ORC などのオープン ファイル形式を使用してデータを一度書き出し、これを拡張性の高いオブジェクト ストレージまたは分散ファイル システム上に保存します。 Hudi は、このデータを取り込み、変換、管理するためのデータ プレーンとして機能します。 Hudi は、 Hadoop FileSystem APIを使用してストレージと対話します。これは、HDFS からオブジェクト ストレージ、インメモリ ファイル システムに至るまでの実装と互換性があります (ただし、必ずしも最適であるとは限りません)。

Hudi ファイル形式

Hudi は、ベース ファイルと、特定のベース ファイルに対する更新/変更を保存するデルタ ログ ファイルを使用します。ベース ファイルは、Parquet (列状) または HFile (インデックス付き) にすることができます。変更が発生したときにベース ファイルに変更を記録することが合理的であるため、デルタ ログはAvro (行) として保存されます。


Hudi は、特定のベース ファイルに対するすべての変更をブロックのシーケンスとしてエンコードします。ブロックには、データ ブロック、削除ブロック、またはロールバック ブロックを指定できます。これらのブロックは、新しいベース ファイルを派生するためにマージされます。このエンコーディングでは、自己完結型のログも作成されます。



Hudi ファイル形式

ソース

Hudi テーブルの形式

テーブル形式は、テーブルのファイル レイアウト、テーブルのスキーマ、およびテーブルへの変更を追跡するメタデータで構成されます。 Hudi は、ストリーム処理の重視と一致して、書き込み時のスキーマを強制し、下位互換性のない変更によってパイプラインが中断されないようにします。


Hudi は、特定のテーブル/パーティションのファイルをグループ化し、レコード キーとファイル グループをマッピングします。前述したように、すべての更新は特定のファイル グループの差分ログ ファイルに記録されます。この設計は、クエリを処理するためにすべてのベース ファイルに対してすべてのデータ レコードをマージする必要がある Hive ACID よりも効率的です。 Hudi の設計は、データセット全体ではなくファイル グループの差分ログを処理するため、キーベースの高速な更新/削除を想定しています。


Hudi は、特定のテーブル/パーティションのファイルをグループ化し、レコード キーとファイル グループをマッピングします。前述したように、すべての更新は特定のファイル グループの差分ログ ファイルに記録されます。この設計は、クエリを処理するためにすべてのベース ファイルに対してすべてのデータ レコードをマージする必要がある Hive ACID よりも効率的です。 Hudi の設計は、データセット全体ではなくファイル グループの差分ログを処理するため、キーベースの高速な更新/削除を想定しています。


Hudi テーブルの形式

ソース


タイムラインは、 Hudi のすべてのテーブル メタデータの信頼できるイベント ログのソースとして機能するため、理解することが重要です。タイムラインは.hoodieフォルダー、この場合はバケットに保存されます。イベントは削除されるまでタイムライン上に保持されます。タイムラインはファイル グループだけでなくテーブル全体にも存在し、差分ログを元のベース ファイルに適用することでファイル グループを再構築できます。頻繁な書き込み/コミットを最適化するために、Hudi の設計では、テーブル全体のサイズに比べてメタデータが小さく保たれます。


タイムライン上の新しいイベントは内部メタデータ テーブルに保存され、一連のマージ オン リード テーブルとして実装されるため、書き込み増幅が低くなります。その結果、Hudi はメタデータに対する急速な変更を迅速に吸収できます。さらに、メタデータ テーブルは HFile ベース ファイル形式を使用し、メタデータ テーブル全体を読み取る必要性を回避する一連のインデックス付きキー検索によりパフォーマンスをさらに最適化します。テーブルの一部であるすべての物理ファイル パスは、コストと時間のかかるクラウド ファイルの一覧表示を回避するためにメタデータに含まれています。

ヒューディライター

Hudi ライターは、Hudi が更新や削除などの非常に高速な増分変更を可能にする ACID トランザクション サポートを備えた高性能書き込みレイヤーとして機能するアーキテクチャを促進します。


一般的な Hudi アーキテクチャは、Spark または Flink パイプラインに依存してデータを Hudi テーブルに配信します。 Hudi 書き込みパスは、Parquet または Avro ファイルを単にディスクに書き込むよりも効率的になるように最適化されています。 Hudi は書き込み操作を分析し、増分操作 ( insertupsertdelete ) またはバッチ操作 ( insert_overwriteinsert_overwrite_tabledelete_partitionbulk_insert ) として分類し、必要な最適化を適用します。


Hudi ライターはメタデータの保守も担当します。各レコードには、コミット時刻とそのレコードに固有のシーケンス番号 (これは Kafka オフセットに似ています) が書き込まれ、レコード レベルの変更を導出できるようになります。ユーザーは、受信データ ストリーム内のイベント時間フィールドを指定し、メタデータと Hudi タイムラインを使用してそれらを追跡することもできます。 Hudi には各レコードの到着時刻とイベント時刻の両方が含まれており、複雑なストリーム処理パイプラインに強力なウォーターマークを構築できるため、ストリーム処理が大幅に改善される可能性があります。

Hudi リーダー

ライターとリーダーの間のスナップショットの分離により、Spark、Hive、Flink、Prest、Trino、Impala を含むすべての主要なデータ レイク クエリ エンジンからテーブル スナップショットを一貫してクエリできるようになります。 Parquet や Avro と同様に、Hudi テーブルはSnowflakeSQL Serverなどによって外部テーブルとして読み取ることができます。


Hudi リーダーは軽量になるように開発されています。可能な限り、Presto や Spark などのエンジン固有のベクトル化リーダーとキャッシュが使用されます。 Hudi がクエリのベース ファイルとログ ファイルをマージする必要がある場合、Hudi はスピル可能マップや遅延読み取りなどのメカニズムを使用してマージ パフォーマンスを向上させると同時に、読み取りに最適化されたクエリも提供します。


Hudi には、非常に強力な増分クエリ機能が多数含まれています。メタデータはその中核であり、大規模なコミットを小さなチャンクとして使用できるようにし、データの書き込みと増分クエリを完全に分離します。メタデータを効率的に使用することで、タイム トラベルは、開始点と終了点が定義された単なる増分クエリになります。 Hudi は、任意の時点でキーを単一のファイル グループにアトミックにマップし、Hudi テーブルで完全な CDC 機能をサポートします。 Hudi ライターのセクションで説明したように、各テーブルはファイル グループで構成され、各ファイル グループには独自の自己完結型メタデータがあります。

フーディ万歳!

Hudi の最大の強みは、ストリーミング データとバッチ データの両方を取り込む速度です。 upsert機能を提供することにより、Hudi はテーブル全体やパーティション全体を書き換えるよりもはるかに高速にタスクを実行します。


Hudi の取り込み速度を活用するには、データ レイクハウスには高い IOPS とスループットを備えたストレージ レイヤーが必要です。 MinIO のスケーラビリティと高性能の組み合わせは、まさに Hudi が必要とするものです。 MinIO は、リアルタイムのエンタープライズ データ レイクを強化するのに必要なパフォーマンスを十分に備えています。 最近のベンチマークは、 32 ノードの既製 NVMe SSD。


アクティブなエンタープライズ Hudi データ レイクには、大量の小さな Parquet ファイルと Avro ファイルが保存されています。 MinIO には、データ レイクの高速化を可能にする小さなファイルの最適化が多数含まれています。小さなオブジェクトはメタデータとともにインラインで保存されるため、Hudi メタデータやインデックスなどの小さなファイルの読み取りと書き込みの両方に必要な IOPS が削減されます。


スキーマは、すべての Hudi テーブルの重要なコンポーネントです。 Hudi はスキーマを強制したり、ストリーミング データ パイプラインが中断することなく適応できるようにスキーマの進化を許可したりできます。さらに、Hudi は変更によってパイプラインが中断されないようにスキーマオンライターを強制します。 Hudi は、Avro を利用してテーブルのスキーマを保存、管理、展開します。


Hudi は、データ レイクにACID トランザクション保証を提供します。 Hudi はアトミックな書き込みを保証します。コミットはタイムラインに対してアトミックに行われ、アクションが発生したとみなされる時刻を示すタイムスタンプが与えられます。 Hudi は、ライター、テーブル、リーダーのプロセス間でスナップショットを分離し、それぞれがテーブルの一貫したスナップショットで動作するようにします。 Hudi は、ライター間のオプティミスティック同時実行制御 (OCC) と、テーブル サービスとライターの間、および複数のテーブル サービス間のノンブロッキング MVCC ベースの同時実行制御によってこれを完成させます。

Hudi と MinIO のチュートリアル

このチュートリアルでは、Spark、Hudi、および MinIO のセットアップ手順を説明し、いくつかの基本的な Hudi 機能を紹介します。このチュートリアルは、 Apache Hudi Spark Guideに基づいており、クラウドネイティブの MinIO オブジェクト ストレージで動作するように調整されています。


バージョン管理されたバケットを操作すると、Hudi にメンテナンスのオーバーヘッドがいくらか追加されることに注意してください。オブジェクトが削除されると、削除マーカーが作成されます。 Hudi がCleaner ユーティリティを使用してファイルをクリーンアップすると、削除マーカーの数は時間の経過とともに増加します。削除マーカーの数が 1000 に達するとリスト操作が停止する可能性があるため、これらの削除マーカーをクリーンアップするようにライフサイクル管理を正しく構成することが重要です。Hudi プロジェクトの管理者は、ライフサイクル ルールを使用して 1 日後に削除マーカーをクリーンアップすることを推奨しています。

前提条件

Apache Sparkをダウンロードしてインストールします


MinIOをダウンロードしてインストールします。 IP アドレス、コンソールの TCP ポート、アクセス キー、および秘密キーを記録します。


MinIO クライアントをダウンロードしてインストールします


S3A を使用してオブジェクト ストレージを操作するには、AWS および AWS Hadoop ライブラリをダウンロードし、クラスパスに追加します。

  • AWS: aws-java-sdk:1.10.34 (またはそれ以降)

  • Hadoop: hadoop-aws:2.7.3 (またはそれ以降)


Jar ファイルをダウンロードして解凍し、 /opt/spark/jarsにコピーします。

MinIO バケットを作成する

MinIO クライアントを使用して、Hudi データを格納するバケットを作成します。

 mc alias set myminio http://<your-MinIO-IP:port> <your-MinIO-access-key> <your-MinIO-secret-key> mc mb myminio/hudi

Hudi で Spark を起動する

ストレージに MinIO を使用するように構成された Hudi で Spark シェルを起動します。 MinIO 設定を使用して S3A のエントリを構成してください。


 spark-shell \ --packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.12.0,org.apache.hadoop:hadoop-aws:3.3.4 \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \ --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \ --conf 'spark.hadoop.fs.s3a.access.key=<your-MinIO-access-key>' \ --conf 'spark.hadoop.fs.s3a.secret.key=<your-MinIO-secret-key>'\ --conf 'spark.hadoop.fs.s3a.endpoint=<your-MinIO-IP>:9000' \ --conf 'spark.hadoop.fs.s3a.path.style.access=true' \ --conf 'fs.s3a.signing-algorithm=S3SignerType'


次に、Spark 内で Hudi を初期化します。

 import org.apache.hudi.QuickstartUtils._ import scala.collection.JavaConversions._ import org.apache.spark.sql.SaveMode._ import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.config.HoodieWriteConfig._ import org.apache.hudi.common.model.HoodieRecord


これにより、Hudi を繰り返し使用して外部構成ファイルを作成することが簡単になることに注意してください。

テーブルを作成する

試してみて、Scala を使用してシンプルな小さな Hudi テーブルを作成してください。 Hudi DataGenerator は、 サンプル旅行スキーマに基づいてサンプルの挿入と更新を生成する迅速かつ簡単な方法です。


 val tableName = "hudi_trips_cow" val basePath = "s3a://hudi/hudi_trips_cow" val dataGen = new DataGenerator

Hudi にデータを挿入し、MinIO にテーブルを書き込みます

以下は、新しい旅行データを生成し、DataFrame にロードし、作成した DataFrame を Hudi テーブルとして MinIO に書き込みます。 mode(Overwrite)テーブルがすでに存在する場合、テーブルを上書きして再作成します。旅行データは、レコード キー ( uuid )、パーティション フィールド ( region/country/city )、およびロジック ( ts ) に依存して、旅行レコードが各パーティションで一意であることを保証します。デフォルトの書き込み操作upsertを使用します。更新のないワークロードがある場合は、 insertまたはbulk_insert使用すると、より高速になる可能性があります。


 val inserts = convertToStringList(dataGen.generateInserts(10)) val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) df.write.format("hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Overwrite). save(basePath)


ブラウザを開き、アクセス キーと秘密キーを使用してhttp://<your-MinIO-IP>:<port>にある MinIO にログインします。バケット内に Hudi テーブルが表示されます。


MinIO コンソール


このバケットには、メタデータを含む.hoodieパスと、データを含むamericasおよびasiaパスも含まれています。


メタデータ


メタデータを見てみましょう。チュートリアル全体を完了した後の.hoodieパスは次のようになります。 2022 年 9 月 13 日火曜日の 9:02、10:37、10:48、10:52、10:56 にテーブルを変更したことがわかります。


チュートリアル完了後の .hoodie パス

クエリデータ

Hudi データを DataFrame にロードし、サンプル クエリを実行してみましょう。

 // spark-shell val tripsSnapshotDF = spark. read. format("hudi"). load(basePath) tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot") spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show() spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_trips_snapshot").show()

Hudi とのタイムトラベル

いいえ、1988 年にフーティ アンド ザ ブローフィッシュのコンサートを観に行ったという話ではありません。


Hudi テーブルに書き込むたびに、新しいスナップショットが作成されます。スナップショットは、タイム トラベル クエリで参照できるテーブルのバージョンと考えてください。


いくつかのタイム トラベル クエリを試してください (タイムスタンプを適切なものに変更する必要があります)。


 spark.read. format("hudi"). option("as.of.instant", "2022-09-13 09:02:08.200"). load(basePath)

データを更新する

このプロセスは、以前に新しいデータを挿入したときと似ています。 Hudi のデータ更新機能を紹介するために、既存の旅行レコードの更新を生成し、それらを DataFrame にロードして、その DataFrame を MinIO に既に保存されている Hudi テーブルに書き込みます。


append保存モードを使用していることに注意してください。一般的なガイドラインは、新しいテーブルを作成する場合を除き、レコードが上書きされないようにappendモードを使用することです。 Hudi を使用する一般的な方法は、ストリーミング データをリアルタイムで取り込んでテーブルに追加し、追加された内容に基づいて既存のレコードをマージおよび更新するロジックを作成することです。あるいは、 overwriteモードを使用して書き込むと、テーブルがすでに存在する場合は削除して再作成されます。


 // spark-shell val updates = convertToStringList(dataGen.generateUpdates(10)) val df = spark.read.json(spark.sparkContext.parallelize(updates, 2)) df.write.format("hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Append). save(basePath)


データをクエリすると、更新された旅行記録が表示されます。

インクリメンタルクエリ

Hudi は、増分クエリを使用して、特定のタイムスタンプ以降に変更されたレコードのストリームを提供できます。必要なのは、現在のコミットまでの変更を確認するために変更がストリーミングされる開始時刻を指定することだけです。また、終了時刻を使用してストリームを制限することもできます。


増分クエリは、バッチ データにストリーミング パイプラインを構築できるため、Hudi にとって非常に重要です。


 // spark-shell // reload data spark. read. format("hudi"). load(basePath). createOrReplaceTempView("hudi_trips_snapshot") val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(50) val beginTime = commits(commits.length - 2) // commit time we are interested in // incrementally query data val tripsIncrementalDF = spark.read.format("hudi"). option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL). option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). load(basePath) tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental") spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_incremental where fare > 20.0").show()

ポイントインタイムクエリ

Hudi は、特定の日時のデータをクエリできます。


 // spark-shell val beginTime = "000" // Represents all commits > this time. val endTime = commits(commits.length - 2) // commit time we are interested in //incrementally query data val tripsPointInTimeDF = spark.read.format("hudi"). option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL). option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). option(END_INSTANTTIME_OPT_KEY, endTime). load(basePath) tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time") spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_point_in_time where fare > 20.0").show()

論理的な削除によるデータの削除

Hudi は、レコードを削除する 2 つの異なる方法をサポートしています。論理的な削除では、レコード キーが保持され、他のすべてのフィールドの値が null になります。論理的な削除は MinIO に保持され、物理的な削除を使用してのみデータ レイクから削除されます。


 // spark-shell spark. read. format("hudi"). load(basePath). createOrReplaceTempView("hudi_trips_snapshot") // fetch total records count spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count() spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is not null").count() // fetch two records for soft deletes val softDeleteDs = spark.sql("select * from hudi_trips_snapshot").limit(2) // prepare the soft deletes by ensuring the appropriate fields are nullified val nullifyColumns = softDeleteDs.schema.fields. map(field => (field.name, field.dataType.typeName)). filter(pair => (!HoodieRecord.HOODIE_META_COLUMNS.contains(pair._1) && !Array("ts", "uuid", "partitionpath").contains(pair._1))) val softDeleteDf = nullifyColumns. foldLeft(softDeleteDs.drop(HoodieRecord.HOODIE_META_COLUMNS: _*))( (ds, col) => ds.withColumn(col._1, lit(null).cast(col._2))) // simply upsert the table after setting these fields to null softDeleteDf.write.format("hudi"). options(getQuickstartWriteConfigs). option(OPERATION_OPT_KEY, "upsert"). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Append). save(basePath) // reload data spark. read. format("hudi"). load(basePath). createOrReplaceTempView("hudi_trips_snapshot") // This should return the same total count as before spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count() // This should return (total - 2) count as two records are updated with nulls spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is not null").count()

物理削除によるデータの削除

対照的に、ハード削除は、私たちが削除として認識しているものです。レコード キーと関連フィールドがテーブルから削除されます。


 // spark-shell // fetch total records count spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count() // fetch two records to be deleted val ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2) // issue deletes val deletes = dataGen.generateDeletes(ds.collectAsList()) val hardDeleteDf = spark.read.json(spark.sparkContext.parallelize(deletes, 2)) hardDeleteDf.write.format("hudi"). options(getQuickstartWriteConfigs). option(OPERATION_OPT_KEY,"delete"). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Append). save(basePath) // run the same read query as above. val roAfterDeleteViewDF = spark. read. format("hudi"). load(basePath) roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot") // fetch should return (total - 2) records spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()

上書き挿入

データ レイクは、既存のデータを更新できるようになると、データ レイクハウスになります。新しい旅行データを生成し、既存のデータを上書きします。この操作は、Hudi がターゲット パーティション全体を一度に計算するupsertよりも高速です。ここでは、 upsertによって行われる自動インデックス作成、事前結合、および再パーティション化をバイパスするための構成を指定します。


 // spark-shell spark. read.format("hudi"). load(basePath). select("uuid","partitionpath"). sort("partitionpath","uuid"). show(100, false) val inserts = convertToStringList(dataGen.generateInserts(10)) val df = spark. read.json(spark.sparkContext.parallelize(inserts, 2)). filter("partitionpath = 'americas/united_states/san_francisco'") df.write.format("hudi"). options(getQuickstartWriteConfigs). option(OPERATION.key(),"insert_overwrite"). option(PRECOMBINE_FIELD.key(), "ts"). option(RECORDKEY_FIELD.key(), "uuid"). option(PARTITIONPATH_FIELD.key(), "partitionpath"). option(TBL_NAME.key(), tableName). mode(Append). save(basePath) // Should have different keys now for San Francisco alone, from query before. spark. read.format("hudi"). load(basePath). select("uuid","partitionpath"). sort("partitionpath","uuid"). show(100, false)

テーブルスキーマとパーティショニングを進化させる

スキーマの進化により、Hudi テーブルのスキーマを変更して、時間の経過とともにデータ内で発生する変更に適応できます。


以下に、スキーマとパーティショニングをクエリして展開する方法の例をいくつか示します。より詳細な説明については、 「スキーマの進化 | スキーマの進化」を参照してください。アパッチ・ヒューディ。これらのコマンドを実行すると、Hudi テーブル スキーマがこのチュートリアルとは異なるものに変更されることに注意してください。


 -- Alter table name ALTER TABLE oldTableName RENAME TO newTableName -- Alter table add columns ALTER TABLE tableIdentifier ADD COLUMNS(colAndType (,colAndType)*) -- Alter table column type ALTER TABLE tableIdentifier CHANGE COLUMN colName colName colType -- Alter table properties ALTER TABLE tableIdentifier SET TBLPROPERTIES (key = 'value') #Alter table examples --rename to: ALTER TABLE hudi_cow_nonpcf_tbl RENAME TO hudi_cow_nonpcf_tbl2; --add column: ALTER TABLE hudi_cow_nonpcf_tbl2 add columns(remark string); --change column: ALTER TABLE hudi_cow_nonpcf_tbl2 change column uuid uuid bigint; --set properties; alter table hudi_cow_nonpcf_tbl2 set tblproperties (hoodie.keep.max.commits = '10');


現在、 SHOW partitionsファイル システム テーブル パスに基づいているため、ファイル システム上でのみ機能します。


このチュートリアルでは、Spark を使用して Hudi の機能を紹介しました。ただし、Hudi は複数のテーブル タイプ/クエリ タイプをサポートでき、Hudi テーブルは Hive、Spark、Presto などのクエリ エンジンからクエリできます。 Hudi プロジェクトには、すべての依存システムがローカルで実行されている Docker ベースのセットアップでこれらすべてを紹介するデモ ビデオがあります。

やあ!やあ! MinIO で Hudi データレイクを構築しましょう!

Apache Hudi はデータ レイク用の最初のオープン テーブル形式であり、ストリーミング アーキテクチャで検討する価値があります。 Hudi コミュニティとエコシステムは活発であり、クラウドネイティブのストリーミング データ レイクのために Hadoop/HDFS を Hudi/オブジェクト ストレージに置き換えることに重点が置かれています。 Hudi ストレージに MinIO を使用すると、マルチクラウドのデータ レイクと分析への道が開かれます。 MinIO には、オンプレミス、パブリック/プライベート クラウド、エッジなどの場所間でデータを同期するためのアクティブ/アクティブ レプリケーションが含まれており、地理的な負荷分散や高速ホットホット フェイルオーバーなど、企業が必要とする優れた機能を実現します。


今すぐ MinIO で Hudi を試してください。ご質問がある場合、またはヒントを共有したい場合は、 Slack チャネルを通じてご連絡ください。


ここでも公開されています。