皆さん、こんにちは。私は Cai Shunfeng です。WhaleOps のシニア データ エンジニアであり、Apache DolphinScheduler コミュニティのコミッター兼 PMC メンバーでもあります。今日は、Apache DolphinScheduler の Worker タスクの仕組みについて説明します。 この説明は 3 つのセクションに分かれています。 Apache DolphinScheduler の紹介 Apache DolphinScheduler の全体設計の概要 ワーカータスクの詳細な実行プロセス プロジェクト紹介 、分散型で簡単に拡張できるビジュアル ワークフロー スケジューリング オープン ソース システムであり、エンタープライズ レベルのシナリオに適しています。 Apache DolphinScheduler は 以下の主要機能を提供し、視覚的な操作を通じてワークフローとタスクのライフサイクル全体のデータ処理ソリューションを提供します。 主な特徴 使いやすい 視覚的な DAG 操作: ユーザーはページ上でコンポーネントをドラッグ アンド ドロップして、DAG (有向非巡回グラフ) に配置できます。 プラグイン システム: タスク プラグイン、データ ソース プラグイン、アラート プラグイン、ストレージ プラグイン、レジストリ センター プラグイン、cron ジョブ プラグインなどが含まれます。ユーザーは、ビジネス要件を満たすために必要に応じてプラグインを簡単に拡張できます。 豊富な使用シナリオ 静的構成: ワークフローのスケジュール設定、オンラインおよびオフライン操作、バージョン管理、バックフィル機能が含まれます。 ランタイム操作: 一時停止、停止、再開、パラメータ置換などの機能を提供します。 依存関係の種類: 豊富な依存関係オプションと戦略をサポートし、より多くのシナリオに適応します。 パラメータの受け渡し: ワークフロー レベルでの起動パラメータ、グローバル パラメータ、タスク レベルでのローカル パラメータ、および動的パラメータの受け渡しをサポートします。 高い信頼性 分散設計: すべてのサービスはステートレスであり、水平方向にスケーリングしてシステム スループットを向上させることができます。 過負荷保護とインスタンスフォールトトレランス: 過負荷保護: 動作中、マスターとワーカーは自身の CPU とメモリの使用状況、およびタスク量を監視します。過負荷になると、現在のワークフロー/タスク処理を一時停止し、回復後に再開します。 インスタンスのフォールト トレランス: マスター/ワーカー ノードに障害が発生すると、レジストリ センターはサービス ノードがオフラインであることを検出し、ワークフローまたはタスク インスタンスのフォールト トレランスを実行して、システムの自己回復機能を最大限に確保します。 全体デザイン プロジェクトアーキテクチャ 次に、全体的な設計背景を紹介しましょう。以下は公式サイトで提供されている設計アーキテクチャ図です。 アーキテクチャ図から、Apache DolphinScheduler はいくつかの主要コンポーネントで構成されていることがわかります。 API コンポーネント: API サービスは主にメタデータを管理し、API サービスを介して UI と対話したり、API インターフェイスを呼び出してワークフロー タスクやワークフローに必要なさまざまなリソースを作成したりします。 マスター コンポーネント: マスターはワークフロー インスタンスのコントローラーであり、コマンドの使用、コマンドのワークフロー インスタンスへの変換、DAG 分割の実行、タスクの順序どおりの送信、およびタスクのワーカーへの配布を担当します。 ワーカー コンポーネント: ワーカーは特定のタスクの実行者です。タスクを受け取った後、さまざまなタスク タイプに従ってタスクを処理し、マスターと対話し、タスクの状態を報告します。特に、ワーカー サービスはデータベースと対話しません。データベースと対話するのは、API、マスター、およびアラート サービスのみです。 アラート サービス: アラート サービスは、さまざまなアラート プラグインを通じてアラートを送信します。これらのサービスはレジストリ センターに登録され、マスターとワーカーは定期的にハートビートと現在のステータスを報告して、タスクを正常に受信できることを確認します。 マスターとワーカーの相互作用プロセス マスターとワーカー間の相互作用プロセスは次のとおりです。 タスクの送信: マスターは DAG 分割を完了すると、タスクをデータベースに送信し、さまざまな分散戦略に基づいて適切なワーカー グループを選択してタスクを分散します。 タスクの受信: ワーカーはタスクを受け取った後、その状態に基づいてタスクを受け入れるかどうかを決定します。受け入れが成功したかどうかに応じてフィードバックが提供されます。 タスク実行: ワーカーはタスクを処理し、ステータスを実行中に更新して、マスターにフィードバックします。マスターは、データベース内のタスクのステータスと開始時刻情報を更新します。 タスク完了: タスクが完了すると、ワーカーはマスターに終了イベント通知を送信し、マスターは ACK 確認を返します。ACK が受信されない場合、ワーカーはタスク イベントが失われないように再試行を続けます。 ワーカータスク受付 ワーカーがタスクを受け取ると、次の操作が実行されます。 ホスト情報を入力します。 ワーカー マシン上のログ パスを生成します。 実行のためにスレッド プールに送信されるワーカー タスク エグゼキューターを生成します。 ワーカーは過負荷かどうかをチェックし、過負荷の場合はタスクを拒否します。タスク分散失敗のフィードバックを受け取った後、マスターは 分散戦略に基づいてタスク分散用に別のワーカーを選択し続けます。 ワーカー実行プロセス ワーカー タスクの具体的な実行プロセスには、次の手順が含まれます。 タスクの初期化: タスクに必要な環境と依存関係を初期化します。 タスク実行: 特定のタスク ロジックを実行します。 タスク完了: タスクの実行が完了すると、タスク実行結果をマスターノードに報告します。 次に、具体的なタスク実行プロセスについて詳しく説明します。 タスクの実行が始まる前に、まずコンテキストが初期化されます。この時点で、タスクの開始時刻が設定されます。タスクの精度を確保するには、時間のずれを避けるためにマスターとワーカーの間で時間を同期する必要があります。 その後、タスクのステータスが実行中に設定され、タスクの実行が開始されたことを通知するためにマスターにフィードバックされます。 ほとんどのタスクは Linux オペレーティング システム上で実行されるため、テナントおよびファイルの処理が必要です。 まず、テナントが存在するかどうかを確認します。存在しない場合は、構成に基づいてテナントを自動的に作成するかどうかを決定します。これには、タスク実行中に指定されたテナントに切り替えるための sudo 権限がデプロイメント ユーザーに必要です。 テナント処理: : シナリオによっては、テナントを切り替える必要はなく、特定のユーザーを使用してタスクを実行するだけです。これもシステムでサポートされています。 特定のユーザー テナントを処理した後、ワーカーは特定の実行ディレクトリを作成します。実行ディレクトリのルート ディレクトリは構成可能であり、適切な承認が必要です。デフォルトでは、 ディレクトリのアクセス許可は 755 に設定されています。 タスクの実行中、AWS S3 または HDFS クラスターからのファイルの取得など、さまざまなリソース ファイルが必要になる場合があります。システムは、後続のタスクで使用するためにこれらのファイルをワーカーの一時ディレクトリにダウンロードします。 Apache DolphinScheduler では、パラメータ変数を置き換えることができます。主なカテゴリは次のとおりです。 主に、時間と日付に関連するパラメータの置き換えを行います。 組み込みパラメータ: ワークフローまたはタスクでユーザーが設定したパラメータ変数もそれに応じて置き換えられます。 ユーザー定義パラメータ: 上記の手順により、タスクの実行環境と必要なリソースが準備され、タスクの実行を正式に開始できるようになります。 さまざまな種類のタスク Apache DolphinScheduler では、さまざまなタイプのタスクがサポートされており、それぞれ異なるシナリオと要件に適用できます。以下では、いくつかの主要なタスク タイプとその特定のコンポーネントを紹介します。 これらのコンポーネントは、さまざまなスクリプト言語やプロトコルに適したスクリプト ファイルを実行するためによく使用されます。 シェル: シェル スクリプトを実行します。 Python: Python スクリプトを実行します。 SQL: SQL ステートメントを実行します。 ストアド プロシージャ: データベースのストアド プロシージャを実行します。 HTTP: HTTP リクエストを実行します。 商用版 (WhaleScheduler) では、JAR パッケージを実行することで Java アプリケーションの実行もサポートされます。 ロジックタスクコンポーネント これらのコンポーネントは、論理制御とワークフロー管理を実装するために使用されます。 スイッチ: 条件制御タスク。 依存: 依存タスク。 サブプロセス: サブタスク。 NextLoop (商用バージョン): 金融シナリオに適したループ制御タスク。 トリガー コンポーネント: ファイルまたはデータが存在するかどうかを監視します。 ビッグデータコンポーネント これらのコンポーネントは主にビッグデータの処理と分析に使用されます。 SeaTunnel: ビッグデータの統合と処理に使用される WhaleTunnel の商用バージョンに相当します。 AWS EMR: Amazon EMR 統合。 HiveCli: Hive コマンドライン タスク。 Spark: Spark タスク。 Flink: Flink タスク。 DataX: データ同期タスク。 コンテナコンポーネント これらのコンポーネントは、コンテナ環境でタスクを実行するために使用されます。 K8S: Kubernetes タスク。 データ品質コンポーネント データの品質を確保するために使用されます: DataQuality: データ品質チェックタスク。 インタラクティブコンポーネント これらのコンポーネントは、データ サイエンスおよび機械学習環境と対話するために使用されます。 Jupyter: Jupyter Notebook タスク。 Zeppelin: Zeppelin Notebook タスク。 機械学習コンポーネント これらのコンポーネントは、機械学習タスクの管理と実行に使用されます。 Kubeflow: Kubeflow タスク。 MlFlow: MlFlow タスク。 Dvc: データ バージョン管理タスク。 全体として、Apache DolphinScheduler は、スクリプト実行、ビッグデータ処理、機械学習などの領域をカバーする 30 ~ 40 のコンポーネントをサポートしています。詳細については、 にアクセスして詳細なドキュメントを参照してください。 公式 Web サイト タスクタイプの抽象化 Apache DolphinScheduler では、さまざまなランタイム環境とニーズに合わせて、タスク タイプが複数の処理モードに抽象化されます。 以下では、タスクタイプの抽象化と実行プロセスを詳しく紹介します。 ワーカーは、サーバー上にデプロイされた JVM サービスです。一部のスクリプト コンポーネント (Shell や Python など) やローカルで実行されるタスク (Spark Local など) では、別のプロセスが開始されて実行されます。 この時点で、ワーカーはプロセス ID (PID) を通じてこれらのタスクと対話します。 データ ソースが異なれば、適応も異なります。SQL およびストアド プロシージャ タスクについては、MySQL、PostgreSQL、AWS Redshift などのさまざまなデータ ソースの処理を抽象化しました。この抽象化により、さまざまなデータベース タイプを柔軟に適応および拡張できます。 、AWS EMR、SeaTunnel クラスター、Kubernetes クラスターなどのリモート クラスターで実行されるタスクを指します。ワーカーはこれらのタスクをローカルで実行するのではなく、リモート クラスターに送信し、そのステータスとメッセージを監視します。このモードは、スケーラビリティが求められるクラウド環境に特に適しています。 リモート タスクとは タスク実行 ログ収集 プラグインによって使用する処理モードが異なるため、ログの収集もそれに応じて異なります。 ローカル プロセス: プロセス出力を監視することでログが記録されます。 リモート タスク: リモート クラスター (AWS EMR など) からのタスクのステータスと出力を定期的にチェックし、ローカル タスク ログに記録することでログが収集されます。 パラメータ変数の置換 システムはタスク ログをスキャンして、動的に置き換える必要があるパラメータ変数を特定します。たとえば、DAG 内のタスク A は、下流のタスク B に渡す必要がある出力パラメータを生成する場合があります。 このプロセス中に、システムはログを読み取り、必要に応じてパラメータ変数を置き換えます。 タスクIDを取得しています ローカル プロセス: プロセス ID (PID) が取得されます。 リモートタスク: リモートタスクの ID (例: AWS EMR タスク ID) が取得されます。 これらのタスク ID を保持することで、さらにデータ クエリやリモート タスク操作が可能になります。たとえば、ワークフローが停止すると、タスク ID を使用して対応するキャンセル API を呼び出して、実行中のタスクを終了できます。 フォールトトレランス処理 ローカル プロセス: ワーカー ノードに障害が発生した場合、ローカル プロセスはそれを認識できないため、タスクを再送信する必要があります。 リモート タスク: タスクがリモート クラスター (AWS など) で実行されている場合、タスク ID を使用してタスクのステータスを確認し、タスクの引き継ぎを試みることができます。成功した場合は、タスクを再送信する必要がないため、時間を節約できます。 タスク実行完了 タスクが実行された後、いくつかの完了アクションが必要になります。 タスク完了チェック: システムはアラートを送信する必要があるかどうかを確認します。たとえば、SQL タスクの場合、クエリ結果によってアラートがトリガーされると、システムは RPC を介してアラート サービスと対話し、アラート メッセージを送信します。 イベント フィードバック: ワーカーはタスク完了イベント (終了イベント) をマスターに返します。マスターはデータベース内のタスク ステータスを更新し、DAG ステータスの遷移を続行します。 コンテキストのクリーンアップ: ワーカーは、タスクの開始時に作成されたタスク コンテキストをメモリから削除します。また、タスク実行中に生成されたファイル パスもクリーンアップします。デバッグ モード (開発モード) の場合、これらのファイルはクリーンアップされないので、失敗したタスクのトラブルシューティングが可能になります。 これらの手順により、タスク インスタンスの実行プロセス全体が完了します。 コミュニティへの貢献 Apache DolphinScheduler に興味があり、オープンソース コミュニティに貢献したい場合は、貢献ガイドラインを参照してください。 コミュニティでは、次のような積極的な貢献を奨励しています (ただし、これらに限定されません)。 使用中に発生した問題を報告します。 ドキュメントとコードの PR を送信します。 ユニットテスト (UT) を追加します。 コードコメントを追加します。 バグを修正したり、新しい機能を追加したりします。 技術記事を書いたり、Meetups に参加したりします。 新規貢献者向けガイド 新しい貢献者の場合は、コミュニティの GitHub の問題で というラベルの付いた問題を検索できます。これらの問題は一般的に単純で、初めて貢献するユーザーに適しています。 good first issue 要約すると、Apache DolphinScheduler の全体的な設計と Worker タスクの詳細な実行プロセスについて学習しました。 このコンテンツが、Apache DolphinScheduler の理解と使用に役立つことを願っています。ご質問がある場合は、コメント セクションからお気軽にお問い合わせください。