paint-brush
如何使用 bytewax 和 ydata-profiling 实时了解您的数据经过@ydata
792 讀數
792 讀數

如何使用 bytewax 和 ydata-profiling 实时了解您的数据

经过 YData9m2023/07/25
Read on Terminal Reader

太長; 讀書

这是一个关于如何对数据流执行数据分析的精彩分步教程🚀
featured image - 如何使用 bytewax 和 ydata-profiling 实时了解您的数据
YData HackerNoon profile picture

在这篇博文中,我们将介绍如何结合和利用开源流解决方案bytewaxydata-profiling来提高流质量。系好安全带!


流处理可以对传输中和存储之前的数据进行实时分析,并且可以是有状态的无状态的


有状态流处理用于实时推荐、模式检测或复杂事件处理,其中处理需要已发生事件的历史记录(窗口、按键连接等)。


无状态流处理用于内联转换,不需要了解流中的其他数据点,例如屏蔽电子邮件或转换类型。


照片由 Unsplash 上的 Markus Spiske 拍摄


总体而言,数据流在行业中得到广泛应用,并且可以应用于欺诈检测患者监控事件预测维护等用例。

所有数据流都必须考虑的一个关键方面是数据的质量

与通常在创建数据仓库或仪表板解决方案期间评估数据质量的传统模型不同,流数据需要持续监控


在从收集到提供下游应用程序的整个过程中保持数据质量至关重要。毕竟,对于组织而言,不良数据质量的成本可能很高:


“对于大多数公司来说,不良数据造成的成本占收入的 15% 到 25%,令人震惊。 (...) 三分之二的成本可以通过提高数据质量来消除。”


— Thomas C. Redman,《Getting in Front on Data Quality》一书的作者


在本文中,我们将向您展示如何将bytewaydata-profiling结合起来来分析和提高流媒体的质量!

使用 Bytewax 为数据专业人员提供流处理

字节蜡是专门为Python开发者设计的OSS流处理框架。


它允许用户构建具有类似于 Flink、Spark 和 Kafka Streams 功能的流数据管道和实时应用程序,同时提供友好熟悉的界面以及与 Python 生态系统 100% 兼容。


使用内置连接器或现有的 Python 库,您可以连接到实时流数据源(Kafka、RedPanda、WebSocket 等)并将转换后的数据写入各种下游系统(Kafka、parquet 文件、数据湖等)。


对于转换,Bytewax 通过映射窗口聚合方法促进有状态和无状态转换,并具有恢复和可扩展性等熟悉的功能。


字节蜡促进数据流的 Python 优先和以数据为中心的体验专为数据工程师和数据科学家而构建


它允许用户构建流数据管道和实时应用程序,并创建满足其需求所需的定制,而无需学习和维护基于 JVM 的流平台(如 Spark 或 Flink)。


Bytewax 非常适合许多用例,即生成人工智能的嵌入管道,处理数据流中的缺失值,在流环境中使用语言模型来理解金融市场, 和更多。


如需用例灵感以及文档、教程和指南等更多信息,请随时查看字节蜡网站

为什么要对数据流进行数据分析?

数据分析是任何机器学习任务成功开始的关键,是指以下步骤: 彻底了解我们的数据:它的结构、行为和质量。


简而言之, 数据剖析涉及分析与数据格式和基本描述符相关的方面(例如样本数量、特征数量/类型、重复值)、其内在特征(例如存在缺失数据或不平衡特征),以及数据收集或处理过程中可能出现的其他复杂因素(例如错误值或不一致特征)。


确保高数据质量标准对于所有领域和组织都至关重要,但对于输出连续数据的领域尤其重要,因为这些领域的情况可能会快速变化,并且可能需要立即采取行动(例如,医疗保健监控、股票价值、空气质量政策)。


对于许多领域,数据分析是从探索性数据分析角度使用的,考虑数据库中存储的历史数据。相反,对于数据流,数据分析对于沿流持续进行验证和质量控制变得至关重要,其中需要在过程的不同时间范围或阶段检查数据。


