paint-brush
使用 Python 可视化实时异常检测经过@bytewax
1,209 讀數
1,209 讀數

使用 Python 可视化实时异常检测

经过 bytewax13m2023/04/21
Read on Terminal Reader

太長; 讀書

流数据的 Python 可视化一直具有挑战性,导致笔记本中基于复杂 JavaScript 的解决方案。 重新运行其 Python-Rust 性质大大简化了它。在 Bytewax 的支持下,我们将看到一个流式异常检测可视化示例。
featured image - 使用 Python 可视化实时异常检测
bytewax HackerNoon profile picture
0-item


Rerun 在 2 月份的开源标志着那些寻找可访问但强大的Python 可视化库的人迈出了重要的一步。


为什么可视化很重要?

可视化是必不可少的,因为像 Scale.ai、Weights & Biases 和 Hugging Face 这样的公司已经通过处理数据集标记、实验跟踪和预训练模型来简化深度学习。然而,在快速数据捕获和可视化方面仍然存在空白。


许多公司开发内部数据可视化解决方案,但由于高昂的开发成本,往往以次优工具告终。此外,数据的 Python 可视化也是一个没有很好解决的问题,导致笔记本中基于JavaScrip t 的解决方案。 Rerun 将 Python 接口利用到高性能Rust可视化引擎(很像 Bytewax!),这使得分析流数据变得非常容易。


在这篇博文中,我们将探索如何使用 Bytewax 和 Rerun 在 Python 中可视化实时流数据并创建实时异常检测可视化。


我们选择异常检测,也就是异常值检测,因为它是众多应用程序中的关键组件,例如网络安全、欺诈检测和工业流程监控。实时可视化这些异常有助于快速识别潜在问题并采取必要的措施来缓解这些问题。


对于那些渴望深入了解的人,请在我们的GitHub上查看我们的端到端 Python 解决方案。别忘了给 Bytewax 加注星标!

概述

以下是我们将介绍的内容:

  • 我们将导航代码并简要讨论顶级实体
  • 然后我们将更详细地讨论数据流的每个步骤:数据流的初始化、输入源、状态异常检测、数据可视化和输出,以及如何生成集群
  • 最后,我们将学习如何运行它并查看漂亮的可视化效果,全部在 Python <3 中
  • 作为奖励,我们会考虑其他用例

我们走吧!


设置你的环境

这篇博文基于以下版本的 Bytewax 和 Rerun:

 bytewax==0.15.1 rerun-sdk==0.4.0


Rerun 和 Bytewax 可安装为

pip install rerun-sdk pip install bytewax


关注 Bytewax 获取更新,因为我们正在烘焙一个新版本,它将进一步简化 Python 中数据流应用程序的开发。

代码

解决方案比较紧凑,所以我们将整个代码示例复制到这里。如果看起来势不可挡,请随意跳过这一大块;我们稍后将讨论每个功能。

 import random # pip install rerun-sdk import rerun as rr from time import sleep from datetime import datetime from bytewax.dataflow import Dataflow from bytewax.execution import spawn_cluster from bytewax.inputs import ManualInputConfig, distribute from bytewax.outputs import ManualOutputConfig rr.init("metrics") rr.spawn() start = datetime.now() def generate_random_metrics(worker_index, worker_count, resume_state): assert resume_state is None keys = ["1", "2", "3", "4", "5", "6"] this_workers_keys = distribute(keys, worker_index, worker_count) for _ in range(1000): for key in this_workers_keys: value = random.randrange(0, 10) if random.random() > 0.9: value *= 2.0 yield None, (key, (key, value, (datetime.now() - start).total_seconds())) sleep(random.random() / 10.0) class ZTestDetector: """Anomaly detector. Use with a call to flow.stateful_map(). Looks at how many standard deviations the current item is away from the mean (Z-score) of the last 10 items. Mark as anomalous if over the threshold specified. """ def __init__(self, threshold_z): self.threshold_z = threshold_z self.last_10 = [] self.mu = None self.sigma = None def _push(self, value): self.last_10.insert(0, value) del self.last_10[10:] def _recalc_stats(self): last_len = len(self.last_10) self.mu = sum(self.last_10) / last_len sigma_sq = sum((value - self.mu) ** 2 for value in self.last_10) / last_len self.sigma = sigma_sq**0.5 def push(self, key__value__t): key, value, t = key__value__t is_anomalous = False if self.mu and self.sigma: is_anomalous = abs(value - self.mu) / self.sigma > self.threshold_z self._push(value) self._recalc_stats() rr.log_scalar(f"temp_{key}/data", value, color=[155, 155, 155]) if is_anomalous: rr.log_point(f"3dpoint/anomaly/{key}", [t, value, float(key) * 10], radius=0.3, color=[255,100,100]) rr.log_scalar( f"temp_{key}/data/anomaly", value, scattered=True, radius=3.0, color=[255, 100, 100], ) else: rr.log_point(f"3dpoint/data/{key}", [t, value, float(key) * 10], radius=0.1) return self, (value, self.mu, self.sigma, is_anomalous) def output_builder(worker_index, worker_count): def inspector(input): metric, (value, mu, sigma, is_anomalous) = input print( f"{metric}: " f"value = {value}, " f"mu = {mu:.2f}, " f"sigma = {sigma:.2f}, " f"{is_anomalous}" ) return inspector if __name__ == '__main__': flow = Dataflow() flow.input("input", ManualInputConfig(generate_random_metrics)) # ("metric", value) flow.stateful_map("AnomalyDetector", lambda: ZTestDetector(2.0), ZTestDetector.push) # ("metric", (value, mu, sigma, is_anomalous)) flow.capture(ManualOutputConfig(output_builder)) spawn_cluster(flow)


