Background In DolphinScheduler for YARN tasks such as MapReduce (MR), Spark, Flink, and even Shell tasks, the initial approach was to determine the task status based on the application ID when a YARN task was detected. This means that instead of relying solely on the client process status, DolphinScheduler would also consider the YARN status to decide on the task state. Later, the community refactored this process (which was a step in the right direction but is still incomplete), leading to some issues. For instance, in the Flink Stream Application mode, where the client is detached, the client Shell exits immediately, causing DolphinScheduler to mark the task as successful. However, the task on YARN is still running, and DolphinScheduler can no longer track its status on YARN. So, how can we implement tracking for YARN task status in DolphinScheduler? Note: This example is based on version 3.2.1. Worker Task Relationship Diagram First, let’s look at the relationship principle of Worker Tasks in DolphinScheduler. AbstractTask: Mainly defines the basic lifecycle interface of a task, such as init, handle, and cancel. AbstractRemoteTask: Implements the handle method, demonstrating the template method design pattern, and extracting three core interface methods: submitApplication, trackApplicationStatus, and cancelApplication. AbstractYarnTask: For YARN tasks, AbstractYarnTask is abstracted, and submitApplication, trackApplicationStatus, and cancelApplication directly access the YARN API. Implementing YARN Status Tracking in AbstractYarnTask The AbstractYarnTask can implement YARN status tracking. Refer to the full code in org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask: public abstract class AbstractYarnTask extends AbstractRemoteTask { private static final int MAX_RETRY_ATTEMPTS = 3; private ShellCommandExecutor shellCommandExecutor; public AbstractYarnTask(TaskExecutionContext taskRequest) { super(taskRequest); this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskRequest); } @Override public void submitApplication() throws TaskException { try { IShellInterceptorBuilder shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder() .properties(getProperties()) .appendScript(getScript().replaceAll("\\r\\n", System.lineSeparator())); TaskResponse response = shellCommandExecutor.run(shellActuatorBuilder, null); setExitStatusCode(response.getExitStatusCode()); setAppIds(String.join(TaskConstants.COMMA, getApplicationIds())); setProcessId(response.getProcessId()); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); log.info("The current yarn task has been interrupted", ex); setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE); throw new TaskException("The current yarn task has been interrupted", ex); } catch (Exception e) { log.error("yarn process failure", e); exitStatusCode = -1; throw new TaskException("Execute task failed", e); } } @Override public void trackApplicationStatus() throws TaskException { if (StringUtils.isEmpty(appIds)) { return; } List<String> appIdList = Arrays.asList(appIds.split(",")); boolean continueTracking = true; while (continueTracking) { Map<String, YarnState> yarnStateMap = new HashMap<>(); for (String appId : appIdList) { if (StringUtils.isEmpty(appId)) { continue; } boolean hadoopSecurityAuthStartupState = PropertyUtils.getBoolean(HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false); String yarnStateJson = fetchYarnStateJsonWithRetry(appId, hadoopSecurityAuthStartupState); if (StringUtils.isNotEmpty(yarnStateJson)) { String appJson = JSONUtils.getNodeString(yarnStateJson, "app"); YarnTask yarnTask = JSONUtils.parseObject(appJson, YarnTask.class); log.info("yarnTask : {}", yarnTask); yarnStateMap.put(yarnTask.getId(), YarnState.of(yarnTask.getState())); } } YarnState yarnTaskOverallStatus = YarnTaskStatusChecker.getYarnTaskOverallStatus(yarnStateMap); if (yarnTaskOverallStatus.isFinalState()) { handleFinalState(yarnTaskOverallStatus); continueTracking = false; } else { try { TimeUnit.MILLISECONDS.sleep(SLEEP_TIME_MILLIS * 10); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); } } } } private String fetchYarnStateJsonWithRetry(String appId, boolean hadoopSecurityAuthStartupState) throws TaskException { int retryCount = 0; while (retryCount < MAX_RETRY_ATTEMPTS) { try { return fetchYarnStateJson(appId, hadoopSecurityAuthStartupState); } catch (Exception e) { retryCount++; log.error("Failed to fetch or parse Yarn state for appId: {}. Attempt: {}/{}", appId, retryCount, MAX_RETRY_ATTEMPTS, e); if (retryCount >= MAX_RETRY_ATTEMPTS) { throw new TaskException("Failed to fetch Yarn state after " + MAX_RETRY_ATTEMPTS + " attempts for appId: " + appId, e); } try { TimeUnit.MILLISECONDS.sleep(SLEEP_TIME_MILLIS); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new RuntimeException(ie); } } } return null; } private void handleFinalState(YarnState yarnState) { switch (yarnState) { case FINISHED: setExitStatusCode(EXIT_CODE_SUCCESS); break; case KILLED: setExitStatusCode(EXIT_CODE_KILL); break; default: setExitStatusCode(EXIT_CODE_FAILURE); break; } } private String fetchYarnStateJson(String appId, boolean hadoopSecurityAuthStartupState) throws Exception { return hadoopSecurityAuthStartupState ? KerberosHttpClient.get(getApplicationUrl(appId)) : HttpUtils.get(getApplicationUrl(appId)); } static class YarnTaskStatusChecker { public static YarnState getYarnTaskOverallStatus(Map<String, YarnState> yarnTaskMap) { boolean hasKilled = yarnTaskMap.values().stream() .anyMatch(state -> state == YarnState.KILLED); if (hasKilled) { return YarnState.KILLED; } boolean hasFailed = yarnTaskMap.values().stream() .anyMatch(state -> state == YarnState.FAILED); if (hasFailed) { return YarnState.FAILED; } boolean allFINISHED = yarnTaskMap.values().stream() .allMatch(state -> state == YarnState.FINISHED); if (allFINISHED) { return YarnState.FINISHED; } boolean hasRunning = yarnTaskMap.values().stream() .anyMatch(state -> state == YarnState.RUNNING); if (hasRunning) { return YarnState.RUNNING; } boolean hasSubmitting = yarnTaskMap.values().stream() .anyMatch(state -> state == YarnState.NEW || state == YarnState.NEW_SAVING || state == YarnState.SUBMITTED || state == YarnState.ACCEPTED); if (hasSubmitting) { return YarnState.SUBMITTING; } return YarnState.UNKNOWN; } } } Here, the core logic is that instead of overriding the handle method directly, YARN tasks only need to implement two core interfaces: submitApplicationand trackApplicationStatus. The cancelApplication method ideally should be delegated to YarnApplicationManager (this integration is currently missing but does not impact functionality). Displaying ApplicationId for Streaming Tasks on the Frontend File: dolphinscheduler-ui/src/views/projects/task/instance/use-stream-table.ts Wrapping ApplicationId as YARN URL on the Backend File: dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java File: dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java File: dolphinscheduler-common/src/main/resources/common.properties File: dolphinscheduler-storage-plugin/dolphinscheduler-storage-hdfs/src/main/java/org/apache/dolphinscheduler/plugin/storage/hdfs/HdfsStorageOperator.java File: dolphinscheduler-storage-plugin/dolphinscheduler-storage-hdfs/src/main/java/org/apache/dolphinscheduler/plugin/storage/hdfs/HdfsStorageProperties.java Final UI display: Note: You will need to manually paste the URL; the above code does not include this functionality. Problem Tracking There is an issue here regarding the state. There are three states: FINISHED, FAILED, and KILLED. However, within the FINISHED state, there is also a FinalStatus, and being “finished” doesn’t necessarily mean success. Under FINISHED, there are actually SUCCEEDED, FAILED, and KILLED statuses. Essentially, FINISHED cannot be treated as the final state in DolphinScheduler, and further evaluation is needed. In the code for org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask#handleFinalState: private void handleFinalState(YarnState yarnState) { switch (yarnState) { case FINISHED: setExitStatusCode(EXIT_CODE_SUCCESS); break; case KILLED: setExitStatusCode(EXIT_CODE_KILL); break; default: setExitStatusCode(EXIT_CODE_FAILURE); break; } } Using HTTP to Kill a Task curl -X PUT -d '{"state":"KILLED"}' \ -H "Content-Type: application/json" \ http://xx.xx.xx.xx:8088/ws/v1/cluster/apps/application_1694766249884_1098/state?user.name=hdfs Note: You must specifyuser.name, otherwise, the task might not be killed successfully. Background In DolphinScheduler for YARN tasks such as MapReduce (MR), Spark, Flink, and even Shell tasks, the initial approach was to determine the task status based on the application ID when a YARN task was detected. This means that instead of relying solely on the client process status, DolphinScheduler would also consider the YARN status to decide on the task state. Later, the community refactored this process (which was a step in the right direction but is still incomplete), leading to some issues. For instance, in the Flink Stream Application mode, where the client is detached, the client Shell exits immediately, causing DolphinScheduler to mark the task as successful. However, the task on YARN is still running, and DolphinScheduler can no longer track its status on YARN. So, how can we implement tracking for YARN task status in DolphinScheduler? Note: This example is based on version 3.2.1. Note: This example is based on version 3.2.1. Note: This example is based on version 3.2.1. Worker Task Relationship Diagram First, let’s look at the relationship principle of Worker Tasks in DolphinScheduler. AbstractTask: Mainly defines the basic lifecycle interface of a task, such as init, handle, and cancel. AbstractTask: Mainly defines the basic lifecycle interface of a task, such as init, handle, and cancel. AbstractRemoteTask: Implements the handle method, demonstrating the template method design pattern, and extracting three core interface methods: submitApplication, trackApplicationStatus, and cancelApplication. AbstractRemoteTask: Implements the handle method, demonstrating the template method design pattern, and extracting three core interface methods: submitApplication , trackApplicationStatus , and cancelApplication . submitApplication trackApplicationStatus cancelApplication AbstractYarnTask: For YARN tasks, AbstractYarnTask is abstracted, and submitApplication, trackApplicationStatus, and cancelApplication directly access the YARN API. AbstractYarnTask: For YARN tasks, AbstractYarnTask is abstracted, and submitApplication , trackApplicationStatus , and cancelApplication directly access the YARN API. AbstractYarnTask submitApplication trackApplicationStatus cancelApplication Implementing YARN Status Tracking in AbstractYarnTask The AbstractYarnTask can implement YARN status tracking. Refer to the full code in org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask : AbstractYarnTask org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask public abstract class AbstractYarnTask extends AbstractRemoteTask { private static final int MAX_RETRY_ATTEMPTS = 3; private ShellCommandExecutor shellCommandExecutor; public AbstractYarnTask(TaskExecutionContext taskRequest) { super(taskRequest); this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskRequest); } @Override public void submitApplication() throws TaskException { try { IShellInterceptorBuilder shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder() .properties(getProperties()) .appendScript(getScript().replaceAll("\\r\\n", System.lineSeparator())); TaskResponse response = shellCommandExecutor.run(shellActuatorBuilder, null); setExitStatusCode(response.getExitStatusCode()); setAppIds(String.join(TaskConstants.COMMA, getApplicationIds())); setProcessId(response.getProcessId()); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); log.info("The current yarn task has been interrupted", ex); setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE); throw new TaskException("The current yarn task has been interrupted", ex); } catch (Exception e) { log.error("yarn process failure", e); exitStatusCode = -1; throw new TaskException("Execute task failed", e); } } @Override public void trackApplicationStatus() throws TaskException { if (StringUtils.isEmpty(appIds)) { return; } List<String> appIdList = Arrays.asList(appIds.split(",")); boolean continueTracking = true; while (continueTracking) { Map<String, YarnState> yarnStateMap = new HashMap<>(); for (String appId : appIdList) { if (StringUtils.isEmpty(appId)) { continue; } boolean hadoopSecurityAuthStartupState = PropertyUtils.getBoolean(HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false); String yarnStateJson = fetchYarnStateJsonWithRetry(appId, hadoopSecurityAuthStartupState); if (StringUtils.isNotEmpty(yarnStateJson)) { String appJson = JSONUtils.getNodeString(yarnStateJson, "app"); YarnTask yarnTask = JSONUtils.parseObject(appJson, YarnTask.class); log.info("yarnTask : {}", yarnTask); yarnStateMap.put(yarnTask.getId(), YarnState.of(yarnTask.getState())); } } YarnState yarnTaskOverallStatus = YarnTaskStatusChecker.getYarnTaskOverallStatus(yarnStateMap); if (yarnTaskOverallStatus.isFinalState()) { handleFinalState(yarnTaskOverallStatus); continueTracking = false; } else { try { TimeUnit.MILLISECONDS.sleep(SLEEP_TIME_MILLIS * 10); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); } } } } private String fetchYarnStateJsonWithRetry(String appId, boolean hadoopSecurityAuthStartupState) throws TaskException { int retryCount = 0; while (retryCount < MAX_RETRY_ATTEMPTS) { try { return fetchYarnStateJson(appId, hadoopSecurityAuthStartupState); } catch (Exception e) { retryCount++; log.error("Failed to fetch or parse Yarn state for appId: {}. Attempt: {}/{}", appId, retryCount, MAX_RETRY_ATTEMPTS, e); if (retryCount >= MAX_RETRY_ATTEMPTS) { throw new TaskException("Failed to fetch Yarn state after " + MAX_RETRY_ATTEMPTS + " attempts for appId: " + appId, e); } try { TimeUnit.MILLISECONDS.sleep(SLEEP_TIME_MILLIS); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new RuntimeException(ie); } } } return null; } private void handleFinalState(YarnState yarnState) { switch (yarnState) { case FINISHED: setExitStatusCode(EXIT_CODE_SUCCESS); break; case KILLED: setExitStatusCode(EXIT_CODE_KILL); break; default: setExitStatusCode(EXIT_CODE_FAILURE); break; } } private String fetchYarnStateJson(String appId, boolean hadoopSecurityAuthStartupState) throws Exception { return hadoopSecurityAuthStartupState ? KerberosHttpClient.get(getApplicationUrl(appId)) : HttpUtils.get(getApplicationUrl(appId)); } static class YarnTaskStatusChecker { public static YarnState getYarnTaskOverallStatus(Map<String, YarnState> yarnTaskMap) { boolean hasKilled = yarnTaskMap.values().stream() .anyMatch(state -> state == YarnState.KILLED); if (hasKilled) { return YarnState.KILLED; } boolean hasFailed = yarnTaskMap.values().stream() .anyMatch(state -> state == YarnState.FAILED); if (hasFailed) { return YarnState.FAILED; } boolean allFINISHED = yarnTaskMap.values().stream() .allMatch(state -> state == YarnState.FINISHED); if (allFINISHED) { return YarnState.FINISHED; } boolean hasRunning = yarnTaskMap.values().stream() .anyMatch(state -> state == YarnState.RUNNING); if (hasRunning) { return YarnState.RUNNING; } boolean hasSubmitting = yarnTaskMap.values().stream() .anyMatch(state -> state == YarnState.NEW || state == YarnState.NEW_SAVING || state == YarnState.SUBMITTED || state == YarnState.ACCEPTED); if (hasSubmitting) { return YarnState.SUBMITTING; } return YarnState.UNKNOWN; } } } public abstract class AbstractYarnTask extends AbstractRemoteTask { private static final int MAX_RETRY_ATTEMPTS = 3; private ShellCommandExecutor shellCommandExecutor; public AbstractYarnTask(TaskExecutionContext taskRequest) { super(taskRequest); this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskRequest); } @Override public void submitApplication() throws TaskException { try { IShellInterceptorBuilder shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder() .properties(getProperties()) .appendScript(getScript().replaceAll("\\r\\n", System.lineSeparator())); TaskResponse response = shellCommandExecutor.run(shellActuatorBuilder, null); setExitStatusCode(response.getExitStatusCode()); setAppIds(String.join(TaskConstants.COMMA, getApplicationIds())); setProcessId(response.getProcessId()); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); log.info("The current yarn task has been interrupted", ex); setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE); throw new TaskException("The current yarn task has been interrupted", ex); } catch (Exception e) { log.error("yarn process failure", e); exitStatusCode = -1; throw new TaskException("Execute task failed", e); } } @Override public void trackApplicationStatus() throws TaskException { if (StringUtils.isEmpty(appIds)) { return; } List<String> appIdList = Arrays.asList(appIds.split(",")); boolean continueTracking = true; while (continueTracking) { Map<String, YarnState> yarnStateMap = new HashMap<>(); for (String appId : appIdList) { if (StringUtils.isEmpty(appId)) { continue; } boolean hadoopSecurityAuthStartupState = PropertyUtils.getBoolean(HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false); String yarnStateJson = fetchYarnStateJsonWithRetry(appId, hadoopSecurityAuthStartupState); if (StringUtils.isNotEmpty(yarnStateJson)) { String appJson = JSONUtils.getNodeString(yarnStateJson, "app"); YarnTask yarnTask = JSONUtils.parseObject(appJson, YarnTask.class); log.info("yarnTask : {}", yarnTask); yarnStateMap.put(yarnTask.getId(), YarnState.of(yarnTask.getState())); } } YarnState yarnTaskOverallStatus = YarnTaskStatusChecker.getYarnTaskOverallStatus(yarnStateMap); if (yarnTaskOverallStatus.isFinalState()) { handleFinalState(yarnTaskOverallStatus); continueTracking = false; } else { try { TimeUnit.MILLISECONDS.sleep(SLEEP_TIME_MILLIS * 10); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); } } } } private String fetchYarnStateJsonWithRetry(String appId, boolean hadoopSecurityAuthStartupState) throws TaskException { int retryCount = 0; while (retryCount < MAX_RETRY_ATTEMPTS) { try { return fetchYarnStateJson(appId, hadoopSecurityAuthStartupState); } catch (Exception e) { retryCount++; log.error("Failed to fetch or parse Yarn state for appId: {}. Attempt: {}/{}", appId, retryCount, MAX_RETRY_ATTEMPTS, e); if (retryCount >= MAX_RETRY_ATTEMPTS) { throw new TaskException("Failed to fetch Yarn state after " + MAX_RETRY_ATTEMPTS + " attempts for appId: " + appId, e); } try { TimeUnit.MILLISECONDS.sleep(SLEEP_TIME_MILLIS); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new RuntimeException(ie); } } } return null; } private void handleFinalState(YarnState yarnState) { switch (yarnState) { case FINISHED: setExitStatusCode(EXIT_CODE_SUCCESS); break; case KILLED: setExitStatusCode(EXIT_CODE_KILL); break; default: setExitStatusCode(EXIT_CODE_FAILURE); break; } } private String fetchYarnStateJson(String appId, boolean hadoopSecurityAuthStartupState) throws Exception { return hadoopSecurityAuthStartupState ? KerberosHttpClient.get(getApplicationUrl(appId)) : HttpUtils.get(getApplicationUrl(appId)); } static class YarnTaskStatusChecker { public static YarnState getYarnTaskOverallStatus(Map<String, YarnState> yarnTaskMap) { boolean hasKilled = yarnTaskMap.values().stream() .anyMatch(state -> state == YarnState.KILLED); if (hasKilled) { return YarnState.KILLED; } boolean hasFailed = yarnTaskMap.values().stream() .anyMatch(state -> state == YarnState.FAILED); if (hasFailed) { return YarnState.FAILED; } boolean allFINISHED = yarnTaskMap.values().stream() .allMatch(state -> state == YarnState.FINISHED); if (allFINISHED) { return YarnState.FINISHED; } boolean hasRunning = yarnTaskMap.values().stream() .anyMatch(state -> state == YarnState.RUNNING); if (hasRunning) { return YarnState.RUNNING; } boolean hasSubmitting = yarnTaskMap.values().stream() .anyMatch(state -> state == YarnState.NEW || state == YarnState.NEW_SAVING || state == YarnState.SUBMITTED || state == YarnState.ACCEPTED); if (hasSubmitting) { return YarnState.SUBMITTING; } return YarnState.UNKNOWN; } } } Here, the core logic is that instead of overriding the handle method directly, YARN tasks only need to implement two core interfaces: submitApplication and trackApplicationStatus . The cancelApplication method ideally should be delegated to YarnApplicationManager (this integration is currently missing but does not impact functionality). handle submitApplication trackApplicationStatus cancelApplication YarnApplicationManager Displaying ApplicationId for Streaming Tasks on the Frontend File: dolphinscheduler-ui/src/views/projects/task/instance/use-stream-table.ts dolphinscheduler-ui/src/views/projects/task/instance/use-stream-table.ts Wrapping ApplicationId as YARN URL on the Backend File: dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java File: dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java File: dolphinscheduler-common/src/main/resources/common.properties dolphinscheduler-common/src/main/resources/common.properties File: dolphinscheduler-storage-plugin/dolphinscheduler-storage-hdfs/src/main/java/org/apache/dolphinscheduler/plugin/storage/hdfs/HdfsStorageOperator.java dolphinscheduler-storage-plugin/dolphinscheduler-storage-hdfs/src/main/java/org/apache/dolphinscheduler/plugin/storage/hdfs/HdfsStorageOperator.java File: dolphinscheduler-storage-plugin/dolphinscheduler-storage-hdfs/src/main/java/org/apache/dolphinscheduler/plugin/storage/hdfs/HdfsStorageProperties.java dolphinscheduler-storage-plugin/dolphinscheduler-storage-hdfs/src/main/java/org/apache/dolphinscheduler/plugin/storage/hdfs/HdfsStorageProperties.java Final UI display: Note: You will need to manually paste the URL; the above code does not include this functionality. Note: You will need to manually paste the URL; the above code does not include this functionality. Note: You will need to manually paste the URL; the above code does not include this functionality. Problem Tracking There is an issue here regarding the state. There are three states: FINISHED, FAILED, and KILLED. However, within the FINISHED state, there is also a FinalStatus, and being “finished” doesn’t necessarily mean success. Under FINISHED, there are actually SUCCEEDED, FAILED, and KILLED statuses. Essentially, FINISHED cannot be treated as the final state in DolphinScheduler, and further evaluation is needed. In the code for org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask#handleFinalState : org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask#handleFinalState private void handleFinalState(YarnState yarnState) { switch (yarnState) { case FINISHED: setExitStatusCode(EXIT_CODE_SUCCESS); break; case KILLED: setExitStatusCode(EXIT_CODE_KILL); break; default: setExitStatusCode(EXIT_CODE_FAILURE); break; } } private void handleFinalState(YarnState yarnState) { switch (yarnState) { case FINISHED: setExitStatusCode(EXIT_CODE_SUCCESS); break; case KILLED: setExitStatusCode(EXIT_CODE_KILL); break; default: setExitStatusCode(EXIT_CODE_FAILURE); break; } } Using HTTP to Kill a Task curl -X PUT -d '{"state":"KILLED"}' \ -H "Content-Type: application/json" \ http://xx.xx.xx.xx:8088/ws/v1/cluster/apps/application_1694766249884_1098/state?user.name=hdfs curl -X PUT -d '{"state":"KILLED"}' \ -H "Content-Type: application/json" \ http://xx.xx.xx.xx:8088/ws/v1/cluster/apps/application_1694766249884_1098/state?user.name=hdfs Note: You must specify user.name , otherwise, the task might not be killed successfully. user.name