paint-brush
Cómo entender sus datos en tiempo real usando bytewax y ydata-profilingpor@ydata
798 lecturas
798 lecturas

Cómo entender sus datos en tiempo real usando bytewax y ydata-profiling

por YData9m2023/07/25
Read on Terminal Reader

Demasiado Largo; Para Leer

Solo un increíble tutorial paso a paso sobre cómo realizar perfiles de datos en flujos de datos 🚀
featured image - Cómo entender sus datos en tiempo real usando bytewax y ydata-profiling
YData HackerNoon profile picture

En esta publicación de blog, cubriremos cómo puede combinar y aprovechar la solución de transmisión de código abierto, bytewax , con ydata-profiling , para mejorar la calidad de sus flujos de transmisión. ¡Cinturón de seguridad!


El procesamiento de flujo permite el análisis en tiempo real de los datos en curso y antes del almacenamiento y puede ser con o sin estado .


El procesamiento de flujo con estado se utiliza para recomendaciones en tiempo real , detección de patrones o procesamiento de eventos complejos, donde se requiere el historial de lo que sucedió para el procesamiento (ventanas, unión por clave, etc.).


El procesamiento de flujo sin estado se usa para la transformación en línea que no requiere el conocimiento de otros puntos de datos en el flujo, como enmascarar un correo electrónico o convertir un tipo.


Foto de Markus Spiske en Unsplash


En general, los flujos de datos se usan ampliamente en la industria y se pueden encontrar aplicados a casos de uso como la detección de fraudes , el monitoreo de pacientes o el mantenimiento predictivo de eventos .

Un aspecto crucial que todos los flujos de datos deben tener en cuenta es la calidad de los datos

A diferencia de los modelos tradicionales en los que la calidad de los datos suele evaluarse durante la creación del almacén de datos o la solución del tablero, la transmisión de datos requiere una supervisión continua .


Es esencial mantener la calidad de los datos durante todo el proceso, desde la recopilación hasta la alimentación de las aplicaciones posteriores. Después de todo, el costo de la mala calidad de los datos puede ser alto para las organizaciones:


“El costo de los datos erróneos es un asombroso 15% a 25% de los ingresos para la mayoría de las empresas. (…) Dos tercios de estos costos pueden eliminarse al adelantarse a la calidad de los datos”.


— Thomas C. Redman, autor de “Poner al frente la calidad de los datos”


A lo largo de este artículo, le mostraremos cómo puede combinar bytewa con ydata-profiling para perfilar y mejorar la calidad de sus flujos de transmisión.

Procesamiento de transmisiones para profesionales de datos con Bytewax

cera de bytes es un marco de procesamiento de flujo OSS diseñado específicamente para desarrolladores de Python.


Permite a los usuarios crear canalizaciones de transmisión de datos y aplicaciones en tiempo real con capacidades similares a Flink, Spark y Kafka Streams, a la vez que proporciona una interfaz amigable y familiar y 100 % de compatibilidad con el ecosistema de Python.


Usando incorporado conectores o bibliotecas de Python existentes, puede conectarse a fuentes de datos de transmisión y en tiempo real (Kafka, RedPanda, WebSocket, etc.) y escribir datos transformados en varios sistemas posteriores (Kafka, archivos de parquet, lagos de datos, etc.).


Para las transformaciones, Bytewax facilita las transformaciones con estado y sin estado con mapas , ventanas y métodos de agregación y viene con características familiares como recuperación y escalabilidad.


cera de bytes facilita una experiencia centrada en los datos y basada en Python primero para los flujos de datos y está diseñado específicamente para ingenieros de datos y científicos de datos .


Permite a los usuarios crear canalizaciones de transmisión de datos y aplicaciones en tiempo real y crear las personalizaciones necesarias para satisfacer sus necesidades sin tener que aprender y mantener plataformas de transmisión basadas en JVM como Spark o Flink.


Bytewax es adecuado para muchos casos de uso, a saber, Incrustación de canalizaciones para IA generativa , Manejo de valores faltantes en flujos de datos , Uso de modelos de lenguaje en un contexto de transmisión para comprender los mercados financieros , y más.


