paint-brush
Quebrando a execução da tarefa do trabalhador no Apache DolphinSchedulerpor@williamguo
142 leituras

Quebrando a execução da tarefa do trabalhador no Apache DolphinScheduler

por William Guo9m2024/08/23
Read on Terminal Reader

Muito longo; Para ler

O Apache DolphinScheduler é um sistema de agendamento de fluxo de trabalho de código aberto conhecido por suas operações DAG visuais e plugins extensíveis. Este artigo explora o processo de execução detalhado das tarefas Worker, desde a inicialização da tarefa até a conclusão, destacando a arquitetura do sistema, os tipos de tarefa e os mecanismos de tolerância a falhas. O conteúdo é essencial para entender como gerenciar e otimizar efetivamente os fluxos de trabalho usando o DolphinScheduler.
featured image - Quebrando a execução da tarefa do trabalhador no Apache DolphinScheduler
William Guo HackerNoon profile picture
0-item
1-item


Olá pessoal, eu sou Cai Shunfeng, um engenheiro de dados sênior na WhaleOps, e um committer e membro PMC da comunidade Apache DolphinScheduler. Hoje, vou explicar como a tarefa Worker do Apache DolphinScheduler funciona.

Esta explicação será dividida em três seções:


  1. Introdução ao Apache DolphinScheduler
  2. Visão geral do design geral do Apache DolphinScheduler
  3. Processo de execução detalhado das tarefas do Worker

Introdução ao Projeto

O Apache DolphinScheduler é um sistema de agendamento de fluxo de trabalho visual, distribuído e facilmente extensível, de código aberto, adequado para cenários de nível empresarial.



Ele fornece as seguintes funcionalidades principais, oferecendo uma solução completa de processamento de dados do ciclo de vida para fluxos de trabalho e tarefas por meio de operações visuais.

Principais características

  • Fácil de usar

  • Operações visuais do DAG: os usuários podem arrastar e soltar componentes na página para organizá-los em um DAG (Directed Acyclic Graph).

  • Sistema de plugins: inclui plugins de tarefas, plugins de fontes de dados, plugins de alertas, plugins de armazenamento, plugins do centro de registro e plugins de tarefas cron, etc. Os usuários podem facilmente estender os plugins conforme necessário para atender aos seus requisitos comerciais.


  • Cenários de uso avançados

  • Configuração estática: inclui agendamento de fluxo de trabalho, operações online e offline, gerenciamento de versões e funções de preenchimento.

  • Operações de tempo de execução: fornece funcionalidades como pausar, parar, retomar e substituição de parâmetros.

  • Tipos de dependência: oferece suporte a um rico conjunto de opções e estratégias de dependência, adaptando-se a mais cenários.

  • Passagem de parâmetros: oferece suporte a parâmetros de inicialização no nível do fluxo de trabalho, parâmetros globais, parâmetros locais no nível da tarefa e passagem dinâmica de parâmetros.


  • Alta confiabilidade

  • Design descentralizado: todos os serviços são independentes de estado e podem ser dimensionados horizontalmente para aumentar o rendimento do sistema.

  • Proteção contra sobrecarga e tolerância a falhas de instância:

  • Proteção contra sobrecarga: durante a operação, o master e o worker monitoram seu próprio uso de CPU e memória, bem como o volume de tarefas. Se sobrecarregados, eles pausam o processamento atual de fluxo de trabalho/tarefa e retomam após a recuperação.

  • Tolerância a falhas de instância: quando os nós mestre/de trabalho falham, o centro de registro detecta o nó de serviço offline e executa tolerância a falhas para instâncias de fluxo de trabalho ou tarefa, garantindo a capacidade de autorrecuperação do sistema o máximo possível.

Design geral

Arquitetura do Projeto

Em seguida, vamos apresentar o contexto geral do design. Abaixo está o diagrama de arquitetura de design fornecido no site oficial.