通过将自动分析嵌入到我们的数据流中,我们可以立即获得有关数据当前状态的反馈,并针对任何潜在的关键问题发出警报 - 无论它们是与数据一致性和完整性(例如,损坏的值或更改格式)有关,还是与短时间内发生的事件(例如,数据漂移、偏离业务规则和结果)有关。


在现实世界中,您只知道墨菲定律必然会发生,并且“一切肯定都可能出错” ,自动分析可能会帮助我们避免多个大脑难题和需要停止生产的系统!


在数据分析方面, ydata-profiling一直是人群最爱,或者对于表格的或者时间序列数据。这也难怪——一行代码就可以进行一系列广泛的分析和见解。


复杂且耗时的操作是在幕后完成的:ydata-profiling自动检测数据中包含的特征类型,并根据特征类型(数字或分类)调整分析报告中显示的摘要统计数据和可视化效果


为了促进以数据为中心的分析,该软件包还强调了特征之间的现有关系,重点关注它们的成对相互作用相关性,并提供了对数据质量警报的全面评估,从重复恒定值到倾斜不平衡的特征。


这实际上是我们数据质量的 360° 视图- 只需付出最少的努力。


分析报告:突出显示潜在的数据质量问题。



综合起来:bytewax 和 ydata-profiling

在开始项目之前,我们需要首先设置Python依赖项并配置数据源。


首先,让我们安装bytewaxydata-profiling软件包(您可能想为此使用虚拟环境 — 检查这些说明如果您需要一些额外的指导!)


 pip install bytewax==0.16.2 ydata-profiling==4.3.1


然后,我们将上传环境传感器遥测数据(许可证 — CC0:公共领域),其中包含来自不同物联网设备的温度、湿度、一氧化碳液化石油气、烟雾、光和运动的多种测量值:


 wget https://raw.githubusercontent.com/bytewax/air-quality-sensor/main/data/iot_telemetry_data_1000


在生产环境中,这些测量结果将由每个设备连续生成,并且输入将类似于我们在流媒体平台中所期望的那样比如卡夫卡。在本文中,为了模拟我们在流数据中发现的上下文,我们将从 CSV 文件中一次一行读取数据,并使用 bytewax 创建一个数据流。


(顺便说一句,数据流本质上是一个数据管道,可以描述为有向无环图 — DAG)


首先,让我们进行一些必要的导入


 from datetime import datetime, timedelta, timezone from bytewax.dataflow import Dataflow from bytewax.connectors.stdio import StdOutput from bytewax.connectors.files import CSVInput from bytewax.testing import run_main


然后,我们定义数据流对象。之后,我们将使用无状态映射方法,在该方法中传入一个函数,将字符串转换为 DateTime 对象,并将数据重新构造为 (device_id, data) 格式。


map 方法将以无状态的方式对每个数据点进行更改。我们修改数据形状的原因是,我们可以在接下来的步骤中轻松地对数据进行分组,以便分别分析每个设备的数据,而不是同时分析所有设备的数据。


 flow = Dataflow() flow.input("simulated_stream", CSVInput("/content/iot_telemetry_data_1000")) # parse timestamp def parse_time(reading_data): reading_data["ts"] = datetime.fromtimestamp(float(reading_data["ts"]), timezone.utc) return reading_data flow.map(parse_time) # remap format to tuple (device_id, reading_data) flow.map(lambda reading_data: (reading_data['device'], reading_data))


现在,我们将利用bytewax的有状态功能在我们定义的时间内收集每个设备的数据。 ydata-profiling需要一段时间内的数据快照,这使得窗口运算符成为执行此操作的完美方法。


ydata-profiling中,我们能够为特定上下文指定的数据帧生成汇总统计信息。例如,在我们的示例中,我们可以生成涉及每个物联网设备或特定时间范围的数据快照:


 from bytewax.window import EventClockConfig, TumblingWindow # This is the accumulator function, and outputs a list of readings def acc_values(acc, reading): acc.append(reading) return acc # This function instructs the event clock on how to retrieve the # event's datetime from the input. def get_time(reading): return reading["ts"] # Configure the `fold_window` operator to use the event time. cc = EventClockConfig(get_time, wait_for_system_duration=timedelta(seconds=30)) # And a tumbling window align_to = datetime(2020, 1, 1, tzinfo=timezone.utc) wc = TumblingWindow(align_to=align_to, length=timedelta(hours=1)) flow.fold_window("running_average", cc, wc, list, acc_values) flow.inspect(print)


