L'open source de Rerun en février a marqué une étape importante pour ceux qui recherchent des bibliothèques de visualisation Python accessibles mais puissantes.
La visualisation est essentielle puisque des entreprises comme Scale.ai, Weights & Biases et Hugging Face ont rationalisé l'apprentissage en profondeur en abordant l'étiquetage des ensembles de données, le suivi des expériences et les modèles pré-formés. Cependant, un vide existe toujours dans la capture et la visualisation rapides des données.
De nombreuses entreprises développent des solutions de visualisation de données en interne, mais se retrouvent souvent avec des outils sous-optimaux en raison des coûts de développement élevés. De plus, la visualisation Python sur les données en streaming est un problème qui n'est pas bien résolu non plus, ce qui conduit à des solutions basées sur JavaScrip t dans les notebooks. Rerun exploite une interface Python dans un moteur de visualisation Rust très performant (un peu comme Bytewax !) qui facilite l'analyse des données de streaming.
Dans cet article de blog, nous explorerons comment utiliser Bytewax et Rerun pour visualiser des données de streaming en temps réel en Python et créer une visualisation de détection d'anomalies en temps réel.
Nous avons choisi la détection d'anomalies, également appelée détection de valeurs aberrantes, car il s'agit d'un composant critique dans de nombreuses applications, telles que la cybersécurité, la détection de fraude et la surveillance des processus industriels. La visualisation de ces anomalies en temps réel peut aider à identifier rapidement les problèmes potentiels et à prendre les mesures nécessaires pour les atténuer.
Pour ceux qui souhaitent plonger, consultez notre solution Python de bout en bout sur notre GitHub . N'oubliez pas de mettre Bytewax en vedette !
Voici ce que nous couvrirons :
Allons-y!
Ce billet de blog est basé sur les versions suivantes de Bytewax et Rerun :
bytewax==0.15.1 rerun-sdk==0.4.0
Rerun et Bytewax sont installables en tant que
pip install rerun-sdk pip install bytewax
Suivez Bytewax pour les mises à jour car nous préparons une nouvelle version qui facilitera davantage le développement d'applications de streaming de données en Python.
La solution est relativement compacte, nous copions donc l'intégralité de l'exemple de code ici. N'hésitez pas à sauter ce gros morceau s'il vous semble écrasant ; nous discuterons de chaque fonction plus tard.
import random # pip install rerun-sdk import rerun as rr from time import sleep from datetime import datetime from bytewax.dataflow import Dataflow from bytewax.execution import spawn_cluster from bytewax.inputs import ManualInputConfig, distribute from bytewax.outputs import ManualOutputConfig rr.init("metrics") rr.spawn() start = datetime.now() def generate_random_metrics(worker_index, worker_count, resume_state): assert resume_state is None keys = ["1", "2", "3", "4", "5", "6"] this_workers_keys = distribute(keys, worker_index, worker_count) for _ in range(1000): for key in this_workers_keys: value = random.randrange(0, 10) if random.random() > 0.9: value *= 2.0 yield None, (key, (key, value, (datetime.now() - start).total_seconds())) sleep(random.random() / 10.0) class ZTestDetector: """Anomaly detector. Use with a call to flow.stateful_map(). Looks at how many standard deviations the current item is away from the mean (Z-score) of the last 10 items. Mark as anomalous if over the threshold specified. """ def __init__(self, threshold_z): self.threshold_z = threshold_z self.last_10 = [] self.mu = None self.sigma = None def _push(self, value): self.last_10.insert(0, value) del self.last_10[10:] def _recalc_stats(self): last_len = len(self.last_10) self.mu = sum(self.last_10) / last_len sigma_sq = sum((value - self.mu) ** 2 for value in self.last_10) / last_len self.sigma = sigma_sq**0.5 def push(self, key__value__t): key, value, t = key__value__t is_anomalous = False if self.mu and self.sigma: is_anomalous = abs(value - self.mu) / self.sigma > self.threshold_z self._push(value) self._recalc_stats() rr.log_scalar(f"temp_{key}/data", value, color=[155, 155, 155]) if is_anomalous: rr.log_point(f"3dpoint/anomaly/{key}", [t, value, float(key) * 10], radius=0.3, color=[255,100,100]) rr.log_scalar( f"temp_{key}/data/anomaly", value, scattered=True, radius=3.0, color=[255, 100, 100], ) else: rr.log_point(f"3dpoint/data/{key}", [t, value, float(key) * 10], radius=0.1) return self, (value, self.mu, self.sigma, is_anomalous) def output_builder(worker_index, worker_count): def inspector(input): metric, (value, mu, sigma, is_anomalous) = input print( f"{metric}: " f"value = {value}, " f"mu = {mu:.2f}, " f"sigma = {sigma:.2f}, " f"{is_anomalous}" ) return inspector if __name__ == '__main__': flow = Dataflow() flow.input("input", ManualInputConfig(generate_random_metrics)) # ("metric", value) flow.stateful_map("AnomalyDetector", lambda: ZTestDetector(2.0), ZTestDetector.push) # ("metric", (value, mu, sigma, is_anomalous)) flow.capture(ManualOutputConfig(output_builder)) spawn_cluster(flow)
Le code fourni montre comment créer un pipeline de détection d'anomalies en temps réel à l'aide de Bytewax et Rerun.
Décomposons les composants essentiels de ce code :
generate_random_metrics : cette fonction génère des métriques aléatoires simulant des flux de données réels. Il génère des points de données avec une petite chance d'avoir une anomalie (valeurs doublées).
ZTestDetector : Cette classe implémente un détecteur d'anomalies en utilisant la méthode Z-score. Il maintient la moyenne et l'écart type des 10 dernières valeurs et marque une valeur comme anormale si son score Z est supérieur à un seuil spécifié.
output_builder : cette fonction est utilisée pour définir le comportement de sortie du pipeline de données. Dans ce cas, il imprime le nom de la métrique, la valeur, la moyenne, l'écart type et si la valeur est anormale.
Flux de données : la partie principale du code construit le flux de données à l'aide de Bytewax, en connectant RandomMetricInput, ZTestDetector et le générateur de sortie.
Visualisation Rerun : La visualisation Rerun est intégrée à la classe ZTestDetector. Les fonctions rr.log_scalar et rr.log_point sont utilisées pour tracer les points de données et leur état d'anomalie correspondant.
Maintenant, avec une compréhension des composants principaux du code, discutons de la façon dont la visualisation est créée étape par étape.
Pour créer un pipeline de flux de données, vous devez :
flow = Dataflow()
.flow.input("input", ManualInputConfig(generate_random_metrics))
.flow.stateful_map("AnomalyDetector", lambda: ZTestDetector(2.0), ZTestDetector.push)
.flow.capture(ManualOutputConfig(output_builder))
.spawn_cluster(flow, proc_count=3)
.
Le flux de données résultant lit les valeurs de métriques générées de manière aléatoire à partir de input_builder
, les transmet à travers le ZTestDetector
pour la détection d'anomalies et génère les résultats à l'aide de la fonction output_builder
. Clarifions les détails pour chaque étape.
generate_random_metrics
La fonction generate_random_metrics
sert de source d'entrée alternative pour le pipeline de flux de données, générant des valeurs de métriques aléatoires de manière distribuée sur plusieurs travailleurs. Il accepte trois paramètres : worker_index
, worker_count
et resume_state
.
def generate_random_metrics(worker_index, worker_count, resume_state): assert resume_state is None keys = ["1", "2", "3", "4", "5", "6"] this_workers_keys = distribute(keys, worker_index, worker_count) for _ in range(1000): for key in keys: value = random.randrange(0, 10) if random.random() > 0.9: value *= 2.0 yield None, (key, (key, value, (datetime.now() - start).total_seconds())) sleep(random.random() / 10.0)
worker_index
: l'index du travailleur actuel dans le pipeline de flux de données.
worker_count
: nombre total de nœuds de calcul dans le pipeline de flux de données.
resume_state
: l'état de la source d'entrée à partir de laquelle reprendre. Dans ce cas, il est affirmé être None
, indiquant que la source d'entrée ne prend pas en charge la reprise à partir d'un état précédent.
Voici une description étape par étape de la fonction generate_random_metrics
:
resume_state
est None
.Générer une valeur aléatoire entre 0 et 10.
Avec une probabilité de 10 %, doublez la valeur pour simuler une anomalie.
Renvoie un tuple contenant None (pour indiquer qu'il n'y a pas de clé de partition spécifique), la clé, la valeur générée et le temps écoulé depuis l'heure de début (non fourni dans l'extrait de code).
Introduisez un temps de repos entre chaque valeur générée pour simuler la génération de données en temps réel.
La fonction generate_random_metrics
est utilisée dans le flux de données comme source d'entrée avec la ligne de code suivante :
flow.input("input", ManualInputConfig(generate_random_metrics))
Cette ligne indique au flux de données d'utiliser la classe RandomMetricInput
pour générer les données d'entrée pour le pipeline.
ZTestDetector
Class La classe ZTestDetector
est un détecteur d'anomalies qui utilise la méthode Z-score pour identifier si un point de données est anormal ou non. Le score Z est le nombre d'écarts types entre un point de données et la moyenne d'un ensemble de données. Si le score Z d'un point de données est supérieur à un seuil spécifié, il est considéré comme anormal.
La classe a les méthodes suivantes :
__init__(self, threshold_z)
: Le constructeur initialise le ZTestDetector avec une valeur seuil Z-score. Il initialise également la liste des 10 dernières valeurs (self.last_10), la moyenne (self.mu) et l'écart type (self.sigma).
_push(self, value)
: Cette méthode privée est utilisée pour mettre à jour la liste des 10 dernières valeurs avec la nouvelle valeur. Il insère la nouvelle valeur au début de la liste et supprime la valeur la plus ancienne, en maintenant la longueur de la liste à 10.
_recalc_stats(self)
: cette méthode privée recalcule la moyenne et l'écart type en fonction des valeurs actuelles de la liste self.last_10.
push(self, key__value__t)
: Cette méthode publique prend un tuple contenant une clé, une valeur et un horodatage comme entrée. Il calcule le score Z pour la valeur, met à jour la liste des 10 dernières valeurs et recalcule la moyenne et l'écart type. Il enregistre également le point de données et son état d'anomalie à l'aide des fonctions de visualisation de Rerun. Enfin, il renvoie l'instance mise à jour de la classe ZTestDetector et un tuple contenant la valeur, la moyenne, l'écart type et le statut d'anomalie.
La classe ZTestDetector est utilisée dans le pipeline de flux de données en tant que carte avec état avec le code suivant :
flow.stateful_map("AnomalyDetector", lambda: ZTestDetector(2.0), ZTestDetector.push)
Cette ligne indique au flux de données d'appliquer le ZTestDetector
avec un seuil de score Z de 2.0
et d'utiliser la méthode push
pour traiter les points de données.
Pour visualiser les anomalies, la classe ZTestDetector
enregistre les points de données et leur état d'anomalie correspondant à l'aide des fonctions de visualisation de Rerun. Plus précisément, rr.log_scalar
est utilisé pour tracer une valeur scalaire, tandis que rr.log_point
est utilisé pour tracer des points 3D.
L'extrait de code suivant montre comment la visualisation est créée :
rr.log_scalar(f"temp_{key}/data", value, color=[155, 155, 155]) if is_anomalous: rr.log_point(f"3dpoint/anomaly/{key}", [t, value, float(key) * 10], radius=0.3, color=[255,100,100]) rr.log_scalar( f"temp_{key}/data/anomaly", value, scattered=True, radius=3.0, color=[255, 100, 100], ) else: rr.log_point(f"3dpoint/data/{key}", [t, value, float(key) * 10], radius=0.1)
Ici, nous enregistrons d'abord une valeur scalaire représentant la métrique. Ensuite, selon que la valeur est anormale, nous enregistrons un point 3D avec un rayon et une couleur différents. Les points anormaux sont enregistrés en rouge avec un rayon plus grand, tandis que les points non anormaux sont enregistrés avec un rayon plus petit.
output_builder
La fonction output_builder
est utilisée pour définir le comportement de sortie du pipeline de données. Dans cet exemple spécifique, il est chargé d'imprimer le nom de la métrique, la valeur, la moyenne, l'écart type et si la valeur est anormale.
La fonction prend deux arguments : worker_index
et worker_count
. Ces arguments aident la fonction à comprendre l'index du travailleur et le nombre total de travailleurs dans le pipeline de flux de données.
Voici la définition de la fonction output_builder
:
def output_builder(worker_index, worker_count): def inspector(input): metric, (value, mu, sigma, is_anomalous) = input print( f"{metric}: " f"value = {value}, " f"mu = {mu:.2f}, " f"sigma = {sigma:.2f}, " f"{is_anomalous}" ) return inspector
Cette fonction est une fonction d'ordre supérieur, ce qui signifie qu'elle renvoie une autre fonction appelée inspector
. La fonction inspector
est responsable du traitement du tuple de données d'entrée et de l'impression de la sortie souhaitée.
La fonction de générateur de sortie est ensuite utilisée dans le pipeline de flux de données lors de la configuration du comportement de sortie avec
flow.capture(ManualOutputConfig(output_builder)).
Bytewax peut fonctionner en un seul processus ou en plusieurs processus. Ce flux de données a été créé pour évoluer sur plusieurs processus, mais nous allons commencer par l'exécuter en tant que processus unique avec le module d'exécution spawn_cluster
.
spawn_cluster(flow)
Si nous voulions augmenter le parallélisme, nous ajouterions simplement plus de processus comme arguments.
Par exemple - spawn_cluster(flow, proc_count=3)
.
Pour exécuter le code fourni, nous pouvons simplement l'exécuter en tant que script Python, mais nous devons d'abord installer les dépendances.
Créez un nouveau fichier dans le même répertoire que dataflow.py et nommez-le requirements.txt.
Ajoutez le contenu suivant au fichier requirements.txt :
bytewax==0.15.1 rerun-sdk==0.4.0
Ouvrez un terminal dans le répertoire contenant les fichiers requirements.txt et dataflow.py.
Installez les dépendances à l'aide de la commande suivante :
pip install -r requirements.txt
Et exécutez le flux de données !
python dataflow.py
Bien que le code fourni serve d'exemple de base de détection d'anomalies en temps réel, vous pouvez étendre ce pipeline pour prendre en charge des scénarios plus complexes.
Par exemple:
Incorporer des sources de données réelles : remplacez la classe RandomMetricInput par une classe personnalisée qui lit les données d'une source réelle, telle que des capteurs IoT, des fichiers journaux ou des API de diffusion en continu.
Implémentez des techniques de détection d'anomalies plus sophistiquées : vous pouvez remplacer la classe ZTestDetector par d'autres méthodes de détection d'anomalies avec état, telles que la moyenne mobile, le lissage exponentiel ou les approches basées sur l'apprentissage automatique.
Personnalisez la visualisation : Améliorez la visualisation de réexécution en ajoutant plus de dimensions de données, en ajustant les schémas de couleurs ou en modifiant les styles de tracé pour mieux répondre à vos besoins.
Intégration avec des systèmes d'alerte et de surveillance : au lieu de simplement imprimer les résultats d'anomalie, vous pouvez intégrer le pipeline avec des systèmes d'alerte ou de surveillance pour informer les parties prenantes appropriées lorsqu'une anomalie est détectée.
En personnalisant et en étendant le pipeline de flux de données, vous pouvez créer une puissante solution de détection et de visualisation des anomalies en temps réel adaptée à votre cas d'utilisation spécifique. La combinaison de Bytewax et Rerun offre une base polyvalente et évolutive pour la construction de systèmes de traitement et de visualisation de données en temps réel.
Ce billet de blog a montré comment utiliser Bytewax et Rerun pour créer une visualisation de détection d'anomalies en temps réel. En créant un pipeline de flux de données avec Bytewax et en intégrant les puissantes capacités de visualisation de Rerun, nous pouvons surveiller et identifier les anomalies dans nos données au fur et à mesure qu'elles se produisent.
Écrit à l'origine par Zander Matheson ici.