paint-brush
Como entender seus dados em tempo real usando bytewax e ydata-profilingpor@ydata
792 leituras
792 leituras

Como entender seus dados em tempo real usando bytewax e ydata-profiling

por YData9m2023/07/25
Read on Terminal Reader

Muito longo; Para ler

Apenas um incrível tutorial passo a passo sobre como executar perfis de dados em fluxos de dados 🚀
featured image - Como entender seus dados em tempo real usando bytewax e ydata-profiling
YData HackerNoon profile picture

Nesta postagem do blog, abordaremos como você pode combinar e aproveitar a solução de streaming de código aberto, bytewax , com ydata-profiling , para melhorar a qualidade de seus fluxos de streaming. Preparar-se!


O processamento de fluxo permite a análise em tempo real de dados em trânsito e antes do armazenamento e pode ser com ou sem estado .


Stateful stream processing é usado para recomendações em tempo real , detecção de padrões ou processamento de eventos complexos, onde o histórico do que aconteceu é necessário para o processamento (janelas, junção por uma chave, etc.).


O processamento de fluxo sem estado é usado para transformação em linha que não requer conhecimento de outros pontos de dados no fluxo, como mascarar um e-mail ou converter um tipo.


Foto de Markus Spiske no Unsplash


No geral, os fluxos de dados são amplamente usados no setor e podem ser aplicados a casos de uso como detecção de fraudes , monitoramento de pacientes ou manutenção preditiva de eventos .

Um aspecto crucial que todos os fluxos de dados devem considerar é a qualidade dos dados

Ao contrário dos modelos tradicionais, em que a qualidade dos dados geralmente é avaliada durante a criação do data warehouse ou da solução de painel, o streaming de dados requer monitoramento contínuo .


É essencial manter a qualidade dos dados durante todo o processo, desde a coleta até a alimentação dos aplicativos downstream. Afinal, o custo da má qualidade dos dados pode ser alto para as organizações:


“O custo de dados incorretos é de surpreendentes 15% a 25% da receita para a maioria das empresas. (…) Dois terços desses custos podem ser eliminados ao se antecipar à qualidade dos dados.”


— Thomas C. Redman, autor de “Getting in Front on Data Quality”


Ao longo deste artigo, mostraremos como você pode combinar bytewa com ydata-profiling para criar perfis e melhorar a qualidade de seus fluxos de streaming!

Processamento de fluxo para profissionais de dados com Bytewax

Bytewax é uma estrutura de processamento de fluxo OSS projetada especificamente para desenvolvedores Python.


Ele permite que os usuários criem pipelines de dados de streaming e aplicativos em tempo real com recursos semelhantes ao Flink, Spark e Kafka Streams, ao mesmo tempo em que fornecem uma interface amigável e familiar e 100% de compatibilidade com o ecossistema Python.


Usando embutido conectores ou bibliotecas Python existentes, você pode se conectar a fontes de dados em tempo real e streaming (Kafka, RedPanda, WebSocket, etc.) e gravar dados transformados em vários sistemas downstream (Kafka, arquivos parquet, data lakes, etc.).


Para as transformações, o Bytewax facilita as transformações stateful e stateless com map , windowing e métodos de agregação e vem com recursos familiares, como recuperação e escalabilidade.


Bytewax facilita uma experiência centrada em dados e Python para fluxos de dados e foi criado especificamente para engenheiros e cientistas de dados .


Ele permite que os usuários criem pipelines de dados de streaming e aplicativos em tempo real e criem as personalizações necessárias para atender às suas necessidades sem ter que aprender e manter plataformas de streaming baseadas em JVM, como Spark ou Flink.


Bytewax é adequado para muitos casos de uso, ou seja, Incorporando pipelines para IA generativa , Manipulando valores ausentes em fluxos de dados , Usando modelos de linguagem em um contexto de streaming para entender os mercados financeiros , e mais.


