When I Implemented batch generation of DolphinScheduler tasks and imported them, it was found that tasks could only be imported one by one, so using the API is apparently more convenient. DolphinScheduler API Documentation DolphinScheduler has API documentation available at: http://IP:12345/dolphinscheduler/swagger-ui/index.html?language=zh_CN&lang=cn However, the documentation is relatively brief, so you need to study it yourself. Token: All APIs require a token. In Security Center - Token Management, create a token. Remember this token, as it will be needed for all subsequent API calls. Header: Construct the request header using the token mentioned above: token = '' headers = { 'Accept': 'application/json', 'token': token } Project ID: project_id can be found in the URL when viewing the project workflow. DolphinScheduler Task Import API The API for importing tasks is: import_url = 'http://IP:12345/dolphinscheduler/projects/{project_id}/process-definition/import' Knowing the API, you can proceed with the import: def import_job(file_path): # Open the file and read it as binary data with open(file_path, 'rb') as file: files = {'file': file} # Import the workflow response = requests.post(import_url, headers=headers, files=files) print(response.status_code) if response.status_code != 200: print('Upload failed ' + file_path) It should be noted that task imports only support binary files. file_path is the workflow file. For implementation, you can export one from the workflow as a reference.By repeating the above method, you can achieve batch task imports. Workflow Deployment After completing batch task uploads using the above method, there is still an issue: manually deploying each workflow is still a significant workload. Therefore, we continue using the API. After research, it was found that deploying a workflow requires first obtaining the schedule ID of the workflow. Get Workflow List -> Get Workflow Code -> Get Schedule IDs for All Workflows -> Deploy Workflows Get Workflow ListThe API address is: jobs_url = 'http://IP:12345/dolphinscheduler/projects/{project_id}/process-definition' However, this requires paginated queries, which is slightly inconvenient: def get_jobs_list(): # Paginated query # Initialize pagination parameters pageNo = 1 pageSize = 10 url = f'{jobs_url}?pageSize=10&pageNo=1&searchVal=' # Construct the complete URL # Store all results all_items = list() while True: # Construct the complete URL url = f'{jobs_url}?pageSize={pageSize}&pageNo={pageNo}&searchVal=' # Send GET request response = requests.get(url, headers=headers) # Check response status code if response.status_code == 200: # Request succeeded, process response data items = response.content.decode() total = json.loads(items)["data"]["total"] item = json.loads(items)["data"]["totalList"] # Add the current page's data to the result list for i in item: all_items.append(i) # Exit the loop if there is no data on the current page if pageNo * pageSize > total: break if not items: break # Increment page number pageNo += 1 else: # Request failed, print error information print('Request failed:', response.status_code, response.text) break return all_items all_items contains the specific details of all workflows, which need to be extracted: all_jobs = get_jobs_list() job_codes = [job['code'] for job in all_jobs] This gives you the codes for all workflows. Get Schedule ID:The API for schedule IDs is as follows. To avoid pagination, 1000 entries per page are directly specified: schedules_url = 'http://36.133.140.132:12345/dolphinscheduler/projects/{project_id}/schedules?pageSize=1000&pageNo=1&processDefinitionCode=' Using this API, you can obtain all schedule IDs: def schedule_id(job_code): url = schedules_url + str(job_code) response = requests.get(url, headers=headers) if response.status_code == 200: data = response.content.decode() js = json.loads(data) if len(js['data']['totalList']) > 0 and js['data']['totalList'][0]['releaseState'] == 'OFFLINE': return js['data']['totalList'][0]['id'] else: return '' Here, schedule IDs that are already deployed are filtered out. Deploy:Everything is ready, and deployment can finally proceed: online_url = 'http://36.133.140.132:12345/dolphinscheduler/projects/{project_id}/schedules/{scheduler_id}/online' Specific implementation: def online_job(scheduler_id): url = online_url.format(scheduler_id=scheduler_id) response = requests.post(url, headers=headers) if response.status_code == 200: print('success') else: print('online job failed') At this point, import and fully automated batch deployment can be achieved. When I Implemented batch generation of DolphinScheduler tasks and imported them, it was found that tasks could only be imported one by one, so using the API is apparently more convenient. DolphinScheduler API Documentation DolphinScheduler has API documentation available at: http://IP:12345/dolphinscheduler/swagger-ui/index.html?language=zh_CN&lang=cn http://IP:12345/dolphinscheduler/swagger-ui/index.html?language=zh_CN&lang=cn However, the documentation is relatively brief, so you need to study it yourself. Token: All APIs require a token. Token: In Security Center - Token Management , create a token. Remember this token, as it will be needed for all subsequent API calls. Security Center - Token Management Header: Construct the request header using the token mentioned above: Header: token = '' headers = { 'Accept': 'application/json', 'token': token } token = '' headers = { 'Accept': 'application/json', 'token': token } Project ID: project_id can be found in the URL when viewing the project workflow. Project ID: project_id DolphinScheduler Task Import API The API for importing tasks is: import_url = 'http://IP:12345/dolphinscheduler/projects/{project_id}/process-definition/import' import_url = 'http://IP:12345/dolphinscheduler/projects/{project_id}/process-definition/import' Knowing the API, you can proceed with the import: def import_job(file_path): # Open the file and read it as binary data with open(file_path, 'rb') as file: files = {'file': file} # Import the workflow response = requests.post(import_url, headers=headers, files=files) print(response.status_code) if response.status_code != 200: print('Upload failed ' + file_path) def import_job(file_path): # Open the file and read it as binary data with open(file_path, 'rb') as file: files = {'file': file} # Import the workflow response = requests.post(import_url, headers=headers, files=files) print(response.status_code) if response.status_code != 200: print('Upload failed ' + file_path) It should be noted that task imports only support binary files. file_path is the workflow file. For implementation, you can export one from the workflow as a reference.By repeating the above method, you can achieve batch task imports. file_path Workflow Deployment After completing batch task uploads using the above method, there is still an issue: manually deploying each workflow is still a significant workload. Therefore, we continue using the API. After research, it was found that deploying a workflow requires first obtaining the schedule ID of the workflow. Get Workflow List -> Get Workflow Code -> Get Schedule IDs for All Workflows -> Deploy Workflows Get Workflow List -> Get Workflow Code -> Get Schedule IDs for All Workflows -> Deploy Workflows Get Workflow List -> Get Workflow Code -> Get Schedule IDs for All Workflows -> Deploy Workflows Get Workflow ListThe API address is: Get Workflow List The API address is: Get Workflow List jobs_url = 'http://IP:12345/dolphinscheduler/projects/{project_id}/process-definition' jobs_url = 'http://IP:12345/dolphinscheduler/projects/{project_id}/process-definition' However, this requires paginated queries, which is slightly inconvenient: def get_jobs_list(): # Paginated query # Initialize pagination parameters pageNo = 1 pageSize = 10 url = f'{jobs_url}?pageSize=10&pageNo=1&searchVal=' # Construct the complete URL # Store all results all_items = list() while True: # Construct the complete URL url = f'{jobs_url}?pageSize={pageSize}&pageNo={pageNo}&searchVal=' # Send GET request response = requests.get(url, headers=headers) # Check response status code if response.status_code == 200: # Request succeeded, process response data items = response.content.decode() total = json.loads(items)["data"]["total"] item = json.loads(items)["data"]["totalList"] # Add the current page's data to the result list for i in item: all_items.append(i) # Exit the loop if there is no data on the current page if pageNo * pageSize > total: break if not items: break # Increment page number pageNo += 1 else: # Request failed, print error information print('Request failed:', response.status_code, response.text) break return all_items def get_jobs_list(): # Paginated query # Initialize pagination parameters pageNo = 1 pageSize = 10 url = f'{jobs_url}?pageSize=10&pageNo=1&searchVal=' # Construct the complete URL # Store all results all_items = list() while True: # Construct the complete URL url = f'{jobs_url}?pageSize={pageSize}&pageNo={pageNo}&searchVal=' # Send GET request response = requests.get(url, headers=headers) # Check response status code if response.status_code == 200: # Request succeeded, process response data items = response.content.decode() total = json.loads(items)["data"]["total"] item = json.loads(items)["data"]["totalList"] # Add the current page's data to the result list for i in item: all_items.append(i) # Exit the loop if there is no data on the current page if pageNo * pageSize > total: break if not items: break # Increment page number pageNo += 1 else: # Request failed, print error information print('Request failed:', response.status_code, response.text) break return all_items all_items contains the specific details of all workflows, which need to be extracted: all_items all_jobs = get_jobs_list() job_codes = [job['code'] for job in all_jobs] all_jobs = get_jobs_list() job_codes = [job['code'] for job in all_jobs] This gives you the codes for all workflows. Get Schedule ID:The API for schedule IDs is as follows. To avoid pagination, 1000 entries per page are directly specified: Get Schedule ID:The API for schedule IDs is as follows. To avoid pagination, 1000 entries per page are directly specified: schedules_url = 'http://36.133.140.132:12345/dolphinscheduler/projects/{project_id}/schedules?pageSize=1000&pageNo=1&processDefinitionCode=' schedules_url = 'http://36.133.140.132:12345/dolphinscheduler/projects/{project_id}/schedules?pageSize=1000&pageNo=1&processDefinitionCode=' Using this API, you can obtain all schedule IDs: def schedule_id(job_code): url = schedules_url + str(job_code) response = requests.get(url, headers=headers) if response.status_code == 200: data = response.content.decode() js = json.loads(data) if len(js['data']['totalList']) > 0 and js['data']['totalList'][0]['releaseState'] == 'OFFLINE': return js['data']['totalList'][0]['id'] else: return '' def schedule_id(job_code): url = schedules_url + str(job_code) response = requests.get(url, headers=headers) if response.status_code == 200: data = response.content.decode() js = json.loads(data) if len(js['data']['totalList']) > 0 and js['data']['totalList'][0]['releaseState'] == 'OFFLINE': return js['data']['totalList'][0]['id'] else: return '' Here, schedule IDs that are already deployed are filtered out. Deploy:Everything is ready, and deployment can finally proceed: Deploy:Everything is ready, and deployment can finally proceed: online_url = 'http://36.133.140.132:12345/dolphinscheduler/projects/{project_id}/schedules/{scheduler_id}/online' online_url = 'http://36.133.140.132:12345/dolphinscheduler/projects/{project_id}/schedules/{scheduler_id}/online' Specific implementation: def online_job(scheduler_id): url = online_url.format(scheduler_id=scheduler_id) response = requests.post(url, headers=headers) if response.status_code == 200: print('success') else: print('online job failed') def online_job(scheduler_id): url = online_url.format(scheduler_id=scheduler_id) response = requests.post(url, headers=headers) if response.status_code == 200: print('success') else: print('online job failed') At this point, import and fully automated batch deployment can be achieved.