paint-brush
Visualización de la detección de anomalías en tiempo real con Pythonpor@bytewax
1,209 lecturas
1,209 lecturas

Visualización de la detección de anomalías en tiempo real con Python

por bytewax13m2023/04/21
Read on Terminal Reader

Demasiado Largo; Para Leer

La visualización de Python para la transmisión de datos ha sido un desafío, lo que ha llevado a soluciones complejas basadas en JavaScript en portátiles. Volver a ejecutar con su naturaleza Python-Rust lo simplifica enormemente. Desarrollado por Bytewax, veremos un ejemplo de visualización de detección de anomalías de transmisión.
featured image - Visualización de la detección de anomalías en tiempo real con Python
bytewax HackerNoon profile picture
0-item


El código abierto de Rerun en febrero marcó un paso significativo para aquellos que buscan bibliotecas de visualización de Python accesibles pero potentes.


¿Por qué es importante la visualización?

La visualización es esencial ya que empresas como Scale.ai, Weights & Biases y Hugging Face han optimizado el aprendizaje profundo al abordar el etiquetado de conjuntos de datos, el seguimiento de experimentos y modelos preentrenados. Sin embargo, todavía existe un vacío en la captura y visualización rápida de datos.


Muchas empresas desarrollan soluciones internas de visualización de datos, pero a menudo terminan con herramientas subóptimas debido a los altos costos de desarrollo. Además, la visualización de Python en la transmisión de datos es un problema que tampoco se resuelve bien, lo que lleva a soluciones basadas en JavaScrip t en portátiles. Rerun aprovecha una interfaz de Python en un motor de visualización Rust de alto rendimiento (¡muy parecido a Bytewax!) que hace que sea muy fácil analizar los datos de transmisión.


En esta publicación de blog, exploraremos cómo usar Bytewax y Rerun para visualizar datos de transmisión en tiempo real en Python y crear una visualización de detección de anomalías en tiempo real.


Elegimos la detección de anomalías, también conocida como detección de valores atípicos, porque es un componente crítico en numerosas aplicaciones, como la ciberseguridad, la detección de fraudes y la supervisión de procesos industriales. Visualizar estas anomalías en tiempo real puede ayudar a identificar rápidamente problemas potenciales y tomar las medidas necesarias para mitigarlos.


Para aquellos ansiosos por sumergirse, consulte nuestra solución completa de Python en nuestro GitHub . ¡No olvides destacar Bytewax!

Descripción general

Esto es lo que cubriremos:

  • Navegaremos por el código y discutiremos brevemente las entidades de nivel superior
  • Luego, analizaremos cada paso del flujo de datos con mayor detalle: inicialización de nuestro flujo de datos, fuente de entrada, detección de anomalías con estado, visualización y salida de datos, y cómo generar un clúster.
  • Finalmente, aprenderemos cómo ejecutarlo y ver la hermosa visualización, todo en Python <3
  • Como beneficio adicional, pensaremos en otros casos de uso

¡Vamos!


Configure su entorno

Esta publicación de blog se basa en las siguientes versiones de Bytewax y Rerun:

 bytewax==0.15.1 rerun-sdk==0.4.0


Rerun y Bytewax se pueden instalar como

 pip install rerun-sdk pip install bytewax


Siga a Bytewax para obtener actualizaciones, ya que estamos preparando una nueva versión que facilitará aún más el desarrollo de aplicaciones de transmisión de datos en Python.

Código