Para inspiração de caso de uso e mais informações como documentação, tutoriais e guias, sinta-se à vontade para verificar site da bytewax .

Por que criação de perfil de dados para fluxos de dados?

A criação de perfil de dados é a chave para um início bem-sucedido de qualquer tarefa de aprendizado de máquina e refere-se à etapa de entendendo completamente nossos dados : sua estrutura, comportamento e qualidade.


Em poucas palavras, perfil de dados envolve a análise de aspectos relacionados ao formato dos dados e descritores básicos (por exemplo, número de amostras, número/tipos de recursos, valores duplicados), sua características intrínsecas (como a presença de dados ausentes ou recursos desequilibrados) e outros fatores complicadores que podem surgir durante a coleta ou processamento de dados (por exemplo, valores incorretos ou recursos inconsistentes).


Garantir altos padrões de qualidade de dados é crucial para todos os domínios e organizações, mas é especialmente relevante para domínios que operam com domínios que produzem dados contínuos , onde as circunstâncias podem mudar rapidamente e exigir ação imediata (por exemplo, monitoramento de saúde, valores de estoque, políticas de qualidade do ar).


Para muitos domínios, o perfil de dados é usado a partir de uma lente de análise exploratória de dados, considerando dados históricos armazenados em bancos de dados. Pelo contrário, para fluxos de dados, o perfil de dados torna-se essencial para validação e controle de qualidade continuamente ao longo do fluxo , onde os dados precisam ser verificados em diferentes prazos ou estágios do processo.


Ao incorporar um perfil automatizado em nossos fluxos de dados , podemos obter feedback imediatamente sobre o estado atual de nossos dados e ser alertados sobre quaisquer problemas potencialmente críticos, sejam eles relacionados à consistência e integridade dos dados (por exemplo, valores corrompidos ou formatos alterados) ou a eventos que ocorrem em curtos períodos de tempo (por exemplo, desvios de dados, desvio de regras de negócios e resultados).


Em domínios do mundo real — onde você simplesmente sabe que a lei de Murphy está prestes a atingir e “tudo pode definitivamente dar errado” — o perfil automatizado pode nos salvar de vários quebra-cabeças cerebrais e sistemas que precisam ser retirados da produção!


No que diz respeito ao perfil de dados, ydata-profiling tem sido consistentemente um favorito da multidão , ou para tabular ou série temporal dados. E não é de admirar: é uma linha de código para um amplo conjunto de análises e insights.


Operações complexas e demoradas são feitas sob o capô: o ydata-profiling detecta automaticamente os tipos de recursos incluídos nos dados e, dependendo dos tipos de recursos (numéricos ou categóricos), ajusta as estatísticas e visualizações resumidas que são mostradas no relatório de criação de perfil.


Promovendo uma análise centrada em dados , o pacote também destaca as relações existentes entre os recursos , concentrando-se em suas interações e correlações pareadas e fornece uma avaliação completa dos alertas de qualidade de dados , desde valores duplicados ou constantes até recursos distorcidos e desequilibrados .


É realmente uma visão 360º da qualidade de nossos dados — com o mínimo de esforço.


Relatório de criação de perfil: destacando possíveis problemas de qualidade de dados.



Juntando tudo: bytewax e ydata-profiling

Antes de iniciar o projeto, precisamos primeiro definir nossas dependências do Python e configurar nossa fonte de dados.


Primeiro, vamos instalar os pacotes bytewax e ydata-profiling ( você pode querer usar um ambiente virtual para isso — verifique estas instruções se precisar de alguma orientação extra!)


 pip install bytewax==0.16.2 ydata-profiling==4.3.1


Em seguida, faremos o upload do Conjunto de dados de telemetria do sensor ambiental (Licença — CC0: Domínio público), que contém várias medições de temperatura, umidade, monóxido de carbono, gás liquefeito de petróleo, fumaça, luz e movimento de diferentes dispositivos IoT:


 wget https://raw.githubusercontent.com/bytewax/air-quality-sensor/main/data/iot_telemetry_data_1000


