paint-brush
详解 Apache DolphinScheduler 中的 Worker 任务执行经过@williamguo
117 讀數

详解 Apache DolphinScheduler 中的 Worker 任务执行

经过 William Guo9m2024/08/23
Read on Terminal Reader

太長; 讀書

Apache DolphinScheduler 是一个开源工作流调度系统,以其可视化 DAG 操作和可扩展插件而闻名。本文探讨了 Worker 任务从任务初始化到完成的详细执行过程,重点介绍了系统的架构、任务类型和容错机制。这些内容对于了解如何使用 DolphinScheduler 有效地管理和优化工作流至关重要。
featured image - 详解 Apache DolphinScheduler 中的 Worker 任务执行
William Guo HackerNoon profile picture
0-item
1-item


大家好,我是蔡顺峰,WhaleOps 高级数据工程师,Apache DolphinScheduler 社区 Committer 和 PMC 成员。今天我来为大家讲解一下 Apache DolphinScheduler 的 Worker 任务是如何运作的。

本讲解将分为三个部分:


  1. Apache DolphinScheduler 简介
  2. Apache DolphinScheduler 总体设计概述
  3. Worker任务执行详细流程

项目简介

Apache DolphinScheduler是一个分布式、易扩展、可视化的工作流调度开源系统,适用于企业级场景。



它提供以下关键功能,通过可视化操作为工作流和任务提供全生命周期的数据处理解决方案。

主要特点

  • 便于使用

  • 可视化 DAG 操作:用户可以在页面上拖放组件,将它们排列成 DAG(有向无环图)。

  • 插件系统:包含任务插件、数据源插件、提醒插件、存储插件、注册中心插件、计划任务插件等,用户可以根据需要轻松扩展插件,满足业务需求。


  • 丰富的使用场景

  • 静态配置:包括工作流调度、在线离线操作、版本管理、回填功能。

  • 运行时操作:提供暂停、停止、恢复和参数替换等功能。

  • 依赖类型:支持丰富的依赖选项和策略,适应更多的场景。

  • 参数传递:支持工作流级别的启动参数、全局参数、任务级别的局部参数、动态参数传递。


  • 高可靠性

  • 去中心化设计:所有服务都是无状态的,可以水平扩展以增加系统吞吐量。

  • 过载保护和实例容错:

  • 过载保护:Master 和 Worker 在运行过程中,会监控自身的 CPU 和内存使用情况,以及任务量,若超载则暂停当前工作流/任务处理,待恢复后再继续运行。

  • 实例容错:当主/工作节点发生故障时,注册中心可以感知服务节点离线,并对工作流或任务实例进行容错处理,尽可能保证系统的自我恢复能力。

总体设计

项目架构

接下来介绍一下整体的设计背景,下面是官网提供的设计架构图。


从架构图中我们可以看到Apache DolphinScheduler由几个主要组件组成:

  • API组件:API服务主要管理元数据,通过API服务与UI交互,或者调用API接口创建工作流任务以及工作流所需的各种资源。


  • Master 组件:Master是工作流实例的控制者,负责消费命令、转换成工作流实例、进行DAG拆分、按顺序提交任务、将任务分发给Worker。


  • Worker 组件:Worker 是具体任务的执行者,接收到任务后根据不同的任务类型进行处理,并与 Master 交互,汇报任务状态。需要注意的是,Worker 服务不与数据库交互,只有 API、Master 和告警服务与数据库交互。


  • 告警服务:告警服务通过不同的告警插件发送告警,这些服务向注册中心注册,Master和Worker定期汇报心跳和当前状态,保证能正常接收任务。

主从交互过程

Master与Worker交互过程如下:

  • 任务提交:Master完成DAG分裂后,向数据库提交任务,并根据不同的分发策略选择合适的Worker Group进行任务分发。


  • 任务接收:worker接收到任务后,根据任务情况判断是否接收,接收成功与否进行反馈。


  • 任务执行:worker处理任务,更新状态为running,并反馈给master,master在数据库中更新任务状态和开始时间信息。


  • 任务完成:任务完成后,worker 会向 master 发送完成事件通知,master 返回 ACK 确认,若没有收到 ACK,worker 会不断重试,保证任务事件不丢失。