La solución es relativamente compacta, por lo que copiamos el código de ejemplo completo aquí. Siéntase libre de omitir esta gran parte si parece abrumadora; discutiremos cada función más adelante.

 import random # pip install rerun-sdk import rerun as rr from time import sleep from datetime import datetime from bytewax.dataflow import Dataflow from bytewax.execution import spawn_cluster from bytewax.inputs import ManualInputConfig, distribute from bytewax.outputs import ManualOutputConfig rr.init("metrics") rr.spawn() start = datetime.now() def generate_random_metrics(worker_index, worker_count, resume_state): assert resume_state is None keys = ["1", "2", "3", "4", "5", "6"] this_workers_keys = distribute(keys, worker_index, worker_count) for _ in range(1000): for key in this_workers_keys: value = random.randrange(0, 10) if random.random() > 0.9: value *= 2.0 yield None, (key, (key, value, (datetime.now() - start).total_seconds())) sleep(random.random() / 10.0) class ZTestDetector: """Anomaly detector. Use with a call to flow.stateful_map(). Looks at how many standard deviations the current item is away from the mean (Z-score) of the last 10 items. Mark as anomalous if over the threshold specified. """ def __init__(self, threshold_z): self.threshold_z = threshold_z self.last_10 = [] self.mu = None self.sigma = None def _push(self, value): self.last_10.insert(0, value) del self.last_10[10:] def _recalc_stats(self): last_len = len(self.last_10) self.mu = sum(self.last_10) / last_len sigma_sq = sum((value - self.mu) ** 2 for value in self.last_10) / last_len self.sigma = sigma_sq**0.5 def push(self, key__value__t): key, value, t = key__value__t is_anomalous = False if self.mu and self.sigma: is_anomalous = abs(value - self.mu) / self.sigma > self.threshold_z self._push(value) self._recalc_stats() rr.log_scalar(f"temp_{key}/data", value, color=[155, 155, 155]) if is_anomalous: rr.log_point(f"3dpoint/anomaly/{key}", [t, value, float(key) * 10], radius=0.3, color=[255,100,100]) rr.log_scalar( f"temp_{key}/data/anomaly", value, scattered=True, radius=3.0, color=[255, 100, 100], ) else: rr.log_point(f"3dpoint/data/{key}", [t, value, float(key) * 10], radius=0.1) return self, (value, self.mu, self.sigma, is_anomalous) def output_builder(worker_index, worker_count): def inspector(input): metric, (value, mu, sigma, is_anomalous) = input print( f"{metric}: " f"value = {value}, " f"mu = {mu:.2f}, " f"sigma = {sigma:.2f}, " f"{is_anomalous}" ) return inspector if __name__ == '__main__': flow = Dataflow() flow.input("input", ManualInputConfig(generate_random_metrics)) # ("metric", value) flow.stateful_map("AnomalyDetector", lambda: ZTestDetector(2.0), ZTestDetector.push) # ("metric", (value, mu, sigma, is_anomalous)) flow.capture(ManualOutputConfig(output_builder)) spawn_cluster(flow)


El código proporcionado demuestra cómo crear una canalización de detección de anomalías en tiempo real mediante Bytewax y Rerun.


Desglosemos los componentes esenciales de este código:

  • generar_random_metrics : esta función genera métricas aleatorias que simulan flujos de datos del mundo real. Genera puntos de datos con una pequeña posibilidad de tener una anomalía (valores duplicados).


  • ZTestDetector : esta clase implementa un detector de anomalías utilizando el método Z-score. Mantiene la media y la desviación estándar de los últimos 10 valores y marca un valor como anómalo si su puntaje Z es mayor que un umbral especificado.


  • output_builder : esta función se usa para definir el comportamiento de salida para la canalización de datos. En este caso, imprime el nombre de la métrica, el valor, la media, la desviación estándar y si el valor es anómalo.


  • Flujo de datos : la parte principal del código construye el flujo de datos utilizando Bytewax, conectando RandomMetricInput, ZTestDetector y el generador de salida.


  • Visualización de repetición : la visualización de repetición está integrada en la clase ZTestDetector. Las funciones rr.log_scalar y rr.log_point se utilizan para trazar los puntos de datos y su estado de anomalía correspondiente.


Ahora, con una comprensión de los componentes principales del código, analicemos cómo se crea la visualización paso a paso.

Construyendo el flujo de datos

Para crear una canalización de flujo de datos, debe hacer lo siguiente:


  1. Inicializa un nuevo flujo de datos con flow = Dataflow() .
  2. Defina la fuente de entrada usando flow.input("input", ManualInputConfig(generate_random_metrics)) .
  3. Aplique el detector de anomalías con estado mediante flow.stateful_map("AnomalyDetector", lambda: ZTestDetector(2.0), ZTestDetector.push) .
  4. Configure el comportamiento de salida con flow.capture(ManualOutputConfig(output_builder)) .
  5. Finalmente, genere un clúster para ejecutar el flujo de datos con spawn_cluster(flow, proc_count=3) .


El flujo de datos resultante lee los valores de métrica generados aleatoriamente desde input_builder , los pasa a través de ZTestDetector para la detección de anomalías y genera los resultados mediante la función output_builder . Aclaremos los detalles de cada paso.

función generate_random_metrics