Em um ambiente de produção, essas medições seriam geradas continuamente por cada dispositivo e a entrada seria semelhante ao que esperamos em uma plataforma de streaming como Kafka . Neste artigo, para simular o contexto que encontraríamos com dados de streaming, vamos ler os dados do arquivo CSV uma linha por vez e criar um fluxo de dados usando bytewax.


(Como uma observação rápida, um fluxo de dados é essencialmente um pipeline de dados que pode ser descrito como um gráfico acíclico direcionado - DAG)


Primeiro, vamos fazer algumas importações necessárias :


 from datetime import datetime, timedelta, timezone from bytewax.dataflow import Dataflow from bytewax.connectors.stdio import StdOutput from bytewax.connectors.files import CSVInput from bytewax.testing import run_main


Em seguida, definimos nosso objeto de fluxo de dados. Em seguida, usaremos um método stateless map onde passamos uma função para converter a string em um objeto DateTime e reestruturar os dados para o formato (device_id, data).


O método map fará a alteração em cada ponto de dados de maneira sem estado. A razão pela qual modificamos a forma de nossos dados é para que possamos agrupar facilmente os dados nas próximas etapas para criar o perfil de dados para cada dispositivo separadamente, em vez de para todos os dispositivos simultaneamente.


 flow = Dataflow() flow.input("simulated_stream", CSVInput("/content/iot_telemetry_data_1000")) # parse timestamp def parse_time(reading_data): reading_data["ts"] = datetime.fromtimestamp(float(reading_data["ts"]), timezone.utc) return reading_data flow.map(parse_time) # remap format to tuple (device_id, reading_data) flow.map(lambda reading_data: (reading_data['device'], reading_data))


Agora, aproveitaremos os recursos de estado do bytewax para coletar dados para cada dispositivo durante o período de tempo que definimos. ydata-profiling espera um instantâneo dos dados ao longo do tempo, o que torna o operador de janela o método perfeito para fazer isso.


Em ydata-profiling , podemos produzir estatísticas resumidas para um dataframe especificado para um contexto específico. Por exemplo, em nosso exemplo, podemos produzir instantâneos de dados referentes a cada dispositivo IoT ou a intervalos de tempo específicos:


 from bytewax.window import EventClockConfig, TumblingWindow # This is the accumulator function, and outputs a list of readings def acc_values(acc, reading): acc.append(reading) return acc # This function instructs the event clock on how to retrieve the # event's datetime from the input. def get_time(reading): return reading["ts"] # Configure the `fold_window` operator to use the event time. cc = EventClockConfig(get_time, wait_for_system_duration=timedelta(seconds=30)) # And a tumbling window align_to = datetime(2020, 1, 1, tzinfo=timezone.utc) wc = TumblingWindow(align_to=align_to, length=timedelta(hours=1)) flow.fold_window("running_average", cc, wc, list, acc_values) flow.inspect(print)


Depois que os instantâneos são definidos, alavancar ydata-profiling é tão simples quanto chamar o ProfileReport para cada um dos dataframes que gostaríamos de analisar:


 import pandas as pd from ydata_profiling import ProfileReport def profile(device_id__readings): print(device_id__readings) device_id, readings = device_id__readings start_time = readings[0]['ts'].replace(minute=0, second=0, microsecond=0).strftime('%Y-%m-%d %H:%M:%S') df = pd.DataFrame(readings) profile = ProfileReport( df, tsmode=True, sortby="ts", title=f"Sensor Readings - device: {device_id}" ) profile.to_file(f"Ts_Profile_{device_id}-{start_time}.html") return f"device {device_id} profiled at hour {start_time}" flow.map(profile)


Neste exemplo, estamos gravando as imagens em arquivos locais como parte de uma função em um método map. Eles podem ser relatados por meio de uma ferramenta de mensagens ou podemos salvá-los em algum armazenamento remoto no futuro.


