Apache DolphinScheduler is an open-source distributed, scalable, and visual DAG-based workflow scheduling system for enterprise-level scenarios. It provides a visual solution for task operation, workflow management, and the full lifecycle of data processing.
The official definition of a Directed Acyclic Graph (DAG) is as follows:
A graph is formed by vertices and edges connecting pairs of vertices, where the vertices can be any kind of object that is connected in pairs by edges. In a directed graph, each edge has an orientation, from one vertex to another. A path in a directed graph is a sequence of edges where the ending vertex of each edge is the starting vertex of the next one. If the starting vertex of the first edge equals the ending vertex of the last, the path forms a cycle.
A directed acyclic graph (DAG) is a directed graph without cycles.
A vertex in a directed graph is said to be reachable from another vertex if a path exists between them. If a vertex can reach itself via a nontrivial path, then the graph contains a cycle. A DAG is a graph where no vertex can reach itself via a nontrivial path.
In DolphinScheduler, the DAG structure is represented by the following data structure:
public class DAG<Node, NodeInfo, EdgeInfo> {
private final ReadWriteLock lock = new ReentrantReadWriteLock();
/**
* node map, key is node, value is node information
*/
private final Map<Node, NodeInfo> nodesMap;
/**
* edge map, key is origin node, value is map with key for destination node and value for edge
*/
private final Map<Node, Map<Node, EdgeInfo>> edgesMap;
/**
* reversed edge set, key is destination node, value is map with key for origin node and value for edge
*/
private final Map<Node, Map<Node, EdgeInfo>> reverseEdgesMap;
}
In enterprise data warehouse construction, tasks are often layered, and there are many dependencies between tasks due to the widespread distribution of business logic and the diversity of data storage types. A DAG becomes the best structure for storing task dependencies and scheduling information.
In a DAG, each node represents a specific scheduling task, and the edges represent dependencies between tasks. Traversing the DAG structure corresponds to executing the data warehouse tasks.
Apache DolphinScheduler has two core roles: MasterServer and WorkerServer. Following a modular design, the Master focuses on DAG task splitting and task submission, while the Worker handles task execution and log services.
The core task execution flow is as follows (based on the official documentation):
Given the complexity of task scheduling, a large process can be broken down into smaller sub-processes. In addition to the main process, there are also auxiliary sub-processes. The following is an analysis of the execution scheduling process, making it easier to understand.
Commands are distributed asynchronously across Master servers.
The API server encapsulates the user’s HTTP request to run a workflow into command data and inserts it into the t_ds_command
table. Here's an example of a command to start a workflow instance:
{
"commandType": "START_PROCESS",
"processDefinitionCode": 14285512555584,
"executorId": 1,
"commandParam": "{}",
"taskDependType": "TASK_POST",
"failureStrategy": "CONTINUE",
"warningType": "NONE",
"startTime": 1723444881372,
"processInstancePriority": "MEDIUM",
"updateTime": 1723444881372,
"workerGroup": "default",
"tenantCode": "default",
"environmentCode": -1,
"dryRun": 0,
"processInstanceId": 0,
"processDefinitionVersion": 1,
"testFlag": 0
}
In the Master server, the MasterSchedulerBootstrap
loop program runs. The MasterSchedulerBootstrap
assigns a slot using ZooKeeper and selects the commands that belong to its slot from the t_ds_command
table using the following query:
<select id="queryCommandPageBySlot" resultType="org.apache.dolphinscheduler.dao.entity.Command">
select *
from t_ds_command
where id % #{masterCount} = #{thisMasterSlot}
order by process_instance_priority, id asc
limit #{limit}
</select>
The MasterSchedulerBootstrap loop
polls and finds pending command tasks, generates a ProcessInstance
with the command task and master host, and inserts the ProcessInstance
object into the t_ds_process_instance
table.
It also generates an executable task, workflowExecuteRunnable
, that contains all the required context information. This workflowExecuteRunnable
is cached locally in the processInstanceExecCacheManager
. At the same time, the ProcessInstance
's event type WorkflowEventType.START_WORKFLOW
is produced and sent to the workflowEventQueue
.
The cache is implemented via ProcessInstanceExecCacheManagerImpl
and provides the following core functionalities:
public interface ProcessInstanceExecCacheManager {
/**
* get WorkflowExecuteThread by process instance id
*
* @param processInstanceId processInstanceId
* @return WorkflowExecuteThread
*/
WorkflowExecuteRunnable getByProcessInstanceId(int processInstanceId);
/**
* judge the process instance does it exist
*
* @param processInstanceId processInstanceId
* @return true - if process instance id exists in cache
*/
boolean contains(int processInstanceId);
/**
* remove cache by process instance id
*
* @param processInstanceId processInstanceId
*/
void removeByProcessInstanceId(int processInstanceId);
/**
* cache
*
* @param processInstanceId processInstanceId
* @param workflowExecuteThread if it is null, will not be cached
*/
void cache(int processInstanceId, @NonNull WorkflowExecuteRunnable workflowExecuteThread);
/**
* get all WorkflowExecuteThread from cache
*
* @return all WorkflowExecuteThread in cache
*/
Collection<WorkflowExecuteRunnable> getAll();
void clearCache();
}
The MasterSchedulerBootstrap
loop transforms commands into tasks with the necessary context and submits them for execution.
The EventExecuteService
adds tasks without dependencies to the standByTaskInstancePriorityQueue
, which are then dispatched in order of priority to the globalTaskDispatchWaitingQueue
.
Once tasks reach the globalTaskDispatchWaitingQueue
, they have already become the smallest executable task units.
The EventExecuteService
traverses the DAG using a breadth-first search to submit tasks to the globalTaskDispatchWaitingQueue
.
The consumer is GlobalTaskDispatchWaitingQueueLooper
, which consumes tasks waiting to be dispatched from the globalTaskDispatchWaitingQueue
. Task dispatching is executed based on the task type, using an RPC interface. Currently, there are two types of dispatchers:
For the WorkerTaskDispatcher, after the RPC server receives the RPC request, the task is submitted to the workerTaskExecutorThreadPool
for execution. This makes it an asynchronous task-processing method, so the master server does not hang at this point. The task execution progress is tracked, with callbacks occurring at key points in the process.
When a task is dispatched to the Worker, it is asynchronously submitted to the thread pool for execution. During various stages of asynchronous task execution, the Worker communicates the task status back to the Master by calling the RPC interface.
In the asynchronous execution node of the Worker, the task execution status callback includes four possible states:
Note: In the official event flowchart, the direction of the ACK was incorrect. The ACK is not sent from the Worker to the Master; rather, the Master notifies the Worker that it has finished processing this event status.
After making this correction, the overall process can be summarized as shown in the following diagram:
On the master node, the ITaskInstanceExecutionEventListener
service receives RPC requests and adds tasks to the TaskEventService
event queue for further processing.
The buffer queue is the TaskEventService eventQueue
at the Master node.
There can be multiple producers, including:
The consumer is the TaskInstanceListenerImpl
service at the Master node. The TaskInstanceListenerImpl
transforms TaskEvent
into TaskExecuteRunnable
and submits it to the thread pool for execution in the taskExecuteThreadMap
. The execution status of the task is modified within the thread pool.