paint-brush
Décomposer l'exécution des tâches de travail dans Apache DolphinSchedulerpar@williamguo
118 lectures

Décomposer l'exécution des tâches de travail dans Apache DolphinScheduler

par William Guo9m2024/08/23
Read on Terminal Reader

Trop long; Pour lire

Apache DolphinScheduler est un système de planification de workflow open source connu pour ses opérations DAG visuelles et ses plugins extensibles. Cet article explore le processus d'exécution détaillé des tâches Worker, de l'initialisation de la tâche à son achèvement, en mettant en évidence l'architecture du système, les types de tâches et les mécanismes de tolérance aux pannes. Le contenu est essentiel pour comprendre comment gérer et optimiser efficacement les workflows à l'aide de DolphinScheduler.
featured image - Décomposer l'exécution des tâches de travail dans Apache DolphinScheduler
William Guo HackerNoon profile picture
0-item
1-item


Salut tout le monde, je suis Cai Shunfeng, ingénieur de données senior chez WhaleOps, et membre du comité de validation et du PMC de la communauté Apache DolphinScheduler. Aujourd'hui, je vais vous expliquer comment fonctionne la tâche Worker d'Apache DolphinScheduler.

Cette explication sera divisée en trois sections :


  1. Introduction à Apache DolphinScheduler
  2. Présentation de la conception globale d'Apache DolphinScheduler
  3. Processus d'exécution détaillé des tâches du travailleur

Présentation du projet

Apache DolphinScheduler est un système open source de planification de flux de travail visuel, distribué et facilement extensible, adapté aux scénarios de niveau entreprise.



Il fournit les fonctionnalités clés suivantes, offrant une solution de traitement de données tout au long du cycle de vie des flux de travail et des tâches via des opérations visuelles.

Caractéristiques principales

  • Facile à utiliser

  • Opérations DAG visuelles : les utilisateurs peuvent faire glisser et déposer des composants sur la page pour les organiser dans un DAG (graphique acyclique dirigé).

  • Système de plugins : comprend des plugins de tâches, des plugins de sources de données, des plugins d'alerte, des plugins de stockage, des plugins de centre de registre et des plugins de tâches cron, etc. Les utilisateurs peuvent facilement étendre les plugins selon leurs besoins pour répondre aux besoins de leur entreprise.


  • Scénarios d'utilisation riches

  • Configuration statique : inclut la planification du flux de travail, les opérations en ligne et hors ligne, la gestion des versions et les fonctions de remplissage.

  • Opérations d'exécution : fournit des fonctionnalités telles que la pause, l'arrêt, la reprise et la substitution de paramètres.

  • Types de dépendances : prend en charge un riche ensemble d’options et de stratégies de dépendance, s’adaptant à davantage de scénarios.

  • Passage de paramètres : prend en charge les paramètres de démarrage au niveau du flux de travail, les paramètres globaux, les paramètres locaux au niveau de la tâche et le passage de paramètres dynamiques.


  • Haute fiabilité

  • Conception décentralisée : tous les services sont sans état et peuvent être mis à l'échelle horizontalement pour augmenter le débit du système.

  • Protection contre les surcharges et tolérance aux pannes d'instance :

  • Protection contre les surcharges : pendant le fonctionnement, le maître et le travailleur surveillent leur propre utilisation du processeur et de la mémoire, ainsi que le volume des tâches. En cas de surcharge, ils interrompent le traitement du flux de travail/de la tâche en cours et reprennent après la récupération.

  • Tolérance aux pannes d'instance : lorsque les nœuds maître/travailleur échouent, le centre de registre détecte le nœud de service hors ligne et exécute une tolérance aux pannes pour les instances de workflow ou de tâche, garantissant ainsi autant que possible la capacité d'auto-récupération du système.

Conception globale

Architecture du projet

Ensuite, nous allons présenter le contexte général de la conception. Vous trouverez ci-dessous le diagramme d'architecture de conception fourni sur le site officiel.


