Moving Logic Out of Pods: Extending the Argo Workflows Controller

Written by antonpechenin | Published 2025/12/19
Tech Story Tags: distributed-systems | argo-workflows | kubernetes | argo | kubeflow | ai-infrastructure | dag-orchestration | hackernoon-top-story

TLDRArgo Workflows typically runs every step as its own Kubernetes pod, which is great for isolation but expensive for fast, lightweight tasks. This article explains the Executor Plugin mechanism—an HTTP contract that lets you extend the Argo controller without maintaining a fork—by moving step execution into a reusable agent pod. It walks through enabling plugins in the controller, implementing a minimal plugin server (with examples), registering plugins via ConfigMaps, and tightening security with token checks, ServiceAccounts, and RBAC (including WorkflowTaskSet). The result: fewer pods, lower scheduler load, and a clean extension path for custom workflow behavior.via the TL;DR App

In this article, I'll show how the Argo Workflows Executor Plugin lets you extend the Argo Workflows controller without maintaining your own fork—simply by implementing a small HTTP server in any language. As a bonus, this same mechanism reduces the number of extra pods in your DAGs and lightens the load on the Kubernetes scheduler. If you're new to Argo, I'll briefly cover the architecture and where plugins fit in. We'll finish with practical examples and key configuration details.

Motivation

I decided to write this article about the Argo Workflows Executor Plugin while working on a Kubernetes Enhancement Proposal (KEP) for Kubeflow Pipelines, where I needed a deeper understanding of its security model, capabilities, and limitations.

TL;DR: Argo Workflows

Before diving into Executor Plugins, let’s quickly recap what Argo Workflows is and how it executes workflows.


What is a Workflow?


A workflow is a sequence of tasks executed in a defined order and typically described in YAML. It can be a simple linear chain or a complex DAG with parallel steps and dependencies. Workflows are common in CI/CD, data processing, ML pipelines, and any scenario that requires coordinating multiple tasks reliably.


Argo Workflows is a Kubernetes-native engine that takes these YAML-defined workflows and executes each task in its own pod. This provides parallelism, explicit dependencies, and precise resource control, while leveraging Kubernetes scaling, scheduling, and reliability. With this model, Argo can orchestrate anything from simple automation to full CI/CD and ML pipelines.

Inside Argo Workflows: Architecture Overview

Let’s look at a simple three-step YAML from the official docs to see how Argo Workflows is structured and how its components interact.

Interactions with Argo Workflows typically start via the Argo CLI.


For example, if I’ve downloaded a workflow YAML description called hello.yaml. I can create an Argo Workflow from it by running::

argo submit hello.yaml

Under the hood, the CLI sends the YAML definition to the Kubernetes API, which creates an Argo Workflow Custom Resource (CR).

Once Kubernetes creates a Workflow CR, the Workflow Controller starts managing its lifecycle, as shown below.


At a high level, the WorkflowController in Argo Workflows v3.7.3 operates with two informers: one for Workflow CRDs and one for the Pods created during workflow execution. Both informers push keys into their respective queues for reconciliation.


  • WorkflowInformer → WorkflowQueue: The controller processes Workflow changes and creates/manages the necessary Pods.
  • PodInformer → WorkflowQueue: It watches the workflow Pods, and Pod status changes generate events that update the Workflow and trigger another reconciliation cycle.

Every Workflow Step Is a Kubernetes Pod (until you introduce an Executor Plugin)

This design comes with a lot of benefits. Since each step runs in its own pod, you can fully leverage the Kubernetes scheduler, pre-allocate or limit resources for individual tasks, and take advantage of pod restart for reliability. You also have full access to Kubernetes primitives, such as volumes, networking, and other resources, giving you complete control over workflow execution. On top of that, you get out-of-the-box access to logs, since each pod’s logs can be inspected directly.


This setup is especially ideal for heavy or relatively long-running tasks, where visibility, resource control, and fault isolation are crucial for keeping the cluster stable and preventing interference with other workloads.


