皆さん、こんにちは。私は Cai Shunfeng です。WhaleOps のシニア データ エンジニアであり、Apache DolphinScheduler コミュニティのコミッター兼 PMC メンバーでもあります。今日は、Apache DolphinScheduler の Worker タスクの仕組みについて説明します。
この説明は 3 つのセクションに分かれています。
Apache DolphinScheduler は、分散型で簡単に拡張できるビジュアル ワークフロー スケジューリング オープン ソース システムであり、エンタープライズ レベルのシナリオに適しています。
以下の主要機能を提供し、視覚的な操作を通じてワークフローとタスクのライフサイクル全体のデータ処理ソリューションを提供します。
使いやすい
視覚的な DAG 操作: ユーザーはページ上でコンポーネントをドラッグ アンド ドロップして、DAG (有向非巡回グラフ) に配置できます。
プラグイン システム: タスク プラグイン、データ ソース プラグイン、アラート プラグイン、ストレージ プラグイン、レジストリ センター プラグイン、cron ジョブ プラグインなどが含まれます。ユーザーは、ビジネス要件を満たすために必要に応じてプラグインを簡単に拡張できます。
豊富な使用シナリオ
静的構成: ワークフローのスケジュール設定、オンラインおよびオフライン操作、バージョン管理、バックフィル機能が含まれます。
ランタイム操作: 一時停止、停止、再開、パラメータ置換などの機能を提供します。
依存関係の種類: 豊富な依存関係オプションと戦略をサポートし、より多くのシナリオに適応します。
パラメータの受け渡し: ワークフロー レベルでの起動パラメータ、グローバル パラメータ、タスク レベルでのローカル パラメータ、および動的パラメータの受け渡しをサポートします。
高い信頼性
分散設計: すべてのサービスはステートレスであり、水平方向にスケーリングしてシステム スループットを向上させることができます。
過負荷保護とインスタンスフォールトトレランス:
過負荷保護: 動作中、マスターとワーカーは自身の CPU とメモリの使用状況、およびタスク量を監視します。過負荷になると、現在のワークフロー/タスク処理を一時停止し、回復後に再開します。
インスタンスのフォールト トレランス: マスター/ワーカー ノードに障害が発生すると、レジストリ センターはサービス ノードがオフラインであることを検出し、ワークフローまたはタスク インスタンスのフォールト トレランスを実行して、システムの自己回復機能を最大限に確保します。
次に、全体的な設計背景を紹介しましょう。以下は公式サイトで提供されている設計アーキテクチャ図です。
アーキテクチャ図から、Apache DolphinScheduler はいくつかの主要コンポーネントで構成されていることがわかります。
API コンポーネント: API サービスは主にメタデータを管理し、API サービスを介して UI と対話したり、API インターフェイスを呼び出してワークフロー タスクやワークフローに必要なさまざまなリソースを作成したりします。
マスター コンポーネント: マスターはワークフロー インスタンスのコントローラーであり、コマンドの使用、コマンドのワークフロー インスタンスへの変換、DAG 分割の実行、タスクの順序どおりの送信、およびタスクのワーカーへの配布を担当します。
ワーカー コンポーネント: ワーカーは特定のタスクの実行者です。タスクを受け取った後、さまざまなタスク タイプに従ってタスクを処理し、マスターと対話し、タスクの状態を報告します。特に、ワーカー サービスはデータベースと対話しません。データベースと対話するのは、API、マスター、およびアラート サービスのみです。
アラート サービス: アラート サービスは、さまざまなアラート プラグインを通じてアラートを送信します。これらのサービスはレジストリ センターに登録され、マスターとワーカーは定期的にハートビートと現在のステータスを報告して、タスクを正常に受信できることを確認します。
マスターとワーカー間の相互作用プロセスは次のとおりです。
タスクの送信: マスターは DAG 分割を完了すると、タスクをデータベースに送信し、さまざまな分散戦略に基づいて適切なワーカー グループを選択してタスクを分散します。
タスクの受信: ワーカーはタスクを受け取った後、その状態に基づいてタスクを受け入れるかどうかを決定します。受け入れが成功したかどうかに応じてフィードバックが提供されます。
タスク実行: ワーカーはタスクを処理し、ステータスを実行中に更新して、マスターにフィードバックします。マスターは、データベース内のタスクのステータスと開始時刻情報を更新します。
タスク完了: タスクが完了すると、ワーカーはマスターに終了イベント通知を送信し、マスターは ACK 確認を返します。ACK が受信されない場合、ワーカーはタスク イベントが失われないように再試行を続けます。
ワーカーがタスクを受け取ると、次の操作が実行されます。
ワーカーは過負荷かどうかをチェックし、過負荷の場合はタスクを拒否します。タスク分散失敗のフィードバックを受け取った後、マスターは分散戦略に基づいてタスク分散用に別のワーカーを選択し続けます。
ワーカー タスクの具体的な実行プロセスには、次の手順が含まれます。
次に、具体的なタスク実行プロセスについて詳しく説明します。
タスクの実行が始まる前に、まずコンテキストが初期化されます。この時点で、タスクの開始時刻が設定されます。タスクの精度を確保するには、時間のずれを避けるためにマスターとワーカーの間で時間を同期する必要があります。
その後、タスクのステータスが実行中に設定され、タスクの実行が開始されたことを通知するためにマスターにフィードバックされます。
ほとんどのタスクは Linux オペレーティング システム上で実行されるため、テナントおよびファイルの処理が必要です。
テナントを処理した後、ワーカーは特定の実行ディレクトリを作成します。実行ディレクトリのルート ディレクトリは構成可能であり、適切な承認が必要です。デフォルトでは、ディレクトリのアクセス許可は 755 に設定されています。
タスクの実行中、AWS S3 または HDFS クラスターからのファイルの取得など、さまざまなリソース ファイルが必要になる場合があります。システムは、後続のタスクで使用するためにこれらのファイルをワーカーの一時ディレクトリにダウンロードします。
Apache DolphinScheduler では、パラメータ変数を置き換えることができます。主なカテゴリは次のとおりです。
上記の手順により、タスクの実行環境と必要なリソースが準備され、タスクの実行を正式に開始できるようになります。
Apache DolphinScheduler では、さまざまなタイプのタスクがサポートされており、それぞれ異なるシナリオと要件に適用できます。以下では、いくつかの主要なタスク タイプとその特定のコンポーネントを紹介します。
これらのコンポーネントは、さまざまなスクリプト言語やプロトコルに適したスクリプト ファイルを実行するためによく使用されます。
商用版 (WhaleScheduler) では、JAR パッケージを実行することで Java アプリケーションの実行もサポートされます。
これらのコンポーネントは、論理制御とワークフロー管理を実装するために使用されます。
これらのコンポーネントは主にビッグデータの処理と分析に使用されます。
これらのコンポーネントは、コンテナ環境でタスクを実行するために使用されます。
データの品質を確保するために使用されます:
これらのコンポーネントは、データ サイエンスおよび機械学習環境と対話するために使用されます。
これらのコンポーネントは、機械学習タスクの管理と実行に使用されます。
全体として、Apache DolphinScheduler は、スクリプト実行、ビッグデータ処理、機械学習などの領域をカバーする 30 ~ 40 のコンポーネントをサポートしています。詳細については、公式 Web サイトにアクセスして詳細なドキュメントを参照してください。
Apache DolphinScheduler では、さまざまなランタイム環境とニーズに合わせて、タスク タイプが複数の処理モードに抽象化されます。
以下では、タスクタイプの抽象化と実行プロセスを詳しく紹介します。
ワーカーは、サーバー上にデプロイされた JVM サービスです。一部のスクリプト コンポーネント (Shell や Python など) やローカルで実行されるタスク (Spark Local など) では、別のプロセスが開始されて実行されます。
この時点で、ワーカーはプロセス ID (PID) を通じてこれらのタスクと対話します。
データ ソースが異なれば、適応も異なります。SQL およびストアド プロシージャ タスクについては、MySQL、PostgreSQL、AWS Redshift などのさまざまなデータ ソースの処理を抽象化しました。この抽象化により、さまざまなデータベース タイプを柔軟に適応および拡張できます。
リモート タスクとは、AWS EMR、SeaTunnel クラスター、Kubernetes クラスターなどのリモート クラスターで実行されるタスクを指します。ワーカーはこれらのタスクをローカルで実行するのではなく、リモート クラスターに送信し、そのステータスとメッセージを監視します。このモードは、スケーラビリティが求められるクラウド環境に特に適しています。
ログ収集
プラグインによって使用する処理モードが異なるため、ログの収集もそれに応じて異なります。
ローカル プロセス: プロセス出力を監視することでログが記録されます。
リモート タスク: リモート クラスター (AWS EMR など) からのタスクのステータスと出力を定期的にチェックし、ローカル タスク ログに記録することでログが収集されます。
パラメータ変数の置換
システムはタスク ログをスキャンして、動的に置き換える必要があるパラメータ変数を特定します。たとえば、DAG 内のタスク A は、下流のタスク B に渡す必要がある出力パラメータを生成する場合があります。
このプロセス中に、システムはログを読み取り、必要に応じてパラメータ変数を置き換えます。
タスクIDを取得しています
これらのタスク ID を保持することで、さらにデータ クエリやリモート タスク操作が可能になります。たとえば、ワークフローが停止すると、タスク ID を使用して対応するキャンセル API を呼び出して、実行中のタスクを終了できます。
フォールトトレランス処理
タスクが実行された後、いくつかの完了アクションが必要になります。
タスク完了チェック: システムはアラートを送信する必要があるかどうかを確認します。たとえば、SQL タスクの場合、クエリ結果によってアラートがトリガーされると、システムは RPC を介してアラート サービスと対話し、アラート メッセージを送信します。
イベント フィードバック: ワーカーはタスク完了イベント (終了イベント) をマスターに返します。マスターはデータベース内のタスク ステータスを更新し、DAG ステータスの遷移を続行します。
コンテキストのクリーンアップ: ワーカーは、タスクの開始時に作成されたタスク コンテキストをメモリから削除します。また、タスク実行中に生成されたファイル パスもクリーンアップします。デバッグ モード (開発モード) の場合、これらのファイルはクリーンアップされないので、失敗したタスクのトラブルシューティングが可能になります。
これらの手順により、タスク インスタンスの実行プロセス全体が完了します。
Apache DolphinScheduler に興味があり、オープンソース コミュニティに貢献したい場合は、貢献ガイドラインを参照してください。
コミュニティでは、次のような積極的な貢献を奨励しています (ただし、これらに限定されません)。
新しい貢献者の場合は、コミュニティの GitHub の問題でgood first issue
というラベルの付いた問題を検索できます。これらの問題は一般的に単純で、初めて貢献するユーザーに適しています。
要約すると、Apache DolphinScheduler の全体的な設計と Worker タスクの詳細な実行プロセスについて学習しました。
このコンテンツが、Apache DolphinScheduler の理解と使用に役立つことを願っています。ご質問がある場合は、コメント セクションからお気軽にお問い合わせください。