Para inspiración de casos de uso y más información como documentación, tutoriales y guías, no dude en consultar el sitio web bytewax .

¿Por qué perfilar datos para flujos de datos?

La creación de perfiles de datos es clave para un inicio exitoso de cualquier tarea de aprendizaje automático y se refiere al paso de entender a fondo nuestros datos : su estructura, comportamiento y calidad.


En una palabra, perfilado de datos implica analizar aspectos relacionados con el formato de los datos y los descriptores básicos (p. ej., número de muestras, número/tipos de características, valores duplicados), su características intrínsecas (como la presencia de datos faltantes o características desequilibradas) y otros factores complicados que pueden surgir durante la recopilación o el procesamiento de datos (p. ej., valores erróneos o características inconsistentes).


Garantizar altos estándares de calidad de datos es crucial para todos los dominios y organizaciones, pero es especialmente relevante para los dominios que operan con dominios que generan datos continuos , donde las circunstancias pueden cambiar rápidamente y pueden requerir una acción inmediata (por ejemplo, monitoreo de atención médica, valores de existencias, políticas de calidad del aire).


Para muchos dominios, la creación de perfiles de datos se utiliza desde una perspectiva de análisis de datos exploratorios, teniendo en cuenta los datos históricos almacenados en las bases de datos. Por el contrario, para los flujos de datos, la creación de perfiles de datos se vuelve esencial para la validación y el control de calidad continuos a lo largo del flujo , donde los datos deben verificarse en diferentes períodos o etapas del proceso.


Al incorporar un perfilado automatizado en nuestros flujos de datos , podemos obtener comentarios de inmediato sobre el estado actual de nuestros datos y recibir alertas sobre cualquier problema potencialmente crítico, ya sea que esté relacionado con la consistencia e integridad de los datos (p. ej., valores corruptos o cambios de formato), o con eventos que ocurren en períodos cortos de tiempo (p. ej., desvíos de datos, desviación de las reglas comerciales y los resultados).


En los dominios del mundo real, donde simplemente sabe que la ley de Murphy está destinada a golpear y "definitivamente todo puede salir mal", ¡ la creación de perfiles automatizados podría salvarnos de múltiples acertijos cerebrales y sistemas que necesitan ser sacados de producción!


En lo que respecta a la creación de perfiles de datos, ydata-profiling ha sido consistentemente una favorito de la multitud , ya sea para tabular o series de tiempo datos. Y no es de extrañar por qué: es una línea de código para un amplio conjunto de análisis e información.


Las operaciones complejas y que consumen mucho tiempo se realizan bajo el capó: ydata-profiling detecta automáticamente los tipos de características incluidos en los datos y, según los tipos de características (numéricos o categóricos), ajusta las estadísticas de resumen y las visualizaciones que se muestran en el informe de creación de perfiles.


Al fomentar un análisis centrado en los datos , el paquete también destaca las relaciones existentes entre las características , centrándose en sus interacciones y correlaciones por pares, y proporciona una evaluación exhaustiva de las alertas de calidad de los datos , desde valores duplicados o constantes hasta características sesgadas y desequilibradas .


Es realmente una vista de 360º de la calidad de nuestros datos , con un esfuerzo mínimo.


Informe de creación de perfiles: Resaltar posibles problemas de calidad de los datos.



Poniendo todo junto: bytewax y ydata-profiling

Antes de comenzar el proyecto, primero debemos establecer nuestras dependencias de Python y configurar nuestra fuente de datos.


Primero, instalemos los paquetes bytewax y ydata-profiling ( es posible que desee utilizar un entorno virtual para esto: revisa estas instrucciones si necesita alguna orientación adicional!)


 pip install bytewax==0.16.2 ydata-profiling==4.3.1


