Dans cet article de blog, nous expliquerons comment vous pouvez combiner et exploiter la solution de streaming open source, , avec , pour améliorer la qualité de vos flux de streaming. Bouclez votre ceinture ! bytewax ydata-profiling Le traitement de flux permet une analyse en temps réel des données en cours et avant le stockage et peut être ou . avec sans état est utilisé pour les recommandations , la détection de modèles ou le traitement d'événements complexes, où l'historique de ce qui s'est passé est requis pour le traitement (fenêtres, jointure par une clé, etc.). Le traitement de flux avec état en temps réel est utilisé pour la transformation qui ne nécessite pas la connaissance d'autres points de données dans le flux, comme le masquage d'un e-mail ou la conversion d'un type. Le traitement de flux sans état en ligne Dans l'ensemble, les flux de données sont largement utilisés dans l'industrie et peuvent être appliqués à des cas d'utilisation tels que , ou . la détection des fraudes la surveillance des patients la maintenance prédictive des événements Un aspect crucial que tous les flux de données doivent prendre en compte est la qualité des données Contrairement aux modèles traditionnels où la qualité des données est généralement évaluée lors de la création de l'entrepôt de données ou de la solution de tableau de bord, . le streaming de données nécessite une surveillance continue Il est essentiel de maintenir la qualité des données tout au long du processus, de la collecte à l'alimentation des applications en aval. Après tout, le coût d'une mauvaise qualité des données peut être élevé pour les organisations : « Le coût des mauvaises données représente 15 % à 25 % du chiffre d'affaires pour la plupart des entreprises. (…) Les deux tiers de ces coûts peuvent être éliminés en misant sur la qualité des données. — Thomas C. Redman, auteur de « Getting in Front on Data Quality » Tout au long de cet article, nous allons vous montrer comment combiner avec pour profiler et améliorer la qualité de vos flux de streaming ! bytewa ydata-profiling Traitement de flux pour les professionnels des données avec Bytewax est un framework de traitement de flux OSS conçu spécifiquement pour les développeurs Python. Bytewax Il permet aux utilisateurs de avec des fonctionnalités similaires à Flink, Spark et Kafka Streams tout en offrant une interface conviviale et familière et créer des pipelines de données de streaming et des applications en temps réel une compatibilité à 100 % avec l'écosystème Python. Utilisation intégrée ou des bibliothèques Python existantes, (Kafka, RedPanda, WebSocket, etc.) et sur divers systèmes en aval (Kafka, fichiers parquet, lacs de données, etc.). connecteurs vous pouvez vous connecter à des sources de données en temps réel et en streaming écrire des données transformées Pour les transformations, Bytewax avec les méthodes , et et est livré avec des fonctionnalités familières telles que la récupération et l'évolutivité. facilite les transformations avec et sans état map windowing agrégation Bytewax et est . facilite une expérience Python-first et centrée sur les données pour les flux de données spécialement conçu pour les ingénieurs de données et les scientifiques de données Il permet aux utilisateurs de et de créer les personnalisations nécessaires pour répondre à leurs besoins sans avoir à apprendre et à maintenir des plates-formes de streaming basées sur JVM comme Spark ou Flink. créer des pipelines de données de streaming et des applications en temps réel Bytewax est bien adapté à de nombreux cas d'utilisation, à savoir, , , , et plus. Intégration de pipelines pour l'IA générative Gestion des valeurs manquantes dans les flux de données Utilisation de modèles de langage dans un contexte de streaming pour comprendre les marchés financiers Pour vous inspirer de cas d'utilisation et plus d'informations comme la documentation, les didacticiels et les guides, n'hésitez pas à consulter . le site bytewax Pourquoi le profilage des données pour les flux de données ? et fait référence à l'étape de : sa structure, son comportement et sa qualité. Le profilage des données est la clé d'un démarrage réussi de toute tâche d'apprentissage automatique bien comprendre nos données En un mot, consiste à analyser les aspects liés au format des données et aux descripteurs de base (par exemple, le nombre d'échantillons, le nombre/types de caractéristiques, les valeurs en double), son (telles que la présence de données manquantes ou de caractéristiques déséquilibrées) et d'autres facteurs de complication pouvant survenir lors de la collecte ou du traitement des données (par exemple, des valeurs erronées ou des caractéristiques incohérentes). profilage des données caractéristiques intrinsèques , où les circonstances peuvent changer rapidement et nécessiter une action immédiate (par exemple, surveillance des soins de santé, valeurs des stocks, politiques de qualité de l'air). Garantir des normes élevées de qualité des données est crucial pour tous les domaines et organisations, mais est particulièrement pertinent pour les domaines fonctionnant avec des domaines produisant des données continues Pour de nombreux domaines, le profilage des données est utilisé dans une optique d'analyse exploratoire des données, en tenant compte des données historiques stockées dans des bases de données. Au contraire, pour les flux de données, , où les données doivent être vérifiées à différentes périodes ou étapes du processus. le profilage des données devient essentiel pour la validation et le contrôle qualité en continu tout au long du flux En intégrant un , nous pouvons sur l'état actuel de nos données et être alertés de tout problème potentiellement critique, qu'il soit lié à (par exemple, des valeurs corrompues ou des changements de formats), ou à (par exemple, des dérives de données, des écarts par rapport aux règles et résultats métier). profilage automatisé dans nos flux de données immédiatement obtenir des informations la cohérence et à l'intégrité des données des événements se produisant sur de courtes périodes Dans les domaines du monde réel - - le profilage automatisé pourrait nous éviter de multiples énigmes cérébrales et des systèmes devant être retirés de la production ! où vous savez juste que la loi de Murphy est vouée à frapper et "tout peut certainement mal tourner" En ce qui concerne le profilage des données, a toujours été un , Soit pour ou données. Et ce n'est pas étonnant : ydata-profiling favori de la foule tabulaire des séries chronologiques il s'agit d'une seule ligne de code pour un ensemble complet d'analyses et d'informations. Des opérations complexes et chronophages sont effectuées sous le capot : ydata-profiling en fonction des types d'entités (numériques ou catégoriques), il qui sont affichées dans le rapport de profilage. détecte automatiquement les types d'entités compris dans les données et, ajuste les statistiques récapitulatives et les visualisations Favorisant une , le package met également , en se concentrant sur leurs et par paires , et fournit une , des valeurs ou aux fonctionnalités et . analyse centrée sur les données en évidence les relations existantes entre les fonctionnalités interactions corrélations évaluation approfondie des alertes de qualité des données en double constantes biaisées déséquilibrées C'est vraiment une — avec un minimum d'effort. vue à 360º de la qualité de nos données Tout mettre ensemble : bytewax et ydata-profiling Avant de démarrer le projet, nous devons d'abord définir nos dépendances Python et configurer notre source de données. Tout d'abord, installons les packages et ( bytewax ydata-profiling vous pouvez utiliser un environnement virtuel pour cela — vérifiez ces instructions si vous avez besoin de conseils supplémentaires !) pip install bytewax==0.16.2 ydata-profiling==4.3.1 Ensuite, nous téléchargeons le (Licence — CC0 : domaine public), qui contient plusieurs mesures de à partir de différents appareils IoT : Ensemble de données de télémétrie des capteurs environnementaux température, d'humidité, de monoxyde de carbone, de gaz de pétrole liquéfié, de fumée, de lumière et de mouvement wget https://raw.githubusercontent.com/bytewax/air-quality-sensor/main/data/iot_telemetry_data_1000 et l'entrée ressemblerait à ce que nous attendons d'une plateforme de streaming. . Dans cet article, et créer un flux de données à l'aide de bytewax. Dans un environnement de production, ces mesures seraient générées en continu par chaque appareil comme Kafka pour simuler le contexte que nous trouverions avec des données en streaming, nous allons lire les données du fichier CSV une ligne à la fois (En guise de remarque rapide, un flux de données est essentiellement un pipeline de données qui peut être décrit comme un graphe acyclique dirigé - DAG) Commençons par effectuer les : importations nécessaires from datetime import datetime, timedelta, timezone from bytewax.dataflow import Dataflow from bytewax.connectors.stdio import StdOutput from bytewax.connectors.files import CSVInput from bytewax.testing import run_main Ensuite, nous définissons notre objet de flux de données. Ensuite, nous utiliserons une méthode de carte sans état où nous transmettrons une fonction pour convertir la chaîne en un objet DateTime et restructurer les données au format (device_id, data). La méthode map apportera la modification à chaque point de données de manière sans état. La raison pour laquelle nous avons modifié la forme de nos données est que nous pouvons facilement regrouper les données dans les prochaines étapes pour profiler les données de chaque appareil séparément plutôt que pour tous les appareils simultanément. flow = Dataflow() flow.input("simulated_stream", CSVInput("/content/iot_telemetry_data_1000")) # parse timestamp def parse_time(reading_data): reading_data["ts"] = datetime.fromtimestamp(float(reading_data["ts"]), timezone.utc) return reading_data flow.map(parse_time) # remap format to tuple (device_id, reading_data) flow.map(lambda reading_data: (reading_data['device'], reading_data)) Maintenant, nous allons tirer parti des capacités avec état de pour collecter des données pour chaque appareil sur la durée que nous avons définie. attend un instantané des données au fil du temps, ce qui fait de l'opérateur de fenêtre la méthode idéale à utiliser pour ce faire. bytewax ydata-profiling Dans , nous sommes en mesure de produire des statistiques récapitulatives pour une trame de données spécifiée pour un contexte particulier. Par exemple, dans notre exemple, nous pouvons produire des instantanés de données se référant à chaque appareil IoT ou à des périodes particulières : ydata-profiling from bytewax.window import EventClockConfig, TumblingWindow # This is the accumulator function, and outputs a list of readings def acc_values(acc, reading): acc.append(reading) return acc # This function instructs the event clock on how to retrieve the # event's datetime from the input. def get_time(reading): return reading["ts"] # Configure the `fold_window` operator to use the event time. cc = EventClockConfig(get_time, wait_for_system_duration=timedelta(seconds=30)) # And a tumbling window align_to = datetime(2020, 1, 1, tzinfo=timezone.utc) wc = TumblingWindow(align_to=align_to, length=timedelta(hours=1)) flow.fold_window("running_average", cc, wc, list, acc_values) flow.inspect(print) Une fois les instantanés définis, tirer parti est aussi simple que d'appeler le pour chacune des trames de données que nous souhaitons analyser : ydata-profiling ProfileReport import pandas as pd from ydata_profiling import ProfileReport def profile(device_id__readings): print(device_id__readings) device_id, readings = device_id__readings start_time = readings[0]['ts'].replace(minute=0, second=0, microsecond=0).strftime('%Y-%m-%d %H:%M:%S') df = pd.DataFrame(readings) profile = ProfileReport( df, tsmode=True, sortby="ts", title=f"Sensor Readings - device: {device_id}" ) profile.to_file(f"Ts_Profile_{device_id}-{start_time}.html") return f"device {device_id} profiled at hour {start_time}" flow.map(profile) Dans cet exemple, nous écrivons les images dans des fichiers locaux dans le cadre d'une fonction dans une méthode map. Ceux-ci pourraient être signalés via un outil de messagerie ou nous pourrions les enregistrer dans un stockage distant à l'avenir. Une fois le profil terminé, le flux de données attend une sortie afin que nous puissions utiliser le intégré pour imprimer le périphérique qui a été profilé et l'heure à laquelle il a été profilé qui a été transmise à la fonction de profil dans l'étape de mappage : StdOutput flow.output("out", StdOutput()) Il existe plusieurs façons d'exécuter les flux de données Bytewax. Dans cet exemple, nous utilisons la même machine locale, mais Bytewax peut également s'exécuter sur plusieurs processus Python, sur plusieurs hôtes, dans un , utilisant un , et . Conteneur Docker Grappe Kubernetes plus Dans cet article, nous continuerons avec une configuration locale, mais nous vous encourageons à consulter notre outil d'aide qui gère les déploiements de flux de données Kubernetes une fois que votre pipeline est prêt à passer en production. waxctl En supposant que nous soyons dans le même répertoire que le fichier avec la définition du flux de données, nous pouvons l'exécuter en utilisant : python -m bytewax.run ydata-profiling-streaming:flow Nous pouvons ensuite utiliser les rapports de profilage pour valider la qualité des données, vérifier les changements de schémas ou de formats de données et . comparer les caractéristiques des données entre différents appareils ou fenêtres temporelles En fait, nous pouvons tirer parti de la qui met en évidence les différences entre deux profils de données de manière simple, ce qui nous permet de détecter plus facilement les modèles importants qui doivent être étudiés ou les problèmes qui doivent être résolus : fonctionnalité de rapport de comparaison snapshot_a_report = ProfileReport(df_a, title="Snapshot A") snapshot_b_report = ProfileReport(df_b, title="Snapshot B") comparison_report =snapshot_a_report(snapshot_b_report) comparison_report.to_file("comparison_report.html") Prêt à explorer vos propres flux de données ? La validation des flux de données est cruciale pour identifier les problèmes de qualité des données de manière continue et comparer l'état des données sur des périodes distinctes. Pour les organisations des , , et , qui travaillent toutes avec des flux continus de données, un , de l'évaluation de la qualité à la confidentialité des données. secteurs de la santé de l'énergie de la fabrication du divertissement profilage automatisé est essentiel pour établir les meilleures pratiques de gouvernance des données Cela nécessite l'analyse d'instantanés de données qui, comme présenté dans cet article, peuvent être réalisées de manière transparente en combinant et . bytewax ydata-profiling prend en charge tous les processus nécessaires pour gérer et structurer les flux de données en instantanés, qui peuvent ensuite être résumés et comparés avec via un rapport complet des caractéristiques des données. Bytewax ydata-profiling Être capable de traiter et de profiler de manière appropriée les données entrantes ouvre une pléthore de cas d'utilisation dans différents domaines, de la à la mise en évidence et à l'atténuation des problèmes supplémentaires qui découlent des activités du monde réel, tels que la (par exemple, la détection de fraude ou d'intrusion/menaces), et d'autres événements qui s'écartent des attentes (par exemple, les dérives de données ou le désalignement avec les règles métier). correction des erreurs dans les schémas et formats de données détection d'anomalies le dysfonctionnement de l'équipement Vous êtes maintenant prêt à commencer à explorer vos flux de données ! Faites-nous savoir quels autres cas d'utilisation vous trouvez, et comme toujours, n'hésitez pas à nous laisser un mot dans les commentaires, ou à nous trouver sur le pour d'autres questions et suggestions ! Communauté d'IA centrée sur les données On se voit là-bas! Remerciements ) et Oli Makhasoeva (Relations avec les développeurs @ ) -- développement . Cet article a été écrit par Fabiana Clemente (Co-fondatrice & CDO @ YData ) et Miriam Santos (Relations avec les développeurs @ YData ) -- développement ydata-profiling -- et Zander Matheson (PDG et fondateur @ Bytewax Byetwax cire d'octet Vous pouvez trouver des informations supplémentaires sur les packages OSS dans les documentations respectives : & . ydata-profiling docs docs bytewax Également publié ici