But how does this work in practice? In a workflow, each step is based on a template. Let’s look at an example from the official documentation:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: steps-
spec:
  entrypoint: hello-hello-hello

  # This spec contains two templates: hello-hello-hello and print-message
  templates:
    - name: hello-hello-hello
      # Instead of just running a container
      # This template has a sequence of steps
      steps:
        - - name: hello1            # hello1 is run before the following steps
            template: print-message
            arguments:
              parameters:
                - name: message
                  value: "hello1"
        - - name: hello2a           # double dash => run after previous step
            template: print-message
            arguments:
              parameters:
                - name: message
                  value: "hello2a"
          - name: hello2b           # single dash => run in parallel with previous step
            template: print-message
            arguments:
              parameters:
                - name: message
                  value: "hello2b"

    # This is the same template as from the previous example
    - name: print-message
      inputs:
        parameters:
          - name: message

      container:
        image: busybox
        command: [echo]
        args: ["{{inputs.parameters.message}}"]


Here we can see template:container, which means a pod will be created for all steps, as shown below.

kubectl -n argo get po
NAMEREADYSTATUSRESTARTS AGE
steps-6zjdt-print-message-227836356 0/2Completed0 60s
steps-6zjdt-print-message-3558628268 0/2Completed0 50s
steps-6zjdt-print-message-3608961125 0/2Completed0 50s


Pods Come with Overhead

However, if some of our tasks are lightweight or execute very quickly, the “one pod per task” approach has drawbacks.

  • Pod Overhead – Each pod consumes additional resources beyond the container itself, including networking, cgroups, and filesystem overhead
  • Startup Latency – For very fast tasks, a pod may take longer to start than the task itself, introducing noticeable time overhead
  • Scheduler Load – Many pods increase scheduler load, and resources may not be available immediately, causing scheduling delays
  • Resource Inefficiency – Pods reserve resources even for short tasks, which can lead to over-provisioning


In practice, this means you want separate pods for long‑running or heavy steps, while lightweight tasks are better off sharing a long‑lived pod that can be reused across many steps.

So, How Do We Reduce Pod Creation Overhead?

Executor Plugin was introduced to address exactly this class of problems: they let you move step execution out of per‑step pods into a reusable agent pod, without modifying the core controller or maintaining a fork.


At a high level, instead of creating a new pod for each step with template: container, a template: plugin creates a single agent pod that is reused for all steps using that plugin while the workflow is running.

Why run user code in a separate agent pod instead of the Workflow Controller?

Running user code in a separate agent pod has several benefits:


  • Protecting the Controller: The workflow controller is a critical component. Isolating user tasks prevents crashes, hangs, or resource spikes in user code from affecting the controller itself.
  • Flexible extensibility: This setup allows implementing additional behavior in any programming language. You can extend it simply by providing a container image that runs a small server responding to /api/v1/template.execute.

Prerequisites for the Executor Plugin

Simply changing a template from container to plugin isn’t enough - some preparation is required beforehand.


Let’s look at a high-level overview of how the Executor Plugin works. Components specific to the Executor Plugin are highlighted in green on the diagram:

To get your plugin up and running, you need to follow a few key high-level steps. Details for each step are provided below:

  • Configure the Argo Workflow Controller and enable the Executor Plugin by setting the environment variable ARGO_EXECUTOR_PLUGINS=true
  • Implement a server (in any programming language) that exposes the /api/v1/template.execute endpoint - this is the component that will implement your plugin’s logic - and package it as a Docker image.
  • Create a ConfigMap that stores the plugin’s settings by its name - for example, specifying the Docker image you built in the previous step.
  • Add additional RBAC permissions (such as create/patch for WorkflowTaskSet) to the ServiceAccount that Argo uses to run your Workflow.
  • Secure your plugin - for example, by restricting access to the agent pod from anything outside the Argo Workflow Controller.
  • Add additional ServiceAccount permissions if needed - for example, granting access to the Kubernetes API.


First, here’s a link to the repository where I implemented all these steps in Argo Workflows so the plugin can be used. I’ll be using examples from this repo in the sections below.

Configure the Argo Workflow Controller

By default, the Argo Workflow Executor Plugin is disabled in the Workflow Controller.


As a result, the Workflow Controller ignores any plugin configurations that were added to the cluster. To enable it, simply add the environment variable ARGO_EXECUTOR_PLUGINS=true.