Depois que o perfil é concluído, o fluxo de dados espera alguma saída para que possamos usar o StdOutput interno para imprimir o dispositivo que foi perfilado e o tempo em que foi criado o perfil que foi passado da função de perfil na etapa do mapa:


 flow.output("out", StdOutput())


Existem várias maneiras de executar fluxos de dados Bytewax. Neste exemplo, usamos a mesma máquina local, mas o Bytewax também pode ser executado em vários processos Python, em vários hosts, em um Contêiner Docker , usando um Cluster do Kubernetes , e mais .


Neste artigo, continuaremos com uma configuração local, mas recomendamos que você verifique nossa ferramenta auxiliar ceractl que gerencia as implantações de fluxo de dados do Kubernetes assim que o pipeline estiver pronto para a transição para a produção.


Assumindo que estamos no mesmo diretório do arquivo com a definição do fluxo de dados, podemos executá-lo usando:


 python -m bytewax.run ydata-profiling-streaming:flow


Podemos usar os relatórios de criação de perfil para validar a qualidade dos dados, verificar alterações em esquemas ou formatos de dados e comparar as características dos dados entre diferentes dispositivos ou janelas de tempo .


Na verdade, podemos aproveitar o funcionalidade de relatório de comparação que destaca as diferenças entre dois perfis de dados de maneira direta, facilitando a detecção de padrões importantes que precisam ser investigados ou questões que precisam ser abordadas:


 snapshot_a_report = ProfileReport(df_a, title="Snapshot A") snapshot_b_report = ProfileReport(df_b, title="Snapshot B") comparison_report =snapshot_a_report(snapshot_b_report) comparison_report.to_file("comparison_report.html")


Pronto para explorar seus próprios fluxos de dados?

A validação de fluxos de dados é crucial para identificar problemas na qualidade dos dados de maneira contínua e comparar o estado dos dados em períodos distintos de tempo.


Para organizações nos setores de saúde , energia , manufatura e entretenimento — todas trabalhando com fluxos contínuos de dados — um perfil automatizado é fundamental para estabelecer práticas recomendadas de governança de dados , desde a avaliação da qualidade até a privacidade dos dados.


Isso requer a análise de instantâneos de dados que, conforme mostrado neste artigo, podem ser obtidos de maneira contínua combinando bytewax e ydata-profiling .


A Bytewax cuida de todos os processos necessários para manipular e estruturar os fluxos de dados em instantâneos, que podem ser resumidos e comparados com a criação de perfil ydata por meio de um relatório abrangente das características dos dados.


Ser capaz de processar e perfilar adequadamente os dados recebidos abre uma infinidade de casos de uso em diferentes domínios, desde a correção de erros em esquemas e formatos de dados até o destaque e mitigação de problemas adicionais derivados de atividades do mundo real, como detecção de anomalias (por exemplo, detecção de fraude ou intrusão/ameaças), mau funcionamento do equipamento e outros eventos que se desviam das expectativas (por exemplo, desvios de dados ou desalinhamento com regras de negócios).


Agora, você está pronto para começar a explorar seus fluxos de dados! Deixe-nos saber quais outros casos de uso você encontra e, como sempre, sinta-se à vontade para nos enviar uma linha nos comentários ou nos encontrar no Comunidade de IA centrada em dados para mais dúvidas e sugestões! Vejo você lá!

Agradecimentos

Este artigo foi escrito por Fabiana Clemente (Cofundadora e CDO @ YData ) e Miriam Santos (Developer Relations @ YData ) -- desenvolvendo ydata-profiling -- e Zander Matheson (CEO e fundador @ Bytewax ) e Oli Makhasoeva (Relações com Desenvolvedores @ Byetwax ) -- desenvolvendo bytewax .


Você pode encontrar informações adicionais sobre os pacotes OSS nas respectivas documentações: documentos de criação de perfil ydata & documentos bytewax .


Também publicado aqui