このブログ投稿では、オープンソース ストリーミング ソリューションbytewaxとydata-profilingを組み合わせて活用して、ストリーミング フローの品質を向上させる方法について説明します。シートベルトを締める!
ストリーム処理により、処理中および保存前のデータのリアルタイム分析が可能になり、ステートフルまたはステートレスにすることができます。
ステートフル ストリーム処理は、リアルタイムの推奨事項、パターン検出、または複雑なイベント処理に使用されます。処理 (ウィンドウ、キーによる結合など) には何が起こったかの履歴が必要です。
ステートレス ストリーム処理は、電子メールのマスクや型の変換など、ストリーム内の他のデータ ポイントの知識を必要としないインライン変換に使用されます。
全体として、データ ストリームは業界で広く使用されており、不正行為の検出、患者の監視、イベントの予測メンテナンスなどのユースケースに適用されていることがわかります。
通常、データ ウェアハウスやダッシュボード ソリューションの作成中にデータ品質が評価される従来のモデルとは異なり、ストリーミング データは継続的な監視が必要です。
収集から下流アプリケーションへの供給まで、プロセス全体を通じてデータ品質を維持することが不可欠です。結局のところ、データ品質の低下によるコストは組織にとって高くつく可能性があります。
「不良データのコストは、ほとんどの企業の収益の 15% ~ 25% に上ります。 (…) これらのコストの 3 分の 2 は、データ品質を前面に押し出すことで削減できます。」
— トーマス C. レッドマン、『Getting in Front on Data Quality』の著者
この記事では、 bytewa
とydata-profiling
組み合わせてストリーミング フローの品質をプロファイリングし、改善する方法を説明します。
これにより、ユーザーは、Flink、Spark、Kafka Streams と同様の機能を備えたストリーミング データ パイプラインとリアルタイム アプリケーションを構築できると同時に、フレンドリーで使い慣れたインターフェイスとPython エコシステムとの 100% 互換性を提供できます。
組み込みの使用
変換に関しては、Bytewax は、マップ、ウィンドウ処理、集計メソッドを使用してステートフルおよびステートレス変換を容易にし、回復やスケーラビリティなどの使い慣れた機能を備えています。
バイトワックス
これにより、ユーザーは、Spark や Flink などの JVM ベースのストリーミング プラットフォームを学習および維持することなく、ストリーミング データ パイプラインとリアルタイム アプリケーションを構築し、ニーズを満たすために必要なカスタマイズを作成できます。
Bytewax は多くのユースケースに適しています。
ユースケースのインスピレーションや、ドキュメント、チュートリアル、ガイドなどの詳細については、お気軽にチェックしてください。
データ プロファイリングは、あらゆる機械学習タスクを正常に開始するための鍵であり、次のステップを指します。
一言で言えば、
高いデータ品質基準を確保することは、すべてのドメインと組織にとって重要ですが、状況が急速に変化する可能性があり、即時対応が必要になる可能性がある (医療モニタリング、株価、大気質ポリシーなど) 連続データを出力するドメインを運用しているドメインに特に関係があります。
多くのドメインでは、データベースに保存されている履歴データを考慮した探索的データ分析の観点からデータ プロファイリングが使用されます。逆に、データ ストリームの場合、ストリームに沿った継続的な検証と品質管理にはデータ プロファイリングが不可欠となり、プロセスのさまざまな時間フレームや段階でデータをチェックする必要があります。
自動化されたプロファイリングをデータ フローに組み込むことで、データの現在の状態に関するフィードバックをすぐに取得でき、データの一貫性や整合性(値の破損や形式の変更など) に関連するものであっても、短期間に発生するイベント(データのドリフト、ビジネス ルールや結果からの逸脱など) に関連するものであっても、潜在的に重大な問題について警告を受けることができます。
マーフィーの法則が必ず起こり、「すべてが間違いなくうまくいかない可能性がある」という現実世界の領域では、自動プロファイリングが、複数の脳のパズルやシステムを運用停止にする必要から私たちを救ってくれるかもしれません。
データプロファイリングに関しては、 ydata-profiling
一貫して重要な役割を果たしています。
複雑で時間のかかる操作が内部で実行されます。ydata-profiling は、データに含まれる特徴タイプを自動的に検出し、特徴タイプ (数値またはカテゴリ) に応じて、プロファイリング レポートに表示される概要統計と視覚化を調整します。
データ中心の分析を促進するこのパッケージは、ペアごとの相互作用と相関関係に焦点を当てて、特徴間の既存の関係も強調し、重複または定数値から偏った不均衡な特徴に至るまで、データ品質アラートの徹底的な評価を提供します。
これは、最小限の労力でデータの品質を 360 度見渡すことができます。
プロジェクトを開始する前に、まず Python の依存関係を設定し、データ ソースを構成する必要があります。
まず、 bytewax
パッケージとydata-profiling
パッケージをインストールしましょう (これには仮想環境を使用するとよいでしょう —
pip install bytewax==0.16.2 ydata-profiling==4.3.1
次に、アップロードします
wget https://raw.githubusercontent.com/bytewax/air-quality-sensor/main/data/iot_telemetry_data_1000
実稼働環境では、これらの測定値は各デバイスによって継続的に生成され、入力はストリーミング プラットフォームで期待されるものと同じになります。
(補足として、データフローは本質的に、有向非巡回グラフ (DAG) として説明できるデータ パイプラインです。)
まず、必要なインポートをいくつか行います。
from datetime import datetime, timedelta, timezone from bytewax.dataflow import Dataflow from bytewax.connectors.stdio import StdOutput from bytewax.connectors.files import CSVInput from bytewax.testing import run_main
次に、データフロー オブジェクトを定義します。その後、ステートレス マップ メソッドを使用して、文字列を DateTime オブジェクトに変換し、データを形式 (device_id、data) に再構築する関数を渡します。
マップ メソッドは、ステートレスな方法で各データ ポイントに変更を加えます。データの形状を変更した理由は、次のステップでデータを簡単にグループ化し、すべてのデバイスのデータを同時にではなく、デバイスごとに個別にプロファイリングできるようにするためです。
flow = Dataflow() flow.input("simulated_stream", CSVInput("/content/iot_telemetry_data_1000")) # parse timestamp def parse_time(reading_data): reading_data["ts"] = datetime.fromtimestamp(float(reading_data["ts"]), timezone.utc) return reading_data flow.map(parse_time) # remap format to tuple (device_id, reading_data) flow.map(lambda reading_data: (reading_data['device'], reading_data))
次に、 bytewax
のステートフル機能を利用して、定義した期間にわたって各デバイスのデータを収集します。 ydata-profiling
時間の経過に伴うデータのスナップショットを期待しているため、ウィンドウ オペレーターはこれを行うのに最適な方法です。
ydata-profiling
では、特定のコンテキストに対して指定されたデータフレームの要約統計を生成できます。たとえば、この例では、各 IoT デバイスまたは特定の時間枠を参照するデータのスナップショットを作成できます。
from bytewax.window import EventClockConfig, TumblingWindow # This is the accumulator function, and outputs a list of readings def acc_values(acc, reading): acc.append(reading) return acc # This function instructs the event clock on how to retrieve the # event's datetime from the input. def get_time(reading): return reading["ts"] # Configure the `fold_window` operator to use the event time. cc = EventClockConfig(get_time, wait_for_system_duration=timedelta(seconds=30)) # And a tumbling window align_to = datetime(2020, 1, 1, tzinfo=timezone.utc) wc = TumblingWindow(align_to=align_to, length=timedelta(hours=1)) flow.fold_window("running_average", cc, wc, list, acc_values) flow.inspect(print)
スナップショットを定義した後、 ydata-profiling
利用するのは、分析したいデータフレームごとにProfileReport
を呼び出すのと同じくらい簡単です。
import pandas as pd from ydata_profiling import ProfileReport def profile(device_id__readings): print(device_id__readings) device_id, readings = device_id__readings start_time = readings[0]['ts'].replace(minute=0, second=0, microsecond=0).strftime('%Y-%m-%d %H:%M:%S') df = pd.DataFrame(readings) profile = ProfileReport( df, tsmode=True, sortby="ts", title=f"Sensor Readings - device: {device_id}" ) profile.to_file(f"Ts_Profile_{device_id}-{start_time}.html") return f"device {device_id} profiled at hour {start_time}" flow.map(profile)
この例では、map メソッドの関数の一部としてイメージをローカル ファイルに書き込みます。これらはメッセージング ツールを介して報告されるか、将来的にはリモート ストレージに保存される可能性があります。
プロファイルが完了すると、データフローは何らかの出力を期待するため、組み込みStdOutput
を使用して、プロファイルされたデバイスと、マップ ステップのプロファイル関数から渡されたプロファイルされた時刻を出力できます。
flow.output("out", StdOutput())
Bytewax データフローを実行するには複数の方法があります。この例では、同じローカル マシンを使用しますが、Bytewax は、複数のホストにわたる複数の Python プロセス上で実行することもできます。
この記事ではローカル セットアップを続けますが、ヘルパー ツールを確認することをお勧めします。
データフロー定義が含まれるファイルと同じディレクトリにあると仮定すると、次のコマンドを使用して実行できます。
python -m bytewax.run ydata-profiling-streaming:flow
その後、プロファイリング レポートを使用して、データ品質を検証し、スキーマまたはデータ形式の変更を確認し、さまざまなデバイスまたは時間枠間のデータ特性を比較できます。
実際に、
snapshot_a_report = ProfileReport(df_a, title="Snapshot A") snapshot_b_report = ProfileReport(df_b, title="Snapshot B") comparison_report =snapshot_a_report(snapshot_b_report) comparison_report.to_file("comparison_report.html")
データ ストリームの検証は、データ品質の問題を継続的に特定し、異なる期間にわたるデータの状態を比較するために重要です。
ヘルスケア、エネルギー、製造、エンターテイメントの組織(すべてが継続的なデータ ストリームを扱う組織)にとって、自動化されたプロファイリングは、品質評価からデータ プライバシーに至るまで、データ ガバナンスのベスト プラクティスを確立するための鍵となります。
これには、この記事で紹介されているように、 bytewax
とydata-profiling
組み合わせることでシームレスな方法で達成できるデータのスナップショットの分析が必要です。
Bytewax は、データ ストリームを処理してスナップショットに構造化するために必要なすべてのプロセスを処理します。その後、データ ストリームを要約して、データ特性の包括的なレポートを通じてydata-profilingと比較できます。
受信データを適切に処理およびプロファイリングできることにより、データ スキーマおよび形式のエラーの修正から、異常検出(例: 詐欺または侵入/脅威の検出)、機器の故障、および期待から逸脱するその他のイベント (例: データ ドリフトまたはビジネス ルールとの不整合) など、現実世界のアクティビティに由来する追加の問題の強調と軽減に至るまで、さまざまなドメインにわたる多数のユース ケースが開かれます。
これで、データ ストリームの探索を開始する準備がすべて整いました。他にどのような使用例を見つけたかをお知らせください。いつものように、お気軽にコメント欄に記入していただくか、次のアドレスまでお問い合わせください。
この記事は Fabiana Clemente (共同創設者兼 CDO @ ) によって書かれました。
OSS パッケージに関する追加情報は、それぞれのドキュメントに記載されています。
ここでも公開されています