À partir du diagramme d’architecture, nous pouvons voir qu’Apache DolphinScheduler est composé de plusieurs composants principaux :

  • Composant API : le service API gère principalement les métadonnées, interagit avec l'interface utilisateur via le service API ou appelle des interfaces API pour créer des tâches de workflow et diverses ressources nécessaires au workflow.


  • Composant maître : le maître est le contrôleur des instances de workflow, chargé de consommer les commandes, de les convertir en instances de workflow, d'effectuer le fractionnement DAG, de soumettre les tâches dans l'ordre et de distribuer les tâches aux travailleurs.


  • Composant Worker : le travailleur est l'exécuteur de tâches spécifiques. Après avoir reçu des tâches, il les traite selon différents types de tâches, interagit avec le maître et signale l'état des tâches. Notamment, le service Worker n'interagit pas avec la base de données ; seuls les services API, maître et d'alerte interagissent avec la base de données.


  • Service d'alerte : le service d'alerte envoie des alertes via différents modules d'alerte. Ces services s'enregistrent auprès du centre de registre, et le maître et le travailleur signalent périodiquement les battements de cœur et l'état actuel pour s'assurer qu'ils peuvent recevoir des tâches normalement.

Processus d'interaction entre le maître et le travailleur

Le processus d’interaction entre le maître et le travailleur est le suivant :

  • Soumission des tâches : une fois que le maître a terminé la division DAG, il soumet les tâches à la base de données et sélectionne un groupe de travailleurs approprié pour distribuer les tâches en fonction de différentes stratégies de distribution.


  • Réception de la tâche : après avoir reçu une tâche, le travailleur détermine s'il doit l'accepter ou non en fonction de son état. Un retour d'information est fourni, que l'acceptation soit réussie ou non.


  • Exécution de la tâche : le travailleur traite la tâche, met à jour le statut sur « en cours d'exécution » et renvoie les informations au maître. Le maître met à jour le statut de la tâche et les informations sur l'heure de début dans la base de données.


  • Achèvement de la tâche : une fois la tâche terminée, le travailleur envoie une notification d'événement de fin au maître, et le maître renvoie une confirmation ACK. Si aucun ACK n'est reçu, le travailleur continuera à réessayer pour s'assurer que l'événement de tâche n'est pas perdu.

Réception des tâches des travailleurs

Lorsque le travailleur reçoit une tâche, les opérations suivantes sont effectuées :

  • Remplit ses informations d'hôte.
  • Génère le chemin du journal sur la machine de travail.
  • Génère un exécuteur de tâches de travail, qui est soumis au pool de threads pour exécution.


Le travailleur vérifie s'il est surchargé ; si tel est le cas, il rejette la tâche. Après avoir reçu le retour d'échec de la distribution des tâches, le maître continue de choisir un autre travailleur pour la distribution des tâches en fonction de la stratégie de distribution.

Processus d'exécution des travailleurs

Le processus d’exécution spécifique des tâches des travailleurs comprend les étapes suivantes :

  1. Initialisation de la tâche : initialise l’environnement et les dépendances requises pour la tâche.
  2. Exécution de la tâche : exécute la logique de tâche spécifique.
  3. Achèvement de la tâche : une fois l'exécution de la tâche terminée, signale les résultats de l'exécution de la tâche au nœud maître.


Ensuite, nous détaillerons le processus spécifique d’exécution des tâches.


Avant de commencer l'exécution de la tâche, un contexte est d'abord initialisé. À ce stade, l'heure de début de la tâche est définie. Pour garantir l'exactitude de la tâche, il est nécessaire de synchroniser l'heure entre le maître et le travailleur pour éviter toute dérive temporelle.


Par la suite, l'état de la tâche est défini sur « En cours d'exécution » et renvoyé au maître pour notifier que la tâche a commencé à s'exécuter.


Étant donné que la plupart des tâches s'exécutent sur le système d'exploitation Linux, le traitement des locataires et des fichiers est requis :

  • Traitement du locataire : tout d’abord, il vérifie si le locataire existe. Dans le cas contraire, il décide de créer automatiquement le locataire en fonction de la configuration. Cela nécessite que l’utilisateur de déploiement dispose des autorisations sudo pour basculer vers le locataire spécifié pendant l’exécution de la tâche.
  • Utilisateurs spécifiques : pour certains scénarios, il n'est pas nécessaire de changer de locataire, mais simplement d'exécuter la tâche à l'aide d'un utilisateur spécifique. Cette fonction est également prise en charge par le système.

Après avoir traité le locataire, le travailleur crée le répertoire d'exécution spécifique. Le répertoire racine du répertoire d'exécution est configurable et nécessite une autorisation appropriée. Par défaut, les autorisations du répertoire sont définies sur 755.


Lors de l'exécution d'une tâche, divers fichiers de ressources peuvent être nécessaires, comme la récupération de fichiers à partir de clusters AWS S3 ou HDFS. Le système télécharge ces fichiers dans le répertoire temporaire du travailleur pour une utilisation ultérieure de la tâche.


