El código abierto de Rerun en febrero marcó un paso significativo para aquellos que buscan bibliotecas de visualización de Python accesibles pero potentes.
La visualización es esencial ya que empresas como Scale.ai, Weights & Biases y Hugging Face han optimizado el aprendizaje profundo al abordar el etiquetado de conjuntos de datos, el seguimiento de experimentos y modelos preentrenados. Sin embargo, todavía existe un vacío en la captura y visualización rápida de datos.
Muchas empresas desarrollan soluciones internas de visualización de datos, pero a menudo terminan con herramientas subóptimas debido a los altos costos de desarrollo. Además, la visualización de Python en la transmisión de datos es un problema que tampoco se resuelve bien, lo que lleva a soluciones basadas en JavaScrip t en portátiles. Rerun aprovecha una interfaz de Python en un motor de visualización Rust de alto rendimiento (¡muy parecido a Bytewax!) que hace que sea muy fácil analizar los datos de transmisión.
En esta publicación de blog, exploraremos cómo usar Bytewax y Rerun para visualizar datos de transmisión en tiempo real en Python y crear una visualización de detección de anomalías en tiempo real.
Elegimos la detección de anomalías, también conocida como detección de valores atípicos, porque es un componente crítico en numerosas aplicaciones, como la ciberseguridad, la detección de fraudes y la supervisión de procesos industriales. Visualizar estas anomalías en tiempo real puede ayudar a identificar rápidamente problemas potenciales y tomar las medidas necesarias para mitigarlos.
Para aquellos ansiosos por sumergirse, consulte nuestra solución completa de Python en nuestro GitHub . ¡No olvides destacar Bytewax!
Esto es lo que cubriremos:
¡Vamos!
Esta publicación de blog se basa en las siguientes versiones de Bytewax y Rerun:
bytewax==0.15.1 rerun-sdk==0.4.0
Rerun y Bytewax se pueden instalar como
pip install rerun-sdk pip install bytewax
Siga a Bytewax para obtener actualizaciones, ya que estamos preparando una nueva versión que facilitará aún más el desarrollo de aplicaciones de transmisión de datos en Python.
La solución es relativamente compacta, por lo que copiamos el código de ejemplo completo aquí. Siéntase libre de omitir esta gran parte si parece abrumadora; discutiremos cada función más adelante.
import random # pip install rerun-sdk import rerun as rr from time import sleep from datetime import datetime from bytewax.dataflow import Dataflow from bytewax.execution import spawn_cluster from bytewax.inputs import ManualInputConfig, distribute from bytewax.outputs import ManualOutputConfig rr.init("metrics") rr.spawn() start = datetime.now() def generate_random_metrics(worker_index, worker_count, resume_state): assert resume_state is None keys = ["1", "2", "3", "4", "5", "6"] this_workers_keys = distribute(keys, worker_index, worker_count) for _ in range(1000): for key in this_workers_keys: value = random.randrange(0, 10) if random.random() > 0.9: value *= 2.0 yield None, (key, (key, value, (datetime.now() - start).total_seconds())) sleep(random.random() / 10.0) class ZTestDetector: """Anomaly detector. Use with a call to flow.stateful_map(). Looks at how many standard deviations the current item is away from the mean (Z-score) of the last 10 items. Mark as anomalous if over the threshold specified. """ def __init__(self, threshold_z): self.threshold_z = threshold_z self.last_10 = [] self.mu = None self.sigma = None def _push(self, value): self.last_10.insert(0, value) del self.last_10[10:] def _recalc_stats(self): last_len = len(self.last_10) self.mu = sum(self.last_10) / last_len sigma_sq = sum((value - self.mu) ** 2 for value in self.last_10) / last_len self.sigma = sigma_sq**0.5 def push(self, key__value__t): key, value, t = key__value__t is_anomalous = False if self.mu and self.sigma: is_anomalous = abs(value - self.mu) / self.sigma > self.threshold_z self._push(value) self._recalc_stats() rr.log_scalar(f"temp_{key}/data", value, color=[155, 155, 155]) if is_anomalous: rr.log_point(f"3dpoint/anomaly/{key}", [t, value, float(key) * 10], radius=0.3, color=[255,100,100]) rr.log_scalar( f"temp_{key}/data/anomaly", value, scattered=True, radius=3.0, color=[255, 100, 100], ) else: rr.log_point(f"3dpoint/data/{key}", [t, value, float(key) * 10], radius=0.1) return self, (value, self.mu, self.sigma, is_anomalous) def output_builder(worker_index, worker_count): def inspector(input): metric, (value, mu, sigma, is_anomalous) = input print( f"{metric}: " f"value = {value}, " f"mu = {mu:.2f}, " f"sigma = {sigma:.2f}, " f"{is_anomalous}" ) return inspector if __name__ == '__main__': flow = Dataflow() flow.input("input", ManualInputConfig(generate_random_metrics)) # ("metric", value) flow.stateful_map("AnomalyDetector", lambda: ZTestDetector(2.0), ZTestDetector.push) # ("metric", (value, mu, sigma, is_anomalous)) flow.capture(ManualOutputConfig(output_builder)) spawn_cluster(flow)
El código proporcionado demuestra cómo crear una canalización de detección de anomalías en tiempo real mediante Bytewax y Rerun.
Desglosemos los componentes esenciales de este código:
generar_random_metrics : esta función genera métricas aleatorias que simulan flujos de datos del mundo real. Genera puntos de datos con una pequeña posibilidad de tener una anomalía (valores duplicados).
ZTestDetector : esta clase implementa un detector de anomalías utilizando el método Z-score. Mantiene la media y la desviación estándar de los últimos 10 valores y marca un valor como anómalo si su puntaje Z es mayor que un umbral especificado.
output_builder : esta función se usa para definir el comportamiento de salida para la canalización de datos. En este caso, imprime el nombre de la métrica, el valor, la media, la desviación estándar y si el valor es anómalo.
Flujo de datos : la parte principal del código construye el flujo de datos utilizando Bytewax, conectando RandomMetricInput, ZTestDetector y el generador de salida.
Visualización de repetición : la visualización de repetición está integrada en la clase ZTestDetector. Las funciones rr.log_scalar y rr.log_point se utilizan para trazar los puntos de datos y su estado de anomalía correspondiente.
Ahora, con una comprensión de los componentes principales del código, analicemos cómo se crea la visualización paso a paso.
Para crear una canalización de flujo de datos, debe hacer lo siguiente:
flow = Dataflow()
.flow.input("input", ManualInputConfig(generate_random_metrics))
.flow.stateful_map("AnomalyDetector", lambda: ZTestDetector(2.0), ZTestDetector.push)
.flow.capture(ManualOutputConfig(output_builder))
.spawn_cluster(flow, proc_count=3)
.
El flujo de datos resultante lee los valores de métrica generados aleatoriamente desde input_builder
, los pasa a través de ZTestDetector
para la detección de anomalías y genera los resultados mediante la función output_builder
. Aclaremos los detalles de cada paso.
generate_random_metrics
La función generate_random_metrics
sirve como una fuente de entrada alternativa para la canalización de flujo de datos, generando valores de métrica aleatorios de manera distribuida entre varios trabajadores. Acepta tres parámetros: worker_index
, worker_count
y resume_state
.
def generate_random_metrics(worker_index, worker_count, resume_state): assert resume_state is None keys = ["1", "2", "3", "4", "5", "6"] this_workers_keys = distribute(keys, worker_index, worker_count) for _ in range(1000): for key in keys: value = random.randrange(0, 10) if random.random() > 0.9: value *= 2.0 yield None, (key, (key, value, (datetime.now() - start).total_seconds())) sleep(random.random() / 10.0)
worker_index
: el índice del trabajador actual en la canalización de flujo de datos.
worker_count
: el número total de trabajadores en la canalización de flujo de datos.
resume_state
: el estado de la fuente de entrada desde la que se reanudará. En este caso, se afirma que es None
, lo que indica que la fuente de entrada no admite la reanudación desde un estado anterior.
Aquí hay una descripción paso a paso de la función generate_random_metrics
:
resume_state
es None
.Genera un valor aleatorio entre 0 y 10.
Con un 10% de probabilidad, duplica el valor para simular una anomalía.
Genera una tupla que contiene Ninguno (para indicar que no hay una clave de partición específica), la clave, el valor generado y el tiempo transcurrido desde la hora de inicio (no proporcionado en el fragmento de código).
Introduzca un tiempo de suspensión entre cada valor generado para simular la generación de datos en tiempo real.
La función generate_random_metrics
se usa en el flujo de datos como fuente de entrada con la siguiente línea de código:
flow.input("input", ManualInputConfig(generate_random_metrics))
Esta línea le dice al flujo de datos que use la clase RandomMetricInput
para generar los datos de entrada para la canalización.
ZTestDetector
La clase ZTestDetector
es un detector de anomalías que utiliza el método de puntuación Z para identificar si un punto de datos es anómalo o no. La puntuación Z es el número de desviaciones estándar de un punto de datos con respecto a la media de un conjunto de datos. Si la puntuación Z de un punto de datos es superior a un umbral especificado, se considera anómalo.
La clase tiene los siguientes métodos:
__init__(self, threshold_z)
: el constructor inicializa el ZTestDetector con un valor de puntuación Z de umbral. También inicializa la lista de los últimos 10 valores (self.last_10), la media (self.mu) y la desviación estándar (self.sigma).
_push(self, value)
: este método privado se usa para actualizar la lista de los últimos 10 valores con el nuevo valor. Inserta el nuevo valor al principio de la lista y elimina el valor más antiguo, manteniendo la longitud de la lista en 10.
_recalc_stats(self)
: este método privado vuelve a calcular la media y la desviación estándar en función de los valores actuales en la lista self.last_10.
push(self, key__value__t)
: este método público toma una tupla que contiene una clave, un valor y una marca de tiempo como entrada. Calcula la puntuación Z para el valor, actualiza la lista de los últimos 10 valores y vuelve a calcular la media y la desviación estándar. También registra el punto de datos y su estado de anomalía utilizando las funciones de visualización de Rerun. Finalmente, devuelve la instancia actualizada de la clase ZTestDetector y una tupla que contiene el valor, la media, la desviación estándar y el estado de anomalía.
La clase ZTestDetector se usa en la tubería de flujo de datos como un mapa con estado con el siguiente código:
flow.stateful_map("AnomalyDetector", lambda: ZTestDetector(2.0), ZTestDetector.push)
Esta línea le dice al flujo de datos que aplique el ZTestDetector
con un umbral de puntaje Z de 2.0
y use el método push
para procesar los puntos de datos.
Para visualizar las anomalías, la clase ZTestDetector
registra los puntos de datos y su estado de anomalía correspondiente utilizando las funciones de visualización de Rerun. Específicamente, rr.log_scalar
se usa para trazar un valor escalar, mientras que rr.log_point
se usa para trazar puntos 3D.
El siguiente fragmento de código muestra cómo se crea la visualización:
rr.log_scalar(f"temp_{key}/data", value, color=[155, 155, 155]) if is_anomalous: rr.log_point(f"3dpoint/anomaly/{key}", [t, value, float(key) * 10], radius=0.3, color=[255,100,100]) rr.log_scalar( f"temp_{key}/data/anomaly", value, scattered=True, radius=3.0, color=[255, 100, 100], ) else: rr.log_point(f"3dpoint/data/{key}", [t, value, float(key) * 10], radius=0.1)
Aquí, primero registramos un valor escalar que representa la métrica. Luego, dependiendo de si el valor es anómalo, registramos un punto 3D con un radio y color diferente. Los puntos anómalos se registran en rojo con un radio más grande, mientras que los puntos no anómalos se registran con un radio más pequeño.
output_builder
La función output_builder
se usa para definir el comportamiento de salida para la canalización de datos. En este ejemplo específico, es responsable de imprimir el nombre de la métrica, el valor, la media, la desviación estándar y si el valor es anómalo.
La función toma dos argumentos: worker_index
y worker_count
. Estos argumentos ayudan a la función a comprender el índice del trabajador y la cantidad total de trabajadores en la canalización del flujo de datos.
Aquí está la definición de la función output_builder
:
def output_builder(worker_index, worker_count): def inspector(input): metric, (value, mu, sigma, is_anomalous) = input print( f"{metric}: " f"value = {value}, " f"mu = {mu:.2f}, " f"sigma = {sigma:.2f}, " f"{is_anomalous}" ) return inspector
Esta función es una función de orden superior, lo que significa que devuelve otra función llamada inspector
. La función inspector
es responsable de procesar la tupla de datos de entrada e imprimir la salida deseada.
La función de generador de salida se usa más adelante en la canalización de flujo de datos al configurar el comportamiento de salida con
flow.capture(ManualOutputConfig(output_builder)).
Bytewax puede ejecutarse como un proceso único o de forma multiproceso. Este flujo de datos se creó para escalar a través de múltiples procesos, pero comenzaremos ejecutándolo como un solo proceso con el módulo de ejecución spawn_cluster
.
spawn_cluster(flow)
Si quisiéramos aumentar el paralelismo, simplemente agregaríamos más procesos como argumentos.
Por ejemplo, spawn_cluster(flow, proc_count=3)
.
Para ejecutar el código provisto, simplemente podemos ejecutarlo como un script de Python, pero primero debemos instalar las dependencias.
Cree un nuevo archivo en el mismo directorio que dataflow.py y asígnele el nombre requisitos.txt.
Agregue el siguiente contenido al archivo requirements.txt:
bytewax==0.15.1 rerun-sdk==0.4.0
Abra una terminal en el directorio que contiene los archivos requirements.txt y dataflow.py.
Instale las dependencias usando el siguiente comando:
pip install -r requirements.txt
¡Y ejecute el flujo de datos!
python dataflow.py
Si bien el código proporcionado sirve como un ejemplo básico de detección de anomalías en tiempo real, puede expandir esta canalización para adaptarse a escenarios más complejos.
Por ejemplo:
Incorpore fuentes de datos del mundo real : reemplace la clase RandomMetricInput con una clase personalizada que lea datos de una fuente del mundo real, como sensores de IoT, archivos de registro o API de transmisión.
Implemente técnicas de detección de anomalías más sofisticadas : puede reemplazar la clase ZTestDetector con otros métodos de detección de anomalías con estado, como el promedio móvil, el suavizado exponencial o los enfoques basados en el aprendizaje automático.
Personalice la visualización : mejore la visualización de repetición agregando más dimensiones de datos, ajustando los esquemas de color o modificando los estilos de trazado para que se adapten mejor a sus necesidades.
Integre con sistemas de alerta y monitoreo : en lugar de simplemente imprimir los resultados de la anomalía, puede integrar la tubería con sistemas de alerta o monitoreo para notificar a las partes interesadas apropiadas cuando se detecta una anomalía.
Al personalizar y ampliar la canalización del flujo de datos, puede crear una potente solución de detección y visualización de anomalías en tiempo real adaptada a su caso de uso específico. La combinación de Bytewax y Rerun ofrece una base versátil y escalable para construir sistemas de procesamiento y visualización de datos en tiempo real.
Esta publicación de blog ha demostrado cómo usar Bytewax y Rerun para crear una visualización de detección de anomalías en tiempo real. Al construir una tubería de flujo de datos con Bytewax e integrar las poderosas capacidades de visualización de Rerun, podemos monitorear e identificar anomalías en nuestros datos a medida que ocurren.
Escrito originalmente por Zander Matheson aquí.