In my demo repository, I do this by patching the Workflow Controller as shown below:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: workflow-controller
  namespace: argo
spec:
  template:
    spec:
      containers:
        - name: workflow-controller
          env:
            # Enable the Executor Plugin.
            # The Executor Plugin is disabled by default.
            # Without this environment variable, the workflow-controller
            # will not register any executor plugins.
            - name: ARGO_EXECUTOR_PLUGINS
              value: "true"


Once enabled, the Workflow Controller uses a special informer to watch all new or updated ConfigMaps containing plugin settings.

It automatically registers any Executor Plugins defined in these ConfigMaps, based on the following label:

kubectl get cm -l workflows.argoproj.io/configmap-type=ExecutorPlugin

Implement a server

Logic of a plugin is implemented in a server.

Plugin it is the extending point for the Argo Workflow Controller.


Argo Workflow Controller communicates with a plugin via the API contract.


Here is the brief explanation of the contract.

  • The server must respond to the /api/v1/template.execute endpoint.
  • The server should respond with a JSON object like this:
{
  "node": {
    "phase": "Succeeded", // Use "Failed" to mark the step as failed
    // other parameters, e.g., outputs
  }
}


Important: To mark a step as failed, the server should return "phase": "Failed". HTTP errors are treated as unexpected issues by the controller and split into transient (retried) and non-transient categories. For more details, refer to the documentation.


Here’s an example of a simple server implementation written in Python:

from fastapi import FastAPI, Request, HTTPException
import uvicorn

app = FastAPI()

@app.post("/api/v1/template.execute")
async def execute(request: Request):
    data = await request.json()

    print_message_plugin_request = data.get("template", {}).get("plugin", {}).get("print-message-plugin")
    if print_message_plugin_request is None:
        return None

    if print_message_plugin_request.get('args') is None:
        raise HTTPException(status_code=400, detail=f"invalid request body. "
                                                    f"Expecting template.plugin.print-message-plugin.args, found: f{data}")
    message = print_message_plugin_request.get('args')

    print(f"PRINT: {message}" if message else "No message")

    return {
        "node": {
            "phase": "Succeeded",
            "outputs": {
                "parameters": [{"name": "result", "value": f'{message} processed by print-message-plugin'}]
            }
        }
    }


if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8080)


It simply prints a message and returns a string in the output, which can be used as an input parameter for the next steps or DAG tasks. This is essentially our extension point


About the Request Parameter

If you submit the following Argo Workflow:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: steps-plugin-
spec:
  entrypoint: hello-hello-hello

  templates:
    - name: hello-hello-hello
      steps:
        - - name: hello1
            template: print-message
            arguments:
              parameters:
                - name: message
                  value: "hello1"

    - name: print-message
      inputs:
        parameters:
          - name: message
      plugin:
        print-message-plugin:
          args: ["{{inputs.parameters.message}}"]


The request body sent to the plugin according to the template will be:

template.plugin.print-message-plugin.args: <value>


Explanation:

  • template.plugin – constant part of the request structure.
  • print-message-plugin – the name of the plugin being called.
  • args – the parameter(s) defined in the template.
  • value – can be a simple type (string, number) or a JSON object.

Inject the Plugin Server Implementation into an Argo Workflow

Here, everything is simple and follows the standard Argo Workflow approach:


  • Build a Docker image from your plugin server. A demo example is provided in this repository.
  • Create a ConfigMap in the cluster specifying this image and other plugin parameters (user, ServiceAccount, resources, etc.).


According to the documentation, you can build the ConfigMap as follows:

  • Create a plugin.yaml file.


Sample for the print-message-plugin:

apiVersion: argoproj.io/v1alpha1
kind: ExecutorPlugin
metadata:
  name: print
spec:
  sidecar:
    container:
      image: print-message-plugin:latest
      name: print-message-plugin
      ports:
        - containerPort: 8080
      securityContext:
        runAsNonRoot: false
        runAsUser: 65534 # nobody
      resources:
        requests:
          memory: "64Mi"
          cpu: "250m"
        limits:
          memory: "128Mi"
          cpu: "500m"
  • Build the ConfigMap using the Argo CLI (make sure it is installed):