Dans Apache DolphinScheduler, les variables de paramètres peuvent être remplacées. Les principales catégories sont les suivantes :

  • Paramètres intégrés : implique principalement le remplacement des paramètres liés à l’heure et à la date.
  • Paramètres définis par l'utilisateur : les variables de paramètres définies par l'utilisateur dans le flux de travail ou la tâche seront également remplacées en conséquence.

Grâce aux étapes ci-dessus, l’environnement d’exécution de la tâche et les ressources requises sont prêts et la tâche peut officiellement commencer à s’exécuter.

Différents types de tâches

Dans Apache DolphinScheduler, différents types de tâches sont pris en charge, chacun applicable à différents scénarios et exigences. Ci-dessous, nous présentons plusieurs types de tâches majeurs et leurs composants spécifiques.


Ces composants sont couramment utilisés pour exécuter des fichiers de script, adaptés à divers langages et protocoles de script :

  • Shell : exécute des scripts shell.
  • Python : exécute des scripts Python.
  • SQL : exécute des instructions SQL.
  • Procédure stockée : exécute les procédures stockées de la base de données.
  • HTTP : Exécute des requêtes HTTP.

La version commerciale (WhaleScheduler) prend également en charge l'exécution d'applications Java en exécutant des packages JAR.

Composants de la tâche logique

Ces composants sont utilisés pour mettre en œuvre le contrôle logique et la gestion des flux de travail :

  • Switch : Tâche de contrôle conditionnel.
  • Dépendant : Tâche de dépendance.
  • Sous-processus : sous-tâche.
  • NextLoop (version commerciale) : tâche de contrôle de boucle adaptée aux scénarios financiers.
  • Composant déclencheur : surveille si des fichiers ou des données existent.

Composants du Big Data

Ces composants sont principalement utilisés pour le traitement et l’analyse de big data :

  • SeaTunnel : correspond à la version commerciale de WhaleTunnel, utilisée pour l'intégration et le traitement de big data.
  • AWS EMR : intégration Amazon EMR.
  • HiveCli : tâche de ligne de commande Hive.
  • Spark : tâche Spark.
  • Flink : Tâche Flink.
  • DataX : tâche de synchronisation des données.

Composants du conteneur

Ces composants sont utilisés pour exécuter des tâches dans un environnement de conteneur :

  • K8S : tâche Kubernetes.

Composants de la qualité des données

Utilisé pour garantir la qualité des données :

  • DataQuality : tâche de contrôle de la qualité des données.

Composants interactifs

Ces composants sont utilisés pour interagir avec les environnements de science des données et d'apprentissage automatique :

  • Jupyter : tâche du bloc-notes Jupyter.
  • Zeppelin : Tâche du carnet Zeppelin.

Composants d'apprentissage automatique

Ces composants sont utilisés pour la gestion et l'exécution des tâches d'apprentissage automatique :

  • Kubeflow : tâche Kubeflow.
  • MlFlow : tâche MlFlow.
  • Dvc : tâche de contrôle de version des données.

Au total, Apache DolphinScheduler prend en charge trois à quatre douzaines de composants, couvrant des domaines allant de l'exécution de scripts au traitement de données volumineuses, en passant par l'apprentissage automatique. Pour plus d'informations, veuillez visiter le site Web officiel pour consulter la documentation détaillée.

Abstraction du type de tâche

Dans Apache DolphinScheduler, les types de tâches sont abstraits dans plusieurs modes de traitement pour s'adapter à divers environnements d'exécution et besoins.

Ci-dessous, nous présentons en détail le processus d’abstraction et d’exécution des types de tâches.


Le worker est un service JVM déployé sur un serveur. Pour certains composants de script (tels que Shell et Python) et tâches exécutées localement (telles que Spark Local), ils démarreront un processus distinct à exécuter.


À ce stade, le travailleur interagit avec ces tâches via l’ID de processus (PID).


Différentes sources de données peuvent nécessiter différentes adaptations. Pour les tâches SQL et les procédures stockées, nous avons abstrait la gestion de différentes sources de données, telles que MySQL, PostgreSQL, AWS Redshift, etc. Cette abstraction permet une adaptation et une extension flexibles de différents types de bases de données.