No diagrama de arquitetura, podemos ver que o Apache DolphinScheduler é composto de vários componentes principais:

  • Componente de API: o serviço de API gerencia principalmente metadados, interage com a interface do usuário por meio do serviço de API ou chama interfaces de API para criar tarefas de fluxo de trabalho e vários recursos necessários ao fluxo de trabalho.


  • Componente mestre: o mestre é o controlador de instâncias de fluxo de trabalho, responsável por consumir comandos, convertê-los em instâncias de fluxo de trabalho, executar a divisão de DAG, enviar tarefas em ordem e distribuir tarefas aos trabalhadores.


  • Componente Worker: O worker é o executor de tarefas específicas. Após receber tarefas, ele as processa de acordo com diferentes tipos de tarefas, interage com o master e relata o status da tarefa. Notavelmente, o serviço worker não interage com o banco de dados; apenas os serviços API, master e alert interagem com o banco de dados.


  • Serviço de Alerta: O serviço de alerta envia alertas por meio de diferentes plugins de alerta. Esses serviços registram-se no centro de registro, e o mestre e o trabalhador relatam periodicamente heartbeats e status atual para garantir que possam receber tarefas normalmente.

Processo de interação entre mestre e trabalhador

O processo de interação entre o mestre e o trabalhador é o seguinte:

  • Envio de tarefas: depois que o mestre conclui a divisão do DAG, ele envia tarefas ao banco de dados e seleciona um grupo de trabalhadores apropriado para distribuir tarefas com base em diferentes estratégias de distribuição.


  • Recepção de Tarefa: Após o trabalhador receber uma tarefa, ele determina se aceita a tarefa com base em sua condição. O feedback é fornecido se a aceitação foi bem-sucedida ou não.


  • Execução da tarefa: O worker processa a tarefa, atualiza o status para running e retorna ao master. O master atualiza o status da tarefa e as informações de hora de início no banco de dados.


  • Conclusão da Tarefa: Após a conclusão da tarefa, o worker envia uma notificação de evento de conclusão ao master, e o master retorna uma confirmação ACK. Se nenhum ACK for recebido, o worker continuará tentando novamente para garantir que o evento da tarefa não seja perdido.

Recepção de tarefas do trabalhador

Quando o trabalhador recebe uma tarefa, as seguintes operações são executadas:

  • Preenche as informações do host.
  • Gera o caminho do log na máquina de trabalho.
  • Gera um Executor de Tarefa de Trabalhador, que é enviado ao pool de threads para execução.


O trabalhador verifica se está sobrecarregado; se estiver, ele rejeita a tarefa. Após receber o feedback de falha na distribuição de tarefas, o mestre continua a escolher outro trabalhador para distribuição de tarefas com base na estratégia de distribuição.

Processo de execução do trabalhador

O processo específico de execução das tarefas do trabalhador inclui as seguintes etapas:

  1. Inicialização da tarefa: inicializa o ambiente e as dependências necessárias para a tarefa.
  2. Execução de tarefa: executa a lógica de tarefa específica.
  3. Conclusão da tarefa: após a conclusão da execução da tarefa, os resultados da execução da tarefa são reportados ao nó mestre.


A seguir, detalharemos o processo específico de execução da tarefa.


Antes que a execução da tarefa comece, um contexto é inicializado primeiro. Neste ponto, o horário de início da tarefa é definido. Para garantir a precisão da tarefa, é necessário sincronizar o tempo entre o mestre e o trabalhador para evitar desvio de tempo.


Posteriormente, o status da tarefa é definido como em execução e enviado de volta ao mestre para notificar que a tarefa começou a ser executada.


Como a maioria das tarefas é executada no sistema operacional Linux, o processamento de inquilinos e arquivos é necessário:

  • Processamento de Tenant: Primeiro, ele verifica se o tenant existe. Se não, ele decide se deve criar automaticamente o tenant com base na configuração. Isso requer que o usuário de implantação tenha permissões sudo para alternar para o tenant especificado durante a execução da tarefa.
  • Usuários Específicos : Para alguns cenários, não é necessário alternar os inquilinos, mas simplesmente executar a tarefa usando um usuário específico. Isso também é suportado pelo sistema.

Após processar o locatário, o trabalhador cria o diretório de execução específico. O diretório raiz do diretório de execução é configurável e requer autorização apropriada. Por padrão, as permissões do diretório são definidas como 755.


Durante a execução da tarefa, vários arquivos de recursos podem ser necessários, como buscar arquivos de clusters AWS S3 ou HDFS. O sistema baixa esses arquivos para o diretório temporário do trabalhador para uso subsequente da tarefa.