Luego, subiremos el Conjunto de datos de telemetría de sensores ambientales (Licencia — CC0: dominio público), que contiene varias mediciones de temperatura, humedad, monóxido de carbono, gas de petróleo líquido, humo, luz y movimiento de diferentes dispositivos IoT:


 wget https://raw.githubusercontent.com/bytewax/air-quality-sensor/main/data/iot_telemetry_data_1000


En un entorno de producción, cada dispositivo generaría continuamente estas mediciones y la entrada se vería como lo que esperamos en una plataforma de transmisión. como kafka . En este artículo, para simular el contexto que encontraríamos con la transmisión de datos, leeremos los datos del archivo CSV una línea a la vez y crearemos un flujo de datos usando bytewax.


(Como nota al margen rápida, un flujo de datos es esencialmente una canalización de datos que se puede describir como un gráfico acíclico dirigido: DAG)


Primero, hagamos algunas importaciones necesarias :


 from datetime import datetime, timedelta, timezone from bytewax.dataflow import Dataflow from bytewax.connectors.stdio import StdOutput from bytewax.connectors.files import CSVInput from bytewax.testing import run_main


Luego, definimos nuestro objeto de flujo de datos. Luego, usaremos un método de mapa sin estado donde pasamos una función para convertir la cadena en un objeto DateTime y reestructurar los datos al formato (device_id, data).


El método del mapa hará el cambio a cada punto de datos sin estado. La razón por la que hemos modificado la forma de nuestros datos es que podemos agruparlos fácilmente en los siguientes pasos para perfilar los datos de cada dispositivo por separado en lugar de todos los dispositivos simultáneamente.


 flow = Dataflow() flow.input("simulated_stream", CSVInput("/content/iot_telemetry_data_1000")) # parse timestamp def parse_time(reading_data): reading_data["ts"] = datetime.fromtimestamp(float(reading_data["ts"]), timezone.utc) return reading_data flow.map(parse_time) # remap format to tuple (device_id, reading_data) flow.map(lambda reading_data: (reading_data['device'], reading_data))


Ahora, aprovecharemos las capacidades de estado de bytewax para recopilar datos para cada dispositivo durante el tiempo que hemos definido. ydata-profiling espera una instantánea de los datos a lo largo del tiempo, lo que hace que el operador de ventana sea el método perfecto para hacerlo.


En ydata-profiling , podemos producir estadísticas de resumen para un marco de datos que se especifica para un contexto particular. Por ejemplo, en nuestro ejemplo, podemos producir instantáneas de datos que se refieren a cada dispositivo IoT o a marcos de tiempo particulares:


 from bytewax.window import EventClockConfig, TumblingWindow # This is the accumulator function, and outputs a list of readings def acc_values(acc, reading): acc.append(reading) return acc # This function instructs the event clock on how to retrieve the # event's datetime from the input. def get_time(reading): return reading["ts"] # Configure the `fold_window` operator to use the event time. cc = EventClockConfig(get_time, wait_for_system_duration=timedelta(seconds=30)) # And a tumbling window align_to = datetime(2020, 1, 1, tzinfo=timezone.utc) wc = TumblingWindow(align_to=align_to, length=timedelta(hours=1)) flow.fold_window("running_average", cc, wc, list, acc_values) flow.inspect(print)


Después de definir las instantáneas, aprovechar ydata-profiling es tan simple como llamar a ProfileReport para cada uno de los marcos de datos que nos gustaría analizar:


 import pandas as pd from ydata_profiling import ProfileReport def profile(device_id__readings): print(device_id__readings) device_id, readings = device_id__readings start_time = readings[0]['ts'].replace(minute=0, second=0, microsecond=0).strftime('%Y-%m-%d %H:%M:%S') df = pd.DataFrame(readings) profile = ProfileReport( df, tsmode=True, sortby="ts", title=f"Sensor Readings - device: {device_id}" ) profile.to_file(f"Ts_Profile_{device_id}-{start_time}.html") return f"device {device_id} profiled at hour {start_time}" flow.map(profile)


En este ejemplo, estamos escribiendo las imágenes en archivos locales como parte de una función en un método de mapa. Estos podrían informarse a través de una herramienta de mensajería o podríamos guardarlos en algún almacenamiento remoto en el futuro.