La función generate_random_metrics sirve como una fuente de entrada alternativa para la canalización de flujo de datos, generando valores de métrica aleatorios de manera distribuida entre varios trabajadores. Acepta tres parámetros: worker_index , worker_count y resume_state .


 def generate_random_metrics(worker_index, worker_count, resume_state): assert resume_state is None keys = ["1", "2", "3", "4", "5", "6"] this_workers_keys = distribute(keys, worker_index, worker_count) for _ in range(1000): for key in keys: value = random.randrange(0, 10) if random.random() > 0.9: value *= 2.0 yield None, (key, (key, value, (datetime.now() - start).total_seconds())) sleep(random.random() / 10.0)


  • worker_index : el índice del trabajador actual en la canalización de flujo de datos.

  • worker_count : el número total de trabajadores en la canalización de flujo de datos.

  • resume_state : el estado de la fuente de entrada desde la que se reanudará. En este caso, se afirma que es None , lo que indica que la fuente de entrada no admite la reanudación desde un estado anterior.


Aquí hay una descripción paso a paso de la función generate_random_metrics :


  1. Comprueba que resume_state es None .
  2. Defina una lista de claves que representan las métricas.
  3. Distribuya las claves entre los trabajadores mediante la función de distribución (no proporcionada en el fragmento de código). Las claves distribuidas para el trabajador actual se asignan a this_workers_keys.
  4. Itere 1000 veces y, para cada iteración, repase la lista de claves:
    • Genera un valor aleatorio entre 0 y 10.

    • Con un 10% de probabilidad, duplica el valor para simular una anomalía.

    • Genera una tupla que contiene Ninguno (para indicar que no hay una clave de partición específica), la clave, el valor generado y el tiempo transcurrido desde la hora de inicio (no proporcionado en el fragmento de código).

    • Introduzca un tiempo de suspensión entre cada valor generado para simular la generación de datos en tiempo real.


La función generate_random_metrics se usa en el flujo de datos como fuente de entrada con la siguiente línea de código:


 flow.input("input", ManualInputConfig(generate_random_metrics))


Esta línea le dice al flujo de datos que use la clase RandomMetricInput para generar los datos de entrada para la canalización.

Clase ZTestDetector

La clase ZTestDetector es un detector de anomalías que utiliza el método de puntuación Z para identificar si un punto de datos es anómalo o no. La puntuación Z es el número de desviaciones estándar de un punto de datos con respecto a la media de un conjunto de datos. Si la puntuación Z de un punto de datos es superior a un umbral especificado, se considera anómalo.


La clase tiene los siguientes métodos:

  • __init__(self, threshold_z) : el constructor inicializa el ZTestDetector con un valor de puntuación Z de umbral. También inicializa la lista de los últimos 10 valores (self.last_10), la media (self.mu) y la desviación estándar (self.sigma).


  • _push(self, value) : este método privado se usa para actualizar la lista de los últimos 10 valores con el nuevo valor. Inserta el nuevo valor al principio de la lista y elimina el valor más antiguo, manteniendo la longitud de la lista en 10.


  • _recalc_stats(self) : este método privado vuelve a calcular la media y la desviación estándar en función de los valores actuales en la lista self.last_10.


  • push(self, key__value__t) : este método público toma una tupla que contiene una clave, un valor y una marca de tiempo como entrada. Calcula la puntuación Z para el valor, actualiza la lista de los últimos 10 valores y vuelve a calcular la media y la desviación estándar. También registra el punto de datos y su estado de anomalía utilizando las funciones de visualización de Rerun. Finalmente, devuelve la instancia actualizada de la clase ZTestDetector y una tupla que contiene el valor, la media, la desviación estándar y el estado de anomalía.


La clase ZTestDetector se usa en la tubería de flujo de datos como un mapa con estado con el siguiente código:

 flow.stateful_map("AnomalyDetector", lambda: ZTestDetector(2.0), ZTestDetector.push)


Esta línea le dice al flujo de datos que aplique el ZTestDetector con un umbral de puntaje Z de 2.0 y use el método push para procesar los puntos de datos.

Visualización de anomalías

Para visualizar las anomalías, la clase ZTestDetector registra los puntos de datos y su estado de anomalía correspondiente utilizando las funciones de visualización de Rerun. Específicamente, rr.log_scalar se usa para trazar un valor escalar, mientras que rr.log_point se usa para trazar puntos 3D.


El siguiente fragmento de código muestra cómo se crea la visualización:

 rr.log_scalar(f"temp_{key}/data", value, color=[155, 155, 155]) if is_anomalous: rr.log_point(f"3dpoint/anomaly/{key}", [t, value, float(key) * 10], radius=0.3, color=[255,100,100]) rr.log_scalar( f"temp_{key}/data/anomaly", value, scattered=True, radius=3.0, color=[255, 100, 100], ) else: rr.log_point(f"3dpoint/data/{key}", [t, value, float(key) * 10], radius=0.1)


