在这篇博文中,我们将介绍如何结合和利用开源流解决方案 与 来提高流质量。系好安全带! bytewax ydata-profiling 流处理可以对传输中和存储之前的数据进行实时分析,并且可以是 或 。 有状态的 无状态的 用于 推荐、模式检测或复杂事件处理,其中处理需要已发生事件的历史记录(窗口、按键连接等)。 有状态流处理 实时 用于 转换,不需要了解流中的其他数据点,例如屏蔽电子邮件或转换类型。 无状态流处理 内联 总体而言,数据流在行业中得到广泛应用,并且可以应用于 、 或 等用例。 欺诈检测 患者监控 事件预测维护 所有数据流都必须考虑的一个关键方面是数据的质量 与通常在创建数据仓库或仪表板解决方案期间评估数据质量的传统模型不同, 。 流数据需要持续监控 在从收集到提供下游应用程序的整个过程中保持数据质量至关重要。毕竟,对于组织而言,不良数据质量的成本可能很高: “对于大多数公司来说,不良数据造成的成本占收入的 15% 到 25%,令人震惊。 (...) 三分之二的成本可以通过提高数据质量来消除。” — Thomas C. Redman,《Getting in Front on Data Quality》一书的作者 在本文中,我们将向您展示如何将 与 结合起来来分析和提高流媒体的质量! bytewa ydata-profiling 使用 Bytewax 为数据专业人员提供流处理 是专门为Python开发者设计的OSS流处理框架。 字节蜡 它允许用户 ,同时提供友好熟悉的界面以及 构建具有类似于 Flink、Spark 和 Kafka Streams 功能的流数据管道和实时应用程序 与 Python 生态系统 100% 兼容。 使用内置 或现有的 Python 库, (Kafka、RedPanda、WebSocket 等)并将 写入各种下游系统(Kafka、parquet 文件、数据湖等)。 连接器 您可以连接到实时流数据源 转换后的数据 对于转换,Bytewax 通过 、 和 方法 ,并具有恢复和可扩展性等熟悉的功能。 映射 窗口 聚合 促进有状态和无状态转换 字节蜡 。 促进数据流的 Python 优先和以数据为中心的体验 专为数据工程师和数据科学家而构建 它允许用户 ,并创建满足其需求所需的定制,而无需学习和维护基于 JVM 的流平台(如 Spark 或 Flink)。 构建流数据管道和实时应用程序 Bytewax 非常适合许多用例,即 , , , 和更多。 生成人工智能的嵌入管道 处理数据流中的缺失值 在流环境中使用语言模型来理解金融市场 如需用例灵感以及文档、教程和指南等更多信息,请随时查看 。 字节蜡网站 为什么要对数据流进行数据分析? ,是指以下步骤: :它的结构、行为和质量。 数据分析是任何机器学习任务成功开始的关键 彻底了解我们的数据 简而言之, 涉及分析与数据格式和基本描述符相关的方面(例如样本数量、特征数量/类型、重复值)、其 (例如存在缺失数据或不平衡特征),以及数据收集或处理过程中可能出现的其他复杂因素(例如错误值或不一致特征)。 数据剖析 内在特征 情况可能会快速变化,并且可能需要立即采取行动(例如,医疗保健监控、股票价值、空气质量政策)。 确保高数据质量标准对于所有领域和组织都至关重要,但对于输出连续数据的领域尤其重要,因为这些领域的 对于许多领域,数据分析是从探索性数据分析角度使用的,考虑数据库中存储的历史数据。相反,对于数据流, ,其中需要在过程的不同时间范围或阶段检查数据。 数据分析对于沿流持续进行验证和质量控制变得至关重要 通过将 ,我们可以 ,并针对任何潜在的关键问题发出警报 - 无论它们是与 (例如,损坏的值或更改格式)有关,还是与 (例如,数据漂移、偏离业务规则和结果)有关。 自动分析嵌入到我们的数据流中 立即获得有关数据当前状态的反馈 数据一致性和完整性 短时间内发生的事件 在现实世界中, ,自动分析可能会帮助我们避免多个大脑难题和需要停止生产的系统! 您只知道墨菲定律必然会发生,并且“一切肯定都可能出错” 在数据分析方面, 一直是 ,或者对于 或者 数据。这也难怪—— ydata-profiling 人群最爱 表格的 时间序列 一行代码就可以进行一系列广泛的分析和见解。 复杂且耗时的操作是在幕后完成的:ydata-profiling 并根据特征类型(数字或分类) 。 自动检测数据中包含的特征类型, 调整分析报告中显示的摘要统计数据和可视化效果 为了促进 ,该软件包还 ,重点关注它们的成对 和 ,并提供了 ,从 或 值到 和 特征。 以数据为中心的分析 强调了特征之间的现有关系 相互作用 相关性 对数据质量警报的全面评估 重复 恒定 倾斜 不平衡的 这实际上是 - 只需付出最少的努力。 我们数据质量的 360° 视图 综合起来:bytewax 和 ydata-profiling 在开始项目之前,我们需要首先设置Python依赖项并配置数据源。 首先,让我们安装 和 软件包( bytewax ydata-profiling 您可能想为此使用虚拟环境 — 检查这些说明 如果您需要一些额外的指导!) pip install bytewax==0.16.2 ydata-profiling==4.3.1 然后,我们将上传 (许可证 — CC0:公共领域),其中包含来自不同物联网设备 的多种测量值: 环境传感器遥测数据 的温度、湿度、一氧化碳液化石油气、烟雾、光和运动 wget https://raw.githubusercontent.com/bytewax/air-quality-sensor/main/data/iot_telemetry_data_1000 ,并且输入将类似于我们在流媒体平台中所期望的那样 。在本文中, ,并使用 bytewax 创建一个数据流。 在生产环境中,这些测量结果将由每个设备连续生成 比如卡夫卡 为了模拟我们在流数据中发现的上下文,我们将从 CSV 文件中一次一行读取数据 (顺便说一句,数据流本质上是一个数据管道,可以描述为有向无环图 — DAG) 首先,让我们进行一些 : 必要的导入 from datetime import datetime, timedelta, timezone from bytewax.dataflow import Dataflow from bytewax.connectors.stdio import StdOutput from bytewax.connectors.files import CSVInput from bytewax.testing import run_main 然后,我们定义数据流对象。之后,我们将使用无状态映射方法,在该方法中传入一个函数,将字符串转换为 DateTime 对象,并将数据重新构造为 (device_id, data) 格式。 map 方法将以无状态的方式对每个数据点进行更改。我们修改数据形状的原因是,我们可以在接下来的步骤中轻松地对数据进行分组,以便分别分析每个设备的数据,而不是同时分析所有设备的数据。 flow = Dataflow() flow.input("simulated_stream", CSVInput("/content/iot_telemetry_data_1000")) # parse timestamp def parse_time(reading_data): reading_data["ts"] = datetime.fromtimestamp(float(reading_data["ts"]), timezone.utc) return reading_data flow.map(parse_time) # remap format to tuple (device_id, reading_data) flow.map(lambda reading_data: (reading_data['device'], reading_data)) 现在,我们将利用 的有状态功能在我们定义的时间内收集每个设备的数据。 需要一段时间内的数据快照,这使得窗口运算符成为执行此操作的完美方法。 bytewax ydata-profiling 在 中,我们能够为特定上下文指定的数据帧生成汇总统计信息。例如,在我们的示例中,我们可以生成涉及每个物联网设备或特定时间范围的数据快照: ydata-profiling from bytewax.window import EventClockConfig, TumblingWindow # This is the accumulator function, and outputs a list of readings def acc_values(acc, reading): acc.append(reading) return acc # This function instructs the event clock on how to retrieve the # event's datetime from the input. def get_time(reading): return reading["ts"] # Configure the `fold_window` operator to use the event time. cc = EventClockConfig(get_time, wait_for_system_duration=timedelta(seconds=30)) # And a tumbling window align_to = datetime(2020, 1, 1, tzinfo=timezone.utc) wc = TumblingWindow(align_to=align_to, length=timedelta(hours=1)) flow.fold_window("running_average", cc, wc, list, acc_values) flow.inspect(print) 定义快照后,利用 就像为我们想要分析的每个数据帧调用 一样简单: ydata-profiling ProfileReport import pandas as pd from ydata_profiling import ProfileReport def profile(device_id__readings): print(device_id__readings) device_id, readings = device_id__readings start_time = readings[0]['ts'].replace(minute=0, second=0, microsecond=0).strftime('%Y-%m-%d %H:%M:%S') df = pd.DataFrame(readings) profile = ProfileReport( df, tsmode=True, sortby="ts", title=f"Sensor Readings - device: {device_id}" ) profile.to_file(f"Ts_Profile_{device_id}-{start_time}.html") return f"device {device_id} profiled at hour {start_time}" flow.map(profile) 在此示例中,我们将图像作为映射方法中函数的一部分写入本地文件。这些可以通过消息传递工具报告出来,或者我们可以在将来将它们保存到某个远程存储中。 配置文件完成后,数据流需要一些输出,因此我们可以使用内置的 来打印已配置文件的设备以及在映射步骤中从配置文件函数传递出的配置文件时间: StdOutput flow.output("out", StdOutput()) 有多种方法可以执行 Bytewax 数据流。在此示例中,我们使用同一台本地计算机,但 Bytewax 也可以在跨多个主机的多个 Python 进程上运行 , 用一个 , 和 。 Docker容器 Kubernetes集群 更多的 在本文中,我们将继续进行本地设置,但我们鼓励您检查我们的帮助工具 一旦您的管道准备好过渡到生产,它就会管理 Kubernetes 数据流部署。 蜡控 假设我们与包含数据流定义的文件位于同一目录中,我们可以使用以下命令运行它: python -m bytewax.run ydata-profiling-streaming:flow 然后,我们可以使用分析报告来验证数据质量,检查模式或数据格式的变化,并 。 比较不同设备或时间窗口之间的数据特征 事实上,我们可以利用 它以简单的方式突出了两个数据配置文件之间的差异,使我们更容易发现需要调查的重要模式或必须解决的问题: 比较报告功能 snapshot_a_report = ProfileReport(df_a, title="Snapshot A") snapshot_b_report = ProfileReport(df_b, title="Snapshot B") comparison_report =snapshot_a_report(snapshot_b_report) comparison_report.to_file("comparison_report.html") 准备好探索您自己的数据流了吗? 验证数据流对于持续识别数据质量问题并比较不同时间段的数据状态至关重要。 对于 、 、 和 领域的组织(所有这些组织都使用连续的数据流), 。 医疗保健 能源 制造 娱乐 自动化分析是建立从质量评估到数据隐私的数据治理最佳实践的关键 这需要对数据快照进行分析,如本文所示,可以通过结合 和 以无缝方式实现。 bytewax ydata-profiling 负责处理数据流并将其构建为快照所需的所有流程,然后可以通过数据特征的综合报告对快照进行汇总并与 进行比较。 Bytewax ydata-profiling 能够适当地处理和分析传入数据,可以开辟大量跨不同领域的用例,从 ,到突出显示和缓解源自现实世界活动的其他问题,例如 (例如,欺诈或入侵/威胁检测)、 以及其他偏离预期的事件(例如,数据漂移或与业务规则不一致)。 纠正数据模式和格式中的错误 异常检测 设备故障 现在,您已准备好开始探索数据流!让我们知道您发现了哪些其他用例,并且一如既往,请随时在评论中给我们留言,或者在 如有更多问题和建议! 以数据为中心的人工智能社区 到时候那里见! 致谢 )和 Oli Makhasoeva(开发者关系@ )——开发 。 本文由 Fabiana Clemente(联合创始人兼 CDO @ 元数据 )和 Miriam Santos(开发者关系@ 元数据 )——开发 ydata 分析 ——Zander Matheson(首席执行官兼创始人@ 字节蜡 拜特蜡 字节蜡 您可以在相应的文档中找到有关 OSS 包的更多信息: & 。 ydata 分析文档 字节蜡文档 也发布 在这里