Una vez que se completa el perfil, el flujo de datos espera algún resultado, por lo que podemos usar el StdOutput integrado para imprimir el dispositivo que se perfiló y la hora en que se perfiló se eliminó de la función de perfil en el paso del mapa:


 flow.output("out", StdOutput())


Hay varias formas de ejecutar flujos de datos de Bytewax. En este ejemplo, usamos la misma máquina local, pero Bytewax también puede ejecutarse en múltiples procesos de Python, en múltiples hosts, en un Contenedor Docker , usando un Clúster de Kubernetes , y más .


En este artículo, continuaremos con una configuración local, pero le recomendamos que consulte nuestra herramienta de ayuda encerado que administra las implementaciones de flujo de datos de Kubernetes una vez que su canalización está lista para la transición a producción.


Suponiendo que estamos en el mismo directorio que el archivo con la definición del flujo de datos, podemos ejecutarlo usando:


 python -m bytewax.run ydata-profiling-streaming:flow


Luego, podemos usar los informes de creación de perfiles para validar la calidad de los datos, verificar cambios en los esquemas o formatos de datos y comparar las características de los datos entre diferentes dispositivos o ventanas de tiempo .


De hecho, podemos aprovechar la funcionalidad de informe de comparación que destaca las diferencias entre dos perfiles de datos de una manera sencilla, lo que nos facilita la detección de patrones importantes que deben investigarse o problemas que deben abordarse:


 snapshot_a_report = ProfileReport(df_a, title="Snapshot A") snapshot_b_report = ProfileReport(df_b, title="Snapshot B") comparison_report =snapshot_a_report(snapshot_b_report) comparison_report.to_file("comparison_report.html")


¿Listo para explorar sus propios flujos de datos?

La validación de flujos de datos es crucial para identificar problemas en la calidad de los datos de manera continua y comparar el estado de los datos en distintos períodos de tiempo.


Para las organizaciones de atención médica , energía , fabricación y entretenimiento , que trabajan con flujos continuos de datos, la creación de perfiles automatizada es clave para establecer las mejores prácticas de gobierno de datos , desde la evaluación de la calidad hasta la privacidad de los datos.


Esto requiere el análisis de instantáneas de datos que, como se muestra en este artículo, se pueden lograr de manera transparente al combinar bytewax y ydata-profiling .


Bytewax se encarga de todos los procesos necesarios para manejar y estructurar los flujos de datos en instantáneas, que luego se pueden resumir y comparar con el perfilado de datos a través de un informe completo de las características de los datos.


Ser capaz de procesar y perfilar adecuadamente los datos entrantes abre una gran cantidad de casos de uso en diferentes dominios, desde la corrección de errores en esquemas y formatos de datos hasta el resaltado y la mitigación de problemas adicionales que se derivan de actividades del mundo real, como detección de anomalías (por ejemplo, detección de fraude o intrusión/amenazas), mal funcionamiento del equipo y otros eventos que se desvían de las expectativas (por ejemplo, desviaciones de datos o desalineación con las reglas comerciales).


¡Ya está todo listo para comenzar a explorar sus flujos de datos! Háganos saber qué otros casos de uso encuentra y, como siempre, no dude en enviarnos una línea en los comentarios o encontrarnos en el Comunidad de IA centrada en datos para más preguntas y sugerencias! ¡Te veo allí!

Expresiones de gratitud

Este artículo fue escrito por Fabiana Clemente (Co-fundadora & CDO @ YData ) y Miriam Santos (Relaciones con Desarrolladores @ YData ) -- desarrollando ydata-perfilado -- y Zander Matheson (CEO y fundador @ cera de bytes ) y Oli Makhasoeva (Relaciones con desarrolladores @ byetwax ) -- desarrollando cera de bytes .


Puede encontrar información adicional sobre los paquetes OSS en las respectivas documentaciones: documentos de perfilado de ydata & documentos bytewax .


También publicado aquí