A Lead Data Engineer and a Tech Enthusiast works on writing Articles on Data Engineering, Data Science and AI
Walkthroughs, tutorials, guides, and tips. This story will teach you how to do something new or how to do something better.
データ エンジニアは、データが不適切な形式、特にジャンク文字やデータ、null 値や空の値になっていること、そして最も重要なのは、レポートやデータ サイエンス モデルを含むすべての下流アプリケーションに影響を与える重複データの処理といった課題に直面することがよくあります。これはエンジニアやサポート チームにとって大変な毎日のタスクとなり、生産性を損ねることなくリソースを急速に消耗します。フレームワークの設計が不十分な場合、開発者が後でこれらのデータ修正を緩和するのに苦労することがよくあります。多くの組織では、非効率的なデータ パイプライン アーキテクチャが原因で冗長なデータがあり、ストレージ コストで数百万ドルのコストがかかり、データが何度も再処理され、リソースの使用率が低下しています。
では、本題に入りましょう。現在の役割で、ストリーミングまたはバッチ データ パイプラインで重複を処理する際に課題に直面したことはありますか? データ エンジニア、データ サイエンティスト、データ アナリストのほとんどは「はい」と答えるでしょう。データ レイク内の重複データを修正するために、現在ではさまざまなツールがありますが、コストはどのくらいでしょうか? アーキテクチャ設計フェーズ自体でこれらを処理できますか? 頭の中には多くの疑問が浮かぶかもしれません。
ストリーミング データの重複排除に役立つツール、その長所と短所、セットアップ、メンテナンスについて詳しく説明します。次に、ストリーミング パイプラインで重複を処理するためのベスト プラクティスと標準について詳しく説明します。
ストリーミング データ パイプラインにおける重複排除の 3 つの主なアプローチを確認しましょう。
すべてのストリーミング パイプラインは、IoT デバイス、センサー、ゲーム統計、交通カメラ、速度検出デバイス、自律走行車から車両使用データをストリーミングするスマート システムなど、さまざまなアプリケーションからデータを抽出します。これらのシステムのほとんどは通常、イベントをストリーミングするパターンに従っており、各イベントには通常、一意の識別子 (たとえば、販売トランザクションの小売店トランザクション ID とそのイベント タイムスタンプ) があります。一部のシステムには一般に一意の識別子がありません。たとえば、速度センサー デバイスには通常センサー ID がありますが、すべてのストリーム イベントにはイベント タイムスタンプ以外の一意の識別子はありません。このような場合、同じセンサー デバイスのストリーミング イベントが重複する可能性が高くなります。
州間高速道路上のデバイスからストリーミングされる車両の速度データが、混雑した日には通常 1 分あたり大量のデータになるユースケースを考えてみましょう。別の例としては、ホリデー セールの期間中、小売業は 1 日あたり数十億件のトランザクションを処理する必要があります。このような大量のイベントをリアルタイムで処理し、データの重複を排除することは、外れ値や重複を排除して正確なレポートとデータ サイエンス モデルを効率的に動作させるために非常に重要です。
技術的な観点からお話しすると、Google Cloud はPub/Sub を提供しています。これは、メッセージを生成するサービスとそれらのメッセージを処理するサービスを分離する、非同期でスケーラブルなメッセージング サービスです。これは、ストリーミング分析やデータ統合パイプラインでデータの読み込みと配信によく使用されます。ユーザー インタラクション イベント、サーバー イベント、リアルタイム イベントの取り込み、データベース間でのデータの複製、組織全体でのビジネス イベントの共有のためのエンタープライズ イベント バスとしての機能、およびデータ パイプラインを介して他の Google Cloud 製品と組み合わせて使用されるセンサーやアプリケーション イベントなどのアプリケーションからのデータ ストリーミングによく使用されます。
Pub/Sub は、属性を使用して重複データを処理するシンプルかつ強力な方法を提供します。Pub/Sub トピック内のすべてのメッセージには、メタデータにキーと値のペアを含めることができます。このデータを使用して重複イベントを識別し、データ パイプラインで重複排除を有効にすることができます。データ処理サービスに負荷をかけると、通常はリソース コストが高くなり、データ パイプラインが大幅に遅くなります。
transaction_idのような一意のフィールドを含むメッセージの場合、メッセージを公開するときにこの値を属性として設定できます。Dataflow で Pub/Sub からメッセージを読み取るときに、この属性を使用して重複を排除するようにパイプラインを構成できます。
このソリューションは、重複が Pub/Sub トピック内の一意の識別子を使用してソース アプリケーションまたはデバイスからストリーミングされる場合に有効です。このソリューションの制限は、重複メッセージが互いに 10 分以内に公開される場合にのみ有効であることです。実装は簡単ですが、Pub/Sub の時間枠制限によりスケーラビリティが不足しています。これは、各メッセージの 10 分以内に重複イベントを生成する高速カメラやセンサー デバイスなどのインスタンスで非常に役立ち、非常にうまく機能します。
ダウンストリームによるメッセージの消費が遅れたり、Pub/Sub が配信されたメッセージの確認応答を受信しなかったりして、Pub/Sub などのパブリッシャー自体内で重複が生成され、Pub/Sub が同じ Message_id を使用して同じメッセージの送信を再試行し、パブリッシャーで重複イベントが作成される場合があります。これに対処するには、Pub/Sub を使用してペイロードの message_id を特定し、これを識別子として使用できます。Google Cloud Platform (GCP) でデータをストリーム処理するためのフルマネージド サービスであるCloud DataFlow は、すべてのレコードを正確に 1 回処理します。これは私たちにとって何を意味しますか? - message_id に基づいて重複イベントを識別し、データ パイプラインで処理するときにそれらを排除しますが、まれに、データフロー内の異なるワーカー ノードで処理されたときにこれらの重複イベントがダウンストリームに効率的に到達しないことがあります。データレイクに重複が残ることになります。
このようなケースの処理方法については、この記事の最後でさらに詳しく説明します。ストリーミング データを重複排除するための残りのオプションに焦点を当てましょう。
これで、Pub/Sub が重複イベントを処理する方法がわかりました。次は、Pub/Sub サブスクライバーがソース アプリケーションからストリーミング メッセージを読み取るCloud DataFlow を使用してこれらのメッセージを処理する方法について説明します。Dataflow は、オープンソースのApache Beam SDK を使用して高度なストリーミング機能を有効にする、フルマネージド サービスです。Dataflow はジョブごとに 4,000 ワーカー ノードまで拡張でき、自動スケーリング機能を使用してペタバイト単位のデータを処理できるため、バッチ パイプラインとストリーミング パイプラインの両方でリソースをより効率的に利用できます。
Apache Beam には、重複を削除するためのより構成可能で堅牢な方法を提供する組み込みのDeduplicate PTransform が用意されています。この方法では、Beam のステートフル API を使用して、監視された各キーの状態を維持し、ユーザーが定義した時間枠内で重複を削除します。このアプローチにより、データ内の特定のフィールドまたはメッセージ コンテンツ全体に基づいて重複削除ロジックを定義でき、イベント時間または処理時間に基づいて重複削除を構成することができます。
この機能を試すには、 GitHubからサンプル データ パイプライン コードを確認してください。
ここで注意すべき点は、バッチ パイプラインでは常に 1 回限りの処理が使用されるのに対し、ストリーミング パイプラインではデフォルトで 1 回限りの処理が使用されますが、少なくとも 1 回の処理を使用するように構成することもできるということです。ここでの注意点は、データフローが現在処理しているウィンドウが、重複メッセージを処理しているウィンドウと交差すると、データフローはレコード ID をメモリに保存しないため、すでに処理されているものと比較されないことです。データフローは、遅れて到着するデータに基づいて、またはデータ パイプラインに未処理のメッセージをキャプチャしてCloud Bigquery (GCP 上の完全に管理されたクラウドネイティブ データ ウェアハウス) のテーブルに書き込む別のレッグがあるかどうかに基づいて、このメッセージを破棄する場合があります。または、クラウド ストレージ(非構造化データをファイルとして保存し、さらに再処理やトラブルシューティングを行うためのマネージド サービス) に書き込みます。
このソリューションは、複雑な重複排除ログインを処理するための柔軟なオプションを提供し、重複排除ウィンドウが Pub/Sub が提供するものよりも大きく複雑なシナリオに適しています。トレードオフには、レコードの一意性を決定するために各状態を維持するためのリソース使用量の増加が含まれます。
これまで、Pub/Sub や統合サービス Cloud DataFlow などのパブリッシャーが重複をリアルタイムで処理する仕組みを見てきました。これらのソリューションは、処理のオーバーヘッドやボリュームの問題により、ウィンドウ処理に関しては 100% 効果的ではないと思います。このようなシナリオでは、重複メッセージが遅れて到着し、メッセージの一意性をクロスチェックするためのレコード ID を保持していないためデータフローがそれを一意のレコードであると認識するなどのエッジ ケースを処理します。また、別のシナリオでは、ネットワーク障害やワーカー ノード障害によりデータフローがこれらのメッセージを異なるワーカー ノードで処理するため、データフローで処理中に一意のレコードであると認識し、Google Cloud BigQuery テーブルなどのダウンストリーム システムに入ります。
このような事態を軽減し、重複排除の最終チェックを BigQuery やその他のデータ ウェアハウスなどのシンク レベルで実行できます。このアプローチは、リアルタイムの重複排除が重要ではなく、定期的な重複排除で十分な場合に役立ちます。これにより、高度な SQL クエリを使用して、重複メッセージがすべて効果的にフィルタリングされ、排除されます。
ユースケースに基づいて、重複を修正するための 2 種類のソリューションが利用できます。
まず、Composer DAG または BigQuery コンソール内でスケジュールされたクエリを使用して、パーティション (毎日または毎時) を使用して重複除去テーブルを定期的に作成します。これにより、誰でも簡単にプロセスを作成し、重複除去データをステージング テーブルに保存して、個別のデータを最終テーブルに読み込むことができます。
次に、マテリアライズド ビューを使用してリアルタイム データを取得し、ビジネス インサイトを迅速に得るための理想的なソリューションを実現できます。
Bigquery SQL クエリは、私のGithub dedup_sqlリンクに掲載されています。
以下の BigQuery SQL コードは、これまでに説明した 2 つのオプションについて説明しています。
-- Use below SQL queries to periodically deduplicate data in BigQuery tables. CREATE OR REPLACE TABLE Transactions AS SELECT DISTINCT * FROM raw_transactions; --OR use below incremental steps to drop the necessary partitions and re-insert the deduped data into the original table -- Step 1: Insert distinct records from the original table based on the max timestamp available CREATE OR REPLACE TABLE STAGE_TRANSACTIONS AS SELECT DISTINCT * FROM raw_transactions WHERE event_timestamp > ( SELECT MAX(event_timestamp) FROM raw_transactions ); -- Step 2: Drop the partition after deduplication DELETE FROM raw_transactions WHERE event_timestamp = > ( SELECT MAX(event_timestamp) FROM raw_transactions ); -- Step 3: Insert deduplicated data into the main table INSERT INTO raw_transactions SELECT DISTINCT * FROM STAGE_TRANSACTIONS; --OR Use below SQL query to Merge new data without duplicates the table MERGE INTO raw_transactions AS target USING ( SELECT * FROM STAGE_TRANSACTIONS ) AS source ON target.transaction_id = source.transaction_id AND target.event_timestamp <= source.event_timestamp WHEN MATCHED THEN UPDATE SET target.product = source.product, target.price = source.price, target.location = source.location, target.store = source.store, target.zipcode = source.zipcode, target.city = source.city, target.promotion = source.promotion, target.event_timestamp = source.event_timestamp WHEN NOT MATCHED THEN INSERT (transaction_id, product, price, location, store, zipcode, city, promotion, event_timestamp) VALUES (source.transaction_id, source.product, source.price, source.location, source.store, source.zipcode, source.city, source.promotion, source.event_timestamp); --OR to get the real-time data without duplicates, use following materialized view and a finest solution to retrieve dedup records quickly CREATE MATERIALIZED VIEW raw_transactions_mv AS SELECT transaction_id, product, price, location, store, zipcode, city, promotion, event_timestamp FROM ( SELECT transaction_id, product, price, location, store, zipcode, city, promotion, event_timestamp, ROW_NUMBER() OVER ( PARTITION BY transaction_id ORDER BY event_timestamp DESC ) AS row_num FROM raw_transactions ) WHERE row_num = 1;
各重複排除戦略には、独自のトレードオフが伴います。適切なアプローチを選択する際に役立つ概要を以下に示します。
方法 | 利点 | デメリット |
---|---|---|
Pub/Sub メッセージ属性 | 低レイテンシ、Pub/Sub ネイティブ | 重複排除ウィンドウは10分に制限されます |
Apache Beam 重複排除 | 柔軟性が高く、複雑な重複排除ロジックをサポート | 状態管理によるリソース消費の増加 |
シンクベースの重複排除 | バッチまたは定期的な更新に適しており、ロジックは最小限です | 遅延が発生する可能性があり、オーケストレーションツールが必要になる場合があります |
重複排除は、ストリーミング パイプラインでの効率的なデータ処理の基礎です。戦略の選択は、パイプラインのリアルタイムのニーズ、複雑さ、リソースの制約によって異なります。Pub/Sub 属性、Apache Beam Deduplicate PTransform、またはシンクベースの重複排除の長所を活用することで、下流のシステムにクリーンで信頼性の高いデータを確保できます。これらのアプローチを調べ、提供されている例を実装し、ユースケースに適応させて最適な結果を得てください。
データ分析と機械学習に関するより詳細なガイドに興味がありますか?フォローしてください
データパイプラインが混乱しています—重複データによる数百万ドルの無駄を防ぐ方法 | HackerNoon