Dans cet article de blog, nous expliquerons comment vous pouvez combiner et exploiter la solution de streaming open source, bytewax , avec ydata-profiling , pour améliorer la qualité de vos flux de streaming. Bouclez votre ceinture !
Le traitement de flux permet une analyse en temps réel des données en cours et avant le stockage et peut être avec ou sans état .
Le traitement de flux avec état est utilisé pour les recommandations en temps réel , la détection de modèles ou le traitement d'événements complexes, où l'historique de ce qui s'est passé est requis pour le traitement (fenêtres, jointure par une clé, etc.).
Le traitement de flux sans état est utilisé pour la transformation en ligne qui ne nécessite pas la connaissance d'autres points de données dans le flux, comme le masquage d'un e-mail ou la conversion d'un type.
Dans l'ensemble, les flux de données sont largement utilisés dans l'industrie et peuvent être appliqués à des cas d'utilisation tels que la détection des fraudes , la surveillance des patients ou la maintenance prédictive des événements .
Contrairement aux modèles traditionnels où la qualité des données est généralement évaluée lors de la création de l'entrepôt de données ou de la solution de tableau de bord, le streaming de données nécessite une surveillance continue .
Il est essentiel de maintenir la qualité des données tout au long du processus, de la collecte à l'alimentation des applications en aval. Après tout, le coût d'une mauvaise qualité des données peut être élevé pour les organisations :
« Le coût des mauvaises données représente 15 % à 25 % du chiffre d'affaires pour la plupart des entreprises. (…) Les deux tiers de ces coûts peuvent être éliminés en misant sur la qualité des données.
— Thomas C. Redman, auteur de « Getting in Front on Data Quality »
Tout au long de cet article, nous allons vous montrer comment combiner bytewa
avec ydata-profiling
pour profiler et améliorer la qualité de vos flux de streaming !
Il permet aux utilisateurs de créer des pipelines de données de streaming et des applications en temps réel avec des fonctionnalités similaires à Flink, Spark et Kafka Streams tout en offrant une interface conviviale et familière et une compatibilité à 100 % avec l'écosystème Python.
Utilisation intégrée
Pour les transformations, Bytewax facilite les transformations avec et sans état avec les méthodes map , windowing et agrégation et est livré avec des fonctionnalités familières telles que la récupération et l'évolutivité.
Bytewax
Il permet aux utilisateurs de créer des pipelines de données de streaming et des applications en temps réel et de créer les personnalisations nécessaires pour répondre à leurs besoins sans avoir à apprendre et à maintenir des plates-formes de streaming basées sur JVM comme Spark ou Flink.
Bytewax est bien adapté à de nombreux cas d'utilisation, à savoir,
Pour vous inspirer de cas d'utilisation et plus d'informations comme la documentation, les didacticiels et les guides, n'hésitez pas à consulter
Le profilage des données est la clé d'un démarrage réussi de toute tâche d'apprentissage automatique et fait référence à l'étape de
En un mot,
Garantir des normes élevées de qualité des données est crucial pour tous les domaines et organisations, mais est particulièrement pertinent pour les domaines fonctionnant avec des domaines produisant des données continues , où les circonstances peuvent changer rapidement et nécessiter une action immédiate (par exemple, surveillance des soins de santé, valeurs des stocks, politiques de qualité de l'air).
Pour de nombreux domaines, le profilage des données est utilisé dans une optique d'analyse exploratoire des données, en tenant compte des données historiques stockées dans des bases de données. Au contraire, pour les flux de données, le profilage des données devient essentiel pour la validation et le contrôle qualité en continu tout au long du flux , où les données doivent être vérifiées à différentes périodes ou étapes du processus.
En intégrant un profilage automatisé dans nos flux de données , nous pouvons immédiatement obtenir des informations sur l'état actuel de nos données et être alertés de tout problème potentiellement critique, qu'il soit lié à la cohérence et à l'intégrité des données (par exemple, des valeurs corrompues ou des changements de formats), ou à des événements se produisant sur de courtes périodes (par exemple, des dérives de données, des écarts par rapport aux règles et résultats métier).
Dans les domaines du monde réel - où vous savez juste que la loi de Murphy est vouée à frapper et "tout peut certainement mal tourner" - le profilage automatisé pourrait nous éviter de multiples énigmes cérébrales et des systèmes devant être retirés de la production !
En ce qui concerne le profilage des données, ydata-profiling
a toujours été un
Des opérations complexes et chronophages sont effectuées sous le capot : ydata-profiling détecte automatiquement les types d'entités compris dans les données et, en fonction des types d'entités (numériques ou catégoriques), il ajuste les statistiques récapitulatives et les visualisations qui sont affichées dans le rapport de profilage.
Favorisant une analyse centrée sur les données , le package met également en évidence les relations existantes entre les fonctionnalités , en se concentrant sur leurs interactions et corrélations par paires , et fournit une évaluation approfondie des alertes de qualité des données , des valeurs en double ou constantes aux fonctionnalités biaisées et déséquilibrées .
C'est vraiment une vue à 360º de la qualité de nos données — avec un minimum d'effort.
Avant de démarrer le projet, nous devons d'abord définir nos dépendances Python et configurer notre source de données.
Tout d'abord, installons les packages bytewax
et ydata-profiling
( vous pouvez utiliser un environnement virtuel pour cela —
pip install bytewax==0.16.2 ydata-profiling==4.3.1
Ensuite, nous téléchargeons le
wget https://raw.githubusercontent.com/bytewax/air-quality-sensor/main/data/iot_telemetry_data_1000
Dans un environnement de production, ces mesures seraient générées en continu par chaque appareil et l'entrée ressemblerait à ce que nous attendons d'une plateforme de streaming.
(En guise de remarque rapide, un flux de données est essentiellement un pipeline de données qui peut être décrit comme un graphe acyclique dirigé - DAG)
Commençons par effectuer les importations nécessaires :
from datetime import datetime, timedelta, timezone from bytewax.dataflow import Dataflow from bytewax.connectors.stdio import StdOutput from bytewax.connectors.files import CSVInput from bytewax.testing import run_main
Ensuite, nous définissons notre objet de flux de données. Ensuite, nous utiliserons une méthode de carte sans état où nous transmettrons une fonction pour convertir la chaîne en un objet DateTime et restructurer les données au format (device_id, data).
La méthode map apportera la modification à chaque point de données de manière sans état. La raison pour laquelle nous avons modifié la forme de nos données est que nous pouvons facilement regrouper les données dans les prochaines étapes pour profiler les données de chaque appareil séparément plutôt que pour tous les appareils simultanément.
flow = Dataflow() flow.input("simulated_stream", CSVInput("/content/iot_telemetry_data_1000")) # parse timestamp def parse_time(reading_data): reading_data["ts"] = datetime.fromtimestamp(float(reading_data["ts"]), timezone.utc) return reading_data flow.map(parse_time) # remap format to tuple (device_id, reading_data) flow.map(lambda reading_data: (reading_data['device'], reading_data))
Maintenant, nous allons tirer parti des capacités avec état de bytewax
pour collecter des données pour chaque appareil sur la durée que nous avons définie. ydata-profiling
attend un instantané des données au fil du temps, ce qui fait de l'opérateur de fenêtre la méthode idéale à utiliser pour ce faire.
Dans ydata-profiling
, nous sommes en mesure de produire des statistiques récapitulatives pour une trame de données spécifiée pour un contexte particulier. Par exemple, dans notre exemple, nous pouvons produire des instantanés de données se référant à chaque appareil IoT ou à des périodes particulières :
from bytewax.window import EventClockConfig, TumblingWindow # This is the accumulator function, and outputs a list of readings def acc_values(acc, reading): acc.append(reading) return acc # This function instructs the event clock on how to retrieve the # event's datetime from the input. def get_time(reading): return reading["ts"] # Configure the `fold_window` operator to use the event time. cc = EventClockConfig(get_time, wait_for_system_duration=timedelta(seconds=30)) # And a tumbling window align_to = datetime(2020, 1, 1, tzinfo=timezone.utc) wc = TumblingWindow(align_to=align_to, length=timedelta(hours=1)) flow.fold_window("running_average", cc, wc, list, acc_values) flow.inspect(print)
Une fois les instantanés définis, tirer parti ydata-profiling
est aussi simple que d'appeler le ProfileReport
pour chacune des trames de données que nous souhaitons analyser :
import pandas as pd from ydata_profiling import ProfileReport def profile(device_id__readings): print(device_id__readings) device_id, readings = device_id__readings start_time = readings[0]['ts'].replace(minute=0, second=0, microsecond=0).strftime('%Y-%m-%d %H:%M:%S') df = pd.DataFrame(readings) profile = ProfileReport( df, tsmode=True, sortby="ts", title=f"Sensor Readings - device: {device_id}" ) profile.to_file(f"Ts_Profile_{device_id}-{start_time}.html") return f"device {device_id} profiled at hour {start_time}" flow.map(profile)
Dans cet exemple, nous écrivons les images dans des fichiers locaux dans le cadre d'une fonction dans une méthode map. Ceux-ci pourraient être signalés via un outil de messagerie ou nous pourrions les enregistrer dans un stockage distant à l'avenir.
Une fois le profil terminé, le flux de données attend une sortie afin que nous puissions utiliser le StdOutput
intégré pour imprimer le périphérique qui a été profilé et l'heure à laquelle il a été profilé qui a été transmise à la fonction de profil dans l'étape de mappage :
flow.output("out", StdOutput())
Il existe plusieurs façons d'exécuter les flux de données Bytewax. Dans cet exemple, nous utilisons la même machine locale, mais Bytewax peut également s'exécuter sur plusieurs processus Python, sur plusieurs hôtes, dans un
Dans cet article, nous continuerons avec une configuration locale, mais nous vous encourageons à consulter notre outil d'aide
En supposant que nous soyons dans le même répertoire que le fichier avec la définition du flux de données, nous pouvons l'exécuter en utilisant :
python -m bytewax.run ydata-profiling-streaming:flow
Nous pouvons ensuite utiliser les rapports de profilage pour valider la qualité des données, vérifier les changements de schémas ou de formats de données et comparer les caractéristiques des données entre différents appareils ou fenêtres temporelles .
En fait, nous pouvons tirer parti de la
snapshot_a_report = ProfileReport(df_a, title="Snapshot A") snapshot_b_report = ProfileReport(df_b, title="Snapshot B") comparison_report =snapshot_a_report(snapshot_b_report) comparison_report.to_file("comparison_report.html")
La validation des flux de données est cruciale pour identifier les problèmes de qualité des données de manière continue et comparer l'état des données sur des périodes distinctes.
Pour les organisations des secteurs de la santé , de l'énergie , de la fabrication et du divertissement , qui travaillent toutes avec des flux continus de données, un profilage automatisé est essentiel pour établir les meilleures pratiques de gouvernance des données , de l'évaluation de la qualité à la confidentialité des données.
Cela nécessite l'analyse d'instantanés de données qui, comme présenté dans cet article, peuvent être réalisées de manière transparente en combinant bytewax
et ydata-profiling
.
Bytewax prend en charge tous les processus nécessaires pour gérer et structurer les flux de données en instantanés, qui peuvent ensuite être résumés et comparés avec ydata-profiling via un rapport complet des caractéristiques des données.
Être capable de traiter et de profiler de manière appropriée les données entrantes ouvre une pléthore de cas d'utilisation dans différents domaines, de la correction des erreurs dans les schémas et formats de données à la mise en évidence et à l'atténuation des problèmes supplémentaires qui découlent des activités du monde réel, tels que la détection d'anomalies (par exemple, la détection de fraude ou d'intrusion/menaces), le dysfonctionnement de l'équipement et d'autres événements qui s'écartent des attentes (par exemple, les dérives de données ou le désalignement avec les règles métier).
Vous êtes maintenant prêt à commencer à explorer vos flux de données ! Faites-nous savoir quels autres cas d'utilisation vous trouvez, et comme toujours, n'hésitez pas à nous laisser un mot dans les commentaires, ou à nous trouver sur le
Cet article a été écrit par Fabiana Clemente (Co-fondatrice & CDO @
Vous pouvez trouver des informations supplémentaires sur les packages OSS dans les documentations respectives :
Également publié ici