Aquí, primero registramos un valor escalar que representa la métrica. Luego, dependiendo de si el valor es anómalo, registramos un punto 3D con un radio y color diferente. Los puntos anómalos se registran en rojo con un radio más grande, mientras que los puntos no anómalos se registran con un radio más pequeño.

Función output_builder

La función output_builder se usa para definir el comportamiento de salida para la canalización de datos. En este ejemplo específico, es responsable de imprimir el nombre de la métrica, el valor, la media, la desviación estándar y si el valor es anómalo.


La función toma dos argumentos: worker_index y worker_count . Estos argumentos ayudan a la función a comprender el índice del trabajador y la cantidad total de trabajadores en la canalización del flujo de datos.


Aquí está la definición de la función output_builder :

 def output_builder(worker_index, worker_count): def inspector(input): metric, (value, mu, sigma, is_anomalous) = input print( f"{metric}: " f"value = {value}, " f"mu = {mu:.2f}, " f"sigma = {sigma:.2f}, " f"{is_anomalous}" ) return inspector


Esta función es una función de orden superior, lo que significa que devuelve otra función llamada inspector . La función inspector es responsable de procesar la tupla de datos de entrada e imprimir la salida deseada.


La función de generador de salida se usa más adelante en la canalización de flujo de datos al configurar el comportamiento de salida con

 flow.capture(ManualOutputConfig(output_builder)).

Ejecutar el flujo de datos

Bytewax puede ejecutarse como un proceso único o de forma multiproceso. Este flujo de datos se creó para escalar a través de múltiples procesos, pero comenzaremos ejecutándolo como un solo proceso con el módulo de ejecución spawn_cluster .


 spawn_cluster(flow)


Si quisiéramos aumentar el paralelismo, simplemente agregaríamos más procesos como argumentos.


Por ejemplo, spawn_cluster(flow, proc_count=3) .

Para ejecutar el código provisto, simplemente podemos ejecutarlo como un script de Python, pero primero debemos instalar las dependencias.


Cree un nuevo archivo en el mismo directorio que dataflow.py y asígnele el nombre requisitos.txt.


Agregue el siguiente contenido al archivo requirements.txt:

 bytewax==0.15.1 rerun-sdk==0.4.0


Abra una terminal en el directorio que contiene los archivos requirements.txt y dataflow.py.

Instale las dependencias usando el siguiente comando:

 pip install -r requirements.txt


¡Y ejecute el flujo de datos!

 python dataflow.py

Expansión del caso de uso

Si bien el código proporcionado sirve como un ejemplo básico de detección de anomalías en tiempo real, puede expandir esta canalización para adaptarse a escenarios más complejos.


Por ejemplo:

  • Incorpore fuentes de datos del mundo real : reemplace la clase RandomMetricInput con una clase personalizada que lea datos de una fuente del mundo real, como sensores de IoT, archivos de registro o API de transmisión.


  • Implemente técnicas de detección de anomalías más sofisticadas : puede reemplazar la clase ZTestDetector con otros métodos de detección de anomalías con estado, como el promedio móvil, el suavizado exponencial o los enfoques basados en el aprendizaje automático.


  • Personalice la visualización : mejore la visualización de repetición agregando más dimensiones de datos, ajustando los esquemas de color o modificando los estilos de trazado para que se adapten mejor a sus necesidades.


  • Integre con sistemas de alerta y monitoreo : en lugar de simplemente imprimir los resultados de la anomalía, puede integrar la tubería con sistemas de alerta o monitoreo para notificar a las partes interesadas apropiadas cuando se detecta una anomalía.


Al personalizar y ampliar la canalización del flujo de datos, puede crear una potente solución de detección y visualización de anomalías en tiempo real adaptada a su caso de uso específico. La combinación de Bytewax y Rerun ofrece una base versátil y escalable para construir sistemas de procesamiento y visualización de datos en tiempo real.

Conclusión

Esta publicación de blog ha demostrado cómo usar Bytewax y Rerun para crear una visualización de detección de anomalías en tiempo real. Al construir una tubería de flujo de datos con Bytewax e integrar las poderosas capacidades de visualización de Rerun, podemos monitorear e identificar anomalías en nuestros datos a medida que ocurren.




Escrito originalmente por Zander Matheson aquí.

Únete a nuestra comunidad: Slack Github