2 月の オープン ソース化は、アクセス可能でありながら強力な ライブラリを探している人々にとって重要な一歩となりました。 Rerun の Python 視覚化 なぜ視覚化が重要なのですか? Scale.ai、Weights & Biases、Hugging Face などの企業は、データセットのラベル付け、実験の追跡、事前トレーニング済みのモデルに対処することでディープ ラーニングを合理化してきたため、視覚化は不可欠です。ただし、迅速なデータの取得と視覚化にはまだ空白が存在します。 多くの企業は、社内でデータ視覚化ソリューションを開発していますが、開発コストが高いために最適ではないツールで終わることがよくあります。さらに、 データでの Python の視覚化もうまく解決されていない問題であり、ノートブックでの t ベースのソリューションにつながっています。 Rerun は Python インターフェースを高パフォーマンスの 可視化エンジン (Bytewax によく似ています!) に活用し、ストリーミング データの分析を非常に簡単にします。 ストリーミング JavaScrip Rust このブログ投稿では、Bytewax と Rerun を使用して Python でリアルタイムのストリーミング データを視覚化し、リアルタイムの異常検出の視覚化を作成する方法について説明します。 異常検出、別名外れ値検出を選択したのは、サイバーセキュリティ、不正検出、産業プロセスの監視など、多数のアプリケーションで重要なコンポーネントであるためです。これらの異常をリアルタイムで視覚化すると、潜在的な問題を迅速に特定し、それらを軽減するために必要なアクションを実行するのに役立ちます。 詳しく知りたい方は、 でエンド ツー エンドの Python ソリューションをご覧ください。 Bytewax にスターを付けるのを忘れないでください! GitHub 概要 ここで説明する内容は次のとおりです。 コードをナビゲートし、トップレベルのエンティティについて簡単に説明します 次に、データフローの各ステップ (データフローの初期化、入力ソース、ステートフルな異常検出、データの視覚化と出力、クラスターの生成方法) について詳しく説明します。 最後に、それを実行して美しいビジュアライゼーションを確認する方法を学びます。すべて Python <3 で行います。 おまけとして、他のユースケースについて考えます さあ行こう! 環境をセットアップする このブログ投稿は、次のバージョンの Bytewax と Rerun に基づいています。 bytewax==0.15.1 rerun-sdk==0.4.0 Rerun と Bytewax は次のようにインストールできます。 pip install rerun-sdk pip install bytewax Python でのデータ ストリーミング アプリの開発をさらに容易にする新しいバージョンを作成しているので、Bytewax をフォローして最新情報を入手してください。 コード ソリューションは比較的コンパクトなので、ここにコード例全体をコピーします。圧倒的に見える場合は、この大きなチャンクをスキップしてください。各機能については後で説明します。 import random # pip install rerun-sdk import rerun as rr from time import sleep from datetime import datetime from bytewax.dataflow import Dataflow from bytewax.execution import spawn_cluster from bytewax.inputs import ManualInputConfig, distribute from bytewax.outputs import ManualOutputConfig rr.init("metrics") rr.spawn() start = datetime.now() def generate_random_metrics(worker_index, worker_count, resume_state): assert resume_state is None keys = ["1", "2", "3", "4", "5", "6"] this_workers_keys = distribute(keys, worker_index, worker_count) for _ in range(1000): for key in this_workers_keys: value = random.randrange(0, 10) if random.random() > 0.9: value *= 2.0 yield None, (key, (key, value, (datetime.now() - start).total_seconds())) sleep(random.random() / 10.0) class ZTestDetector: """Anomaly detector. Use with a call to flow.stateful_map(). Looks at how many standard deviations the current item is away from the mean (Z-score) of the last 10 items. Mark as anomalous if over the threshold specified. """ def __init__(self, threshold_z): self.threshold_z = threshold_z self.last_10 = [] self.mu = None self.sigma = None def _push(self, value): self.last_10.insert(0, value) del self.last_10[10:] def _recalc_stats(self): last_len = len(self.last_10) self.mu = sum(self.last_10) / last_len sigma_sq = sum((value - self.mu) ** 2 for value in self.last_10) / last_len self.sigma = sigma_sq**0.5 def push(self, key__value__t): key, value, t = key__value__t is_anomalous = False if self.mu and self.sigma: is_anomalous = abs(value - self.mu) / self.sigma > self.threshold_z self._push(value) self._recalc_stats() rr.log_scalar(f"temp_{key}/data", value, color=[155, 155, 155]) if is_anomalous: rr.log_point(f"3dpoint/anomaly/{key}", [t, value, float(key) * 10], radius=0.3, color=[255,100,100]) rr.log_scalar( f"temp_{key}/data/anomaly", value, scattered=True, radius=3.0, color=[255, 100, 100], ) else: rr.log_point(f"3dpoint/data/{key}", [t, value, float(key) * 10], radius=0.1) return self, (value, self.mu, self.sigma, is_anomalous) def output_builder(worker_index, worker_count): def inspector(input): metric, (value, mu, sigma, is_anomalous) = input print( f"{metric}: " f"value = {value}, " f"mu = {mu:.2f}, " f"sigma = {sigma:.2f}, " f"{is_anomalous}" ) return inspector if __name__ == '__main__': flow = Dataflow() flow.input("input", ManualInputConfig(generate_random_metrics)) # ("metric", value) flow.stateful_map("AnomalyDetector", lambda: ZTestDetector(2.0), ZTestDetector.push) # ("metric", (value, mu, sigma, is_anomalous)) flow.capture(ManualOutputConfig(output_builder)) spawn_cluster(flow) 提供されているコードは、Bytewax と Rerun を使用してリアルタイムの異常検出パイプラインを作成する方法を示しています。 このコードの重要なコンポーネントを分解してみましょう。 : この関数は、実際のデータ ストリームをシミュレートするランダム メトリックを生成します。異常 (値が 2 倍) の可能性が低いデータ ポイントが生成されます。 generate_random_metrics : このクラスは、Z スコア メソッドを使用して異常検出器を実装します。最後の 10 個の値の平均と標準偏差を維持し、Z スコアが指定されたしきい値よりも大きい場合、その値を異常としてマークします。 ZTestDetector : この関数は、データ パイプラインの出力動作を定義するために使用されます。この場合、メトリック名、値、平均、標準偏差、および値が異常であるかどうかが出力されます。 output_builder : コードの主要部分は、Bytewax を使用してデータフローを構築し、RandomMetricInput、ZTestDetector、および出力ビルダーを接続します。 データフロー : 再実行の視覚化は、ZTestDetector クラスに統合されています。 rr.log_scalar および rr.log_point 関数は、データ ポイントとそれに対応する異常ステータスをプロットするために使用されます。 再実行の視覚化 コードの主なコンポーネントを理解したところで、ビジュアライゼーションがどのように作成されるかを順を追って説明しましょう。 データフローの構築 データフロー パイプラインを作成するには、次のことを行う必要があります。 で新しいデータフローを初期化します。 flow = Dataflow() を使用して入力ソースを定義します。 flow.input("input", ManualInputConfig(generate_random_metrics)) を使用して、ステートフルな異常検出器を適用します。 flow.stateful_map("AnomalyDetector", lambda: ZTestDetector(2.0), ZTestDetector.push) で出力動作を構成します。 flow.capture(ManualOutputConfig(output_builder)) 最後に、 でクラスターを生成してデータフローを実行します。 spawn_cluster(flow, proc_count=3) 結果のデータフローは、 からランダムに生成されたメトリック値を読み取り、それらを異常検出のために に渡し、 関数を使用して結果を出力します。各ステップの詳細を明らかにしましょう。 input_builder ZTestDetector output_builder 関数 generate_random_metrics 関数は、データフロー パイプラインの代替入力ソースとして機能し、ランダムなメトリック値を複数のワーカーに分散して生成します。 、 、および の 3 つのパラメーターを受け入れます。 generate_random_metrics worker_index worker_count resume_state def generate_random_metrics(worker_index, worker_count, resume_state): assert resume_state is None keys = ["1", "2", "3", "4", "5", "6"] this_workers_keys = distribute(keys, worker_index, worker_count) for _ in range(1000): for key in keys: value = random.randrange(0, 10) if random.random() > 0.9: value *= 2.0 yield None, (key, (key, value, (datetime.now() - start).total_seconds())) sleep(random.random() / 10.0) : データフロー パイプライン内の現在のワーカーのインデックス。 worker_index : データフロー パイプライン内のワーカーの総数。 worker_count : 再開する入力ソースの状態。この場合、それは であるとアサートされ、入力ソースが前の状態からの再開をサポートしていないことを示します。 resume_state None 以下は、 関数 generate_random_metrics の段階的な説明です 。 が であることをアサートします。 resume_state None メトリックを表すキーのリストを定義します。 配布機能を使用してワーカー間でキーを配布します (コード スニペットでは提供されていません)。現在のワーカーの分散キーは、this_workers_keys に割り当てられます。 1,000 回反復し、反復ごとにキーのリストを反復処理します。 0 から 10 の間のランダムな値を生成します。 10% の確率で、値を 2 倍にして異常をシミュレートします。 None (特定のパーティション キーがないことを示すため)、キー、生成された値、および開始時刻からの経過時間 (コード スニペットでは提供されていません) を含むタプルを生成します。 生成された各値の間にスリープ時間を導入して、リアルタイムのデータ生成をシミュレートします。 関数は、次のコード行で入力ソースとしてデータフローで使用されます。 generate_random_metrics flow.input("input", ManualInputConfig(generate_random_metrics)) この行は、 クラスを使用してパイプラインの入力データを生成するようデータフローに指示します。 RandomMetricInput クラス ZTestDetector クラスは、Z スコア メソッドを使用してデータ ポイントが異常かどうかを識別する異常検出器です。 Z スコアは、データ ポイントのデータセットの平均からの標準偏差の数です。データ ポイントの Z スコアが指定されたしきい値よりも高い場合、異常と見なされます。 ZTestDetector このクラスには次のメソッドがあります。 : コンストラクターは、しきい値 Z スコア値で ZTestDetector を初期化します。また、最後の 10 個の値のリスト (self.last_10)、平均 (self.mu)、および標準偏差 (self.sigma) も初期化します。 __init__(self, threshold_z) : このプライベート メソッドは、最後の 10 個の値のリストを新しい値で更新するために使用されます。リストの長さを 10 に維持しながら、リストの先頭に新しい値を挿入し、最も古い値を削除します。 _push(self, value) : このプライベート メソッドは、self.last_10 リストの現在の値に基づいて平均と標準偏差を再計算します。 _recalc_stats(self) : この public メソッドは、キー、値、およびタイムスタンプを含むタプルを入力として受け取ります。値の Z スコアを計算し、最後の 10 個の値リストを更新して、平均と標準偏差を再計算します。また、Rerun の視覚化機能を使用して、データ ポイントとその異常ステータスを記録します。最後に、ZTestDetector クラスの更新されたインスタンスと、値、平均、標準偏差、異常ステータスを含むタプルを返します。 push(self, key__value__t) ZTestDetector クラスは、データフロー パイプラインでステートフル マップとして次のコードで使用されます。 flow.stateful_map("AnomalyDetector", lambda: ZTestDetector(2.0), ZTestDetector.push) この行は、 の Z スコアしきい値で 適用し、 メソッドを使用してデータ ポイントを処理するようにデータフローに指示します。 2.0 ZTestDetector push 異常の視覚化 異常を視覚化するために、 クラスは、Rerun の視覚化関数を使用して、データ ポイントとそれに対応する異常ステータスをログに記録します。具体的には、 スカラー値をプロットするために使用され、 3D ポイントをプロットするために使用されます。 ZTestDetector rr.log_scalar rr.log_point 次のコード スニペットは、ビジュアライゼーションの作成方法を示しています。 rr.log_scalar(f"temp_{key}/data", value, color=[155, 155, 155]) if is_anomalous: rr.log_point(f"3dpoint/anomaly/{key}", [t, value, float(key) * 10], radius=0.3, color=[255,100,100]) rr.log_scalar( f"temp_{key}/data/anomaly", value, scattered=True, radius=3.0, color=[255, 100, 100], ) else: rr.log_point(f"3dpoint/data/{key}", [t, value, float(key) * 10], radius=0.1) ここでは、最初にメトリックを表すスカラー値をログに記録します。次に、値が異常かどうかに応じて、異なる半径と色で 3D ポイントをログに記録します。異常なポイントは大きな半径で赤く記録され、異常でないポイントは小さな半径で記録されます。 関数 output_builder 関数は、データ パイプラインの出力動作を定義するために使用されます。この特定の例では、メトリック名、値、平均、標準偏差、および値が異常であるかどうかを出力する責任があります。 output_builder この関数は、 と の 2 つの引数を取ります。これらの引数は、関数がワーカーのインデックスとデータフロー パイプライン内のワーカーの総数を理解するのに役立ちます。 worker_index worker_count 関数 output_builder の定義は次のとおりです 。 def output_builder(worker_index, worker_count): def inspector(input): metric, (value, mu, sigma, is_anomalous) = input print( f"{metric}: " f"value = {value}, " f"mu = {mu:.2f}, " f"sigma = {sigma:.2f}, " f"{is_anomalous}" ) return inspector この関数は高階関数です。つまり、 という別の関数を返します。 関数は、入力データのタプルを処理し、目的の出力を出力します。 inspector inspector 出力ビルダー関数は、後で出力動作を構成するときにデータフロー パイプラインで使用されます。 flow.capture(ManualOutputConfig(output_builder)). データフローの実行 Bytewax は、単一プロセスまたは複数プロセスで実行できます。このデータフローは、複数のプロセスにわたってスケーリングするように作成されていますが、 実行モジュールを使用して単一のプロセスとして実行することから始めます。 spawn_cluster spawn_cluster(flow) 並列処理を増やしたい場合は、引数としてプロセスを追加するだけです。 例 - 。 spawn_cluster(flow, proc_count=3) 提供されたコードを実行するには、単純に Python スクリプトとして実行できますが、最初に依存関係をインストールする必要があります。 dataflow.py と同じディレクトリに新しいファイルを作成し、requirements.txt という名前を付けます。 次のコンテンツを requirements.txt ファイルに追加します。 bytewax==0.15.1 rerun-sdk==0.4.0 requirements.txt および dataflow.py ファイルを含むディレクトリでターミナルを開きます。 次のコマンドを使用して依存関係をインストールします。 pip install -r requirements.txt そして、データフローを実行してください! python dataflow.py ユースケースの拡大 提供されているコードは、リアルタイムの異常検出の基本的な例として機能しますが、このパイプラインを拡張して、より複雑なシナリオに対応できます。 例えば: : RandomMetricInput クラスを、IoT センサー、ログ ファイル、ストリーミング API などの実世界のソースからデータを読み取るカスタム クラスに置き換えます。 実世界のデータ ソースを組み込む : ZTestDetector クラスを、移動平均、指数平滑法、機械学習ベースのアプローチなど、他のステートフルな異常検出方法に置き換えることができます。 より高度な異常検出手法の実装 : データ ディメンションを追加したり、配色を調整したり、必要に応じてプロット スタイルを変更したりして、再実行ビジュアライゼーションを強化します。 ビジュアライゼーションのカスタマイズ : 異常結果を単に出力する代わりに、パイプラインをアラートまたは監視システムと統合して、異常が検出されたときに適切な利害関係者に通知できます。 アラートおよび監視システムとの統合 データフロー パイプラインをカスタマイズおよび拡張することで、特定のユース ケースに合わせて調整された強力なリアルタイムの異常検出および視覚化ソリューションを作成できます。 Bytewax と Rerun の組み合わせは、リアルタイムのデータ処理および視覚化システムを構築するための汎用性と拡張性に優れた基盤を提供します。 結論 このブログ投稿では、Bytewax と Rerun を使用してリアルタイムの異常検出の視覚化を作成する方法を示しました。 Bytewax を使用してデータフロー パイプラインを構築し、Rerun の強力な視覚化機能を統合することで、データの異常を監視して特定することができます。 Zander Matheson によって最初に書かれまし た。 私たちのコミュニティに参加してください: Slack Github