O código aberto da Rerun em fevereiro marcou um passo significativo para quem procura bibliotecas de visualização Python acessíveis, mas potentes.
A visualização é essencial, pois empresas como Scale.ai, Weights & Biases e Hugging Face simplificaram o aprendizado profundo abordando a rotulagem de conjuntos de dados, rastreamento de experimentos e modelos pré-treinados. No entanto, ainda existe um vazio na captura e visualização rápida de dados.
Muitas empresas desenvolvem soluções de visualização de dados internamente, mas geralmente acabam com ferramentas abaixo do ideal devido aos altos custos de desenvolvimento. Além disso, a visualização Python em dados de streaming é um problema que também não é bem resolvido, levando a soluções baseadas em JavaScrip t em notebooks. O Rerun aproveita uma interface Python em um mecanismo de visualização Rust de alto desempenho (muito parecido com o Bytewax!) Que facilita a análise de dados de streaming.
Nesta postagem do blog, exploraremos como usar Bytewax e Rerun para visualizar dados de streaming em tempo real em Python e criar uma visualização de detecção de anomalias em tempo real.
Escolhemos a detecção de anomalias, também conhecida como detecção de outliers, porque é um componente crítico em várias aplicações, como segurança cibernética, detecção de fraudes e monitoramento de processos industriais. A visualização dessas anomalias em tempo real pode ajudar a identificar rapidamente possíveis problemas e tomar as ações necessárias para mitigá-los.
Para aqueles que estão ansiosos para mergulhar, confira nossa solução Python de ponta a ponta em nosso GitHub . Não se esqueça de estrelar Bytewax!
Aqui está o que vamos cobrir:
Vamos!
Esta postagem de blog é baseada nas seguintes versões de Bytewax e Rerun:
bytewax==0.15.1 rerun-sdk==0.4.0
Rerun e Bytewax são instaláveis como
pip install rerun-sdk pip install bytewax
Siga o Bytewax para obter atualizações, pois estamos criando uma nova versão que facilitará ainda mais o desenvolvimento de aplicativos de streaming de dados em Python.
A solução é relativamente compacta, então copiamos todo o exemplo de código aqui. Sinta-se à vontade para pular esta grande parte se parecer esmagadora; discutiremos cada função mais tarde.
import random # pip install rerun-sdk import rerun as rr from time import sleep from datetime import datetime from bytewax.dataflow import Dataflow from bytewax.execution import spawn_cluster from bytewax.inputs import ManualInputConfig, distribute from bytewax.outputs import ManualOutputConfig rr.init("metrics") rr.spawn() start = datetime.now() def generate_random_metrics(worker_index, worker_count, resume_state): assert resume_state is None keys = ["1", "2", "3", "4", "5", "6"] this_workers_keys = distribute(keys, worker_index, worker_count) for _ in range(1000): for key in this_workers_keys: value = random.randrange(0, 10) if random.random() > 0.9: value *= 2.0 yield None, (key, (key, value, (datetime.now() - start).total_seconds())) sleep(random.random() / 10.0) class ZTestDetector: """Anomaly detector. Use with a call to flow.stateful_map(). Looks at how many standard deviations the current item is away from the mean (Z-score) of the last 10 items. Mark as anomalous if over the threshold specified. """ def __init__(self, threshold_z): self.threshold_z = threshold_z self.last_10 = [] self.mu = None self.sigma = None def _push(self, value): self.last_10.insert(0, value) del self.last_10[10:] def _recalc_stats(self): last_len = len(self.last_10) self.mu = sum(self.last_10) / last_len sigma_sq = sum((value - self.mu) ** 2 for value in self.last_10) / last_len self.sigma = sigma_sq**0.5 def push(self, key__value__t): key, value, t = key__value__t is_anomalous = False if self.mu and self.sigma: is_anomalous = abs(value - self.mu) / self.sigma > self.threshold_z self._push(value) self._recalc_stats() rr.log_scalar(f"temp_{key}/data", value, color=[155, 155, 155]) if is_anomalous: rr.log_point(f"3dpoint/anomaly/{key}", [t, value, float(key) * 10], radius=0.3, color=[255,100,100]) rr.log_scalar( f"temp_{key}/data/anomaly", value, scattered=True, radius=3.0, color=[255, 100, 100], ) else: rr.log_point(f"3dpoint/data/{key}", [t, value, float(key) * 10], radius=0.1) return self, (value, self.mu, self.sigma, is_anomalous) def output_builder(worker_index, worker_count): def inspector(input): metric, (value, mu, sigma, is_anomalous) = input print( f"{metric}: " f"value = {value}, " f"mu = {mu:.2f}, " f"sigma = {sigma:.2f}, " f"{is_anomalous}" ) return inspector if __name__ == '__main__': flow = Dataflow() flow.input("input", ManualInputConfig(generate_random_metrics)) # ("metric", value) flow.stateful_map("AnomalyDetector", lambda: ZTestDetector(2.0), ZTestDetector.push) # ("metric", (value, mu, sigma, is_anomalous)) flow.capture(ManualOutputConfig(output_builder)) spawn_cluster(flow)
O código fornecido demonstra como criar um pipeline de detecção de anomalias em tempo real usando Bytewax e Rerun.
Vamos detalhar os componentes essenciais deste código:
generate_random_metrics : Esta função gera métricas aleatórias simulando fluxos de dados do mundo real. Gera pontos de dados com uma pequena chance de ter uma anomalia (valores duplicados).
ZTestDetector : Esta classe implementa um detector de anomalias usando o método Z-score. Ele mantém a média e o desvio padrão dos últimos 10 valores e marca um valor como anômalo se seu Z-score for maior que um limite especificado.
output_builder : Esta função é usada para definir o comportamento de saída para o pipeline de dados. Nesse caso, imprime o nome da métrica, valor, média, desvio padrão e se o valor é anômalo.
Fluxo de dados : a parte principal do código constrói o fluxo de dados usando Bytewax, conectando RandomMetricInput, ZTestDetector e o construtor de saída.
Visualização de reexecução : a visualização de reexecução é integrada à classe ZTestDetector. As funções rr.log_scalar e rr.log_point são usadas para plotar os pontos de dados e seu status de anomalia correspondente.
Agora, com o entendimento dos principais componentes do código, vamos discutir passo a passo como a visualização é criada.
Para criar um pipeline de fluxo de dados, você precisa:
flow = Dataflow()
.flow.input("input", ManualInputConfig(generate_random_metrics))
.flow.stateful_map("AnomalyDetector", lambda: ZTestDetector(2.0), ZTestDetector.push)
.flow.capture(ManualOutputConfig(output_builder))
.spawn_cluster(flow, proc_count=3)
.
O fluxo de dados resultante lê os valores métricos gerados aleatoriamente de input_builder
, passa-os pelo ZTestDetector
para detecção de anomalias e gera os resultados usando a função output_builder
. Vamos esclarecer os detalhes de cada etapa.
generate_random_metrics
A função generate_random_metrics
serve como uma fonte de entrada alternativa para o pipeline de fluxo de dados, gerando valores de métrica aleatórios de maneira distribuída entre vários trabalhadores. Ele aceita três parâmetros: worker_index
, worker_count
e resume_state
.
def generate_random_metrics(worker_index, worker_count, resume_state): assert resume_state is None keys = ["1", "2", "3", "4", "5", "6"] this_workers_keys = distribute(keys, worker_index, worker_count) for _ in range(1000): for key in keys: value = random.randrange(0, 10) if random.random() > 0.9: value *= 2.0 yield None, (key, (key, value, (datetime.now() - start).total_seconds())) sleep(random.random() / 10.0)
worker_index
: o índice do trabalhador atual no pipeline de fluxo de dados.
worker_count
: o número total de trabalhadores no pipeline de fluxo de dados.
resume_state
: O estado da fonte de entrada da qual retomar. Nesse caso, é declarado como None
, indicando que a fonte de entrada não suporta a retomada de um estado anterior.
Aqui está uma descrição passo a passo da função generate_random_metrics
:
resume_state
é None
.Gere um valor aleatório entre 0 e 10.
Com 10% de probabilidade, duplique o valor para simular uma anomalia.
Gera uma tupla contendo None (para indicar nenhuma chave de partição específica), a chave, o valor gerado e o tempo decorrido desde o horário de início (não fornecido no trecho de código).
Introduza um tempo de espera entre cada valor gerado para simular a geração de dados em tempo real.
A função generate_random_metrics
é usada no fluxo de dados como fonte de entrada com a seguinte linha de código:
flow.input("input", ManualInputConfig(generate_random_metrics))
Essa linha informa ao fluxo de dados para usar a classe RandomMetricInput
para gerar os dados de entrada para o pipeline.
ZTestDetector
A classe ZTestDetector
é um detector de anomalias que usa o método Z-score para identificar se um ponto de dados é anômalo ou não. O escore Z é o número de desvios padrão de um ponto de dados da média de um conjunto de dados. Se o Z-score de um ponto de dados for maior que um limite especificado, ele será considerado anômalo.
A classe possui os seguintes métodos:
__init__(self, threshold_z)
: O construtor inicializa o ZTestDetector com um valor limite de Z-score. Ele também inicializa a lista dos últimos 10 valores (self.last_10), média (self.mu) e desvio padrão (self.sigma).
_push(self, value)
: Este método privado é usado para atualizar a lista dos últimos 10 valores com o novo valor. Ele insere o novo valor no início da lista e remove o valor mais antigo, mantendo o tamanho da lista em 10.
_recalc_stats(self)
: Este método privado recalcula a média e o desvio padrão com base nos valores atuais na lista self.last_10.
push(self, key__value__t)
: Este método público recebe uma tupla contendo uma chave, um valor e um timestamp como entrada. Ele calcula o Z-score para o valor, atualiza a lista dos últimos 10 valores e recalcula a média e o desvio padrão. Ele também registra o ponto de dados e seu status de anomalia usando as funções de visualização do Rerun. Por fim, ele retorna a instância atualizada da classe ZTestDetector e uma tupla contendo o valor, média, desvio padrão e status de anomalia.
A classe ZTestDetector é usada no pipeline de fluxo de dados como um mapa de estado com o seguinte código:
flow.stateful_map("AnomalyDetector", lambda: ZTestDetector(2.0), ZTestDetector.push)
Essa linha informa ao fluxo de dados para aplicar o ZTestDetector
com um limite de pontuação Z de 2.0
e usar o método push
para processar os pontos de dados.
Para visualizar as anomalias, a classe ZTestDetector
registra os pontos de dados e seu status de anomalia correspondente usando as funções de visualização do Rerun. Especificamente, rr.log_scalar
é usado para plotar um valor escalar, enquanto rr.log_point
é usado para plotar pontos 3D.
O trecho de código a seguir mostra como a visualização é criada:
rr.log_scalar(f"temp_{key}/data", value, color=[155, 155, 155]) if is_anomalous: rr.log_point(f"3dpoint/anomaly/{key}", [t, value, float(key) * 10], radius=0.3, color=[255,100,100]) rr.log_scalar( f"temp_{key}/data/anomaly", value, scattered=True, radius=3.0, color=[255, 100, 100], ) else: rr.log_point(f"3dpoint/data/{key}", [t, value, float(key) * 10], radius=0.1)
Aqui, primeiro registramos um valor escalar que representa a métrica. Então, dependendo se o valor é anômalo, registramos um ponto 3D com raio e cor diferentes. Os pontos anômalos são registrados em vermelho com um raio maior, enquanto os pontos não anômalos são registrados com um raio menor.
output_builder
A função output_builder
é usada para definir o comportamento de saída para o pipeline de dados. Neste exemplo específico, ele é responsável por imprimir o nome da métrica, valor, média, desvio padrão e se o valor é anômalo.
A função recebe dois argumentos: worker_index
e worker_count
. Esses argumentos ajudam a função a entender o índice do trabalhador e o número total de trabalhadores no pipeline de fluxo de dados.
Aqui está a definição da função output_builder
:
def output_builder(worker_index, worker_count): def inspector(input): metric, (value, mu, sigma, is_anomalous) = input print( f"{metric}: " f"value = {value}, " f"mu = {mu:.2f}, " f"sigma = {sigma:.2f}, " f"{is_anomalous}" ) return inspector
Esta função é uma função de ordem superior, o que significa que ela retorna outra função chamada inspector
. A função inspector
é responsável por processar a tupla de dados de entrada e imprimir a saída desejada.
A função do construtor de saída é usada posteriormente no pipeline de fluxo de dados ao configurar o comportamento de saída com
flow.capture(ManualOutputConfig(output_builder)).
Bytewax pode ser executado como um único processo ou de forma multi-processo. Esse fluxo de dados foi criado para escalar em vários processos, mas começaremos a executá-lo como um único processo com o módulo de execução spawn_cluster
.
spawn_cluster(flow)
Se quiséssemos aumentar o paralelismo, simplesmente adicionaríamos mais processos como argumentos.
Por exemplo - spawn_cluster(flow, proc_count=3)
.
Para executar o código fornecido, podemos simplesmente executá-lo como um script Python, mas primeiro precisamos instalar as dependências.
Crie um novo arquivo no mesmo diretório que dataflow.py e nomeie-o como requirements.txt.
Adicione o seguinte conteúdo ao arquivo requirements.txt:
bytewax==0.15.1 rerun-sdk==0.4.0
Abra um terminal no diretório que contém os arquivos requirements.txt e dataflow.py.
Instale as dependências usando o seguinte comando:
pip install -r requirements.txt
E execute o fluxo de dados!
python dataflow.py
Embora o código fornecido sirva como um exemplo básico de detecção de anomalias em tempo real, você pode expandir esse pipeline para acomodar cenários mais complexos.
Por exemplo:
Incorpore fontes de dados do mundo real : substitua a classe RandomMetricInput por uma classe personalizada que lê dados de uma fonte do mundo real, como sensores de IoT, arquivos de log ou APIs de streaming.
Implemente técnicas de detecção de anomalias mais sofisticadas : você pode substituir a classe ZTestDetector por outros métodos de detecção de anomalias com estado, como média móvel, suavização exponencial ou abordagens baseadas em aprendizado de máquina.
Personalize a visualização : aprimore a visualização Rerun adicionando mais dimensões de dados, ajustando os esquemas de cores ou modificando os estilos de plotagem para atender melhor às suas necessidades.
Integre com sistemas de alerta e monitoramento : em vez de simplesmente imprimir os resultados da anomalia, você pode integrar o pipeline com sistemas de alerta ou monitoramento para notificar as partes interessadas apropriadas quando uma anomalia é detectada.
Ao personalizar e estender o pipeline de fluxo de dados, você pode criar uma poderosa solução de detecção e visualização de anomalias em tempo real, adaptada ao seu caso de uso específico. A combinação de Bytewax e Rerun oferece uma base versátil e escalável para a construção de sistemas de visualização e processamento de dados em tempo real.
Esta postagem de blog demonstrou como usar Bytewax e Rerun para criar uma visualização de detecção de anomalias em tempo real. Ao construir um pipeline de fluxo de dados com Bytewax e integrar os poderosos recursos de visualização do Rerun, podemos monitorar e identificar anomalias em nossos dados à medida que ocorrem.
Originalmente escrito por Zander Matheson aqui.