Nesta postagem do blog, abordaremos como você pode combinar e aproveitar a solução de streaming de código aberto, bytewax , com ydata-profiling , para melhorar a qualidade de seus fluxos de streaming. Preparar-se!
O processamento de fluxo permite a análise em tempo real de dados em trânsito e antes do armazenamento e pode ser com ou sem estado .
Stateful stream processing é usado para recomendações em tempo real , detecção de padrões ou processamento de eventos complexos, onde o histórico do que aconteceu é necessário para o processamento (janelas, junção por uma chave, etc.).
O processamento de fluxo sem estado é usado para transformação em linha que não requer conhecimento de outros pontos de dados no fluxo, como mascarar um e-mail ou converter um tipo.
No geral, os fluxos de dados são amplamente usados no setor e podem ser aplicados a casos de uso como detecção de fraudes , monitoramento de pacientes ou manutenção preditiva de eventos .
Ao contrário dos modelos tradicionais, em que a qualidade dos dados geralmente é avaliada durante a criação do data warehouse ou da solução de painel, o streaming de dados requer monitoramento contínuo .
É essencial manter a qualidade dos dados durante todo o processo, desde a coleta até a alimentação dos aplicativos downstream. Afinal, o custo da má qualidade dos dados pode ser alto para as organizações:
“O custo de dados incorretos é de surpreendentes 15% a 25% da receita para a maioria das empresas. (…) Dois terços desses custos podem ser eliminados ao se antecipar à qualidade dos dados.”
— Thomas C. Redman, autor de “Getting in Front on Data Quality”
Ao longo deste artigo, mostraremos como você pode combinar bytewa
com ydata-profiling
para criar perfis e melhorar a qualidade de seus fluxos de streaming!
Ele permite que os usuários criem pipelines de dados de streaming e aplicativos em tempo real com recursos semelhantes ao Flink, Spark e Kafka Streams, ao mesmo tempo em que fornecem uma interface amigável e familiar e 100% de compatibilidade com o ecossistema Python.
Usando embutido
Para as transformações, o Bytewax facilita as transformações stateful e stateless com map , windowing e métodos de agregação e vem com recursos familiares, como recuperação e escalabilidade.
Bytewax
Ele permite que os usuários criem pipelines de dados de streaming e aplicativos em tempo real e criem as personalizações necessárias para atender às suas necessidades sem ter que aprender e manter plataformas de streaming baseadas em JVM, como Spark ou Flink.
Bytewax é adequado para muitos casos de uso, ou seja,
Para inspiração de caso de uso e mais informações como documentação, tutoriais e guias, sinta-se à vontade para verificar
A criação de perfil de dados é a chave para um início bem-sucedido de qualquer tarefa de aprendizado de máquina e refere-se à etapa de
Em poucas palavras,
Garantir altos padrões de qualidade de dados é crucial para todos os domínios e organizações, mas é especialmente relevante para domínios que operam com domínios que produzem dados contínuos , onde as circunstâncias podem mudar rapidamente e exigir ação imediata (por exemplo, monitoramento de saúde, valores de estoque, políticas de qualidade do ar).
Para muitos domínios, o perfil de dados é usado a partir de uma lente de análise exploratória de dados, considerando dados históricos armazenados em bancos de dados. Pelo contrário, para fluxos de dados, o perfil de dados torna-se essencial para validação e controle de qualidade continuamente ao longo do fluxo , onde os dados precisam ser verificados em diferentes prazos ou estágios do processo.
Ao incorporar um perfil automatizado em nossos fluxos de dados , podemos obter feedback imediatamente sobre o estado atual de nossos dados e ser alertados sobre quaisquer problemas potencialmente críticos, sejam eles relacionados à consistência e integridade dos dados (por exemplo, valores corrompidos ou formatos alterados) ou a eventos que ocorrem em curtos períodos de tempo (por exemplo, desvios de dados, desvio de regras de negócios e resultados).
Em domínios do mundo real — onde você simplesmente sabe que a lei de Murphy está prestes a atingir e “tudo pode definitivamente dar errado” — o perfil automatizado pode nos salvar de vários quebra-cabeças cerebrais e sistemas que precisam ser retirados da produção!
No que diz respeito ao perfil de dados, ydata-profiling
tem sido consistentemente um
Operações complexas e demoradas são feitas sob o capô: o ydata-profiling detecta automaticamente os tipos de recursos incluídos nos dados e, dependendo dos tipos de recursos (numéricos ou categóricos), ajusta as estatísticas e visualizações resumidas que são mostradas no relatório de criação de perfil.
Promovendo uma análise centrada em dados , o pacote também destaca as relações existentes entre os recursos , concentrando-se em suas interações e correlações pareadas e fornece uma avaliação completa dos alertas de qualidade de dados , desde valores duplicados ou constantes até recursos distorcidos e desequilibrados .
É realmente uma visão 360º da qualidade de nossos dados — com o mínimo de esforço.
Antes de iniciar o projeto, precisamos primeiro definir nossas dependências do Python e configurar nossa fonte de dados.
Primeiro, vamos instalar os pacotes bytewax
e ydata-profiling
( você pode querer usar um ambiente virtual para isso —
pip install bytewax==0.16.2 ydata-profiling==4.3.1
Em seguida, faremos o upload do
wget https://raw.githubusercontent.com/bytewax/air-quality-sensor/main/data/iot_telemetry_data_1000
Em um ambiente de produção, essas medições seriam geradas continuamente por cada dispositivo e a entrada seria semelhante ao que esperamos em uma plataforma de streaming
(Como uma observação rápida, um fluxo de dados é essencialmente um pipeline de dados que pode ser descrito como um gráfico acíclico direcionado - DAG)
Primeiro, vamos fazer algumas importações necessárias :
from datetime import datetime, timedelta, timezone from bytewax.dataflow import Dataflow from bytewax.connectors.stdio import StdOutput from bytewax.connectors.files import CSVInput from bytewax.testing import run_main
Em seguida, definimos nosso objeto de fluxo de dados. Em seguida, usaremos um método stateless map onde passamos uma função para converter a string em um objeto DateTime e reestruturar os dados para o formato (device_id, data).
O método map fará a alteração em cada ponto de dados de maneira sem estado. A razão pela qual modificamos a forma de nossos dados é para que possamos agrupar facilmente os dados nas próximas etapas para criar o perfil de dados para cada dispositivo separadamente, em vez de para todos os dispositivos simultaneamente.
flow = Dataflow() flow.input("simulated_stream", CSVInput("/content/iot_telemetry_data_1000")) # parse timestamp def parse_time(reading_data): reading_data["ts"] = datetime.fromtimestamp(float(reading_data["ts"]), timezone.utc) return reading_data flow.map(parse_time) # remap format to tuple (device_id, reading_data) flow.map(lambda reading_data: (reading_data['device'], reading_data))
Agora, aproveitaremos os recursos de estado do bytewax
para coletar dados para cada dispositivo durante o período de tempo que definimos. ydata-profiling
espera um instantâneo dos dados ao longo do tempo, o que torna o operador de janela o método perfeito para fazer isso.
Em ydata-profiling
, podemos produzir estatísticas resumidas para um dataframe especificado para um contexto específico. Por exemplo, em nosso exemplo, podemos produzir instantâneos de dados referentes a cada dispositivo IoT ou a intervalos de tempo específicos:
from bytewax.window import EventClockConfig, TumblingWindow # This is the accumulator function, and outputs a list of readings def acc_values(acc, reading): acc.append(reading) return acc # This function instructs the event clock on how to retrieve the # event's datetime from the input. def get_time(reading): return reading["ts"] # Configure the `fold_window` operator to use the event time. cc = EventClockConfig(get_time, wait_for_system_duration=timedelta(seconds=30)) # And a tumbling window align_to = datetime(2020, 1, 1, tzinfo=timezone.utc) wc = TumblingWindow(align_to=align_to, length=timedelta(hours=1)) flow.fold_window("running_average", cc, wc, list, acc_values) flow.inspect(print)
Depois que os instantâneos são definidos, alavancar ydata-profiling
é tão simples quanto chamar o ProfileReport
para cada um dos dataframes que gostaríamos de analisar:
import pandas as pd from ydata_profiling import ProfileReport def profile(device_id__readings): print(device_id__readings) device_id, readings = device_id__readings start_time = readings[0]['ts'].replace(minute=0, second=0, microsecond=0).strftime('%Y-%m-%d %H:%M:%S') df = pd.DataFrame(readings) profile = ProfileReport( df, tsmode=True, sortby="ts", title=f"Sensor Readings - device: {device_id}" ) profile.to_file(f"Ts_Profile_{device_id}-{start_time}.html") return f"device {device_id} profiled at hour {start_time}" flow.map(profile)
Neste exemplo, estamos gravando as imagens em arquivos locais como parte de uma função em um método map. Eles podem ser relatados por meio de uma ferramenta de mensagens ou podemos salvá-los em algum armazenamento remoto no futuro.
Depois que o perfil é concluído, o fluxo de dados espera alguma saída para que possamos usar o StdOutput
interno para imprimir o dispositivo que foi perfilado e o tempo em que foi criado o perfil que foi passado da função de perfil na etapa do mapa:
flow.output("out", StdOutput())
Existem várias maneiras de executar fluxos de dados Bytewax. Neste exemplo, usamos a mesma máquina local, mas o Bytewax também pode ser executado em vários processos Python, em vários hosts, em um
Neste artigo, continuaremos com uma configuração local, mas recomendamos que você verifique nossa ferramenta auxiliar
Assumindo que estamos no mesmo diretório do arquivo com a definição do fluxo de dados, podemos executá-lo usando:
python -m bytewax.run ydata-profiling-streaming:flow
Podemos usar os relatórios de criação de perfil para validar a qualidade dos dados, verificar alterações em esquemas ou formatos de dados e comparar as características dos dados entre diferentes dispositivos ou janelas de tempo .
Na verdade, podemos aproveitar o
snapshot_a_report = ProfileReport(df_a, title="Snapshot A") snapshot_b_report = ProfileReport(df_b, title="Snapshot B") comparison_report =snapshot_a_report(snapshot_b_report) comparison_report.to_file("comparison_report.html")
A validação de fluxos de dados é crucial para identificar problemas na qualidade dos dados de maneira contínua e comparar o estado dos dados em períodos distintos de tempo.
Para organizações nos setores de saúde , energia , manufatura e entretenimento — todas trabalhando com fluxos contínuos de dados — um perfil automatizado é fundamental para estabelecer práticas recomendadas de governança de dados , desde a avaliação da qualidade até a privacidade dos dados.
Isso requer a análise de instantâneos de dados que, conforme mostrado neste artigo, podem ser obtidos de maneira contínua combinando bytewax
e ydata-profiling
.
A Bytewax cuida de todos os processos necessários para manipular e estruturar os fluxos de dados em instantâneos, que podem ser resumidos e comparados com a criação de perfil ydata por meio de um relatório abrangente das características dos dados.
Ser capaz de processar e perfilar adequadamente os dados recebidos abre uma infinidade de casos de uso em diferentes domínios, desde a correção de erros em esquemas e formatos de dados até o destaque e mitigação de problemas adicionais derivados de atividades do mundo real, como detecção de anomalias (por exemplo, detecção de fraude ou intrusão/ameaças), mau funcionamento do equipamento e outros eventos que se desviam das expectativas (por exemplo, desvios de dados ou desalinhamento com regras de negócios).
Agora, você está pronto para começar a explorar seus fluxos de dados! Deixe-nos saber quais outros casos de uso você encontra e, como sempre, sinta-se à vontade para nos enviar uma linha nos comentários ou nos encontrar no
Este artigo foi escrito por Fabiana Clemente (Cofundadora e CDO @
Você pode encontrar informações adicionais sobre os pacotes OSS nas respectivas documentações:
Também publicado aqui