No Apache DolphinScheduler, variáveis de parâmetros podem ser substituídas. As principais categorias incluem:

  • Parâmetros integrados: envolvem principalmente a substituição de parâmetros relacionados a hora e data.
  • Parâmetros definidos pelo usuário: variáveis de parâmetros definidas pelo usuário no fluxo de trabalho ou tarefa também serão substituídas adequadamente.

Por meio das etapas acima, o ambiente de execução da tarefa e os recursos necessários estarão prontos, e a tarefa poderá iniciar oficialmente a execução.

Diferentes tipos de tarefas

No Apache DolphinScheduler, vários tipos de tarefas são suportados, cada um aplicável a diferentes cenários e requisitos. Abaixo, apresentamos vários tipos principais de tarefas e seus componentes específicos.


Esses componentes são comumente usados para executar arquivos de script, adequados para várias linguagens de script e protocolos:

  • Shell: Executa scripts de shell.
  • Python: Executa scripts Python.
  • SQL: Executa instruções SQL.
  • Procedimento armazenado: executa procedimentos armazenados no banco de dados.
  • HTTP: executa solicitações HTTP.

A versão comercial (WhaleScheduler) também oferece suporte à execução de aplicativos Java por meio da execução de pacotes JAR.

Componentes de Tarefas Lógicas

Esses componentes são usados para implementar controle lógico e gerenciamento de fluxo de trabalho:

  • Switch: Tarefa de controle condicional.
  • Dependente: Tarefa de dependência.
  • SubProcesso: Subtarefa.
  • NextLoop (versão comercial): tarefa de controle de loop adequada para cenários financeiros.
  • Componente de gatilho: monitora se arquivos ou dados existem.

Componentes de Big Data

Esses componentes são usados principalmente para processamento e análise de big data:

  • SeaTunnel: Corresponde à versão comercial do WhaleTunnel, usado para integração e processamento de big data.
  • AWS EMR: integração com o Amazon EMR.
  • HiveCli: tarefa de linha de comando do Hive.
  • Spark: Tarefa Spark.
  • Flink: Tarefa Flink.
  • DataX: Tarefa de sincronização de dados.

Componentes do contêiner

Esses componentes são usados para executar tarefas em um ambiente de contêiner:

  • K8S: tarefa do Kubernetes.

Componentes de qualidade de dados

Usado para garantir a qualidade dos dados:

  • DataQuality: Tarefa de verificação da qualidade dos dados.

Componentes interativos

Esses componentes são usados para interagir com ambientes de ciência de dados e aprendizado de máquina:

  • Jupyter: Tarefa do Jupyter Notebook.
  • Zeppelin: Tarefa do Caderno Zeppelin.

Componentes de Aprendizado de Máquina

Esses componentes são usados para o gerenciamento e execução de tarefas de aprendizado de máquina:

  • Kubeflow: tarefa do Kubeflow.
  • MlFlow: Tarefa MlFlow.
  • Dvc: Tarefa de controle de versão de dados.

No geral, o Apache DolphinScheduler suporta de três a quatro dúzias de componentes, cobrindo áreas de execução de script, processamento de big data, até machine learning. Para mais informações, visite o site oficial para ver a documentação detalhada.

Abstração do tipo de tarefa

No Apache DolphinScheduler, os tipos de tarefas são abstraídos em vários modos de processamento para atender a vários ambientes de tempo de execução e necessidades.

Abaixo apresentamos o processo de abstração e execução dos tipos de tarefas em detalhes.


O worker é um serviço JVM implantado em um servidor. Para alguns componentes de script (como Shell e Python) e tarefas executadas localmente (como Spark Local), eles iniciarão um processo separado para execução.


Neste ponto, o trabalhador interage com essas tarefas por meio do ID do processo (PID).


Diferentes fontes de dados podem exigir diferentes adaptações. Para tarefas de SQL e procedimentos armazenados, abstraímos o tratamento para diferentes fontes de dados, como MySQL, PostgreSQL, AWS Redshift, etc. Essa abstração permite adaptação e expansão flexíveis de diferentes tipos de banco de dados.


