このブログ投稿では、オープンソース ストリーミング ソリューション と を組み合わせて活用して、ストリーミング フローの品質を向上させる方法について説明します。シートベルトを締める! bytewax ydata-profiling ストリーム処理により、処理中および保存前のデータのリアルタイム分析が可能になり、 または することができます。 ステートフル ステートレスに 、 推奨事項、パターン検出、または複雑なイベント処理に使用されます。処理 (ウィンドウ、キーによる結合など) には何が起こったかの履歴が必要です。 ステートフル ストリーム処理は リアルタイムの 電子メールのマスクや型の変換など、ストリーム内の他のデータ ポイントの知識を必要としない 変換に使用されます。 ステートレス ストリーム処理は、 インライン 全体として、データ ストリームは業界で広く使用されており、 、 、 などのユースケースに適用されていることがわかります。 不正行為の検出 患者の監視 イベントの予測メンテナンス すべてのデータ ストリームで考慮すべき重要な側面の 1 つはデータの品質です 通常、データ ウェアハウスやダッシュボード ソリューションの作成中にデータ品質が評価される従来のモデルとは異なり、 。 ストリーミング データは継続的な監視が必要です 収集から下流アプリケーションへの供給まで、プロセス全体を通じてデータ品質を維持することが不可欠です。結局のところ、データ品質の低下によるコストは組織にとって高くつく可能性があります。 「不良データのコストは、ほとんどの企業の収益の 15% ~ 25% に上ります。 (…) これらのコストの 3 分の 2 は、データ品質を前面に押し出すことで削減できます。」 — トーマス C. レッドマン、『Getting in Front on Data Quality』の著者 この記事では、 と 組み合わせてストリーミング フローの品質をプロファイリングし、改善する方法を説明します。 bytewa ydata-profiling Bytewax を使用したデータ プロフェッショナル向けのストリーム処理 は、Python 開発者向けに特別に設計された OSS ストリーム処理フレームワークです。 バイトワックス これにより、ユーザーは、Flink、Spark、Kafka Streams と同様の機能を と同時に、フレンドリーで使い慣れたインターフェイスと 備えたストリーミング データ パイプラインとリアルタイム アプリケーションを構築できる Python エコシステムとの 100% 互換性を提供できます。 組み込みの使用 または既存の Python ライブラリを使用して、 、 ことができます。 コネクタ リアルタイムおよびストリーミング データ ソース (Kafka、RedPanda、WebSocket など) に接続し 変換されたデータをさまざまなダウンストリーム システム (Kafka、寄木細工ファイル、データ レイクなど) に書き出す 変換に関しては、Bytewax は、 、 、 メソッドを使用して 、回復やスケーラビリティなどの使い慣れた機能を備えています。 マップ ウィンドウ処理 集計 ステートフルおよびステートレス変換を容易にし バイトワックス ます。 データ ストリームに対する Python ファーストかつデータ中心のエクスペリエンスを促進します データ エンジニアやデータ サイエンティスト向けに特別に構築されてい これにより、ユーザーは、Spark や Flink などの JVM ベースのストリーミング プラットフォームを学習および維持することなく 、ニーズを満たすために必要なカスタマイズを作成できます。 、ストリーミング データ パイプラインとリアルタイム アプリケーションを構築し Bytewax は多くのユースケースに適しています。 、 、 、 もっと。 生成 AI 用のパイプラインの埋め込み データ ストリーム内の欠損値の処理 ストリーミング コンテキストで言語モデルを使用して金融市場を理解する ユースケースのインスピレーションや、ドキュメント、チュートリアル、ガイドなどの詳細については、お気軽にチェックしてください。 。 バイトワックスのウェブサイト データ ストリームのデータ プロファイリングを行う理由 、次のステップを指します。 : その構造、動作、品質。 データ プロファイリングは、あらゆる機械学習タスクを正常に開始するための鍵であり データを徹底的に理解する 一言で言えば、 データの形式と基本的な記述子 (サンプル数、特徴の数/種類、重複値など) に関連する側面の分析が含まれます。 (欠落データや不均衡な特徴の存在など)、およびデータ収集または処理中に発生する可能性のあるその他の複雑な要因(たとえば、誤った値や一貫性のない特徴)。 データプロファイリング 本質的な特性 。 高いデータ品質基準を確保することは、すべてのドメインと組織にとって重要ですが、状況が急速に変化する可能性があり、即時対応が必要になる可能性がある (医療モニタリング、株価、大気質ポリシーなど) 連続データを出力するドメインを運用しているドメインに特に関係があります 多くのドメインでは、データベースに保存されている履歴データを考慮した探索的データ分析の観点からデータ プロファイリングが使用されます。逆に、データ ストリームの場合、 、プロセスのさまざまな時間フレームや段階でデータをチェックする必要があります。 ストリームに沿った継続的な検証と品質管理にはデータ プロファイリングが不可欠となり 組み込むことで、データの現在の状態に関する でき、 (値の破損や形式の変更など) に関連するものであっても、 (データのドリフト、ビジネス ルールや結果からの逸脱など) に関連するものであっても、潜在的に重大な問題について警告を受けることができます。 自動化されたプロファイリングをデータ フローに フィードバックをすぐに取得 データの一貫性や整合性 短期間に発生するイベント 現実世界の領域では、自動プロファイリングが、複数の脳のパズルやシステムを運用停止にする必要から私たちを救ってくれるかもしれません。 マーフィーの法則が必ず起こり、「すべてが間違いなくうまくいかない可能性がある」という データプロファイリングに関しては、 一貫して重要な役割を果たしています。 、どちらかの場合 また データ。それもそのはずです。 ydata-profiling 群衆のお気に入り 表形式 時系列 これは 1 行のコードで広範な分析と洞察を得ることができます。 複雑で時間のかかる操作が内部で実行されます。ydata-profiling は 特徴タイプ (数値またはカテゴリ) に応じて、プロファイリング レポートに表示される 。 、データに含まれる特徴タイプを自動的に検出し、 概要統計と視覚化を調整します を促進するこのパッケージは、ペアごとの相互 と 関係に焦点を当てて、 、 または 値から 特徴に至るまで、 提供します。 データ中心の分析 作用 相関 特徴間の既存の関係も強調し 重複 定数 偏った 不均衡な データ品質アラートの徹底的な評価を これは、最小限の労力で ができます。 データの品質を 360 度見渡すこと すべてをまとめる: bytewax と ydata-profiling プロジェクトを開始する前に、まず Python の依存関係を設定し、データ ソースを構成する必要があります。 まず、 パッケージと パッケージをインストールしましょう ( bytewax ydata-profiling これには仮想環境を使用するとよいでしょう — これらの指示を確認してください 追加のガイダンスが必要な場合は!) pip install bytewax==0.16.2 ydata-profiling==4.3.1 次に、アップロードします (ライセンス — CC0: パブリック ドメイン)。これには、さまざまな IoT デバイスから いくつかの測定値が含まれています。 環境センサーテレメトリーデータセット の温度、湿度、一酸化炭素、液体石油ガス、煙、光、動きの wget https://raw.githubusercontent.com/bytewax/air-quality-sensor/main/data/iot_telemetry_data_1000 、入力はストリーミング プラットフォームで期待されるものと同じになります。 。この記事では、 bytewax を使用してデータフローを作成します。 実稼働環境では、これらの測定値は各デバイスによって継続的に生成され カフカのような ストリーミング データで見つかるコンテキストをシミュレートするために、CSV ファイルから一度に 1 行ずつデータを読み取り、 (補足として、データフローは本質的に、有向非巡回グラフ (DAG) として説明できるデータ パイプラインです。) まず、 をいくつか行います。 必要なインポート from datetime import datetime, timedelta, timezone from bytewax.dataflow import Dataflow from bytewax.connectors.stdio import StdOutput from bytewax.connectors.files import CSVInput from bytewax.testing import run_main 次に、データフロー オブジェクトを定義します。その後、ステートレス マップ メソッドを使用して、文字列を DateTime オブジェクトに変換し、データを形式 (device_id、data) に再構築する関数を渡します。 マップ メソッドは、ステートレスな方法で各データ ポイントに変更を加えます。データの形状を変更した理由は、次のステップでデータを簡単にグループ化し、すべてのデバイスのデータを同時にではなく、デバイスごとに個別にプロファイリングできるようにするためです。 flow = Dataflow() flow.input("simulated_stream", CSVInput("/content/iot_telemetry_data_1000")) # parse timestamp def parse_time(reading_data): reading_data["ts"] = datetime.fromtimestamp(float(reading_data["ts"]), timezone.utc) return reading_data flow.map(parse_time) # remap format to tuple (device_id, reading_data) flow.map(lambda reading_data: (reading_data['device'], reading_data)) 次に、 のステートフル機能を利用して、定義した期間にわたって各デバイスのデータを収集します。 時間の経過に伴うデータのスナップショットを期待しているため、ウィンドウ オペレーターはこれを行うのに最適な方法です。 bytewax ydata-profiling では、特定のコンテキストに対して指定されたデータフレームの要約統計を生成できます。たとえば、この例では、各 IoT デバイスまたは特定の時間枠を参照するデータのスナップショットを作成できます。 ydata-profiling from bytewax.window import EventClockConfig, TumblingWindow # This is the accumulator function, and outputs a list of readings def acc_values(acc, reading): acc.append(reading) return acc # This function instructs the event clock on how to retrieve the # event's datetime from the input. def get_time(reading): return reading["ts"] # Configure the `fold_window` operator to use the event time. cc = EventClockConfig(get_time, wait_for_system_duration=timedelta(seconds=30)) # And a tumbling window align_to = datetime(2020, 1, 1, tzinfo=timezone.utc) wc = TumblingWindow(align_to=align_to, length=timedelta(hours=1)) flow.fold_window("running_average", cc, wc, list, acc_values) flow.inspect(print) スナップショットを定義した後、 利用するのは、分析したいデータフレームごとに を呼び出すのと同じくらい簡単です。 ydata-profiling ProfileReport import pandas as pd from ydata_profiling import ProfileReport def profile(device_id__readings): print(device_id__readings) device_id, readings = device_id__readings start_time = readings[0]['ts'].replace(minute=0, second=0, microsecond=0).strftime('%Y-%m-%d %H:%M:%S') df = pd.DataFrame(readings) profile = ProfileReport( df, tsmode=True, sortby="ts", title=f"Sensor Readings - device: {device_id}" ) profile.to_file(f"Ts_Profile_{device_id}-{start_time}.html") return f"device {device_id} profiled at hour {start_time}" flow.map(profile) この例では、map メソッドの関数の一部としてイメージをローカル ファイルに書き込みます。これらはメッセージング ツールを介して報告されるか、将来的にはリモート ストレージに保存される可能性があります。 プロファイルが完了すると、データフローは何らかの出力を期待するため、組み込み を使用して、プロファイルされたデバイスと、マップ ステップのプロファイル関数から渡されたプロファイルされた時刻を出力できます。 StdOutput flow.output("out", StdOutput()) Bytewax データフローを実行するには複数の方法があります。この例では、同じローカル マシンを使用しますが、Bytewax は、複数のホストにわたる複数の Python プロセス上で実行することもできます。 、 を使って 、 と 。 Dockerコンテナ Kubernetes クラスター もっと この記事ではローカル セットアップを続けますが、ヘルパー ツールを確認することをお勧めします。 パイプラインが本番環境に移行する準備ができたら、Kubernetes データフローのデプロイメントを管理します。 ワックスctl データフロー定義が含まれるファイルと同じディレクトリにあると仮定すると、次のコマンドを使用して実行できます。 python -m bytewax.run ydata-profiling-streaming:flow その後、プロファイリング レポートを使用して、データ品質を検証し、スキーマまたはデータ形式の変更を確認し、 。 さまざまなデバイスまたは時間枠間のデータ特性を比較できます 実際に、 これにより、2 つのデータ プロファイル間の違いがわかりやすくなり、調査が必要な重要なパターンや対処する必要がある問題を簡単に検出できるようになります。 比較レポート機能 snapshot_a_report = ProfileReport(df_a, title="Snapshot A") snapshot_b_report = ProfileReport(df_b, title="Snapshot B") comparison_report =snapshot_a_report(snapshot_b_report) comparison_report.to_file("comparison_report.html") 独自のデータ ストリームを探索する準備はできましたか? データ ストリームの検証は、データ品質の問題を継続的に特定し、異なる期間にわたるデータの状態を比較するために重要です。 、 、 、 の組織(すべてが継続的なデータ ストリームを扱う組織)に 。 ヘルスケア エネルギー 製造 エンターテイメント とって、自動化されたプロファイリングは、品質評価からデータ プライバシーに至るまで、データ ガバナンスのベスト プラクティスを確立するための鍵となります これには、この記事で紹介されているように、 と 組み合わせることでシームレスな方法で達成できるデータのスナップショットの分析が必要です。 bytewax ydata-profiling データ ストリームを処理してスナップショットに構造化するために必要なすべてのプロセスを処理します。その後、データ ストリームを要約して、データ特性の包括的なレポートを通じて と比較できます。 Bytewax は、 ydata-profiling 受信データを適切に処理およびプロファイリングできることにより、 から、 (例: 詐欺または侵入/脅威の検出)、 、および期待から逸脱するその他のイベント (例: データ ドリフトまたはビジネス ルールとの不整合) など、現実世界のアクティビティに由来する追加の問題の強調と軽減に至るまで、さまざまなドメインにわたる多数のユース ケースが開かれます。 データ スキーマおよび形式のエラーの修正 異常検出 機器の故障 これで、データ ストリームの探索を開始する準備がすべて整いました。他にどのような使用例を見つけたかをお知らせください。いつものように、お気軽にコメント欄に記入していただくか、次のアドレスまでお問い合わせください。 さらに質問や提案がありましたら! データ中心の AI コミュニティ それではまたお会いしましょう! 謝辞 ) と Oli Makhasoeva (開発者関係 @ ) -- 開発中 。 この記事は Fabiana Clemente (共同創設者兼 CDO @ ) によって書かれました。 Yデータ ) と Miriam Santos (開発者関係 @ Yデータ ) -- 開発中 ydata-プロファイリング -- およびザンダー・マセソン (CEO 兼創設者 @ バイトワックス バイトワックス バイトワックス OSS パッケージに関する追加情報は、それぞれのドキュメントに記載されています。 & 。 ydata-プロファイリングドキュメント バイトワックスのドキュメント でも公開されています ここ