En esta publicación de blog, cubriremos cómo puede combinar y aprovechar la solución de transmisión de código abierto, , con , para mejorar la calidad de sus flujos de transmisión. ¡Cinturón de seguridad! bytewax ydata-profiling El procesamiento de flujo permite el análisis en tiempo real de los datos en curso y antes del almacenamiento y puede ser o . con sin estado se utiliza para recomendaciones , detección de patrones o procesamiento de eventos complejos, donde se requiere el historial de lo que sucedió para el procesamiento (ventanas, unión por clave, etc.). El procesamiento de flujo con estado en tiempo real se usa para la transformación que no requiere el conocimiento de otros puntos de datos en el flujo, como enmascarar un correo electrónico o convertir un tipo. El procesamiento de flujo sin estado en línea En general, los flujos de datos se usan ampliamente en la industria y se pueden encontrar aplicados a casos de uso como , o . la detección de fraudes el monitoreo de pacientes el mantenimiento predictivo de eventos Un aspecto crucial que todos los flujos de datos deben tener en cuenta es la calidad de los datos A diferencia de los modelos tradicionales en los que la calidad de los datos suele evaluarse durante la creación del almacén de datos o la solución del tablero, . la transmisión de datos requiere una supervisión continua Es esencial mantener la calidad de los datos durante todo el proceso, desde la recopilación hasta la alimentación de las aplicaciones posteriores. Después de todo, el costo de la mala calidad de los datos puede ser alto para las organizaciones: “El costo de los datos erróneos es un asombroso 15% a 25% de los ingresos para la mayoría de las empresas. (…) Dos tercios de estos costos pueden eliminarse al adelantarse a la calidad de los datos”. — Thomas C. Redman, autor de “Poner al frente la calidad de los datos” A lo largo de este artículo, le mostraremos cómo puede combinar con para perfilar y mejorar la calidad de sus flujos de transmisión. bytewa ydata-profiling Procesamiento de transmisiones para profesionales de datos con Bytewax es un marco de procesamiento de flujo OSS diseñado específicamente para desarrolladores de Python. cera de bytes Permite a los usuarios con capacidades similares a Flink, Spark y Kafka Streams, a la vez que proporciona una interfaz amigable y familiar y crear canalizaciones de transmisión de datos y aplicaciones en tiempo real 100 % de compatibilidad con el ecosistema de Python. Usando incorporado o bibliotecas de Python existentes, (Kafka, RedPanda, WebSocket, etc.) y en varios sistemas posteriores (Kafka, archivos de parquet, lagos de datos, etc.). conectores puede conectarse a fuentes de datos de transmisión y en tiempo real escribir datos transformados Para las transformaciones, Bytewax con , y métodos y viene con características familiares como recuperación y escalabilidad. facilita las transformaciones con estado y sin estado mapas ventanas de agregación cera de bytes y está . facilita una experiencia centrada en los datos y basada en Python primero para los flujos de datos diseñado específicamente para ingenieros de datos y científicos de datos Permite a los usuarios y crear las personalizaciones necesarias para satisfacer sus necesidades sin tener que aprender y mantener plataformas de transmisión basadas en JVM como Spark o Flink. crear canalizaciones de transmisión de datos y aplicaciones en tiempo real Bytewax es adecuado para muchos casos de uso, a saber, , , , y más. Incrustación de canalizaciones para IA generativa Manejo de valores faltantes en flujos de datos Uso de modelos de lenguaje en un contexto de transmisión para comprender los mercados financieros Para inspiración de casos de uso y más información como documentación, tutoriales y guías, no dude en consultar . el sitio web bytewax ¿Por qué perfilar datos para flujos de datos? y se refiere al paso de : su estructura, comportamiento y calidad. La creación de perfiles de datos es clave para un inicio exitoso de cualquier tarea de aprendizaje automático entender a fondo nuestros datos En una palabra, implica analizar aspectos relacionados con el formato de los datos y los descriptores básicos (p. ej., número de muestras, número/tipos de características, valores duplicados), su (como la presencia de datos faltantes o características desequilibradas) y otros factores complicados que pueden surgir durante la recopilación o el procesamiento de datos (p. ej., valores erróneos o características inconsistentes). perfilado de datos características intrínsecas , donde las circunstancias pueden cambiar rápidamente y pueden requerir una acción inmediata (por ejemplo, monitoreo de atención médica, valores de existencias, políticas de calidad del aire). Garantizar altos estándares de calidad de datos es crucial para todos los dominios y organizaciones, pero es especialmente relevante para los dominios que operan con dominios que generan datos continuos Para muchos dominios, la creación de perfiles de datos se utiliza desde una perspectiva de análisis de datos exploratorios, teniendo en cuenta los datos históricos almacenados en las bases de datos. Por el contrario, para los flujos de datos, , donde los datos deben verificarse en diferentes períodos o etapas del proceso. la creación de perfiles de datos se vuelve esencial para la validación y el control de calidad continuos a lo largo del flujo Al incorporar un , podemos sobre el estado actual de nuestros datos y recibir alertas sobre cualquier problema potencialmente crítico, ya sea que esté relacionado con (p. ej., valores corruptos o cambios de formato), o con (p. ej., desvíos de datos, desviación de las reglas comerciales y los resultados). perfilado automatizado en nuestros flujos de datos obtener comentarios de inmediato la consistencia e integridad de los datos eventos que ocurren en períodos cortos de tiempo En los dominios del mundo real, la creación de perfiles automatizados podría salvarnos de múltiples acertijos cerebrales y sistemas que necesitan ser sacados de producción! donde simplemente sabe que la ley de Murphy está destinada a golpear y "definitivamente todo puede salir mal", ¡ En lo que respecta a la creación de perfiles de datos, ha sido consistentemente una , ya sea para o datos. Y no es de extrañar por qué: ydata-profiling favorito de la multitud tabular series de tiempo es una línea de código para un amplio conjunto de análisis e información. Las operaciones complejas y que consumen mucho tiempo se realizan bajo el capó: ydata-profiling según los tipos de características (numéricos o categóricos), que se muestran en el informe de creación de perfiles. detecta automáticamente los tipos de características incluidos en los datos y, ajusta las estadísticas de resumen y las visualizaciones Al fomentar un , el paquete también , centrándose en sus y por pares, y proporciona una , desde valores o hasta características y . análisis centrado en los datos destaca las relaciones existentes entre las características interacciones correlaciones evaluación exhaustiva de las alertas de calidad de los datos duplicados constantes sesgadas desequilibradas Es realmente una , con un esfuerzo mínimo. vista de 360º de la calidad de nuestros datos Poniendo todo junto: bytewax y ydata-profiling Antes de comenzar el proyecto, primero debemos establecer nuestras dependencias de Python y configurar nuestra fuente de datos. Primero, instalemos los paquetes y ( bytewax ydata-profiling es posible que desee utilizar un entorno virtual para esto: revisa estas instrucciones si necesita alguna orientación adicional!) pip install bytewax==0.16.2 ydata-profiling==4.3.1 Luego, subiremos el (Licencia — CC0: dominio público), que contiene varias mediciones de de diferentes dispositivos IoT: Conjunto de datos de telemetría de sensores ambientales temperatura, humedad, monóxido de carbono, gas de petróleo líquido, humo, luz y movimiento wget https://raw.githubusercontent.com/bytewax/air-quality-sensor/main/data/iot_telemetry_data_1000 y la entrada se vería como lo que esperamos en una plataforma de transmisión. . En este artículo, y crearemos un flujo de datos usando bytewax. En un entorno de producción, cada dispositivo generaría continuamente estas mediciones como kafka para simular el contexto que encontraríamos con la transmisión de datos, leeremos los datos del archivo CSV una línea a la vez (Como nota al margen rápida, un flujo de datos es esencialmente una canalización de datos que se puede describir como un gráfico acíclico dirigido: DAG) Primero, hagamos algunas : importaciones necesarias from datetime import datetime, timedelta, timezone from bytewax.dataflow import Dataflow from bytewax.connectors.stdio import StdOutput from bytewax.connectors.files import CSVInput from bytewax.testing import run_main Luego, definimos nuestro objeto de flujo de datos. Luego, usaremos un método de mapa sin estado donde pasamos una función para convertir la cadena en un objeto DateTime y reestructurar los datos al formato (device_id, data). El método del mapa hará el cambio a cada punto de datos sin estado. La razón por la que hemos modificado la forma de nuestros datos es que podemos agruparlos fácilmente en los siguientes pasos para perfilar los datos de cada dispositivo por separado en lugar de todos los dispositivos simultáneamente. flow = Dataflow() flow.input("simulated_stream", CSVInput("/content/iot_telemetry_data_1000")) # parse timestamp def parse_time(reading_data): reading_data["ts"] = datetime.fromtimestamp(float(reading_data["ts"]), timezone.utc) return reading_data flow.map(parse_time) # remap format to tuple (device_id, reading_data) flow.map(lambda reading_data: (reading_data['device'], reading_data)) Ahora, aprovecharemos las capacidades de estado de para recopilar datos para cada dispositivo durante el tiempo que hemos definido. espera una instantánea de los datos a lo largo del tiempo, lo que hace que el operador de ventana sea el método perfecto para hacerlo. bytewax ydata-profiling En , podemos producir estadísticas de resumen para un marco de datos que se especifica para un contexto particular. Por ejemplo, en nuestro ejemplo, podemos producir instantáneas de datos que se refieren a cada dispositivo IoT o a marcos de tiempo particulares: ydata-profiling from bytewax.window import EventClockConfig, TumblingWindow # This is the accumulator function, and outputs a list of readings def acc_values(acc, reading): acc.append(reading) return acc # This function instructs the event clock on how to retrieve the # event's datetime from the input. def get_time(reading): return reading["ts"] # Configure the `fold_window` operator to use the event time. cc = EventClockConfig(get_time, wait_for_system_duration=timedelta(seconds=30)) # And a tumbling window align_to = datetime(2020, 1, 1, tzinfo=timezone.utc) wc = TumblingWindow(align_to=align_to, length=timedelta(hours=1)) flow.fold_window("running_average", cc, wc, list, acc_values) flow.inspect(print) Después de definir las instantáneas, aprovechar es tan simple como llamar a para cada uno de los marcos de datos que nos gustaría analizar: ydata-profiling ProfileReport import pandas as pd from ydata_profiling import ProfileReport def profile(device_id__readings): print(device_id__readings) device_id, readings = device_id__readings start_time = readings[0]['ts'].replace(minute=0, second=0, microsecond=0).strftime('%Y-%m-%d %H:%M:%S') df = pd.DataFrame(readings) profile = ProfileReport( df, tsmode=True, sortby="ts", title=f"Sensor Readings - device: {device_id}" ) profile.to_file(f"Ts_Profile_{device_id}-{start_time}.html") return f"device {device_id} profiled at hour {start_time}" flow.map(profile) En este ejemplo, estamos escribiendo las imágenes en archivos locales como parte de una función en un método de mapa. Estos podrían informarse a través de una herramienta de mensajería o podríamos guardarlos en algún almacenamiento remoto en el futuro. Una vez que se completa el perfil, el flujo de datos espera algún resultado, por lo que podemos usar el integrado para imprimir el dispositivo que se perfiló y la hora en que se perfiló se eliminó de la función de perfil en el paso del mapa: StdOutput flow.output("out", StdOutput()) Hay varias formas de ejecutar flujos de datos de Bytewax. En este ejemplo, usamos la misma máquina local, pero Bytewax también puede ejecutarse en múltiples procesos de Python, en múltiples hosts, en un , usando un , y . Contenedor Docker Clúster de Kubernetes más En este artículo, continuaremos con una configuración local, pero le recomendamos que consulte nuestra herramienta de ayuda que administra las implementaciones de flujo de datos de Kubernetes una vez que su canalización está lista para la transición a producción. encerado Suponiendo que estamos en el mismo directorio que el archivo con la definición del flujo de datos, podemos ejecutarlo usando: python -m bytewax.run ydata-profiling-streaming:flow Luego, podemos usar los informes de creación de perfiles para validar la calidad de los datos, verificar cambios en los esquemas o formatos de datos y . comparar las características de los datos entre diferentes dispositivos o ventanas de tiempo De hecho, podemos aprovechar la que destaca las diferencias entre dos perfiles de datos de una manera sencilla, lo que nos facilita la detección de patrones importantes que deben investigarse o problemas que deben abordarse: funcionalidad de informe de comparación snapshot_a_report = ProfileReport(df_a, title="Snapshot A") snapshot_b_report = ProfileReport(df_b, title="Snapshot B") comparison_report =snapshot_a_report(snapshot_b_report) comparison_report.to_file("comparison_report.html") ¿Listo para explorar sus propios flujos de datos? La validación de flujos de datos es crucial para identificar problemas en la calidad de los datos de manera continua y comparar el estado de los datos en distintos períodos de tiempo. Para las organizaciones de , , y , que trabajan con flujos continuos de datos, la , desde la evaluación de la calidad hasta la privacidad de los datos. atención médica energía fabricación entretenimiento creación de perfiles automatizada es clave para establecer las mejores prácticas de gobierno de datos Esto requiere el análisis de instantáneas de datos que, como se muestra en este artículo, se pueden lograr de manera transparente al combinar y . bytewax ydata-profiling se encarga de todos los procesos necesarios para manejar y estructurar los flujos de datos en instantáneas, que luego se pueden resumir y comparar con a través de un informe completo de las características de los datos. Bytewax el perfilado de datos Ser capaz de procesar y perfilar adecuadamente los datos entrantes abre una gran cantidad de casos de uso en diferentes dominios, desde la hasta el resaltado y la mitigación de problemas adicionales que se derivan de actividades del mundo real, como (por ejemplo, detección de fraude o intrusión/amenazas), y otros eventos que se desvían de las expectativas (por ejemplo, desviaciones de datos o desalineación con las reglas comerciales). corrección de errores en esquemas y formatos de datos detección de anomalías mal funcionamiento del equipo ¡Ya está todo listo para comenzar a explorar sus flujos de datos! Háganos saber qué otros casos de uso encuentra y, como siempre, no dude en enviarnos una línea en los comentarios o encontrarnos en el para más preguntas y sugerencias! Comunidad de IA centrada en datos ¡Te veo allí! Expresiones de gratitud ) y Oli Makhasoeva (Relaciones con desarrolladores @ ) -- desarrollando . Este artículo fue escrito por Fabiana Clemente (Co-fundadora & CDO @ YData ) y Miriam Santos (Relaciones con Desarrolladores @ YData ) -- desarrollando ydata-perfilado -- y Zander Matheson (CEO y fundador @ cera de bytes byetwax cera de bytes Puede encontrar información adicional sobre los paquetes OSS en las respectivas documentaciones: & . documentos de perfilado de ydata documentos bytewax También publicado aquí