paint-brush
Visualizando a detecção de anomalias em tempo real com Pythonpor@bytewax
1,209 leituras
1,209 leituras

Visualizando a detecção de anomalias em tempo real com Python

por bytewax13m2023/04/21
Read on Terminal Reader

Muito longo; Para ler

A visualização Python para streaming de dados tem sido desafiadora, levando a soluções complexas baseadas em JavaScript em notebooks. A reexecução com sua natureza Python-Rust simplifica muito. Powered by Bytewax, veremos um exemplo de visualização de detecção de anomalias de streaming.
featured image - Visualizando a detecção de anomalias em tempo real com Python
bytewax HackerNoon profile picture
0-item


O código aberto da Rerun em fevereiro marcou um passo significativo para quem procura bibliotecas de visualização Python acessíveis, mas potentes.


Por que a visualização é importante?

A visualização é essencial, pois empresas como Scale.ai, Weights & Biases e Hugging Face simplificaram o aprendizado profundo abordando a rotulagem de conjuntos de dados, rastreamento de experimentos e modelos pré-treinados. No entanto, ainda existe um vazio na captura e visualização rápida de dados.


Muitas empresas desenvolvem soluções de visualização de dados internamente, mas geralmente acabam com ferramentas abaixo do ideal devido aos altos custos de desenvolvimento. Além disso, a visualização Python em dados de streaming é um problema que também não é bem resolvido, levando a soluções baseadas em JavaScrip t em notebooks. O Rerun aproveita uma interface Python em um mecanismo de visualização Rust de alto desempenho (muito parecido com o Bytewax!) Que facilita a análise de dados de streaming.


Nesta postagem do blog, exploraremos como usar Bytewax e Rerun para visualizar dados de streaming em tempo real em Python e criar uma visualização de detecção de anomalias em tempo real.


Escolhemos a detecção de anomalias, também conhecida como detecção de outliers, porque é um componente crítico em várias aplicações, como segurança cibernética, detecção de fraudes e monitoramento de processos industriais. A visualização dessas anomalias em tempo real pode ajudar a identificar rapidamente possíveis problemas e tomar as ações necessárias para mitigá-los.


Para aqueles que estão ansiosos para mergulhar, confira nossa solução Python de ponta a ponta em nosso GitHub . Não se esqueça de estrelar Bytewax!

Visão geral

Aqui está o que vamos cobrir:

  • Navegaremos pelo código e discutiremos brevemente as entidades de nível superior
  • Em seguida, discutiremos cada etapa do fluxo de dados com mais detalhes: inicialização de nosso fluxo de dados, fonte de entrada, detecção de anomalias de estado, visualização e saída de dados e como gerar um cluster
  • Por fim, aprenderemos como executá-lo e ver a bela visualização, tudo em Python <3
  • Como bônus, pensaremos em outros casos de uso

Vamos!


Configure seu ambiente

Esta postagem de blog é baseada nas seguintes versões de Bytewax e Rerun:

 bytewax==0.15.1 rerun-sdk==0.4.0


Rerun e Bytewax são instaláveis como

 pip install rerun-sdk pip install bytewax


Siga o Bytewax para obter atualizações, pois estamos criando uma nova versão que facilitará ainda mais o desenvolvimento de aplicativos de streaming de dados em Python.

Código

