2 月のRerun のオープン ソース化は、アクセス可能でありながら強力なPython 視覚化ライブラリを探している人々にとって重要な一歩となりました。
Scale.ai、Weights & Biases、Hugging Face などの企業は、データセットのラベル付け、実験の追跡、事前トレーニング済みのモデルに対処することでディープ ラーニングを合理化してきたため、視覚化は不可欠です。ただし、迅速なデータの取得と視覚化にはまだ空白が存在します。
多くの企業は、社内でデータ視覚化ソリューションを開発していますが、開発コストが高いために最適ではないツールで終わることがよくあります。さらに、ストリーミングデータでの Python の視覚化もうまく解決されていない問題であり、ノートブックでのJavaScrip t ベースのソリューションにつながっています。 Rerun は Python インターフェースを高パフォーマンスのRust可視化エンジン (Bytewax によく似ています!) に活用し、ストリーミング データの分析を非常に簡単にします。
このブログ投稿では、Bytewax と Rerun を使用して Python でリアルタイムのストリーミング データを視覚化し、リアルタイムの異常検出の視覚化を作成する方法について説明します。
異常検出、別名外れ値検出を選択したのは、サイバーセキュリティ、不正検出、産業プロセスの監視など、多数のアプリケーションで重要なコンポーネントであるためです。これらの異常をリアルタイムで視覚化すると、潜在的な問題を迅速に特定し、それらを軽減するために必要なアクションを実行するのに役立ちます。
詳しく知りたい方は、 GitHubでエンド ツー エンドの Python ソリューションをご覧ください。 Bytewax にスターを付けるのを忘れないでください!
ここで説明する内容は次のとおりです。
さあ行こう!
このブログ投稿は、次のバージョンの Bytewax と Rerun に基づいています。
bytewax==0.15.1 rerun-sdk==0.4.0
Rerun と Bytewax は次のようにインストールできます。
pip install rerun-sdk pip install bytewax
Python でのデータ ストリーミング アプリの開発をさらに容易にする新しいバージョンを作成しているので、Bytewax をフォローして最新情報を入手してください。
ソリューションは比較的コンパクトなので、ここにコード例全体をコピーします。圧倒的に見える場合は、この大きなチャンクをスキップしてください。各機能については後で説明します。
import random # pip install rerun-sdk import rerun as rr from time import sleep from datetime import datetime from bytewax.dataflow import Dataflow from bytewax.execution import spawn_cluster from bytewax.inputs import ManualInputConfig, distribute from bytewax.outputs import ManualOutputConfig rr.init("metrics") rr.spawn() start = datetime.now() def generate_random_metrics(worker_index, worker_count, resume_state): assert resume_state is None keys = ["1", "2", "3", "4", "5", "6"] this_workers_keys = distribute(keys, worker_index, worker_count) for _ in range(1000): for key in this_workers_keys: value = random.randrange(0, 10) if random.random() > 0.9: value *= 2.0 yield None, (key, (key, value, (datetime.now() - start).total_seconds())) sleep(random.random() / 10.0) class ZTestDetector: """Anomaly detector. Use with a call to flow.stateful_map(). Looks at how many standard deviations the current item is away from the mean (Z-score) of the last 10 items. Mark as anomalous if over the threshold specified. """ def __init__(self, threshold_z): self.threshold_z = threshold_z self.last_10 = [] self.mu = None self.sigma = None def _push(self, value): self.last_10.insert(0, value) del self.last_10[10:] def _recalc_stats(self): last_len = len(self.last_10) self.mu = sum(self.last_10) / last_len sigma_sq = sum((value - self.mu) ** 2 for value in self.last_10) / last_len self.sigma = sigma_sq**0.5 def push(self, key__value__t): key, value, t = key__value__t is_anomalous = False if self.mu and self.sigma: is_anomalous = abs(value - self.mu) / self.sigma > self.threshold_z self._push(value) self._recalc_stats() rr.log_scalar(f"temp_{key}/data", value, color=[155, 155, 155]) if is_anomalous: rr.log_point(f"3dpoint/anomaly/{key}", [t, value, float(key) * 10], radius=0.3, color=[255,100,100]) rr.log_scalar( f"temp_{key}/data/anomaly", value, scattered=True, radius=3.0, color=[255, 100, 100], ) else: rr.log_point(f"3dpoint/data/{key}", [t, value, float(key) * 10], radius=0.1) return self, (value, self.mu, self.sigma, is_anomalous) def output_builder(worker_index, worker_count): def inspector(input): metric, (value, mu, sigma, is_anomalous) = input print( f"{metric}: " f"value = {value}, " f"mu = {mu:.2f}, " f"sigma = {sigma:.2f}, " f"{is_anomalous}" ) return inspector if __name__ == '__main__': flow = Dataflow() flow.input("input", ManualInputConfig(generate_random_metrics)) # ("metric", value) flow.stateful_map("AnomalyDetector", lambda: ZTestDetector(2.0), ZTestDetector.push) # ("metric", (value, mu, sigma, is_anomalous)) flow.capture(ManualOutputConfig(output_builder)) spawn_cluster(flow)
提供されているコードは、Bytewax と Rerun を使用してリアルタイムの異常検出パイプラインを作成する方法を示しています。
このコードの重要なコンポーネントを分解してみましょう。
generate_random_metrics : この関数は、実際のデータ ストリームをシミュレートするランダム メトリックを生成します。異常 (値が 2 倍) の可能性が低いデータ ポイントが生成されます。
ZTestDetector : このクラスは、Z スコア メソッドを使用して異常検出器を実装します。最後の 10 個の値の平均と標準偏差を維持し、Z スコアが指定されたしきい値よりも大きい場合、その値を異常としてマークします。
output_builder : この関数は、データ パイプラインの出力動作を定義するために使用されます。この場合、メトリック名、値、平均、標準偏差、および値が異常であるかどうかが出力されます。
データフロー: コードの主要部分は、Bytewax を使用してデータフローを構築し、RandomMetricInput、ZTestDetector、および出力ビルダーを接続します。
再実行の視覚化: 再実行の視覚化は、ZTestDetector クラスに統合されています。 rr.log_scalar および rr.log_point 関数は、データ ポイントとそれに対応する異常ステータスをプロットするために使用されます。
コードの主なコンポーネントを理解したところで、ビジュアライゼーションがどのように作成されるかを順を追って説明しましょう。
データフロー パイプラインを作成するには、次のことを行う必要があります。
flow = Dataflow()
で新しいデータフローを初期化します。flow.input("input", ManualInputConfig(generate_random_metrics))
を使用して入力ソースを定義します。flow.stateful_map("AnomalyDetector", lambda: ZTestDetector(2.0), ZTestDetector.push)
を使用して、ステートフルな異常検出器を適用します。flow.capture(ManualOutputConfig(output_builder))
で出力動作を構成します。spawn_cluster(flow, proc_count=3)
でクラスターを生成してデータフローを実行します。
結果のデータフローは、 input_builder
からランダムに生成されたメトリック値を読み取り、それらを異常検出のためにZTestDetector
に渡し、 output_builder
関数を使用して結果を出力します。各ステップの詳細を明らかにしましょう。
generate_random_metrics
関数generate_random_metrics
関数は、データフロー パイプラインの代替入力ソースとして機能し、ランダムなメトリック値を複数のワーカーに分散して生成します。 worker_index
、 worker_count
、およびresume_state
の 3 つのパラメーターを受け入れます。
def generate_random_metrics(worker_index, worker_count, resume_state): assert resume_state is None keys = ["1", "2", "3", "4", "5", "6"] this_workers_keys = distribute(keys, worker_index, worker_count) for _ in range(1000): for key in keys: value = random.randrange(0, 10) if random.random() > 0.9: value *= 2.0 yield None, (key, (key, value, (datetime.now() - start).total_seconds())) sleep(random.random() / 10.0)
worker_index
: データフロー パイプライン内の現在のワーカーのインデックス。
worker_count
: データフロー パイプライン内のワーカーの総数。
resume_state
: 再開する入力ソースの状態。この場合、それはNone
であるとアサートされ、入力ソースが前の状態からの再開をサポートしていないことを示します。
以下は、 generate_random_metrics
関数の段階的な説明です。
resume_state
がNone
であることをアサートします。0 から 10 の間のランダムな値を生成します。
10% の確率で、値を 2 倍にして異常をシミュレートします。
None (特定のパーティション キーがないことを示すため)、キー、生成された値、および開始時刻からの経過時間 (コード スニペットでは提供されていません) を含むタプルを生成します。
生成された各値の間にスリープ時間を導入して、リアルタイムのデータ生成をシミュレートします。
generate_random_metrics
関数は、次のコード行で入力ソースとしてデータフローで使用されます。
flow.input("input", ManualInputConfig(generate_random_metrics))
この行は、 RandomMetricInput
クラスを使用してパイプラインの入力データを生成するようデータフローに指示します。
ZTestDetector
クラスZTestDetector
クラスは、Z スコア メソッドを使用してデータ ポイントが異常かどうかを識別する異常検出器です。 Z スコアは、データ ポイントのデータセットの平均からの標準偏差の数です。データ ポイントの Z スコアが指定されたしきい値よりも高い場合、異常と見なされます。
このクラスには次のメソッドがあります。
__init__(self, threshold_z)
: コンストラクターは、しきい値 Z スコア値で ZTestDetector を初期化します。また、最後の 10 個の値のリスト (self.last_10)、平均 (self.mu)、および標準偏差 (self.sigma) も初期化します。
_push(self, value)
: このプライベート メソッドは、最後の 10 個の値のリストを新しい値で更新するために使用されます。リストの長さを 10 に維持しながら、リストの先頭に新しい値を挿入し、最も古い値を削除します。
_recalc_stats(self)
: このプライベート メソッドは、self.last_10 リストの現在の値に基づいて平均と標準偏差を再計算します。
push(self, key__value__t)
: この public メソッドは、キー、値、およびタイムスタンプを含むタプルを入力として受け取ります。値の Z スコアを計算し、最後の 10 個の値リストを更新して、平均と標準偏差を再計算します。また、Rerun の視覚化機能を使用して、データ ポイントとその異常ステータスを記録します。最後に、ZTestDetector クラスの更新されたインスタンスと、値、平均、標準偏差、異常ステータスを含むタプルを返します。
ZTestDetector クラスは、データフロー パイプラインでステートフル マップとして次のコードで使用されます。
flow.stateful_map("AnomalyDetector", lambda: ZTestDetector(2.0), ZTestDetector.push)
この行は、 2.0
の Z スコアしきい値でZTestDetector
適用し、 push
メソッドを使用してデータ ポイントを処理するようにデータフローに指示します。
異常を視覚化するために、 ZTestDetector
クラスは、Rerun の視覚化関数を使用して、データ ポイントとそれに対応する異常ステータスをログに記録します。具体的には、 rr.log_scalar
スカラー値をプロットするために使用され、 rr.log_point
3D ポイントをプロットするために使用されます。
次のコード スニペットは、ビジュアライゼーションの作成方法を示しています。
rr.log_scalar(f"temp_{key}/data", value, color=[155, 155, 155]) if is_anomalous: rr.log_point(f"3dpoint/anomaly/{key}", [t, value, float(key) * 10], radius=0.3, color=[255,100,100]) rr.log_scalar( f"temp_{key}/data/anomaly", value, scattered=True, radius=3.0, color=[255, 100, 100], ) else: rr.log_point(f"3dpoint/data/{key}", [t, value, float(key) * 10], radius=0.1)
ここでは、最初にメトリックを表すスカラー値をログに記録します。次に、値が異常かどうかに応じて、異なる半径と色で 3D ポイントをログに記録します。異常なポイントは大きな半径で赤く記録され、異常でないポイントは小さな半径で記録されます。
output_builder
関数output_builder
関数は、データ パイプラインの出力動作を定義するために使用されます。この特定の例では、メトリック名、値、平均、標準偏差、および値が異常であるかどうかを出力する責任があります。
この関数は、 worker_index
とworker_count
の 2 つの引数を取ります。これらの引数は、関数がワーカーのインデックスとデータフロー パイプライン内のワーカーの総数を理解するのに役立ちます。
output_builder
関数の定義は次のとおりです。
def output_builder(worker_index, worker_count): def inspector(input): metric, (value, mu, sigma, is_anomalous) = input print( f"{metric}: " f"value = {value}, " f"mu = {mu:.2f}, " f"sigma = {sigma:.2f}, " f"{is_anomalous}" ) return inspector
この関数は高階関数です。つまり、 inspector
という別の関数を返します。 inspector
関数は、入力データのタプルを処理し、目的の出力を出力します。
出力ビルダー関数は、後で出力動作を構成するときにデータフロー パイプラインで使用されます。
flow.capture(ManualOutputConfig(output_builder)).
Bytewax は、単一プロセスまたは複数プロセスで実行できます。このデータフローは、複数のプロセスにわたってスケーリングするように作成されていますが、 spawn_cluster
実行モジュールを使用して単一のプロセスとして実行することから始めます。
spawn_cluster(flow)
並列処理を増やしたい場合は、引数としてプロセスを追加するだけです。
例 - spawn_cluster(flow, proc_count=3)
。
提供されたコードを実行するには、単純に Python スクリプトとして実行できますが、最初に依存関係をインストールする必要があります。
dataflow.py と同じディレクトリに新しいファイルを作成し、requirements.txt という名前を付けます。
次のコンテンツを requirements.txt ファイルに追加します。
bytewax==0.15.1 rerun-sdk==0.4.0
requirements.txt および dataflow.py ファイルを含むディレクトリでターミナルを開きます。
次のコマンドを使用して依存関係をインストールします。
pip install -r requirements.txt
そして、データフローを実行してください!
python dataflow.py
提供されているコードは、リアルタイムの異常検出の基本的な例として機能しますが、このパイプラインを拡張して、より複雑なシナリオに対応できます。
例えば:
実世界のデータ ソースを組み込む: RandomMetricInput クラスを、IoT センサー、ログ ファイル、ストリーミング API などの実世界のソースからデータを読み取るカスタム クラスに置き換えます。
より高度な異常検出手法の実装: ZTestDetector クラスを、移動平均、指数平滑法、機械学習ベースのアプローチなど、他のステートフルな異常検出方法に置き換えることができます。
ビジュアライゼーションのカスタマイズ: データ ディメンションを追加したり、配色を調整したり、必要に応じてプロット スタイルを変更したりして、再実行ビジュアライゼーションを強化します。
アラートおよび監視システムとの統合: 異常結果を単に出力する代わりに、パイプラインをアラートまたは監視システムと統合して、異常が検出されたときに適切な利害関係者に通知できます。
データフロー パイプラインをカスタマイズおよび拡張することで、特定のユース ケースに合わせて調整された強力なリアルタイムの異常検出および視覚化ソリューションを作成できます。 Bytewax と Rerun の組み合わせは、リアルタイムのデータ処理および視覚化システムを構築するための汎用性と拡張性に優れた基盤を提供します。
このブログ投稿では、Bytewax と Rerun を使用してリアルタイムの異常検出の視覚化を作成する方法を示しました。 Bytewax を使用してデータフロー パイプラインを構築し、Rerun の強力な視覚化機能を統合することで、データの異常を監視して特定することができます。
Zander Matheson によって最初に書かれました。