工人任务接收

当worker收到任务后,会进行如下操作:

  • 填写其主机信息。
  • 在工作机器上生成日志路径。
  • 生成WorkerTaskExecutor,提交到线程池中执行。


Worker检查自己是否超载,若是,则拒绝该任务。Master收到任务分发失败反馈后,继续按照分发策略选择其他Worker进行任务分发。

工人执行过程

Worker任务的具体执行过程包括以下步骤:

  1. 任务初始化:初始化任务所需的环境和依赖项。
  2. 任务执行:执行具体的任务逻辑。
  3. 任务完成:任务执行完成后,向主节点汇报任务执行结果。


接下来我们来细说一下具体的任务执行流程。


在任务开始执行之前,首先会初始化一个context,此时会设置任务的开始时间,为了保证任务的准确性,需要master和worker进行时间同步,避免出现时间漂移。


随后将任务状态设置为运行中,并反馈给master通知任务已经开始运行。


由于大多数任务都在Linux操作系统上运行,因此需要进行租户和文件处理:

  • 租户处理:首先检查租户是否存在,若不存在则根据配置决定是否自动创建租户,这需要部署用户有 sudo 权限,才能在任务执行过程中切换到指定的租户。
  • 特定用户:某些场景下,不需要切换租户,直接用特定用户执行任务即可,系统也支持此功能。

Worker处理完tenant后,会创建具体的执行目录,执行目录的根目录是可配置的,需要适当的授权,默认情况下,目录权限设置为755。


在任务执行过程中,可能需要各种资源文件,比如从AWS S3或者HDFS集群中获取文件,系统会将这些文件下载到worker的临时目录中,以供后续任务使用。


在Apache DolphinScheduler中,参数变量是可以替换的,主要包括以下类别:

  • 内置参数:主要涉及时间和日期相关参数的替换。
  • 用户定义参数:用户在工作流或任务中设置的参数变量也将被相应替换。

通过以上步骤,任务的执行环境和所需要的资源已经准备好,任务可以正式开始执行了。

不同类型的任务

在 Apache DolphinScheduler 中,支持多种类型的任务,每种类型适用于不同的场景和需求。下面我们介绍几种主要的任务类型及其具体组件。


这些组件通常用于执行脚本文件,适用于各种脚本语言和协议:

  • Shell:执行shell脚本。
  • Python:执行 Python 脚本。
  • SQL:执行 SQL 语句。
  • 存储过程:执行数据库存储过程。
  • HTTP:执行 HTTP 请求。

商业版本(WhaleScheduler)也支持通过执行JAR包来运行Java应用程序。

逻辑任务组件

这些组件用于实现逻辑控制和工作流管理:

  • 开关:条件控制任务。
  • Dependent:依赖任务。
  • SubProcess:子任务。
  • NextLoop(商业版):适合金融场景的循环控制任务。
  • 触发组件:监视文件或数据是否存在。

大数据组件

这些组件主要用于大数据处理和分析:

  • SeaTunnel:对应WhaleTunnel的商业版本,用于大数据的整合与处理。
  • AWS EMR:Amazon EMR 集成。
  • HiveCli:Hive 命令行任务。
  • Spark:Spark任务。
  • Flink:Flink 任务。
  • DataX:数据同步任务。

容器组件

这些组件用于在容器环境中运行任务:

  • K8S:Kubernetes 任务。

数据质量组件

用于确保数据质量:

  • DataQuality:数据质量检查任务。

交互式组件

这些组件用于与数据科学和机器学习环境交互:

  • Jupyter:Jupyter Notebook 任务。
  • Zeppelin:Zeppelin Notebook任务。

机器学习组件

这些组件用于机器学习任务的管理和执行:

  • Kubeflow:Kubeflow 任务。
  • MlFlow:MlFlow任务。
  • Dvc:数据版本控制任务。