A solução é relativamente compacta, então copiamos todo o exemplo de código aqui. Sinta-se à vontade para pular esta grande parte se parecer esmagadora; discutiremos cada função mais tarde.

 import random # pip install rerun-sdk import rerun as rr from time import sleep from datetime import datetime from bytewax.dataflow import Dataflow from bytewax.execution import spawn_cluster from bytewax.inputs import ManualInputConfig, distribute from bytewax.outputs import ManualOutputConfig rr.init("metrics") rr.spawn() start = datetime.now() def generate_random_metrics(worker_index, worker_count, resume_state): assert resume_state is None keys = ["1", "2", "3", "4", "5", "6"] this_workers_keys = distribute(keys, worker_index, worker_count) for _ in range(1000): for key in this_workers_keys: value = random.randrange(0, 10) if random.random() > 0.9: value *= 2.0 yield None, (key, (key, value, (datetime.now() - start).total_seconds())) sleep(random.random() / 10.0) class ZTestDetector: """Anomaly detector. Use with a call to flow.stateful_map(). Looks at how many standard deviations the current item is away from the mean (Z-score) of the last 10 items. Mark as anomalous if over the threshold specified. """ def __init__(self, threshold_z): self.threshold_z = threshold_z self.last_10 = [] self.mu = None self.sigma = None def _push(self, value): self.last_10.insert(0, value) del self.last_10[10:] def _recalc_stats(self): last_len = len(self.last_10) self.mu = sum(self.last_10) / last_len sigma_sq = sum((value - self.mu) ** 2 for value in self.last_10) / last_len self.sigma = sigma_sq**0.5 def push(self, key__value__t): key, value, t = key__value__t is_anomalous = False if self.mu and self.sigma: is_anomalous = abs(value - self.mu) / self.sigma > self.threshold_z self._push(value) self._recalc_stats() rr.log_scalar(f"temp_{key}/data", value, color=[155, 155, 155]) if is_anomalous: rr.log_point(f"3dpoint/anomaly/{key}", [t, value, float(key) * 10], radius=0.3, color=[255,100,100]) rr.log_scalar( f"temp_{key}/data/anomaly", value, scattered=True, radius=3.0, color=[255, 100, 100], ) else: rr.log_point(f"3dpoint/data/{key}", [t, value, float(key) * 10], radius=0.1) return self, (value, self.mu, self.sigma, is_anomalous) def output_builder(worker_index, worker_count): def inspector(input): metric, (value, mu, sigma, is_anomalous) = input print( f"{metric}: " f"value = {value}, " f"mu = {mu:.2f}, " f"sigma = {sigma:.2f}, " f"{is_anomalous}" ) return inspector if __name__ == '__main__': flow = Dataflow() flow.input("input", ManualInputConfig(generate_random_metrics)) # ("metric", value) flow.stateful_map("AnomalyDetector", lambda: ZTestDetector(2.0), ZTestDetector.push) # ("metric", (value, mu, sigma, is_anomalous)) flow.capture(ManualOutputConfig(output_builder)) spawn_cluster(flow)


O código fornecido demonstra como criar um pipeline de detecção de anomalias em tempo real usando Bytewax e Rerun.


Vamos detalhar os componentes essenciais deste código:

  • generate_random_metrics : Esta função gera métricas aleatórias simulando fluxos de dados do mundo real. Gera pontos de dados com uma pequena chance de ter uma anomalia (valores duplicados).


  • ZTestDetector : Esta classe implementa um detector de anomalias usando o método Z-score. Ele mantém a média e o desvio padrão dos últimos 10 valores e marca um valor como anômalo se seu Z-score for maior que um limite especificado.


  • output_builder : Esta função é usada para definir o comportamento de saída para o pipeline de dados. Nesse caso, imprime o nome da métrica, valor, média, desvio padrão e se o valor é anômalo.


  • Fluxo de dados : a parte principal do código constrói o fluxo de dados usando Bytewax, conectando RandomMetricInput, ZTestDetector e o construtor de saída.


  • Visualização de reexecução : a visualização de reexecução é integrada à classe ZTestDetector. As funções rr.log_scalar e rr.log_point são usadas para plotar os pontos de dados e seu status de anomalia correspondente.


Agora, com o entendimento dos principais componentes do código, vamos discutir passo a passo como a visualização é criada.

Construindo o fluxo de dados

Para criar um pipeline de fluxo de dados, você precisa:


  1. Inicialize um novo fluxo de dados com flow = Dataflow() .
  2. Defina a fonte de entrada usando flow.input("input", ManualInputConfig(generate_random_metrics)) .
  3. Aplique o detector de anomalias com estado usando flow.stateful_map("AnomalyDetector", lambda: ZTestDetector(2.0), ZTestDetector.push) .
  4. Configure o comportamento de saída com flow.capture(ManualOutputConfig(output_builder)) .
  5. Por fim, gere um cluster para executar o fluxo de dados com spawn_cluster(flow, proc_count=3) .


O fluxo de dados resultante lê os valores métricos gerados aleatoriamente de input_builder , passa-os pelo ZTestDetector para detecção de anomalias e gera os resultados usando a função output_builder . Vamos esclarecer os detalhes de cada etapa.

função generate_random_metrics

