En esta publicación de blog, cubriremos cómo puede combinar y aprovechar la solución de transmisión de código abierto, bytewax , con ydata-profiling , para mejorar la calidad de sus flujos de transmisión. ¡Cinturón de seguridad!
El procesamiento de flujo permite el análisis en tiempo real de los datos en curso y antes del almacenamiento y puede ser con o sin estado .
El procesamiento de flujo con estado se utiliza para recomendaciones en tiempo real , detección de patrones o procesamiento de eventos complejos, donde se requiere el historial de lo que sucedió para el procesamiento (ventanas, unión por clave, etc.).
El procesamiento de flujo sin estado se usa para la transformación en línea que no requiere el conocimiento de otros puntos de datos en el flujo, como enmascarar un correo electrónico o convertir un tipo.
En general, los flujos de datos se usan ampliamente en la industria y se pueden encontrar aplicados a casos de uso como la detección de fraudes , el monitoreo de pacientes o el mantenimiento predictivo de eventos .
A diferencia de los modelos tradicionales en los que la calidad de los datos suele evaluarse durante la creación del almacén de datos o la solución del tablero, la transmisión de datos requiere una supervisión continua .
Es esencial mantener la calidad de los datos durante todo el proceso, desde la recopilación hasta la alimentación de las aplicaciones posteriores. Después de todo, el costo de la mala calidad de los datos puede ser alto para las organizaciones:
“El costo de los datos erróneos es un asombroso 15% a 25% de los ingresos para la mayoría de las empresas. (…) Dos tercios de estos costos pueden eliminarse al adelantarse a la calidad de los datos”.
— Thomas C. Redman, autor de “Poner al frente la calidad de los datos”
A lo largo de este artículo, le mostraremos cómo puede combinar bytewa
con ydata-profiling
para perfilar y mejorar la calidad de sus flujos de transmisión.
Permite a los usuarios crear canalizaciones de transmisión de datos y aplicaciones en tiempo real con capacidades similares a Flink, Spark y Kafka Streams, a la vez que proporciona una interfaz amigable y familiar y 100 % de compatibilidad con el ecosistema de Python.
Usando incorporado
Para las transformaciones, Bytewax facilita las transformaciones con estado y sin estado con mapas , ventanas y métodos de agregación y viene con características familiares como recuperación y escalabilidad.
cera de bytes
Permite a los usuarios crear canalizaciones de transmisión de datos y aplicaciones en tiempo real y crear las personalizaciones necesarias para satisfacer sus necesidades sin tener que aprender y mantener plataformas de transmisión basadas en JVM como Spark o Flink.
Bytewax es adecuado para muchos casos de uso, a saber,
Para inspiración de casos de uso y más información como documentación, tutoriales y guías, no dude en consultar
La creación de perfiles de datos es clave para un inicio exitoso de cualquier tarea de aprendizaje automático y se refiere al paso de
En una palabra,
Garantizar altos estándares de calidad de datos es crucial para todos los dominios y organizaciones, pero es especialmente relevante para los dominios que operan con dominios que generan datos continuos , donde las circunstancias pueden cambiar rápidamente y pueden requerir una acción inmediata (por ejemplo, monitoreo de atención médica, valores de existencias, políticas de calidad del aire).
Para muchos dominios, la creación de perfiles de datos se utiliza desde una perspectiva de análisis de datos exploratorios, teniendo en cuenta los datos históricos almacenados en las bases de datos. Por el contrario, para los flujos de datos, la creación de perfiles de datos se vuelve esencial para la validación y el control de calidad continuos a lo largo del flujo , donde los datos deben verificarse en diferentes períodos o etapas del proceso.
Al incorporar un perfilado automatizado en nuestros flujos de datos , podemos obtener comentarios de inmediato sobre el estado actual de nuestros datos y recibir alertas sobre cualquier problema potencialmente crítico, ya sea que esté relacionado con la consistencia e integridad de los datos (p. ej., valores corruptos o cambios de formato), o con eventos que ocurren en períodos cortos de tiempo (p. ej., desvíos de datos, desviación de las reglas comerciales y los resultados).
En los dominios del mundo real, donde simplemente sabe que la ley de Murphy está destinada a golpear y "definitivamente todo puede salir mal", ¡ la creación de perfiles automatizados podría salvarnos de múltiples acertijos cerebrales y sistemas que necesitan ser sacados de producción!
En lo que respecta a la creación de perfiles de datos, ydata-profiling
ha sido consistentemente una
Las operaciones complejas y que consumen mucho tiempo se realizan bajo el capó: ydata-profiling detecta automáticamente los tipos de características incluidos en los datos y, según los tipos de características (numéricos o categóricos), ajusta las estadísticas de resumen y las visualizaciones que se muestran en el informe de creación de perfiles.
Al fomentar un análisis centrado en los datos , el paquete también destaca las relaciones existentes entre las características , centrándose en sus interacciones y correlaciones por pares, y proporciona una evaluación exhaustiva de las alertas de calidad de los datos , desde valores duplicados o constantes hasta características sesgadas y desequilibradas .
Es realmente una vista de 360º de la calidad de nuestros datos , con un esfuerzo mínimo.
Antes de comenzar el proyecto, primero debemos establecer nuestras dependencias de Python y configurar nuestra fuente de datos.
Primero, instalemos los paquetes bytewax
y ydata-profiling
( es posible que desee utilizar un entorno virtual para esto:
pip install bytewax==0.16.2 ydata-profiling==4.3.1
Luego, subiremos el
wget https://raw.githubusercontent.com/bytewax/air-quality-sensor/main/data/iot_telemetry_data_1000
En un entorno de producción, cada dispositivo generaría continuamente estas mediciones y la entrada se vería como lo que esperamos en una plataforma de transmisión.
(Como nota al margen rápida, un flujo de datos es esencialmente una canalización de datos que se puede describir como un gráfico acíclico dirigido: DAG)
Primero, hagamos algunas importaciones necesarias :
from datetime import datetime, timedelta, timezone from bytewax.dataflow import Dataflow from bytewax.connectors.stdio import StdOutput from bytewax.connectors.files import CSVInput from bytewax.testing import run_main
Luego, definimos nuestro objeto de flujo de datos. Luego, usaremos un método de mapa sin estado donde pasamos una función para convertir la cadena en un objeto DateTime y reestructurar los datos al formato (device_id, data).
El método del mapa hará el cambio a cada punto de datos sin estado. La razón por la que hemos modificado la forma de nuestros datos es que podemos agruparlos fácilmente en los siguientes pasos para perfilar los datos de cada dispositivo por separado en lugar de todos los dispositivos simultáneamente.
flow = Dataflow() flow.input("simulated_stream", CSVInput("/content/iot_telemetry_data_1000")) # parse timestamp def parse_time(reading_data): reading_data["ts"] = datetime.fromtimestamp(float(reading_data["ts"]), timezone.utc) return reading_data flow.map(parse_time) # remap format to tuple (device_id, reading_data) flow.map(lambda reading_data: (reading_data['device'], reading_data))
Ahora, aprovecharemos las capacidades de estado de bytewax
para recopilar datos para cada dispositivo durante el tiempo que hemos definido. ydata-profiling
espera una instantánea de los datos a lo largo del tiempo, lo que hace que el operador de ventana sea el método perfecto para hacerlo.
En ydata-profiling
, podemos producir estadísticas de resumen para un marco de datos que se especifica para un contexto particular. Por ejemplo, en nuestro ejemplo, podemos producir instantáneas de datos que se refieren a cada dispositivo IoT o a marcos de tiempo particulares:
from bytewax.window import EventClockConfig, TumblingWindow # This is the accumulator function, and outputs a list of readings def acc_values(acc, reading): acc.append(reading) return acc # This function instructs the event clock on how to retrieve the # event's datetime from the input. def get_time(reading): return reading["ts"] # Configure the `fold_window` operator to use the event time. cc = EventClockConfig(get_time, wait_for_system_duration=timedelta(seconds=30)) # And a tumbling window align_to = datetime(2020, 1, 1, tzinfo=timezone.utc) wc = TumblingWindow(align_to=align_to, length=timedelta(hours=1)) flow.fold_window("running_average", cc, wc, list, acc_values) flow.inspect(print)
Después de definir las instantáneas, aprovechar ydata-profiling
es tan simple como llamar a ProfileReport
para cada uno de los marcos de datos que nos gustaría analizar:
import pandas as pd from ydata_profiling import ProfileReport def profile(device_id__readings): print(device_id__readings) device_id, readings = device_id__readings start_time = readings[0]['ts'].replace(minute=0, second=0, microsecond=0).strftime('%Y-%m-%d %H:%M:%S') df = pd.DataFrame(readings) profile = ProfileReport( df, tsmode=True, sortby="ts", title=f"Sensor Readings - device: {device_id}" ) profile.to_file(f"Ts_Profile_{device_id}-{start_time}.html") return f"device {device_id} profiled at hour {start_time}" flow.map(profile)
En este ejemplo, estamos escribiendo las imágenes en archivos locales como parte de una función en un método de mapa. Estos podrían informarse a través de una herramienta de mensajería o podríamos guardarlos en algún almacenamiento remoto en el futuro.
Una vez que se completa el perfil, el flujo de datos espera algún resultado, por lo que podemos usar el StdOutput
integrado para imprimir el dispositivo que se perfiló y la hora en que se perfiló se eliminó de la función de perfil en el paso del mapa:
flow.output("out", StdOutput())
Hay varias formas de ejecutar flujos de datos de Bytewax. En este ejemplo, usamos la misma máquina local, pero Bytewax también puede ejecutarse en múltiples procesos de Python, en múltiples hosts, en un
En este artículo, continuaremos con una configuración local, pero le recomendamos que consulte nuestra herramienta de ayuda
Suponiendo que estamos en el mismo directorio que el archivo con la definición del flujo de datos, podemos ejecutarlo usando:
python -m bytewax.run ydata-profiling-streaming:flow
Luego, podemos usar los informes de creación de perfiles para validar la calidad de los datos, verificar cambios en los esquemas o formatos de datos y comparar las características de los datos entre diferentes dispositivos o ventanas de tiempo .
De hecho, podemos aprovechar la
snapshot_a_report = ProfileReport(df_a, title="Snapshot A") snapshot_b_report = ProfileReport(df_b, title="Snapshot B") comparison_report =snapshot_a_report(snapshot_b_report) comparison_report.to_file("comparison_report.html")
La validación de flujos de datos es crucial para identificar problemas en la calidad de los datos de manera continua y comparar el estado de los datos en distintos períodos de tiempo.
Para las organizaciones de atención médica , energía , fabricación y entretenimiento , que trabajan con flujos continuos de datos, la creación de perfiles automatizada es clave para establecer las mejores prácticas de gobierno de datos , desde la evaluación de la calidad hasta la privacidad de los datos.
Esto requiere el análisis de instantáneas de datos que, como se muestra en este artículo, se pueden lograr de manera transparente al combinar bytewax
y ydata-profiling
.
Bytewax se encarga de todos los procesos necesarios para manejar y estructurar los flujos de datos en instantáneas, que luego se pueden resumir y comparar con el perfilado de datos a través de un informe completo de las características de los datos.
Ser capaz de procesar y perfilar adecuadamente los datos entrantes abre una gran cantidad de casos de uso en diferentes dominios, desde la corrección de errores en esquemas y formatos de datos hasta el resaltado y la mitigación de problemas adicionales que se derivan de actividades del mundo real, como detección de anomalías (por ejemplo, detección de fraude o intrusión/amenazas), mal funcionamiento del equipo y otros eventos que se desvían de las expectativas (por ejemplo, desviaciones de datos o desalineación con las reglas comerciales).
¡Ya está todo listo para comenzar a explorar sus flujos de datos! Háganos saber qué otros casos de uso encuentra y, como siempre, no dude en enviarnos una línea en los comentarios o encontrarnos en el
Este artículo fue escrito por Fabiana Clemente (Co-fundadora & CDO @
Puede encontrar información adicional sobre los paquetes OSS en las respectivas documentaciones:
También publicado aquí