总体来说,Apache DolphinScheduler 支持三四十个组件,涵盖脚本执行、大数据处理、机器学习等领域。更多信息请关注官网查看详细文档。

任务类型抽象

在Apache DolphinScheduler中,任务类型被抽象为多种处理模式,以适应各种运行环境和需求。

下面我们详细介绍任务类型的抽象和执行过程。


Worker 是部署在服务器上的一个 JVM 服务,对于一些脚本组件(比如 Shell、Python)以及本地运行的任务(比如 Spark Local)都会启动一个单独的进程来运行。


此时,worker通过进程ID(PID)与这些任务进行交互。


不同的数据源可能需要不同的适配,对于 SQL 和存储过程任务,我们抽象了针对不同数据源的处理,比如 MySQL、PostgreSQL、AWS Redshift 等,这种抽象可以灵活适配和扩展不同的数据库类型。


远程任务是指在远程集群上执行的任务,例如 AWS EMR、SeaTunnel 集群、Kubernetes 集群等。Worker 不会在本地执行这些任务,而是将其提交到远程集群并监控其状态和消息。此模式特别适合需要可扩展性的云环境。

任务执行

日志收集

不同的插件采用不同的处理方式,因此日志的采集方式也有所不同:

  • 本地进程:通过监控进程输出来记录日志。

  • 远程任务:通过定期检查远程集群(例如 AWS EMR)的任务状态和输出并将其记录在本地任务日志中来收集日志。


参数变量替换

系统扫描任务日志,识别需要动态替换的参数变量,例如DAG中的Task A可能会生成一些输出参数,需要传递给下游的Task B。

在此过程中,系统读取日志并根据需要替换参数变量。


检索任务 ID

  • 本地进程:检索进程 ID (PID)。
  • 远程任务:检索远程任务的 ID(例如,AWS EMR 任务 ID)。

保存这些任务 ID 可以用于进一步的数据查询和远程任务操作。例如,当工作流停止时,可以使用任务 ID 调用相应的取消 API 来终止正在运行的任务。


容错处理

  • 本地进程:如果 Worker 节点发生故障,本地进程将不会感知到,需要重新提交任务。
  • 远程任务:如果任务在远程集群(例如 AWS)上运行,则可以使用任务 ID 检查任务状态,并尝试接管任务。如果成功,则无需重新提交任务,从而节省时间。

任务执行完成

执行任务后,需要执行几个完成操作:

  • 任务完成检查:系统会检查是否需要发送告警,例如对于一个SQL任务,如果查询结果触发了告警,系统会通过RPC与告警服务进行交互,发送告警消息。

  • 事件反馈:Worker 会向 Master 回发任务完成事件(finish event),Master 更新数据库中的任务状态,并继续进行 DAG 状态转换。

  • 上下文清理:Worker 会从内存中删除任务开始时创建的任务上下文,还会清理任务执行过程中生成的文件路径,如果处于调试模式(开发模式),这些文件不会被清理,从而可以对失败的任务进行故障排除。


通过以上步骤,一个任务实例的整个执行过程就完成了。

社区贡献

如果你对 Apache DolphinScheduler 感兴趣,并希望为开源社区做出贡献,欢迎参考我们的贡献指南。


社区鼓励积极贡献,包括但不限于:

  • 报告使用过程中遇到的问题。
  • 提交文档和代码 PR。
  • 添加单元测试(UT)。
  • 添加代码注释。
  • 修复错误或添加新功能。
  • 撰写技术文章或参加 Meetups。

新贡献者指南

对于新贡献者,您可以在社区的 GitHub 问题中搜索标记为good first issue问题。这些问题通常比较简单,适合首次贡献的用户。


综上所述,我们了解了Apache DolphinScheduler的整体设计以及Worker任务的详细执行流程。

希望本内容能帮助您更好地理解和使用 Apache DolphinScheduler。如果您有任何疑问,请随时在评论部分与我联系。