A função generate_random_metrics serve como uma fonte de entrada alternativa para o pipeline de fluxo de dados, gerando valores de métrica aleatórios de maneira distribuída entre vários trabalhadores. Ele aceita três parâmetros: worker_index , worker_count e resume_state .


 def generate_random_metrics(worker_index, worker_count, resume_state): assert resume_state is None keys = ["1", "2", "3", "4", "5", "6"] this_workers_keys = distribute(keys, worker_index, worker_count) for _ in range(1000): for key in keys: value = random.randrange(0, 10) if random.random() > 0.9: value *= 2.0 yield None, (key, (key, value, (datetime.now() - start).total_seconds())) sleep(random.random() / 10.0)


  • worker_index : o índice do trabalhador atual no pipeline de fluxo de dados.

  • worker_count : o número total de trabalhadores no pipeline de fluxo de dados.

  • resume_state : O estado da fonte de entrada da qual retomar. Nesse caso, é declarado como None , indicando que a fonte de entrada não suporta a retomada de um estado anterior.


Aqui está uma descrição passo a passo da função generate_random_metrics :


  1. Afirme que resume_state é None .
  2. Defina uma lista de chaves que representam as métricas.
  3. Distribua as chaves entre os trabalhadores usando a função de distribuição (não fornecida no trecho de código). As chaves distribuídas para o trabalhador atual são atribuídas a this_workers_keys.
  4. Repita 1.000 vezes e, para cada iteração, percorra a lista de chaves:
    • Gere um valor aleatório entre 0 e 10.

    • Com 10% de probabilidade, duplique o valor para simular uma anomalia.

    • Gera uma tupla contendo None (para indicar nenhuma chave de partição específica), a chave, o valor gerado e o tempo decorrido desde o horário de início (não fornecido no trecho de código).

    • Introduza um tempo de espera entre cada valor gerado para simular a geração de dados em tempo real.


A função generate_random_metrics é usada no fluxo de dados como fonte de entrada com a seguinte linha de código:


 flow.input("input", ManualInputConfig(generate_random_metrics))


Essa linha informa ao fluxo de dados para usar a classe RandomMetricInput para gerar os dados de entrada para o pipeline.

Classe ZTestDetector

A classe ZTestDetector é um detector de anomalias que usa o método Z-score para identificar se um ponto de dados é anômalo ou não. O escore Z é o número de desvios padrão de um ponto de dados da média de um conjunto de dados. Se o Z-score de um ponto de dados for maior que um limite especificado, ele será considerado anômalo.


A classe possui os seguintes métodos:

  • __init__(self, threshold_z) : O construtor inicializa o ZTestDetector com um valor limite de Z-score. Ele também inicializa a lista dos últimos 10 valores (self.last_10), média (self.mu) e desvio padrão (self.sigma).


  • _push(self, value) : Este método privado é usado para atualizar a lista dos últimos 10 valores com o novo valor. Ele insere o novo valor no início da lista e remove o valor mais antigo, mantendo o tamanho da lista em 10.


  • _recalc_stats(self) : Este método privado recalcula a média e o desvio padrão com base nos valores atuais na lista self.last_10.


  • push(self, key__value__t) : Este método público recebe uma tupla contendo uma chave, um valor e um timestamp como entrada. Ele calcula o Z-score para o valor, atualiza a lista dos últimos 10 valores e recalcula a média e o desvio padrão. Ele também registra o ponto de dados e seu status de anomalia usando as funções de visualização do Rerun. Por fim, ele retorna a instância atualizada da classe ZTestDetector e uma tupla contendo o valor, média, desvio padrão e status de anomalia.


A classe ZTestDetector é usada no pipeline de fluxo de dados como um mapa de estado com o seguinte código:

 flow.stateful_map("AnomalyDetector", lambda: ZTestDetector(2.0), ZTestDetector.push)


Essa linha informa ao fluxo de dados para aplicar o ZTestDetector com um limite de pontuação Z de 2.0 e usar o método push para processar os pontos de dados.

Visualizando Anomalias

Para visualizar as anomalias, a classe ZTestDetector registra os pontos de dados e seu status de anomalia correspondente usando as funções de visualização do Rerun. Especificamente, rr.log_scalar é usado para plotar um valor escalar, enquanto rr.log_point é usado para plotar pontos 3D.


O trecho de código a seguir mostra como a visualização é criada:

 rr.log_scalar(f"temp_{key}/data", value, color=[155, 155, 155]) if is_anomalous: rr.log_point(f"3dpoint/anomaly/{key}", [t, value, float(key) * 10], radius=0.3, color=[255,100,100]) rr.log_scalar( f"temp_{key}/data/anomaly", value, scattered=True, radius=3.0, color=[255, 100, 100], ) else: rr.log_point(f"3dpoint/data/{key}", [t, value, float(key) * 10], radius=0.1)


