Background Note: In Dolphinscheduler, offline tasks have a complete lifecycle, such as stopping, pausing, resuming from pause, rerunning, etc., all are organized in the form of DAG (Directed Acyclic Graph) for T+1 offline tasks. Dolphinscheduler DAG Implementation org.apache.dolphinscheduler.common.graph.DAG Three important data structures of DAG: // Vertex information private final Map<Node, NodeInfo> nodesMap; // Edge association information, which records the relationship between vertices and edges, allowing to find leaf nodes and downstream nodes private final Map<Node, Map<Node, EdgeInfo>> edgesMap; // Reverse edge association information, which allows for quick finding of nodes with an in-degree of 0 (starting nodes), and also to obtain upstream nodes private final Map<Node, Map<Node, EdgeInfo>> reverseEdgesMap; Example below: DAG<String, String, String> graph = new DAG<>(); graph.addNode("A", "A"); graph.addNode("B", "B"); graph.addNode("C", "C"); // Add an edge from B to C, A is still floating graph.addEdge("B", "C"); // If you add A -> B, it actually starts from B and checks if there is a connectable line to A. If there is, it means the A -> B edge cannot be added because it would form a cycle; otherwise, it can be added. graph.addEdge("A", "B"); Source code analysis: org.apache.dolphinscheduler.common.graph.DAG#addEdge public boolean addEdge(Node fromNode, Node toNode, EdgeInfo edge, boolean createNode) { lock.writeLock().lock(); try { // TODO Whether the edge can be added if (!isLegalAddEdge(fromNode, toNode, createNode)) { log.error("serious error: add edge({} -> {}) is invalid, cause cycle!", fromNode, toNode); return false; } // TODO Add nodes addNodeIfAbsent(fromNode, null); addNodeIfAbsent(toNode, null); // TODO Add edges addEdge(fromNode, toNode, edge, edgesMap); addEdge(toNode, fromNode, edge, reverseEdgesMap); return true; } finally { lock.writeLock().unlock(); } } private boolean isLegalAddEdge(Node fromNode, Node toNode, boolean createNode) { // TODO If fromNode and toNode are the same vertex, this edge cannot be added if (fromNode.equals(toNode)) { log.error("edge fromNode({}) can't equals toNode({})", fromNode, toNode); return false; } // TODO If not creating a node, meaning fromNode and toNode must be existing vertices if (!createNode) { if (!containsNode(fromNode) || !containsNode(toNode)) { log.error("edge fromNode({}) or toNode({}) is not in vertices map", fromNode, toNode); return false; } } // Whether an edge can be successfully added(fromNode -> toNode), need to determine whether the // DAG has a cycle! // TODO Get the number of nodes int verticesCount = getNodesCount(); Queue<Node> queue = new LinkedList<>(); // TODO Put toNode into the queue queue.add(toNode); // If DAG doesn't find fromNode, it's not a cycle! // TODO When the queue is not empty, it is definitely not empty here while (!queue.isEmpty() && (--verticesCount > 0)) { // TODO Get the element in the queue Node key = queue.poll(); for (Node subsequentNode : getSubsequentNodes(key)) { // TODO Actually, it is judged that if A -> B has a connection in the DAG, and the node B is passed in, to see if B's edge has A. If there is A, it means there is already a B -> A association, and it cannot be added. If, for example, B's downstream node is A -> B -> C, then B's downstream node is C, and C needs to be put into the queue // TODO The core idea is to find the connection of the target node to be added, whether there is a connection from the target node to the source node (to judge whether there is a cycle) if (subsequentNode.equals(fromNode)) { return false; } queue.add(subsequentNode); } } return true; } Dolphinscheduler DagHelper Explanation The DAG class is a basic general-purpose DAG tool class, and DagHelper is a business tool class that assembles task definitions and relationships between task definitions into a DAG. org.apache.dolphinscheduler.server.master.graph.WorkflowGraphFactory#createWorkflowGraph public IWorkflowGraph createWorkflowGraph(ProcessInstance workflowInstance) throws Exception { // TODO Here is actually to get the number of tasks and their relationships corresponding to the process instance List<ProcessTaskRelation> processTaskRelations = processService.findRelationByCode( workflowInstance.getProcessDefinitionCode(), workflowInstance.getProcessDefinitionVersion()); // TODO Get the corresponding task definition log List<TaskDefinitionLog> taskDefinitionLogs = taskDefinitionLogDao.queryTaskDefineLogList(processTaskRelations); // TODO Get TaskNode List<TaskNode> taskNodeList = processService.transformTask(processTaskRelations, taskDefinitionLogs); // generate process to get DAG info // TODO Here is to parse whether the start node list is manually specified, which is not by default List<Long> recoveryTaskNodeCodeList = getRecoveryTaskNodeCodeList(workflowInstance.getCommandParam()); // TODO If the default startNodeNameList is empty List<Long> startNodeNameList = parseStartNodeName(workflowInstance.getCommandParam()); // TODO Build a ProcessDag object instance ProcessDag processDag = DagHelper.generateFlowDag( taskNodeList, startNodeNameList, recoveryTaskNodeCodeList, workflowInstance.getTaskDependType()); if (processDag == null) { log.error("ProcessDag is null"); throw new IllegalArgumentException("Create WorkflowGraph failed, ProcessDag is null"); } // TODO Generate DAG DAG<Long, TaskNode, TaskNodeRelation> dagGraph = DagHelper.buildDagGraph(processDag); log.debug("Build dag success, dag: {}", dagGraph); // TODO Use WorkflowGraph to encapsulate the task node list and dagGraph return new WorkflowGraph(taskNodeList, dagGraph); } org.apache.dolphinscheduler.service.utils.DagHelper#generateFlowDag public static ProcessDag generateFlowDag( List<TaskNode> totalTaskNodeList, List<Long> startNodeNameList, List<Long> recoveryNodeCodeList, TaskDependType depNodeType) throws Exception { // TODO Actually, it is to get all nodes List<TaskNode> destTaskNodeList = generateFlowNodeListByStartNode( totalTaskNodeList, startNodeNameList, recoveryNodeCodeList, depNodeType); if (destTaskNodeList.isEmpty()) { return null; } // TODO Get the relationship between task nodes List<TaskNodeRelation> taskNodeRelations = generateRelationListByFlowNodes(destTaskNodeList); // TODO Actually, it is to instantiate a ProcessDag ProcessDag processDag = new ProcessDag(); // TODO Set the edges of DAG processDag.setEdges(taskNodeRelations); // TODO Set the vertices of DAG processDag.setNodes(destTaskNodeList); return processDag; } Set destTaskNodeList and taskNodeRelations org.apache.dolphinscheduler.service.utils.DagHelper#buildDagGraph public static DAG<Long, TaskNode, TaskNodeRelation> buildDagGraph(ProcessDag processDag) { DAG<Long, TaskNode, TaskNodeRelation> dag = new DAG<>(); // TODO Add vertices if (CollectionUtils.isNotEmpty(processDag.getNodes())) { for (TaskNode node : processDag.getNodes()) { dag.addNode(node.getCode(), node); } } // TODO Add edges if (CollectionUtils.isNotEmpty(processDag.getEdges())) { for (TaskNodeRelation edge : processDag.getEdges()) { dag.addEdge(edge.getStartNode(), edge.getEndNode()); } } return dag; } Background Note: In Dolphinscheduler, offline tasks have a complete lifecycle, such as stopping, pausing, resuming from pause, rerunning, etc., all are organized in the form of DAG (Directed Acyclic Graph) for T+1 offline tasks. Dolphinscheduler DAG Implementation org.apache.dolphinscheduler.common.graph.DAG org.apache.dolphinscheduler.common.graph.DAG Three important data structures of DAG: // Vertex information private final Map<Node, NodeInfo> nodesMap; // Edge association information, which records the relationship between vertices and edges, allowing to find leaf nodes and downstream nodes private final Map<Node, Map<Node, EdgeInfo>> edgesMap; // Reverse edge association information, which allows for quick finding of nodes with an in-degree of 0 (starting nodes), and also to obtain upstream nodes private final Map<Node, Map<Node, EdgeInfo>> reverseEdgesMap; // Vertex information private final Map<Node, NodeInfo> nodesMap; // Edge association information, which records the relationship between vertices and edges, allowing to find leaf nodes and downstream nodes private final Map<Node, Map<Node, EdgeInfo>> edgesMap; // Reverse edge association information, which allows for quick finding of nodes with an in-degree of 0 (starting nodes), and also to obtain upstream nodes private final Map<Node, Map<Node, EdgeInfo>> reverseEdgesMap; Example below: DAG<String, String, String> graph = new DAG<>(); graph.addNode("A", "A"); graph.addNode("B", "B"); graph.addNode("C", "C"); // Add an edge from B to C, A is still floating graph.addEdge("B", "C"); // If you add A -> B, it actually starts from B and checks if there is a connectable line to A. If there is, it means the A -> B edge cannot be added because it would form a cycle; otherwise, it can be added. graph.addEdge("A", "B"); DAG<String, String, String> graph = new DAG<>(); graph.addNode("A", "A"); graph.addNode("B", "B"); graph.addNode("C", "C"); // Add an edge from B to C, A is still floating graph.addEdge("B", "C"); // If you add A -> B, it actually starts from B and checks if there is a connectable line to A. If there is, it means the A -> B edge cannot be added because it would form a cycle; otherwise, it can be added. graph.addEdge("A", "B"); Source code analysis: org.apache.dolphinscheduler.common.graph.DAG#addEdge public boolean addEdge(Node fromNode, Node toNode, EdgeInfo edge, boolean createNode) { lock.writeLock().lock(); try { // TODO Whether the edge can be added if (!isLegalAddEdge(fromNode, toNode, createNode)) { log.error("serious error: add edge({} -> {}) is invalid, cause cycle!", fromNode, toNode); return false; } // TODO Add nodes addNodeIfAbsent(fromNode, null); addNodeIfAbsent(toNode, null); // TODO Add edges addEdge(fromNode, toNode, edge, edgesMap); addEdge(toNode, fromNode, edge, reverseEdgesMap); return true; } finally { lock.writeLock().unlock(); } } private boolean isLegalAddEdge(Node fromNode, Node toNode, boolean createNode) { // TODO If fromNode and toNode are the same vertex, this edge cannot be added if (fromNode.equals(toNode)) { log.error("edge fromNode({}) can't equals toNode({})", fromNode, toNode); return false; } // TODO If not creating a node, meaning fromNode and toNode must be existing vertices if (!createNode) { if (!containsNode(fromNode) || !containsNode(toNode)) { log.error("edge fromNode({}) or toNode({}) is not in vertices map", fromNode, toNode); return false; } } // Whether an edge can be successfully added(fromNode -> toNode), need to determine whether the // DAG has a cycle! // TODO Get the number of nodes int verticesCount = getNodesCount(); Queue<Node> queue = new LinkedList<>(); // TODO Put toNode into the queue queue.add(toNode); // If DAG doesn't find fromNode, it's not a cycle! // TODO When the queue is not empty, it is definitely not empty here while (!queue.isEmpty() && (--verticesCount > 0)) { // TODO Get the element in the queue Node key = queue.poll(); for (Node subsequentNode : getSubsequentNodes(key)) { // TODO Actually, it is judged that if A -> B has a connection in the DAG, and the node B is passed in, to see if B's edge has A. If there is A, it means there is already a B -> A association, and it cannot be added. If, for example, B's downstream node is A -> B -> C, then B's downstream node is C, and C needs to be put into the queue // TODO The core idea is to find the connection of the target node to be added, whether there is a connection from the target node to the source node (to judge whether there is a cycle) if (subsequentNode.equals(fromNode)) { return false; } queue.add(subsequentNode); } } return true; } public boolean addEdge(Node fromNode, Node toNode, EdgeInfo edge, boolean createNode) { lock.writeLock().lock(); try { // TODO Whether the edge can be added if (!isLegalAddEdge(fromNode, toNode, createNode)) { log.error("serious error: add edge({} -> {}) is invalid, cause cycle!", fromNode, toNode); return false; } // TODO Add nodes addNodeIfAbsent(fromNode, null); addNodeIfAbsent(toNode, null); // TODO Add edges addEdge(fromNode, toNode, edge, edgesMap); addEdge(toNode, fromNode, edge, reverseEdgesMap); return true; } finally { lock.writeLock().unlock(); } } private boolean isLegalAddEdge(Node fromNode, Node toNode, boolean createNode) { // TODO If fromNode and toNode are the same vertex, this edge cannot be added if (fromNode.equals(toNode)) { log.error("edge fromNode({}) can't equals toNode({})", fromNode, toNode); return false; } // TODO If not creating a node, meaning fromNode and toNode must be existing vertices if (!createNode) { if (!containsNode(fromNode) || !containsNode(toNode)) { log.error("edge fromNode({}) or toNode({}) is not in vertices map", fromNode, toNode); return false; } } // Whether an edge can be successfully added(fromNode -> toNode), need to determine whether the // DAG has a cycle! // TODO Get the number of nodes int verticesCount = getNodesCount(); Queue<Node> queue = new LinkedList<>(); // TODO Put toNode into the queue queue.add(toNode); // If DAG doesn't find fromNode, it's not a cycle! // TODO When the queue is not empty, it is definitely not empty here while (!queue.isEmpty() && (--verticesCount > 0)) { // TODO Get the element in the queue Node key = queue.poll(); for (Node subsequentNode : getSubsequentNodes(key)) { // TODO Actually, it is judged that if A -> B has a connection in the DAG, and the node B is passed in, to see if B's edge has A. If there is A, it means there is already a B -> A association, and it cannot be added. If, for example, B's downstream node is A -> B -> C, then B's downstream node is C, and C needs to be put into the queue // TODO The core idea is to find the connection of the target node to be added, whether there is a connection from the target node to the source node (to judge whether there is a cycle) if (subsequentNode.equals(fromNode)) { return false; } queue.add(subsequentNode); } } return true; } Dolphinscheduler DagHelper Explanation The DAG class is a basic general-purpose DAG tool class, and DagHelper is a business tool class that assembles task definitions and relationships between task definitions into a DAG. The DAG class is a basic general-purpose DAG tool class, and DagHelper is a business tool class that assembles task definitions and relationships between task definitions into a DAG. org.apache.dolphinscheduler.server.master.graph.WorkflowGraphFactory#createWorkflowGraph public IWorkflowGraph createWorkflowGraph(ProcessInstance workflowInstance) throws Exception { // TODO Here is actually to get the number of tasks and their relationships corresponding to the process instance List<ProcessTaskRelation> processTaskRelations = processService.findRelationByCode( workflowInstance.getProcessDefinitionCode(), workflowInstance.getProcessDefinitionVersion()); // TODO Get the corresponding task definition log List<TaskDefinitionLog> taskDefinitionLogs = taskDefinitionLogDao.queryTaskDefineLogList(processTaskRelations); // TODO Get TaskNode List<TaskNode> taskNodeList = processService.transformTask(processTaskRelations, taskDefinitionLogs); // generate process to get DAG info // TODO Here is to parse whether the start node list is manually specified, which is not by default List<Long> recoveryTaskNodeCodeList = getRecoveryTaskNodeCodeList(workflowInstance.getCommandParam()); // TODO If the default startNodeNameList is empty List<Long> startNodeNameList = parseStartNodeName(workflowInstance.getCommandParam()); // TODO Build a ProcessDag object instance ProcessDag processDag = DagHelper.generateFlowDag( taskNodeList, startNodeNameList, recoveryTaskNodeCodeList, workflowInstance.getTaskDependType()); if (processDag == null) { log.error("ProcessDag is null"); throw new IllegalArgumentException("Create WorkflowGraph failed, ProcessDag is null"); } // TODO Generate DAG DAG<Long, TaskNode, TaskNodeRelation> dagGraph = DagHelper.buildDagGraph(processDag); log.debug("Build dag success, dag: {}", dagGraph); // TODO Use WorkflowGraph to encapsulate the task node list and dagGraph return new WorkflowGraph(taskNodeList, dagGraph); } public IWorkflowGraph createWorkflowGraph(ProcessInstance workflowInstance) throws Exception { // TODO Here is actually to get the number of tasks and their relationships corresponding to the process instance List<ProcessTaskRelation> processTaskRelations = processService.findRelationByCode( workflowInstance.getProcessDefinitionCode(), workflowInstance.getProcessDefinitionVersion()); // TODO Get the corresponding task definition log List<TaskDefinitionLog> taskDefinitionLogs = taskDefinitionLogDao.queryTaskDefineLogList(processTaskRelations); // TODO Get TaskNode List<TaskNode> taskNodeList = processService.transformTask(processTaskRelations, taskDefinitionLogs); // generate process to get DAG info // TODO Here is to parse whether the start node list is manually specified, which is not by default List<Long> recoveryTaskNodeCodeList = getRecoveryTaskNodeCodeList(workflowInstance.getCommandParam()); // TODO If the default startNodeNameList is empty List<Long> startNodeNameList = parseStartNodeName(workflowInstance.getCommandParam()); // TODO Build a ProcessDag object instance ProcessDag processDag = DagHelper.generateFlowDag( taskNodeList, startNodeNameList, recoveryTaskNodeCodeList, workflowInstance.getTaskDependType()); if (processDag == null) { log.error("ProcessDag is null"); throw new IllegalArgumentException("Create WorkflowGraph failed, ProcessDag is null"); } // TODO Generate DAG DAG<Long, TaskNode, TaskNodeRelation> dagGraph = DagHelper.buildDagGraph(processDag); log.debug("Build dag success, dag: {}", dagGraph); // TODO Use WorkflowGraph to encapsulate the task node list and dagGraph return new WorkflowGraph(taskNodeList, dagGraph); } org.apache.dolphinscheduler.service.utils.DagHelper#generateFlowDag public static ProcessDag generateFlowDag( List<TaskNode> totalTaskNodeList, List<Long> startNodeNameList, List<Long> recoveryNodeCodeList, TaskDependType depNodeType) throws Exception { // TODO Actually, it is to get all nodes List<TaskNode> destTaskNodeList = generateFlowNodeListByStartNode( totalTaskNodeList, startNodeNameList, recoveryNodeCodeList, depNodeType); if (destTaskNodeList.isEmpty()) { return null; } // TODO Get the relationship between task nodes List<TaskNodeRelation> taskNodeRelations = generateRelationListByFlowNodes(destTaskNodeList); // TODO Actually, it is to instantiate a ProcessDag ProcessDag processDag = new ProcessDag(); // TODO Set the edges of DAG processDag.setEdges(taskNodeRelations); // TODO Set the vertices of DAG processDag.setNodes(destTaskNodeList); return processDag; } public static ProcessDag generateFlowDag( List<TaskNode> totalTaskNodeList, List<Long> startNodeNameList, List<Long> recoveryNodeCodeList, TaskDependType depNodeType) throws Exception { // TODO Actually, it is to get all nodes List<TaskNode> destTaskNodeList = generateFlowNodeListByStartNode( totalTaskNodeList, startNodeNameList, recoveryNodeCodeList, depNodeType); if (destTaskNodeList.isEmpty()) { return null; } // TODO Get the relationship between task nodes List<TaskNodeRelation> taskNodeRelations = generateRelationListByFlowNodes(destTaskNodeList); // TODO Actually, it is to instantiate a ProcessDag ProcessDag processDag = new ProcessDag(); // TODO Set the edges of DAG processDag.setEdges(taskNodeRelations); // TODO Set the vertices of DAG processDag.setNodes(destTaskNodeList); return processDag; } Set destTaskNodeList and taskNodeRelations Set destTaskNodeList and taskNodeRelations org.apache.dolphinscheduler.service.utils.DagHelper#buildDagGraph public static DAG<Long, TaskNode, TaskNodeRelation> buildDagGraph(ProcessDag processDag) { DAG<Long, TaskNode, TaskNodeRelation> dag = new DAG<>(); // TODO Add vertices if (CollectionUtils.isNotEmpty(processDag.getNodes())) { for (TaskNode node : processDag.getNodes()) { dag.addNode(node.getCode(), node); } } // TODO Add edges if (CollectionUtils.isNotEmpty(processDag.getEdges())) { for (TaskNodeRelation edge : processDag.getEdges()) { dag.addEdge(edge.getStartNode(), edge.getEndNode()); } } return dag; } public static DAG<Long, TaskNode, TaskNodeRelation> buildDagGraph(ProcessDag processDag) { DAG<Long, TaskNode, TaskNodeRelation> dag = new DAG<>(); // TODO Add vertices if (CollectionUtils.isNotEmpty(processDag.getNodes())) { for (TaskNode node : processDag.getNodes()) { dag.addNode(node.getCode(), node); } } // TODO Add edges if (CollectionUtils.isNotEmpty(processDag.getEdges())) { for (TaskNodeRelation edge : processDag.getEdges()) { dag.addEdge(edge.getStartNode(), edge.getEndNode()); } } return dag; }