定义快照后,利用ydata-profiling就像为我们想要分析的每个数据帧调用ProfileReport一样简单:


 import pandas as pd from ydata_profiling import ProfileReport def profile(device_id__readings): print(device_id__readings) device_id, readings = device_id__readings start_time = readings[0]['ts'].replace(minute=0, second=0, microsecond=0).strftime('%Y-%m-%d %H:%M:%S') df = pd.DataFrame(readings) profile = ProfileReport( df, tsmode=True, sortby="ts", title=f"Sensor Readings - device: {device_id}" ) profile.to_file(f"Ts_Profile_{device_id}-{start_time}.html") return f"device {device_id} profiled at hour {start_time}" flow.map(profile)


在此示例中,我们将图像作为映射方法中函数的一部分写入本地文件。这些可以通过消息传递工具报告出来,或者我们可以在将来将它们保存到某个远程存储中。


配置文件完成后,数据流需要一些输出,因此我们可以使用内置的StdOutput来打印已配置文件的设备以及在映射步骤中从配置文件函数传递出的配置文件时间:


 flow.output("out", StdOutput())


有多种方法可以执行 Bytewax 数据流。在此示例中,我们使用同一台本地计算机,但 Bytewax 也可以在跨多个主机的多个 Python 进程上运行Docker容器, 用一个Kubernetes集群, 和更多的


在本文中,我们将继续进行本地设置,但我们鼓励您检查我们的帮助工具蜡控一旦您的管道准备好过渡到生产,它就会管理 Kubernetes 数据流部署。


假设我们与包含数据流定义的文件位于同一目录中,我们可以使用以下命令运行它:


 python -m bytewax.run ydata-profiling-streaming:flow


然后,我们可以使用分析报告来验证数据质量,检查模式或数据格式的变化,并比较不同设备或时间窗口之间的数据特征


事实上,我们可以利用比较报告功能它以简单的方式突出了两个数据配置文件之间的差异,使我们更容易发现需要调查的重要模式或必须解决的问题:


 snapshot_a_report = ProfileReport(df_a, title="Snapshot A") snapshot_b_report = ProfileReport(df_b, title="Snapshot B") comparison_report =snapshot_a_report(snapshot_b_report) comparison_report.to_file("comparison_report.html")


准备好探索您自己的数据流了吗?

验证数据流对于持续识别数据质量问题并比较不同时间段的数据状态至关重要。


对于医疗保健能源制造娱乐领域的组织(所有这些组织都使用连续的数据流),自动化分析是建立从质量评估到数据隐私的数据治理最佳实践的关键


这需要对数据快照进行分析,如本文所示,可以通过结合bytewaxydata-profiling以无缝方式实现。


Bytewax负责处理数据流并将其构建为快照所需的所有流程,然后可以通过数据特征的综合报告对快照进行汇总并与ydata-profiling进行比较。


能够适当地处理和分析传入数据,可以开辟大量跨不同领域的用例,从纠正数据模式和格式中的错误,到突出显示和缓解源自现实世界活动的其他问题,例如异常检测(例如,欺诈或入侵/威胁检测)、设备故障以及其他偏离预期的事件(例如,数据漂移或与业务规则不一致)。


现在,您已准备好开始探索数据流!让我们知道您发现了哪些其他用例,并且一如既往,请随时在评论中给我们留言,或者在以数据为中心的人工智能社区如有更多问题和建议!到时候那里见!

致谢

本文由 Fabiana Clemente(联合创始人兼 CDO @元数据)和 Miriam Santos(开发者关系@元数据)——开发ydata 分析——Zander Matheson(首席执行官兼创始人@字节蜡)和 Oli Makhasoeva(开发者关系@拜特蜡)——开发字节蜡


您可以在相应的文档中找到有关 OSS 包的更多信息: ydata 分析文档&字节蜡文档


也发布在这里