Aqui, primeiro registramos um valor escalar que representa a métrica. Então, dependendo se o valor é anômalo, registramos um ponto 3D com raio e cor diferentes. Os pontos anômalos são registrados em vermelho com um raio maior, enquanto os pontos não anômalos são registrados com um raio menor.

função output_builder

A função output_builder é usada para definir o comportamento de saída para o pipeline de dados. Neste exemplo específico, ele é responsável por imprimir o nome da métrica, valor, média, desvio padrão e se o valor é anômalo.


A função recebe dois argumentos: worker_index e worker_count . Esses argumentos ajudam a função a entender o índice do trabalhador e o número total de trabalhadores no pipeline de fluxo de dados.


Aqui está a definição da função output_builder :

 def output_builder(worker_index, worker_count): def inspector(input): metric, (value, mu, sigma, is_anomalous) = input print( f"{metric}: " f"value = {value}, " f"mu = {mu:.2f}, " f"sigma = {sigma:.2f}, " f"{is_anomalous}" ) return inspector


Esta função é uma função de ordem superior, o que significa que ela retorna outra função chamada inspector . A função inspector é responsável por processar a tupla de dados de entrada e imprimir a saída desejada.


A função do construtor de saída é usada posteriormente no pipeline de fluxo de dados ao configurar o comportamento de saída com

 flow.capture(ManualOutputConfig(output_builder)).

Executando o fluxo de dados

Bytewax pode ser executado como um único processo ou de forma multi-processo. Esse fluxo de dados foi criado para escalar em vários processos, mas começaremos a executá-lo como um único processo com o módulo de execução spawn_cluster .


 spawn_cluster(flow)


Se quiséssemos aumentar o paralelismo, simplesmente adicionaríamos mais processos como argumentos.


Por exemplo - spawn_cluster(flow, proc_count=3) .

Para executar o código fornecido, podemos simplesmente executá-lo como um script Python, mas primeiro precisamos instalar as dependências.


Crie um novo arquivo no mesmo diretório que dataflow.py e nomeie-o como requirements.txt.


Adicione o seguinte conteúdo ao arquivo requirements.txt:

 bytewax==0.15.1 rerun-sdk==0.4.0


Abra um terminal no diretório que contém os arquivos requirements.txt e dataflow.py.

Instale as dependências usando o seguinte comando:

 pip install -r requirements.txt


E execute o fluxo de dados!

 python dataflow.py

Expandindo o caso de uso

Embora o código fornecido sirva como um exemplo básico de detecção de anomalias em tempo real, você pode expandir esse pipeline para acomodar cenários mais complexos.


Por exemplo:

  • Incorpore fontes de dados do mundo real : substitua a classe RandomMetricInput por uma classe personalizada que lê dados de uma fonte do mundo real, como sensores de IoT, arquivos de log ou APIs de streaming.


  • Implemente técnicas de detecção de anomalias mais sofisticadas : você pode substituir a classe ZTestDetector por outros métodos de detecção de anomalias com estado, como média móvel, suavização exponencial ou abordagens baseadas em aprendizado de máquina.


  • Personalize a visualização : aprimore a visualização Rerun adicionando mais dimensões de dados, ajustando os esquemas de cores ou modificando os estilos de plotagem para atender melhor às suas necessidades.


  • Integre com sistemas de alerta e monitoramento : em vez de simplesmente imprimir os resultados da anomalia, você pode integrar o pipeline com sistemas de alerta ou monitoramento para notificar as partes interessadas apropriadas quando uma anomalia é detectada.


Ao personalizar e estender o pipeline de fluxo de dados, você pode criar uma poderosa solução de detecção e visualização de anomalias em tempo real, adaptada ao seu caso de uso específico. A combinação de Bytewax e Rerun oferece uma base versátil e escalável para a construção de sistemas de visualização e processamento de dados em tempo real.

Conclusão

Esta postagem de blog demonstrou como usar Bytewax e Rerun para criar uma visualização de detecção de anomalias em tempo real. Ao construir um pipeline de fluxo de dados com Bytewax e integrar os poderosos recursos de visualização do Rerun, podemos monitorar e identificar anomalias em nossos dados à medida que ocorrem.




Originalmente escrito por Zander Matheson aqui.

Junte-se à nossa comunidade: Slack Github