Nesta postagem do blog, abordaremos como você pode combinar e aproveitar a solução de streaming de código aberto, , com , para melhorar a qualidade de seus fluxos de streaming. Preparar-se! bytewax ydata-profiling O processamento de fluxo permite a análise em tempo real de dados em trânsito e antes do armazenamento e pode ser ou . com sem estado é usado para recomendações , detecção de padrões ou processamento de eventos complexos, onde o histórico do que aconteceu é necessário para o processamento (janelas, junção por uma chave, etc.). Stateful stream processing em tempo real é usado para transformação que não requer conhecimento de outros pontos de dados no fluxo, como mascarar um e-mail ou converter um tipo. O processamento de fluxo sem estado em linha No geral, os fluxos de dados são amplamente usados no setor e podem ser aplicados a casos de uso como , ou . detecção de fraudes monitoramento de pacientes manutenção preditiva de eventos Um aspecto crucial que todos os fluxos de dados devem considerar é a qualidade dos dados Ao contrário dos modelos tradicionais, em que a qualidade dos dados geralmente é avaliada durante a criação do data warehouse ou da solução de painel, . o streaming de dados requer monitoramento contínuo É essencial manter a qualidade dos dados durante todo o processo, desde a coleta até a alimentação dos aplicativos downstream. Afinal, o custo da má qualidade dos dados pode ser alto para as organizações: “O custo de dados incorretos é de surpreendentes 15% a 25% da receita para a maioria das empresas. (…) Dois terços desses custos podem ser eliminados ao se antecipar à qualidade dos dados.” — Thomas C. Redman, autor de “Getting in Front on Data Quality” Ao longo deste artigo, mostraremos como você pode combinar com para criar perfis e melhorar a qualidade de seus fluxos de streaming! bytewa ydata-profiling Processamento de fluxo para profissionais de dados com Bytewax é uma estrutura de processamento de fluxo OSS projetada especificamente para desenvolvedores Python. Bytewax Ele permite que os usuários com recursos semelhantes ao Flink, Spark e Kafka Streams, ao mesmo tempo em que fornecem uma interface amigável e familiar e criem pipelines de dados de streaming e aplicativos em tempo real 100% de compatibilidade com o ecossistema Python. Usando embutido ou bibliotecas Python existentes, (Kafka, RedPanda, WebSocket, etc.) e em vários sistemas downstream (Kafka, arquivos parquet, data lakes, etc.). conectores você pode se conectar a fontes de dados em tempo real e streaming gravar dados transformados Para as transformações, o Bytewax com , e métodos e vem com recursos familiares, como recuperação e escalabilidade. facilita as transformações stateful e stateless map windowing de agregação Bytewax e foi . facilita uma experiência centrada em dados e Python para fluxos de dados criado especificamente para engenheiros e cientistas de dados Ele permite que os usuários e criem as personalizações necessárias para atender às suas necessidades sem ter que aprender e manter plataformas de streaming baseadas em JVM, como Spark ou Flink. criem pipelines de dados de streaming e aplicativos em tempo real Bytewax é adequado para muitos casos de uso, ou seja, , , , e mais. Incorporando pipelines para IA generativa Manipulando valores ausentes em fluxos de dados Usando modelos de linguagem em um contexto de streaming para entender os mercados financeiros Para inspiração de caso de uso e mais informações como documentação, tutoriais e guias, sinta-se à vontade para verificar . site da bytewax Por que criação de perfil de dados para fluxos de dados? e refere-se à etapa de : sua estrutura, comportamento e qualidade. A criação de perfil de dados é a chave para um início bem-sucedido de qualquer tarefa de aprendizado de máquina entendendo completamente nossos dados Em poucas palavras, envolve a análise de aspectos relacionados ao formato dos dados e descritores básicos (por exemplo, número de amostras, número/tipos de recursos, valores duplicados), sua (como a presença de dados ausentes ou recursos desequilibrados) e outros fatores complicadores que podem surgir durante a coleta ou processamento de dados (por exemplo, valores incorretos ou recursos inconsistentes). perfil de dados características intrínsecas , onde as circunstâncias podem mudar rapidamente e exigir ação imediata (por exemplo, monitoramento de saúde, valores de estoque, políticas de qualidade do ar). Garantir altos padrões de qualidade de dados é crucial para todos os domínios e organizações, mas é especialmente relevante para domínios que operam com domínios que produzem dados contínuos Para muitos domínios, o perfil de dados é usado a partir de uma lente de análise exploratória de dados, considerando dados históricos armazenados em bancos de dados. Pelo contrário, para fluxos de dados, , onde os dados precisam ser verificados em diferentes prazos ou estágios do processo. o perfil de dados torna-se essencial para validação e controle de qualidade continuamente ao longo do fluxo Ao incorporar um , podemos sobre o estado atual de nossos dados e ser alertados sobre quaisquer problemas potencialmente críticos, sejam eles relacionados à (por exemplo, valores corrompidos ou formatos alterados) ou a (por exemplo, desvios de dados, desvio de regras de negócios e resultados). perfil automatizado em nossos fluxos de dados obter feedback imediatamente consistência e integridade dos dados eventos que ocorrem em curtos períodos de tempo Em domínios do mundo real — — o perfil automatizado pode nos salvar de vários quebra-cabeças cerebrais e sistemas que precisam ser retirados da produção! onde você simplesmente sabe que a lei de Murphy está prestes a atingir e “tudo pode definitivamente dar errado” No que diz respeito ao perfil de dados, tem sido consistentemente um , ou para ou dados. E não é de admirar: ydata-profiling favorito da multidão tabular série temporal é uma linha de código para um amplo conjunto de análises e insights. Operações complexas e demoradas são feitas sob o capô: o ydata-profiling dependendo dos tipos de recursos (numéricos ou categóricos), que são mostradas no relatório de criação de perfil. detecta automaticamente os tipos de recursos incluídos nos dados e, ajusta as estatísticas e visualizações resumidas Promovendo uma , o pacote também , concentrando-se em suas e pareadas e fornece uma , desde valores ou até recursos e . análise centrada em dados destaca as relações existentes entre os recursos interações correlações avaliação completa dos alertas de qualidade de dados duplicados constantes distorcidos desequilibrados É realmente uma — com o mínimo de esforço. visão 360º da qualidade de nossos dados Juntando tudo: bytewax e ydata-profiling Antes de iniciar o projeto, precisamos primeiro definir nossas dependências do Python e configurar nossa fonte de dados. Primeiro, vamos instalar os pacotes e ( bytewax ydata-profiling você pode querer usar um ambiente virtual para isso — verifique estas instruções se precisar de alguma orientação extra!) pip install bytewax==0.16.2 ydata-profiling==4.3.1 Em seguida, faremos o upload do (Licença — CC0: Domínio público), que contém várias medições de de diferentes dispositivos IoT: Conjunto de dados de telemetria do sensor ambiental temperatura, umidade, monóxido de carbono, gás liquefeito de petróleo, fumaça, luz e movimento wget https://raw.githubusercontent.com/bytewax/air-quality-sensor/main/data/iot_telemetry_data_1000 e a entrada seria semelhante ao que esperamos em uma plataforma de streaming . Neste artigo, e criar um fluxo de dados usando bytewax. Em um ambiente de produção, essas medições seriam geradas continuamente por cada dispositivo como Kafka para simular o contexto que encontraríamos com dados de streaming, vamos ler os dados do arquivo CSV uma linha por vez (Como uma observação rápida, um fluxo de dados é essencialmente um pipeline de dados que pode ser descrito como um gráfico acíclico direcionado - DAG) Primeiro, vamos fazer algumas : importações necessárias from datetime import datetime, timedelta, timezone from bytewax.dataflow import Dataflow from bytewax.connectors.stdio import StdOutput from bytewax.connectors.files import CSVInput from bytewax.testing import run_main Em seguida, definimos nosso objeto de fluxo de dados. Em seguida, usaremos um método stateless map onde passamos uma função para converter a string em um objeto DateTime e reestruturar os dados para o formato (device_id, data). O método map fará a alteração em cada ponto de dados de maneira sem estado. A razão pela qual modificamos a forma de nossos dados é para que possamos agrupar facilmente os dados nas próximas etapas para criar o perfil de dados para cada dispositivo separadamente, em vez de para todos os dispositivos simultaneamente. flow = Dataflow() flow.input("simulated_stream", CSVInput("/content/iot_telemetry_data_1000")) # parse timestamp def parse_time(reading_data): reading_data["ts"] = datetime.fromtimestamp(float(reading_data["ts"]), timezone.utc) return reading_data flow.map(parse_time) # remap format to tuple (device_id, reading_data) flow.map(lambda reading_data: (reading_data['device'], reading_data)) Agora, aproveitaremos os recursos de estado do para coletar dados para cada dispositivo durante o período de tempo que definimos. espera um instantâneo dos dados ao longo do tempo, o que torna o operador de janela o método perfeito para fazer isso. bytewax ydata-profiling Em , podemos produzir estatísticas resumidas para um dataframe especificado para um contexto específico. Por exemplo, em nosso exemplo, podemos produzir instantâneos de dados referentes a cada dispositivo IoT ou a intervalos de tempo específicos: ydata-profiling from bytewax.window import EventClockConfig, TumblingWindow # This is the accumulator function, and outputs a list of readings def acc_values(acc, reading): acc.append(reading) return acc # This function instructs the event clock on how to retrieve the # event's datetime from the input. def get_time(reading): return reading["ts"] # Configure the `fold_window` operator to use the event time. cc = EventClockConfig(get_time, wait_for_system_duration=timedelta(seconds=30)) # And a tumbling window align_to = datetime(2020, 1, 1, tzinfo=timezone.utc) wc = TumblingWindow(align_to=align_to, length=timedelta(hours=1)) flow.fold_window("running_average", cc, wc, list, acc_values) flow.inspect(print) Depois que os instantâneos são definidos, alavancar é tão simples quanto chamar o para cada um dos dataframes que gostaríamos de analisar: ydata-profiling ProfileReport import pandas as pd from ydata_profiling import ProfileReport def profile(device_id__readings): print(device_id__readings) device_id, readings = device_id__readings start_time = readings[0]['ts'].replace(minute=0, second=0, microsecond=0).strftime('%Y-%m-%d %H:%M:%S') df = pd.DataFrame(readings) profile = ProfileReport( df, tsmode=True, sortby="ts", title=f"Sensor Readings - device: {device_id}" ) profile.to_file(f"Ts_Profile_{device_id}-{start_time}.html") return f"device {device_id} profiled at hour {start_time}" flow.map(profile) Neste exemplo, estamos gravando as imagens em arquivos locais como parte de uma função em um método map. Eles podem ser relatados por meio de uma ferramenta de mensagens ou podemos salvá-los em algum armazenamento remoto no futuro. Depois que o perfil é concluído, o fluxo de dados espera alguma saída para que possamos usar o interno para imprimir o dispositivo que foi perfilado e o tempo em que foi criado o perfil que foi passado da função de perfil na etapa do mapa: StdOutput flow.output("out", StdOutput()) Existem várias maneiras de executar fluxos de dados Bytewax. Neste exemplo, usamos a mesma máquina local, mas o Bytewax também pode ser executado em vários processos Python, em vários hosts, em um , usando um , e . Contêiner Docker Cluster do Kubernetes mais Neste artigo, continuaremos com uma configuração local, mas recomendamos que você verifique nossa ferramenta auxiliar que gerencia as implantações de fluxo de dados do Kubernetes assim que o pipeline estiver pronto para a transição para a produção. ceractl Assumindo que estamos no mesmo diretório do arquivo com a definição do fluxo de dados, podemos executá-lo usando: python -m bytewax.run ydata-profiling-streaming:flow Podemos usar os relatórios de criação de perfil para validar a qualidade dos dados, verificar alterações em esquemas ou formatos de dados e . comparar as características dos dados entre diferentes dispositivos ou janelas de tempo Na verdade, podemos aproveitar o que destaca as diferenças entre dois perfis de dados de maneira direta, facilitando a detecção de padrões importantes que precisam ser investigados ou questões que precisam ser abordadas: funcionalidade de relatório de comparação snapshot_a_report = ProfileReport(df_a, title="Snapshot A") snapshot_b_report = ProfileReport(df_b, title="Snapshot B") comparison_report =snapshot_a_report(snapshot_b_report) comparison_report.to_file("comparison_report.html") Pronto para explorar seus próprios fluxos de dados? A validação de fluxos de dados é crucial para identificar problemas na qualidade dos dados de maneira contínua e comparar o estado dos dados em períodos distintos de tempo. Para organizações nos , , e — todas trabalhando com fluxos contínuos de dados — um , desde a avaliação da qualidade até a privacidade dos dados. setores de saúde energia manufatura entretenimento perfil automatizado é fundamental para estabelecer práticas recomendadas de governança de dados Isso requer a análise de instantâneos de dados que, conforme mostrado neste artigo, podem ser obtidos de maneira contínua combinando e . bytewax ydata-profiling cuida de todos os processos necessários para manipular e estruturar os fluxos de dados em instantâneos, que podem ser resumidos e comparados com por meio de um relatório abrangente das características dos dados. A Bytewax a criação de perfil ydata Ser capaz de processar e perfilar adequadamente os dados recebidos abre uma infinidade de casos de uso em diferentes domínios, desde a até o destaque e mitigação de problemas adicionais derivados de atividades do mundo real, como (por exemplo, detecção de fraude ou intrusão/ameaças), e outros eventos que se desviam das expectativas (por exemplo, desvios de dados ou desalinhamento com regras de negócios). correção de erros em esquemas e formatos de dados detecção de anomalias mau funcionamento do equipamento Agora, você está pronto para começar a explorar seus fluxos de dados! Deixe-nos saber quais outros casos de uso você encontra e, como sempre, sinta-se à vontade para nos enviar uma linha nos comentários ou nos encontrar no para mais dúvidas e sugestões! Comunidade de IA centrada em dados Vejo você lá! Agradecimentos ) e Oli Makhasoeva (Relações com Desenvolvedores @ ) -- desenvolvendo . Este artigo foi escrito por Fabiana Clemente (Cofundadora e CDO @ YData ) e Miriam Santos (Developer Relations @ YData ) -- desenvolvendo ydata-profiling -- e Zander Matheson (CEO e fundador @ Bytewax Byetwax bytewax Você pode encontrar informações adicionais sobre os pacotes OSS nas respectivas documentações: & . documentos de criação de perfil ydata documentos bytewax Também publicado aqui