提供的代码演示了如何使用 Bytewax 和 Rerun 创建实时异常检测管道。


让我们分解这段代码的基本组成部分:

  • generate_random_metrics :此函数生成模拟真实世界数据流的随机指标。它生成的数据点极有可能出现异常(值加倍)。


  • ZTestDetector :此类使用 Z-score 方法实现异常检测器。它保留最后 10 个值的平均值和标准偏差,并在 Z 分数大于指定阈值时将值标记为异常。


  • output_builder :此函数用于定义数据管道的输出行为。在这种情况下,它会打印指标名称、值、平均值、标准差以及值是否异常。


  • 数据流:代码的主要部分使用 Bytewax 构建数据流,连接 RandomMetricInput、ZTestDetector 和输出构建器。


  • 重新运行可视化:重新运行可视化已集成到 ZTestDetector 类中。 rr.log_scalar 和 rr.log_point 函数用于绘制数据点及其相应的异常状态。


现在,了解代码的主要组成部分后,让我们讨论如何逐步创建可视化。

构建数据流

要创建数据流管道,您需要:


  1. 使用flow = Dataflow()初始化新数据流。
  2. 使用flow.input("input", ManualInputConfig(generate_random_metrics))定义输入源。
  3. 使用flow.stateful_map("AnomalyDetector", lambda: ZTestDetector(2.0), ZTestDetector.push)应用状态异常检测器。
  4. 使用flow.capture(ManualOutputConfig(output_builder))配置输出行为。
  5. 最后,生成一个集群以使用spawn_cluster(flow, proc_count=3)执行数据流。


生成的数据流从input_builder读取随机生成的指标值,将它们传递给ZTestDetector进行异常检测,并使用output_builder函数输出结果。让我们澄清每个步骤的细节。

generate_random_metrics函数

generate_random_metrics函数用作数据流管道的替代输入源,以跨多个工作程序的分布式方式生成随机指标值。它接受三个参数: worker_indexworker_countresume_state


 def generate_random_metrics(worker_index, worker_count, resume_state): assert resume_state is None keys = ["1", "2", "3", "4", "5", "6"] this_workers_keys = distribute(keys, worker_index, worker_count) for _ in range(1000): for key in keys: value = random.randrange(0, 10) if random.random() > 0.9: value *= 2.0 yield None, (key, (key, value, (datetime.now() - start).total_seconds())) sleep(random.random() / 10.0)


  • worker_index :数据流管道中当前工作人员的索引。

  • worker_count :数据流管道中的工人总数。

  • resume_state :要恢复的输入源的状态。在这种情况下,它被断言为None ,表示输入源不支持从先前状态恢复。


以下是generate_random_metrics函数的分步说明


  1. 断言resume_stateNone
  2. 定义表示指标的键列表。
  3. 使用分发函数(代码片段中未提供)在工作人员之间分发密钥。当前工作人员的分布式密钥分配给 this_workers_keys。
  4. 迭代 1,000 次,并且对于每次迭代,迭代键列表:
    • 生成 0 到 10 之间的随机值。

    • 以 10% 的概率,将值加倍以模拟异常。

    • 生成一个元组,其中包含 None(表示没有特定的分区键)、键、生成的值和自开始时间以来经过的时间(代码片段中未提供)。

    • 在每个生成值之间引入休眠时间来模拟实时数据生成。


generate_random_metrics函数在数据流中用作输入源,代码行如下:


 flow.input("input", ManualInputConfig(generate_random_metrics))


此行告诉数据流使用RandomMetricInput类为管道生成输入数据。

ZTestDetector

ZTestDetector类是一个异常检测器,它使用 Z-score 方法来识别数据点是否异常。 Z 分数是数据点与数据集平均值的标准差数。如果数据点的 Z 分数高于指定阈值,则认为它是异常的。