argo executor-plugin build .
  • This command creates a ConfigMap and a README explaining how to apply it.


The Workflow Controller will pick up the plugin automatically. When an Argo Workflow runs, it will execute our plugin image inside a dedicated agent pod.

Add additional RBAC permissions for WorkflowTaskSet

This is necessary for updating workflow statuses.

Secure the plugin to only accept requests from the Argo Workflow Controller

As noted, our plugin runs as a sidecar container inside a dedicated pod. It exposes an HTTP endpoint externally.

This means that, in theory, any workload in the cluster could send HTTP requests to the plugin sidecar, so it is important to add basic access controls instead of relying on defaults.


The minimal change we need to make in plugin.yaml

apiVersion: argoproj.io/v1alpha1
kind: ExecutorPlugin
metadata:
  name: print
spec:
  sidecar:
    container:
      #...
      securityContext:
        #...
        runAsUser: 1000  # A user is now required to access the token from the sidecar container; we can no longer use a non-root user.
      #...


By default (without extra customization):


The Argo Workflow Controller mounts the /var/run/argo volume into the agent-pod (which hosts the plugin sidecar). This volume contains a token.

  • The Argo Workflow Controller includes the same token in the Authorization Bearer header of each execution request.


Additionally:

Inside the plugin, you need to read the token from /var/run/argo/token and compare it with the token from the request header.

with open("/var/run/argo/token") as f:
  token = f.read().strip()
def do_POST(self):
  if self.headers.get("Authorization") != "Bearer " + token:
    self.forbidden()              


Mount the ServiceAccount to the agent pod

Kubernetes uses a default-deny model for API access. If your plugin needs to call the Kubernetes API (for example, to list pods or create resources), you must mount a ServiceAccount into the sidecar container and grant it the minimal required RBAC permissions.


Let’s walk through a simple example: writing a plugin that calls the Kubernetes API and prints the list of pods in the namespace specified in the request.


Here is an example server implementation.

from fastapi import FastAPI, Request, HTTPException
import uvicorn

import os

from kubernetes import config, client

app = FastAPI()

config.load_incluster_config()
k8s = client.CoreV1Api()

def list_pods_in_namespace(namespace: str):
    # This is where we query the Kubernetes API to list pods.
    # The ServiceAccount must be mounted into the pod and have the required RBAC permissions
    # to list pods in the specified namespace.
    pods = k8s.list_namespaced_pod(namespace=namespace)
    return [p.metadata.name for p in pods.items]

@app.post("/api/v1/template.execute")
async def execute(request: Request):
    data = await request.json()

    plugin_request = data.get("template", {}).get("plugin", {}).get("list-pods-plugin")
    if plugin_request is None:
        return None
    if plugin_request.get('namespace') is None:
        raise HTTPException(400, 'namespace parameter is missing')
    namespace = plugin_request.get('namespace')

    print(f'namespace: {namespace}')

    pod_list = list_pods_in_namespace(namespace)

    return {
        "node": {
            "phase": "Succeeded",
            "outputs": {
                "parameters": [{"name": "result", "value":  f'pods in ns {namespace}: {",".join(pod_list)}'}]
            }
        }
    }


if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8082)


For mounting the ServiceAccount into the executor plugin sidecar, automountServiceAccountToken must be set to true.


Here is an example YAML showing how to mount the ServiceAccount token into the executor plugin sidecar:

apiVersion: argoproj.io/v1alpha1
kind: ExecutorPlugin
metadata:
  name: print
spec:
  sidecar:
    automountServiceAccountToken: true 
    container:
      image: list-pods-plugin:latest
      name: list-pods-plugin
      ports:
        - containerPort: 8082
      securityContext:
        runAsNonRoot: false
        runAsUser: 1001
      resources:
        requests:
          memory: "64Mi"
          cpu: "250m"
        limits:
          memory: "128Mi"
          cpu: "500m"


Reminder: to obtain the ConfigMap required to add the plugin to the cluster, you need to run the following command.

 argo executor-plugin build .


Afterwards, apply the created ConfigMap to the cluster.


