前回の投稿では、Apache Iceberg の概要を説明し、Apache Iceberg がストレージに MinIO を使用する方法を示しました。開発マシンのセットアップ方法も示しました。これを行うために、Docker Compose を使用して、処理エンジンとして Apache Spark コンテナー、REST カタログ、ストレージ用の MinIO をインストールしました。最後に、Apache Spark を使用してデータを取り込み、PyIceberg を使用してデータをクエリする非常に単純な例で締めくくりました。 Apache Iceberg を初めて使用する場合、または開発マシンに Apache Iceberg をセットアップする必要がある場合は、この入門記事をお読みください。
この投稿では、前回の投稿の続きから、一般的なビッグ データの問題、つまり、生データ、非構造化データ、および構造化データ (生のデータから厳選されたデータ) のためのストレージを提供する単一のソリューションの必要性について調査します。データ)。さらに、同じソリューションは、厳選されたデータに対する効率的なレポート作成を可能にする処理エンジンを提供する必要があります。これは、データ レイクハウスの約束です。構造化データに対するデータ ウェアハウスの機能と非構造化データに対するデータ レイクの機能をすべて一元化されたソリューションで提供します。
ビッグデータのシナリオをさらに詳しく見てみましょう。
以下の図は、一般的な問題と仮説的な解決策を示しています。データは複数の場所から複数の形式でデータセンターに流入します。必要なのは、処理エンジンがビジネス インテリジェンス、データ分析、機械学習を効率的にサポートできるように生データを変換できる一元化されたソリューションです。同時に、このソリューションは、データ探索と機械学習のために非構造化データ (テキスト、画像、オーディオ、ビデオ) を保存できる必要があります。また、変換を再実行する必要がある場合や、データの整合性の問題を調査する必要がある場合に備えて、変換されたデータを元の形式で保持する必要があります。
具体的な例として、顧客のために投資信託を管理している世界的な保管銀行を想像してください。各顧客の各ファンドの会計帳簿と投資記録簿を表すデータが、世界中の地域から絶えずデータ レイクハウスにストリーミングされます。そこから、安全な通過チェック (送信されたものはすべて受信されたかどうか) を行う必要があり、データ品質チェックを実行する必要があります。最後に、データを分割して、一日の始まりと終わりのレポートをサポートする別のストアにロードすることができます。
あるいは、この図は、気象観測所が気温やその他の気象関連データを送信する IOT シナリオを表している可能性があります。どのようなシナリオであっても、必要なのは、データを元の形式で安全に保存し、より構造化された方法で保存する必要があるデータをすべて 1 つの集中ソリューションで変換および処理する方法です。これがデータ レイクハウスの約束です。データ ウェアハウスとデータ レイクの長所を 1 つの集中ソリューションに組み合わせたものです。
上記の仮説的な解決策を現実にしてみましょう。これを下の図に示します。
データ レイクハウスには 2 つの論理コンポーネントがあります。 1 つ目は、データ ウェアハウスに相当する構造化データ用の Apache Iceberg の実装です。 (これは、前回の投稿で構築したものです。そのため、ここでは詳しく説明しません。) 2 番目の論理コンポーネントは、非構造化データ用の MinIO (データ レイクハウスのデータ レイク側) です。 Lakehouse に入るすべてのデータは、MinIO のこの論理インスタンスに配信されます。実際には、上に示した MinIO の 2 つの論理インスタンスは、データ センター内の MinIO の同じインスタンスである可能性があります。 MinIO を実行しているクラスターがすべての受信データの取り込みと Apache Iceberg の処理要件を処理できる場合、そのようなデプロイメントはコストを節約します。実際、これがこの投稿で行うことです。 Apache Iceberg の MinIO インスタンス内のバケットを使用して、すべての非構造化データと生データを保持します。
この演習で使用するデータセットを導入し、それを MinIO に取り込むことで、データの操作を開始しましょう。
この投稿で実験するデータセットは、Global Surface Summary of the Day (GSOD) として知られる公開データセットで、米国海洋大気局 (NOAA) によって管理されています。 NOAA は現在、世界中の 9000 以上の観測点からのデータを維持しており、GSOD データセットにはこれらの観測点からの 1 日あたりの概要情報が含まれています。データはここからダウンロードできます。 gzip ファイルは 1 年に 1 つあります。 1929 年に始まり、2022 年に終わります (この記事の執筆時点)。データ レイクハウスを構築するために、毎年ファイルをダウンロードし、データ レイクハウスに使用されている MinIO インスタンスに配置しました。すべてのファイルを「lake」という名前のバケットに置きます。MinIO のインスタンス内の 2 つのバケットを以下に示します。 「warehouse」バケットは、Apache Iceberg をインストールしたときに作成されました。
MinIO コンソールを使用して生データを手動で取り込みました。プロフェッショナルなパイプラインでは、これを自動化された方法で実行する必要があります。 Kafka と Kubernetes を使用してデータを MinIO に取得する方法については、「Kubernetes で Kafka をセットアップしてMinIO にデータをストリーミングする方法」を参照してください。
これらのファイルは、ダウンロードの利便性を考慮してパッケージ化されています。これらのファイルを直接使用してレポートやグラフを作成しようとすると、非常に IO を集中的に使用する操作 (CPU を集中的に使用する可能性があります) になります。指定した測点からの年間平均気温をグラフにしたいと想像してください。これを行うには、すべてのファイルを開いてすべての行を検索し、対象の日にステーションに一致するエントリを探す必要があります。より良いオプションは、データをキュレーションし、キュレーションされたデータに関するレポートを作成するために Data Lakehouse 機能を使用することです。最初のステップは、新しい Jupyter ノートブックをセットアップすることです。
まず、Apache Spark 処理エンジンにインストールされている Jupyter Notebook サーバーに移動します。これはhttp://localhost:8888にあります。新しいノートブックを作成し、最初のセルに以下に示すインポートを追加します。 (この投稿で作成した完成したノートブックはすべて、ここにあります。)
from collections import namedtuple import csv import json import logging import tarfile from time import time from typing import List from minio import Minio from minio.error import S3Error import pandas as pd import pyarrow as pa import pyarrow.parquet as pq pd.options.mode.chained_assignment = None bucket_name = 'lake'
MinIO ライブラリをインポートしていることに注意してください。私たちが構築しているノートブックは、非構造化ストレージ (MinIO Data Lake) から構造化ストレージ (内部で MinIO を使用する Apache Iceberg) への ETL パイプラインです。ノートブックの開始部分は次のようになります。
これで、Iceberg データベースとデータ用のテーブルを作成できます。
GSOD データセットのデータベースとテーブルの作成は簡単です。以下のスクリプトは、「noaa」という名前のデータベースを作成します。これをインポート後のセルに追加します。
%%sql CREATE DATABASE IF NOT EXISTS noaa;
以下のスクリプトはgsod
テーブルを作成します。
%%sql CREATE TABLE IF NOT EXISTS noaa.gsod ( station string, date timestamp, latitude double, longitude double, name string, temp double ) USING iceberg PARTITIONED BY (station)
Apache Iceberg を使って遊んでいると、実験をやり直すためにテーブルを削除したくなることがよくあります。以下のスクリプトは、 gsod
テーブルの設定を変更したい場合にそのテーブルを削除します。
%%sql DROP TABLE IF EXISTS noaa.gsod;
これで、生の年ベースの zip ファイルが Lakehouse に保存されたので、それらを抽出、変換し、Data Lakehouse にロードできます。最初にいくつかのヘルパー関数を紹介しましょう。以下の関数は、プレフィックスに一致する指定されたバケット内の MinIO オブジェクトのリストを返します。
def get_object_list(bucket_name: str, prefix: str) -> List[str]: ''' Gets a list of objects from a bucket. ''' logger = logging.getLogger('gsod_logger') logger.setLevel(logging.INFO) # Load the credentials and connection information. with open('credentials.json') as f: credentials = json.load(f) # Get data of an object. try: # Create client with access and secret key client = Minio(credentials['url'], # host.docker.internal credentials['accessKey'], credentials['secretKey'], secure=False) object_list = [] objects = client.list_objects(bucket_name, prefix=prefix, recursive=True) for obj in objects: object_list.append(obj.object_name) except S3Error as s3_err: logger.error(f'S3 Error occurred: {s3_err}.') raise s3_err except Exception as err: logger.error(f'Error occurred: {err}.') raise err return object_list
上記のコードでは、MinIO 認証情報ファイルが必要であることに注意してください。これは MinIO コンソールから取得できます。 MinIO 認証情報の取得方法がわからない場合は、この投稿のセクションで認証情報を生成してダウンロードする方法を説明します。
次に、MinIO からオブジェクトを取得する関数が必要です。オブジェクトは tar ファイルであるため、tar アーカイブからデータを抽出し、Pandas DataFrame に変換するためにこの関数も必要です。これは以下の関数を使用して行われます。
def tar_to_df(bucket_name: str, object_name: str) -> pd.DataFrame: ''' This function will take a tarfile reference in MinIO and do the following: - unzip the tarfile - turn the data into a single DataFrame object ''' logger = logging.getLogger('gsod_logger') logger.setLevel(logging.INFO) # Temp file to use for processing the tar files. temp_file_name = 'temp.tar.gz' # Load the credentials and connection information. with open('credentials.json') as f: credentials = json.load(f) # Get data of an object. try: # Create client with access and secret key client = Minio(credentials['url'], # host.docker.internal credentials['accessKey'], credentials['secretKey'], secure=False) object_info = client.fget_object(bucket_name, object_name, temp_file_name) Row = namedtuple('Row', ('station', 'date', 'latitude', 'longitude', 'elevation', 'name', 'temp', 'temp_attributes', 'dewp', 'dewp_attributes', 'slp', 'SLP_attributes', 'stp', 'stp_attributes', 'visib', 'visib_attributes', 'wdsp', 'wdsp_attributes', 'mxspd', 'gust', 'max', 'max_attributes', 'min', 'min_attributes', 'prcp', 'prcp_attributes', 'sndp', 'frshtt')) # Columns of interest and their data types. dtypes={ 'station': 'string', 'date': 'datetime64[ns]', 'latitude': 'float64', 'longitude': 'float64', 'name': 'string', 'temp': 'float64' } tar = tarfile.open(temp_file_name, 'r:gz') all_rows = [] for member in tar.getmembers(): member_handle = tar.extractfile(member) byte_data = member_handle.read() decoded_string = byte_data.decode() lines = decoded_string.splitlines() reader = csv.reader(lines, delimiter=',') # Get all the rows in the member. Skip the header. _ = next(reader) file_rows = [Row(*l) for l in reader] all_rows += file_rows df = pd.DataFrame.from_records(all_rows, columns=Row._fields) df = df[list(dtypes.keys())] for c in df.columns: if dtypes[c] == 'float64': df[c] = pd.to_numeric(df[c], errors='coerce') df = df.astype(dtype=dtypes) except S3Error as s3_err: logger.error(f'S3 Error occurred: {s3_err}.') raise s3_err except Exception as err: logger.error(f'Error occurred: {err}.') raise err return df
これらの関数は両方とも、MinIO で何をしているかに関係なく再利用できる汎用ユーティリティです。これらをコード スニペットの個人コレクションまたは組織の Github Gist に入れることを検討してください。
これで、Lakehouse の倉庫側にデータを送信する準備が整いました。これは、Spark セッションを開始し、すべての GSOD tar ファイルをループし、抽出、変換して、Iceberg テーブルに送信する以下のコードで実行できます。
from pyspark.sql import SparkSession spark = SparkSession.builder.appName('Jupyter').getOrCreate() objects = get_object_list(bucket_name, 'noaa/gsod') for obj in reversed(objects): print(obj) df = tar_to_df(bucket_name, obj) table = pa.Table.from_pandas(df) pq.write_table(table, 'temp.parquet') df = spark.read.parquet('temp.parquet') df.write.mode('append').saveAsTable('noaa.gsod')
このセクションのコードは、MinIO バケットからデータを手動でロードしました。運用環境では、このコードをサービスにデプロイし、自動取り込みにMinIO Bucket Eventsを使用することができます。
レポート用に新しいノートブックを始めましょう。下のセルは、必要なユーティリティをインポートします。具体的には、データの取得には PyIceberg、データのラングリングには Pandas、データの視覚化には Seaborn を使用します。
from pyiceberg.catalog import load_catalog from pyiceberg.expressions import GreaterThanOrEqual, EqualTo import pandas as pd import seaborn as sns pd.options.mode.chained_assignment = None catalog = load_catalog('default')
私たちがやりたいのは、特定の気象観測所の年間平均気温を計算することです。これにより、1 年ごとに 1 つの数値が得られ、1 年のすべての季節が考慮されます。最初のステップは、特定のステーションのすべてのデータについて Iceberg にクエリを実行することです。これは、以下で PyIceberg を使用して行われます。
tbl = catalog.load_table('noaa.gsod') sc = tbl.scan(row_filter="station == '72502014734'") df = sc.to_arrow().to_pandas() df.head(10)
上記のコードで使用されているステーション ID は、米国ニュージャージー州のニューアーク リバティー国際空港にあるステーションのものです。 1973 年から運用されています (約 50 年間のデータ)。コードを実行すると、以下の出力が得られます。 (サンプルを取得するために DataFrame head() 関数を使用しています。)
次に、年ごとにグループ化し、平均を計算する必要があります。 Pandas を使用すると、これは数行のコードになります。ループ処理は必要ありません。
df['year'] = df['date'].dt.year df = df[['year','temp']] grouped_by_year = df.groupby('year') average_by_year = grouped_by_year.mean() average_by_year
このセルを実行すると、年ごとに 1 つの値が表示されます。上位数年を以下に示します。
最後に、年間平均を視覚化できます。 Seaborn を使用してライン プロットを作成します。これにはたった 1 行のコードが必要です。
sns.lineplot(data=df, x="year", y="temp", errorbar=None)
折れ線グラフを以下に示します。
レポートを初めて実行した後に必ず実行する必要があるもう 1 つのコマンドを以下に示します。
[task.file.file_path for task in sc.plan_files()]
これは、クエリに一致するデータを持つ Apache Iceberg 内のすべてのデータ ファイルのリストを提供するリスト内包表記です。 Iceberg のメタデータは多くのものを除外できますが、多くのものがあるでしょう。関連するすべてのファイルを見ると、高速オブジェクト ストレージが Lakehouse の重要な部分であるという事実がわかります。
この投稿では、MinIO と Apache Iceberg を使用してデータ レイクハウスを構築しました。これは GSOD データセットを使用して行いました。まず、生データが Data Lakehouse (MinIO) の Lake 側にアップロードされました。そこから、Apache Iceberg (データ レイクハウスのデータ ウェアハウス側) にデータベースとテーブルを作成しました。次に、データをレイクからデータ レイクハウス内のウェアハウスに移動するためのシンプルな ETL パイプラインを構築しました。
Apache Iceberg にデータを完全に入力すると、年間平均気温レポートを作成して視覚化できるようになりました。
運用環境でデータ レイクハウスを構築する場合は、MinIO のエンタープライズ機能が必要になることに注意してください。 オブジェクトのライフサイクル管理、セキュリティのベスト プラクティス、 Kafka ストリーミング、およびバケット イベントを検討することを検討してください。
ここでも公開されています。