该类具有以下方法:

  • __init__(self, threshold_z) :构造函数使用阈值 Z 分数值初始化 ZTestDetector。它还会初始化最后 10 个值列表 (self.last_10)、平均值 (self.mu) 和标准差 (self.sigma)。


  • _push(self, value) :此私有方法用于使用新值更新最后 10 个值的列表。它在列表的开头插入新值并删除最旧的值,将列表长度保持在 10。


  • _recalc_stats(self) :此私有方法根据 self.last_10 列表中的当前值重新计算平均值和标准差。


  • push(self, key__value__t) :此公共方法采用包含键、值和时间戳的元组作为输入。它计算值的 Z 分数,更新最后 10 个值列表,并重新计算平均值和标准偏差。它还使用 Rerun 的可视化功能记录数据点及其异常状态。最后,它返回 ZTestDetector 类的更新实例和一个包含值、平均值、标准差和异常状态的元组。


ZTestDetector 类在数据流管道中用作有状态映射,代码如下:

 flow.stateful_map("AnomalyDetector", lambda: ZTestDetector(2.0), ZTestDetector.push)


此行告诉数据流应用 Z 分数阈值为2.0ZTestDetector ,并使用push方法处理数据点。

可视化异常

为了可视化异常, ZTestDetector类使用 Rerun 的可视化功能记录数据点及其相应的异常状态。具体来说, rr.log_scalar用于绘制标量值,而rr.log_point用于绘制 3D 点。


以下代码片段显示了可视化的创建方式:

 rr.log_scalar(f"temp_{key}/data", value, color=[155, 155, 155]) if is_anomalous: rr.log_point(f"3dpoint/anomaly/{key}", [t, value, float(key) * 10], radius=0.3, color=[255,100,100]) rr.log_scalar( f"temp_{key}/data/anomaly", value, scattered=True, radius=3.0, color=[255, 100, 100], ) else: rr.log_point(f"3dpoint/data/{key}", [t, value, float(key) * 10], radius=0.1)


在这里,我们首先记录一个表示指标的标量值。然后,根据该值是否异常,我们记录一个具有不同半径和颜色的 3D 点。异常点以较大半径记录为红色,而非异常点以较小半径记录。

output_builder函数

output_builder函数用于定义数据管道的输出行为。在这个具体的例子中,它负责打印指标名称、值、均值、标准差以及值是否异常。


该函数有两个参数: worker_indexworker_count 。这些参数有助于函数了解工作人员的索引和数据流管道中的工作人员总数。


这是output_builder函数的定义

 def output_builder(worker_index, worker_count): def inspector(input): metric, (value, mu, sigma, is_anomalous) = input print( f"{metric}: " f"value = {value}, " f"mu = {mu:.2f}, " f"sigma = {sigma:.2f}, " f"{is_anomalous}" ) return inspector


这个函数是一个高阶函数,这意味着它返回另一个名为inspector的函数。 inspector功能负责处理输入数据元组并打印所需的输出。


输出生成器函数稍后在配置输出行为时在数据流管道中使用

flow.capture(ManualOutputConfig(output_builder)).

运行数据流

Bytewax 可以作为单个进程或以多进程方式运行。此数据流已创建为跨多个进程扩展,但我们将开始使用spawn_cluster执行模块将其作为单个进程运行。


 spawn_cluster(flow)


如果我们想增加并行性,我们只需添加更多进程作为参数。


例如 - spawn_cluster(flow, proc_count=3)

要运行提供的代码,我们可以简单地将其作为 Python 脚本运行,但首先我们需要安装依赖项。


在与 dataflow.py 相同的目录中创建一个新文件并将其命名为 requirements.txt。


在 requirements.txt 文件中添加以下内容:

 bytewax==0.15.1 rerun-sdk==0.4.0


在包含 requirements.txt 和 dataflow.py 文件的目录中打开一个终端。

使用以下命令安装依赖项:

 pip install -r requirements.txt


并运行数据流!

 python dataflow.py

扩展用例

虽然提供的代码用作实时异常检测的基本示例,但您可以扩展此管道以适应更复杂的场景。


例如:

  • 合并真实世界的数据源:将 RandomMetricInput 类替换为自定义类,该自定义类从真实世界的来源读取数据,例如 IoT 传感器、日志文件或流式 API。


  • 实施更复杂的异常检测技术:您可以将 ZTestDetector 类替换为其他有状态异常检测方法,例如移动平均、指数平滑或基于机器学习的方法。


  • 自定义可视化:通过添加更多数据维度、调整配色方案或修改绘图样式来增强 Rerun 可视化,以更好地满足您的需求。


  • 与警报和监控系统集成:您可以将管道与警报或监控系统集成,以在检测到异常时通知适当的利益相关者,而不是简单地打印异常结果。


通过自定义和扩展数据流管道,您可以创建针对您的特定用例量身定制的强大的实时异常检测和可视化解决方案。 Bytewax 和 Rerun 的结合为构建实时数据处理和可视化系统提供了一个通用且可扩展的基础。

结论

这篇博文演示了如何使用 Bytewax 和 Rerun 创建实时异常检测可视化。通过使用 Bytewax 构建数据流管道并集成 Rerun 强大的可视化功能,我们可以在数据异常发生时对其进行监控和识别。




最初由 Zander Matheson在这里撰写。

加入我们的社区: Slack Github