Tarefas remotas referem-se a tarefas que são executadas em clusters remotos, como AWS EMR, clusters SeaTunnel, clusters Kubernetes, etc. O Worker não executa essas tarefas localmente; em vez disso, ele as envia aos clusters remotos e monitora seus status e mensagens. Este modo é particularmente adequado para ambientes de nuvem onde a escalabilidade é necessária.

Execução de Tarefas

Coleção de Logs

Diferentes plugins usam diferentes modos de processamento e, portanto, a coleta de logs varia de acordo:

  • Processos locais: os logs são registrados monitorando a saída do processo.

  • Tarefas remotas: os logs são coletados verificando periodicamente o status da tarefa e a saída do cluster remoto (por exemplo, AWS EMR) e registrando-os nos logs de tarefas locais.


Substituição de variável de parâmetro

O sistema verifica os logs de tarefas para identificar quaisquer variáveis de parâmetros que precisam ser substituídas dinamicamente. Por exemplo, a Tarefa A no DAG pode gerar alguns parâmetros de saída que precisam ser passados para a Tarefa B downstream.

Durante esse processo, o sistema lê os logs e substitui as variáveis de parâmetros conforme necessário.


Recuperando ID da tarefa

  • Processos locais: O ID do processo (PID) é recuperado.
  • Tarefas remotas: o ID da tarefa remota (por exemplo, ID da tarefa do AWS EMR) é recuperado.

Manter esses IDs de tarefa permite consultas de dados adicionais e operações de tarefas remotas. Por exemplo, quando um fluxo de trabalho é interrompido, a API cancel correspondente pode ser chamada usando o ID da tarefa para encerrar a tarefa em execução.


Tratamento de tolerância a falhas

  • Processos locais: se um nó de trabalho falhar, o processo local não ficará ciente disso, exigindo que a tarefa seja reenviada.
  • Tarefas remotas: se a tarefa estiver em execução em um cluster remoto (por exemplo, AWS), o status da tarefa pode ser verificado usando o ID da tarefa, e uma tentativa pode ser feita para assumir a tarefa. Se bem-sucedido, não há necessidade de reenviar a tarefa, economizando tempo.

Conclusão da execução da tarefa

Após a execução de uma tarefa, diversas ações de conclusão são necessárias:

  • Verificação de Conclusão de Tarefa: O sistema verificará se um alerta precisa ser enviado. Por exemplo, para uma tarefa SQL, se os resultados da consulta acionarem um alerta, o sistema interagirá com o serviço de alerta via RPC para enviar a mensagem de alerta.

  • Feedback do Evento: O Worker enviará o evento de conclusão da tarefa (evento de término) de volta ao Master. O Master atualiza o status da tarefa no banco de dados e prossegue com a transição do status do DAG.

  • Limpeza de Contexto: O Worker removerá o contexto da tarefa que foi criado no início da tarefa da memória. Ele também limpará os caminhos de arquivo gerados durante a execução da tarefa. Se estiver no modo de depuração (modo de desenvolvimento), esses arquivos não serão limpos, permitindo a solução de problemas de tarefas com falha.


Por meio dessas etapas, todo o processo de execução de uma instância de tarefa é concluído.

Contribuição da Comunidade

Se você estiver interessado no Apache DolphinScheduler e quiser contribuir com a comunidade de código aberto, sinta-se à vontade para consultar nossas diretrizes de contribuição.


A comunidade incentiva contribuições ativas, incluindo, mas não se limitando a:

  • Relatando problemas encontrados durante o uso.
  • Envio de documentação e PRs de código.
  • Adicionando testes unitários (UT).
  • Adicionando comentários de código.
  • Corrigindo bugs ou adicionando novos recursos.
  • Escrever artigos técnicos ou participar de Meetups.

Guia para novos colaboradores

Para novos contribuidores, você pode procurar por problemas rotulados como good first issue nos problemas do GitHub da comunidade. Esses problemas são geralmente mais simples e adequados para usuários que estão fazendo sua primeira contribuição.


Em resumo, aprendemos sobre o design geral do Apache DolphinScheduler e o processo de execução detalhado das tarefas do Worker.

Espero que este conteúdo ajude você a entender e usar melhor o Apache DolphinScheduler. Se tiver alguma dúvida, sinta-se à vontade para entrar em contato comigo na seção de comentários.