Les tâches distantes font référence aux tâches exécutées sur des clusters distants, tels que AWS EMR, les clusters SeaTunnel, les clusters Kubernetes, etc. Le Worker n'exécute pas ces tâches localement ; il les soumet plutôt aux clusters distants et surveille leur état et leurs messages. Ce mode est particulièrement adapté aux environnements cloud où l'évolutivité est requise.

Exécution de la tâche

Collecte de journaux

Différents plugins utilisent différents modes de traitement et, par conséquent, la collecte des journaux varie en conséquence :

  • Processus locaux : les journaux sont enregistrés en surveillant la sortie du processus.

  • Tâches distantes : les journaux sont collectés en vérifiant périodiquement l'état des tâches et la sortie du cluster distant (par exemple, AWS EMR) et en les enregistrant dans les journaux des tâches locales.


Substitution de variable de paramètre

Le système analyse les journaux des tâches pour identifier les variables de paramètres qui doivent être remplacées de manière dynamique. Par exemple, la tâche A du DAG peut générer des paramètres de sortie qui doivent être transmis à la tâche B en aval.

Au cours de ce processus, le système lit les journaux et remplace les variables de paramètres selon les besoins.


Récupération de l'ID de tâche

  • Processus locaux : l’ID du processus (PID) est récupéré.
  • Tâches distantes : l'ID de la tâche distante (par exemple, l'ID de tâche AWS EMR) est récupéré.

La conservation de ces identifiants de tâches permet d'effectuer d'autres requêtes de données et des opérations de tâches à distance. Par exemple, lorsqu'un flux de travail est arrêté, l'API d'annulation correspondante peut être appelée à l'aide de l'ID de tâche pour mettre fin à la tâche en cours d'exécution.


Gestion de la tolérance aux pannes

  • Processus locaux : si un nœud Worker échoue, le processus local n'en sera pas conscient, ce qui nécessitera que la tâche soit soumise à nouveau.
  • Tâches distantes : si la tâche est exécutée sur un cluster distant (par exemple, AWS), l'état de la tâche peut être vérifié à l'aide de l'ID de tâche et une tentative de reprise de la tâche peut être effectuée. En cas de réussite, il n'est pas nécessaire de soumettre à nouveau la tâche, ce qui permet de gagner du temps.

Achèvement de l'exécution de la tâche

Une fois qu'une tâche a été exécutée, plusieurs actions d'achèvement sont nécessaires :

  • Vérification de l'achèvement de la tâche : le système vérifie si une alerte doit être envoyée. Par exemple, pour une tâche SQL, si les résultats de la requête déclenchent une alerte, le système interagira avec le service d'alerte via RPC pour envoyer le message d'alerte.

  • Commentaires sur l'événement : le travailleur renvoie l'événement d'achèvement de la tâche (événement de fin) au maître. Le maître met à jour le statut de la tâche dans la base de données et procède à la transition du statut DAG.

  • Nettoyage du contexte : Le Worker supprime de la mémoire le contexte de la tâche créé au début de la tâche. Il nettoie également les chemins de fichiers générés pendant l'exécution de la tâche. En mode débogage (mode développement), ces fichiers ne seront pas nettoyés, ce qui permet de résoudre les problèmes liés aux tâches ayant échoué.


Grâce à ces étapes, l’ensemble du processus d’exécution d’une instance de tâche est terminé.

Contribution communautaire

Si vous êtes intéressé par Apache DolphinScheduler et souhaitez contribuer à la communauté open source, vous êtes invités à vous référer à nos directives de contribution.


La communauté encourage les contributions actives, y compris, mais sans s'y limiter :

  • Signaler les problèmes rencontrés lors de l'utilisation.
  • Soumission de la documentation et des PR de code.
  • Ajout de tests unitaires (UT).
  • Ajout de commentaires de code.
  • Correction de bugs ou ajout de nouvelles fonctionnalités.
  • Rédiger des articles techniques ou participer à des Meetups.

Guide pour les nouveaux contributeurs

Pour les nouveaux contributeurs, vous pouvez rechercher des problèmes étiquetés comme étant good first issue dans les problèmes GitHub de la communauté. Ces problèmes sont généralement plus simples et adaptés aux utilisateurs qui apportent leur première contribution.


En résumé, nous avons appris la conception globale d’Apache DolphinScheduler et le processus d’exécution détaillé des tâches Worker.

J'espère que ce contenu vous aidera à mieux comprendre et utiliser Apache DolphinScheduler. Si vous avez des questions, n'hésitez pas à me contacter dans la section commentaires.