The executor plugin then runs with its own dedicated ServiceAccount, separate from the one used by the Argo Workflow. This approach improves security by allowing fine-grained control over the permissions granted to the executor plugin. The ServiceAccount name is predefined and cannot be customized. It is always set to: <pluginName>-executor-plugin (for example, list-pods-plugin-executor-plugin in this case).


With the token mounted, the executor plugin can authenticate with the Kubernetes API and perform actions allowed by its ServiceAccount.

RBAC details are not included here. For everything needed to try out the plugins, see my demo repo with instructions: here.

Handling Multiple Plugins

When multiple Executor Plugins are installed in the cluster (for example, from different teams), the Argo Workflow Controller calls them sequentially until one handles the task.


Always check the plugin name in your server:

@app.post("/api/v1/template.execute")
async def execute(request: Request):
    data = await request.json()
    
    # Check if this is OUR plugin
    my_plugin_request = data.get("template", {}).get("plugin", {}).get("print-message-plugin")
    if my_plugin_request is None:
        return None  # ← Pass to the next plugin
    
    # Execute logic only for our plugin name
    # ...

Why this matters:

  • Without the check, your plugin might unexpectedly process tasks meant for others
  • return None tells the controller: "not my request, try the next plugin"
  • The first plugin returning phase: "Succeeded" or phase: "Failed" completes execution

See the discussion here.​

Final Example: One Pod Instead of Three

Now that the plugin is properly installed and configured, let's run the same steps workflow - but using our plugin.

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: steps-plugin-
spec:
  entrypoint: hello-hello-hello

  templates:
    - name: hello-hello-hello
      steps:
        - - name: hello1
            template: print-message
            arguments:
              parameters:
                - name: message
                  value: "hello1"
        - - name: hello2a
            template: print-message
            arguments:
              parameters:
                - name: message
                  value: "hello2a"
          - name: hello2b
            template: print-message
            arguments:
              parameters:
                - name: message
                  value: "hello2b"

    - name: print-message
      inputs:
        parameters:
          - name: message
      plugin:
        print-message-plugin:
          args: ["{{inputs.parameters.message}}"]


Before (standard template: container):

kubectl -n argo get po
NAME READY STATUS RESTARTS AGE
steps-6zjdt-print-message-2278363560/2Completed060s
steps-6zjdt-print-message-35586282680/2Completed050s
steps-6zjdt-print-message-36089611250/2Completed050s


After (using template: plugin):

kubectl -n argo get po
NAME READY STATUS RESTARTS AGE
steps-plugin-7fkgk-1340600742-agent 4/4Running019s

The same workflow now runs all three steps on a single reusable agent pod, reducing pod creation overhead and scheduler load.

HTTP Template (Bonus Alternative)

There's also the http template - a simpler alternative to full plugins when you just need to make an HTTP request and use the full response.

Works exactly like plugins but without custom server code:

  • Single agent pod handles all template: http steps/tasks
  • No plugin implementation needed - just point to any HTTP endpoint
  • Full response (JSON, status, headers) flows back as workflow outputs

When NOT to Use Executor Plugins

  • Executor Plugins aren't always needed. Stick with standard pods for the classic Argo Workflows use case: long-running or resource-heavy tasks where you want Kubernetes to precisely control scheduling, resource allocation, and isolation per step.
  • Plugins also unnecessary for tasks Argo already handles natively, like notifications.

Real-World Use Cases from Community

  • Proposing an "http" step type in Argo to delegate task execution via HTTP instead of spinning up a container, improving efficiency and enabling integration with systems like Brigade.
  • Official Argo Plugin Directory
  • In my KEP, plugin checks cache first - reuses outputs or executes + caches. Ideal for ms-fast schema validation vs. 5-10s pod startup.

Conclusion

Executor Plugins powerfully extend Argo Workflows without controller forks, enabling seamless integration with external systems via HTTP and eliminating pod fluctuations for lightweight tasks.


Featured image: Photo by Ian Taylor on Unsplash





Written by antonpechenin | I believe that artificial intelligence will help humanity reach distant galaxies.
Published by HackerNoon on 2025/12/19