paint-brush
Why We Said ‘No’ to Apache Airflow and Built Our Own Workflow Engineby@jaktenstid111
126 reads

Why We Said ‘No’ to Apache Airflow and Built Our Own Workflow Engine

by IgorDecember 4th, 2024
Read on Terminal Reader
Read this story w/o Javascript
tldt arrow

Too Long; Didn't Read

Was it worth implementing our own engine? I’d say yes.
featured image - Why We Said ‘No’ to Apache Airflow and Built Our Own Workflow Engine
Igor HackerNoon profile picture

Hey there, I'm Igor!


During the past time in BasedLabs, we've been working on the MVP of the NoLabs workflow engine. NoLabs empowers bioinformaticians to create pipelines and workflows for their lab experiments. While numerous workflow engines exist, we had compelling reasons to develop our own.

The problem of choice or framework of decision making

Decision-making is hard, and selecting among numerous solutions for the core of your product is especially challenging. The problem with choosing an incorrect solution is that, in the initial stages, a framework, platform, or library you have selected might seem like a good decision until you encounter unavoidable use cases in your business requirements or environment. A small detail or missing feature that went unnoticed in the early stages becomes a huge obstacle when you have 100,000 lines of code in your solution. As wise people say, "Understanding a problem clearly is the first step to solving it."


Firstly, our specific requirements necessitated a custom solution. Existing platforms like Apache Airflow, Prefect, Dagster and others either meet only a subset of our needs or have critical shortcomings. For instance, they may lack forked task force-kill functionality, suffer from crucial bugs like tasks hanging in the CANCELLING state (and you cannot recover from it apart from shutting down worker completely), or lack a user interface and I/O component validation. Many also don't support dynamic task execution, which is essential for our use case. Secondly, the complexity of some systems, particularly Apache Airflow, posed significant challenges.


NoLabs workflow engine UI

A few words about Apache Airflow

Simple mechanisms tend to be more reliable and easier to understand. An old car from the 1980s is simpler to fix and maintain (our grands did it by hand) than a brand-new Tesla. A system made of a few parts is easier to comprehend than one with thousands of moving components.


As Terry Davis, the creator of TempleOS, once said, "Genius admires simplicity, fool admires complexity". There's a significant difference between "easy" and "simple," though. "Easy" implies "without effort," whereas "simple" means "without complication." From my experience—which, while not vast, has been quite broad—good software is a simple software. Simple software can still be built from complex parts if they are abstracted well enough, but in my humble opinion - every software developer should be in a constant thoughts of how could I simplify my solution, reduce moving parts in it, while preserving a good architectural practices and functionality. With each additional gear in your system you add complexity to it. In some cases, a vertical slices architecture can provide more advantages than a traditional layered approach, and a simple monolith can sometimes outperform SOA by avoiding unnecessary overhead. Airflow is extremely extensible and feature-rich and complex at the same time. Having just two devs (including me) in the team you have to keep these borders of how much data and concepts around you solution you keep in head at the same time (that’s why microservices tend to be per-team component). Also you must maintain a separate infrastructure for it, making it more suitable for corporations with dedicated IT departments and DevOps teams, but not for the small startups.

Our decision on Celery

At a high level, Celery is just a task queue—but an extremely powerful one and an excellent framework for distributed task execution. It supports a good subset of features we needed, also we realized we could easily test it using the "threads" task pool executor mode without spinning up a separate Docker container or similar infrastructure. Since it's essentially a task queue—you can represent a node in a graph as a task—it's perfect for dynamic task execution. Using a worker with a "prefork" pool allows for easy force-killing of tasks, and monitoring workers is straightforward with the Celery API.

Key Challenges in Building Scalable and Distributed workflow engines

At a high level, one might assume that distributing tasks across workers can immediately yield a functional workflow engine. However, this simplistic view breaks down when faced with real-world complexities. Designing a truly robust and scalable asynchronous system requires careful consideration of numerous edge cases, such as:


  • What happens when worker is down in a middle of task execution? - tasks may become orphaned, requiring mechanisms to detect incomplete tasks and reschedule them.
  • What happens when worker did not take a task at all? - task dispatching must ensure that tasks are not only queued but actively monitored for execution status.
  • What happens when multiple workers attempt to update the same task’s state simultaneously? - this can lead to race conditions, data corruption, or undefined states if not handled with proper concurrency controls.
  • How errors should be propagated from the worker to user?
  • What happens if you started the task, but redis failed trying to save task idempotency id for futher monitoring?
  • And many more!


To tackle these challenges effectively, the following strategies can be employed:


  1. Distributed state managing - leverage state management systems to track task statuses. Using distributed key-value stores like Redis or etcd can help manage the state centrally while allowing workers to access and update task progress in a synchronized manner.
  2. Distributed locks - implement distributed locking mechanisms (e.g., Redlock for Redis (which we used) or Zookeeper locks) to ensure that only one worker modifies node’s state at a given time, avoiding race conditions.
  3. Task scheduling and orphaned tasks detection - periodic monitoring tasks, often referred to as heartbeat checkers, can detect unresponsive workers. For incomplete tasks, use Celery’s task retries or implement custom task recovery mechanisms that reschedule tasks safely.
  4. Actor-based models - while Celery provides a task-queue-based model, actor-based frameworks like Ray.io (great framework, but it does not allow to use different python versions across workers, which was critical for us) or .NET Orleans (although restrictive in some scenarios) are worth exploring. These allow fine-grained control over task execution and state management through distributed actors, offering a more decentralized paradigm. Personally, I like the idea of actor-based programming in distributed systems. Part of this paradigm is used in NoLabs - every execution node of the graph does not have in-memory state and node state manipulations can be done on each worker without race conditions\data race considerations.

NoLabs workflow schema high-level overview

  • Workflow - a graph of component nodes.
  • Component node - a single execution node of a workflow (node of the graph). Also a graph of jobs.
  • Job - a single execution node of a component node graph (yes, it is graph itself as well). Why do we need concept of jobs? Imagine that you have a thousand .fasta files representing amino acid sequence. You wouldn’t want to make a thousand of components, but just a single component with thousand automatically generated jobs. Component is considered completed when at least one job is in SUCCESS state.

Workflow schematic overview.

Requirements

As everybody knows every software starts with a set of functional and nonfunctional requirements, or use-cases. For distributed and scalable workflow engine there are plenty of ones. Some can be covered by celery, some must be implemented from the scratch.


There I leave just a couple of such requirements:

Requirement

Priority

User can create a workflow from components

High

User can execute entire workflow

High

User can execute individual component

Low

User can connect components IO with each other

High


Component must fail if worker executing component’s task is down

Medium

Component must fail if property validation error occurs

High

User can cancel component execution

Medium

User can add new component to graph

Medium

Task must be taken by free worker

High

State management

There are multiple workflow architectural styles, as described in the Microsoft Architecture Guide.


We have chosen the State Machine style because, due to the dynamic nature of the workflow, there is uncertainty regarding the number of active nodes in the graph. Additionally, there is no predefined workflow structure that can be described in code. Each component is aware only of its predecessors. By traversing this graph and checking the states of the nodes and their dependencies, the graph scheduler can determine the current state of the workflow and identify the necessary state transitions or actions for those components.


Each node of the workflow (graph, component, job and ‘steps’ inside them) has a state:


class ControlStates(str, Enum):
    SCHEDULED = "SCHEDULED" # Ready for execution
    FAILURE = "FAILURE"
    STARTED = "STARTED"
    CANCELLING = "CANCELLING" # In a process of cleaning up\shutting down tasks, etc
    SUCCESS = "SUCCESS"
    CANCELLED = "CANCELLED"
    UNKNOWN = "UNKNOWN" # Initial state


There must also be a state transition matrix to disallow incorrect states transition (remember - no data or failure is better than invalid data!)


state_transitions = {
    ControlStates.UNKNOWN: [ ],
    ControlStates.CANCELLING: [ControlStates.CANCELLED],
    ControlStates.SCHEDULED: [ ControlStates.SCHEDULED, ControlStates.STARTED, ...],
    ControlStates.STARTED: [ ControlStates.CANCELLING, ControlStates.FAILURE, ...],
    ...
}


There are also PROGRESS_STATES also, that define a set of state that must be ‘controlled’ by the scheduler. Each node must support methods for handling these progress states. Inside such methods we determine should we move to the next state or remain in the current (for example if celery task is still running on worker, or task is terminating).


PROGRESS_STATES = [ControlStates.STARTED, ControlStates.CANCELLING]


Importance of small steps or divide and conquer

You cannot get a million dollars when yesterday you were unemployed a broke or become a jiu-jitsu black belt from the first day of training. Every big goal can be solved basically by divide and conquer algorithm, which is used successfully in software development and life both. If the problem is too big to comprehend you must divide it into smaller manageable pieces and conquer them all. This concept lays in the root of software engineering itself - modules, microservices, recursion algorithms, scalability - all of this allows you to tackle a big problem as a pieces of smaller and manageable ones, then recombine these pieces into a single working system.


Divide and conquer algorithm overview.


Starting from the bottom

Each execution unit of the graph is a separate execution node class which exposes necessary methods for each moving part of the workflow. The logic of this approach is that each bit of application that has a distributed state should be an instance of ExecutionNode that scheduler can track effectively.


class ExecutionNode(ABC):
    """
    Represents single execution node in scheduler.
    """
    async def get_state(self) -> Optional[ControlStates]:
        '''Get state. In case of composite node (like graph) can be a composite state of inner nodes.'''
        ...

    async def set_state(self, state: ControlStates, pipe: Optional[Pipeline] = None):
        ...

    @abstractmethod
    async def sync_started(self):
        '''Is executed by the scheduler when component in STARTED state'''
        ...

    @abstractmethod
    async def start(self, **kwargs):
        ...


Some execution nodes are not atomic, but contain necessary steps. Component is a composite of it’s pre-execute node, all jobs nodes and post-execution node. Composite nodes are in control of managing states of each child node. So schematically the logic of i.e component node is following:


Overview of execution graph


By adopting this composite execution node approach, you simplify each node's logic by dividing it into smaller, more manageable units, allowing the scheduler to control and monitor them more effectively.


For example method for syncing the started state of the ComponentExecutionNode:


async def sync_started(self):
    state = await self.get_state()
    if state != ControlStates.STARTED:
        return

    await self.sync_main_task()

    if await self.get_state() in TERMINAL_STATES:
        return

    any_job_succeeded = False
    if await self.main_task.get_state() == ControlStates.SUCCESS:
        any_job_succeeded = await self.sync_jobs()
        if await self.get_state() in TERMINAL_STATES:
            return

    if await self.get_state() in TERMINAL_STATES:
        return

    if (
        await self.main_task.get_state() == ControlStates.SUCCESS
        and any_job_succeeded
    ):
        await self.sync_complete_task()

    if await self.get_state() in TERMINAL_STATES:
        return


There is a special case of execution node - celery execution node, whose logic resides within a Celery task. When you invoke the start method, it runs a celery task. The smallest components of any node are celery execution nodes essentially, allowing you to track Celery tasks and the execution of each node using the Celery API.


class CeleryExecutionNode(ExecutionNode, ABC):
    @abstractmethod
    async def start(self, **kwargs):
        ...

    async def sync_started(self) -> ControlStates:
        """
        Executed by scheduler every n seconds if task state == STARTED.
        """
        ...

        # Get information about inner celery task
        async_result = AsyncResult(id=task_id, app=self.celery)
        celery_state = async_result.state
        new_state = celery_to_internal_mapping[celery_state]
        ...

        return new_state

    async def _prepare_for_start(self, queue: str, pipe: Optional[Pipeline] = None) -> str:
        """
        Creates and saves the task id for further tracking before task is executed.
        :returns: celery_task_id
        """
        task_id = str(uuid.uuid4())
        timestamp = datetime.datetime.now(datetime.UTC).timestamp()
        (pipe or rd).hset(self.celery_task_id_cid, mapping={
            'task_id': task_id,
            'queue': queue,
            'timestamp': timestamp
        })

        # Will discuss this later
        OrphanedTasksTracker().track(task_id=task_id,
                                     queue=queue,
                                     timestamp=timestamp)

        return task_id


So what should be written inside start method?


@abstractmethod
async def start(self, **kwargs):
   ...


It depends. For component node, which is a composite node it is just:


class ComponentExecutionNode(CeleryExecutionNode):   
   async def start(self, **kwargs):
        await self.set_state(state=ControlStates.STARTED)


For celery execution nodes it is more complex


class JobMainTaskExecutionNode(CeleryExecutionNode):
    async def start(self):
        pipe = get_redis_pipe() # Pipe allows transactional behavior in redis
        queue = settings.workflow_queue
        # Create a celery task id and save it so monitoring process can check it later
        celery_task_id = await self._prepare_for_start(queue=queue, pipe=pipe)
        self.celery.send_task( # Create task in celery.
            name=Tasks.job_main_task,
            ...
        )
        await self.set_state(ControlStates.STARTED, pipe=pipe)
        pipe.execute() # 'Commit'


Adding your functionality to nodes

So as I described components and jobs are multi-stepped and they consist of celery tasks that do some work. They have pre and post execution actions, jobs also can have some long running task (which I’ll describe later). To support this functionality class ComponentFlowHandler exists, that provides methods to be overridden and used to extend functionality of nodes.


class ComponentFlowHandler(Generic[TInput, TOutput]):
    """
    Represents component flow and available client handlers
    """
    async def on_start(self, inp: TInput) -> List[uuid.UUID]:
        """
        Must return IDs of jobs. Called from inside ComponentMainTaskExecutionNode.
        """
        return []

    async def on_finish(self, inp: TInput, job_ids: List[uuid.UUID]) -> Optional[TOutput]:
        """Is called from inside celery task of ComponentCompleteTaskExecutionNode"""
        ....

    async def on_job_start(self, job_id: uuid.UUID):
        """Is called from inside celery task of JobMainTaskExecutionNode"""
        ...

    async def on_job_finish(self, job_id: uuid.UUID, long_running_output: Optional[Dict[str, Any]]):
        """Is called from inside celery task of JobCompleteTaskExecutionNode"""
        ...

    async def cancel_job(self, job_id: uuid.UUID, reason: Optional[str] = None):
        node = JobExecutionNode(
            experiment_id=self.experiment_id,
            component_id=self.component_id,
            job_id=job_id,
        )
        if await node.can_cancel():
            await node.cancel(message=reason)

    async def schedule(
            self,
            job_id: uuid.UUID,
            celery_task_name: str,
            celery_queue: str,
            input: Optional[Union[BaseModel, Dict[str, Any]]] = None
    ):
        """Schedule long running jobs"""
        ...

Long running tasks

In the biotech world, some experiments can run for days or even weeks. Imagine fine-tuning a model that designs small molecules for a specific target—which can take hours to days—or running a BLAST search that takes minutes. Firstly, it is a bad design principle to make any synchronous communication around such tasks. Suppose a user clicks the "run experiment" button on the web UI; how should the main application worker communicate with another service that fine-tunes the model? Should it send a message and wait for the response? What if the main application worker goes down during this time? This means that the long-running task will be orphaned. In software design, if any task is running, all communication with it must always be asynchronous; you must not spend any thread or process resources waiting for the results of such a task. In NoLabs, we provide a special API that developers can leverage to run long-running tasks on different services, such as those running AI models.


class ComponentFlowHandler(Generic[TInput, TOutput]):
    async def schedule(
            self,
            job_id: uuid.UUID,
            celery_task_name: str, # any celery task name
            celery_queue: str, # any celery queue
            input: Optional[Union[BaseModel, Dict[str, Any]]] = None
    ):
        """Schedule long running jobs"""
        node = JobLongRunningTaskExecutionNode(
            experiment_id=self.experiment_id,
            component_id=self.component_id,
            job_id=job_id,
        )
        can_schedule = await node.can_schedule()
        if can_schedule:
            arguments = {}

            if isinstance(input, dict):
                arguments = input
            elif input:
                arguments = input.model_dump()

            await node.schedule(
                celery_task_name=celery_task_name,
                arguments=arguments,
                celery_queue=celery_queue,
            )

High level workflow api (aka Graph class) overview

While other nodes are internal to the package, the Graph class offers a high-level API for workflow manipulation. It includes three methods that encapsulate the core execution logic.


 class Graph:   
     async def set_components_graph(self, components: List[Component]):
        """
        Sets the components (nodes of the graph). 
        If any of the component is running - we must prohibit the operation,
        since it can lead to incorrect internal state.
        We must apply distributed lock to these methods to prevent incorrect internal state.
        """
        lock = redlock(key=self._id, blocking=True)
        lock.acquire()

        try:
            # Set graph nodes.
        finally:
            if lock.owned():
                lock.release()

    async def schedule(self, component_ids: List[uuid.UUID]):
        """
        Schedule all components as soon as possible.
        """
        lock = redlock(key=self._id, blocking=True)

        try:
            # Schedule graph nodes.
        finally:
            if lock.owned():
                lock.release()

    async def sync(self):
        """
        Synchronize the graph and move the states of internal nodes.
        """
        lock = redlock(key=self._id, blocking=False, auto_release_time=10.0)

        if not lock.acquire():
            return

        try:
            # Synchronize graph nodes which are in progress states like 'STARTED' or 'CANCELLING'.
            # Call sync_started or sync_cancelling of components.
        finally:
            if lock.owned():
                lock.release()


Worker entry point

To run the scheduler periodically, the Celery scheduler comes into play. It synchronizes all graphs every n seconds, advancing the entire state machine to the next state at each step.


@celery.task(name=Tasks.sync_graphs_task,
             bind=True,
             queue=settings.workflow_queue)
def sync_graphs(bind):
    async def _():
        # Prevent more than one worker going into this method.
        lock = redlock(key=Tasks.sync_graphs_task, auto_release_time=10.0)

        if not lock.acquire():
            return

        try:
            experiments = Experiment.objects.only("id")

            for experiment in experiments:
                graph = Graph(experiment_id=experiment.id)
                await graph.sync()
        finally:
            if lock.owned():
                lock.release()

    async_to_sync(_)()


Dealing with orphaned tasks

Orphaned tasks can easily lead to Celery queue starvation, negatively impacting the overall user experience. For instance, consider a scenario where you run a task with a timeout of one day. If you lose track of whether the task is still executing or if the corresponding worker is down, it can result in prolonged delays and inefficiencies. To mitigate this issue, we assign a unique task_id to every task and ensure it is written to Redis before sending the task to Celery. This approach helps track tasks more reliably and prevents them from being orphaned.


The following code snippet demonstrates how we handle orphaned tasks by monitoring the Celery queue and removing tasks that no longer have an active worker or queue. The remove_orphaned_tasks task leverages Redis-based locking and Celery introspection features to manage task states effectively


@celery.task(
    name=Tasks.remove_orphaned_tasks, queue=settings.workflow_queue, max_retries=0
)
def remove_orphaned_tasks():
    lock = redlock(key=Tasks.remove_orphaned_tasks, auto_release_time=10.0)

    if not lock.acquire():
        return

    try:
        celery = get_celery_app()
        inspect = celery.control.inspect()
        tasks = list(OrphanedTasksTracker.get_task_data())

        if not tasks:
            return

        active_workers = inspect.active() or {}
        active_queues = inspect.active_queues()
        workers = set(active_workers.keys())
        queues = set()
        for l in active_queues.values():
            for queue in l:
                queues.add(queue['name'])

        for task in tasks:
            task_id = task['task_id']
            task_queue = task['queue']
            timestamp = task['timestamp']
            async_result = AsyncResult(id=task_id, app=celery)

            if async_result.state in READY_STATES:
                OrphanedTasksTracker.remove_task(task_id)
                continue

            if datetime.datetime.now(datetime.UTC).timestamp() - timestamp < 60.0:
                continue

            if async_result.info:
                worker_name = async_result.info.get('hostname')
                if worker_name and worker_name not in workers:
                    celery.backend.store_result(
                        task_id, Exception("Task worker is down (failed in progress)"), state=FAILURE
                    )
                    OrphanedTasksTracker.remove_task(task_id)
                    continue

            if task_queue not in queues:
                celery.backend.store_result(
                    task_id, Exception("Task worker is down (not started)"), state=FAILURE
                )
                OrphanedTasksTracker.remove_task(task_id)
                continue
    finally:
        if lock.owned():
            lock.release()


Component properties validation

In the workflow engine, properties validation plays a crucial role in ensuring that the components within a workflow are correctly configured and interconnected. Each component has input and output schemas defined using Pydantic models, which provide a structured way to represent the expected data formats and types. The Parameter, Property, and Component classes are central to this validation process.


The Parameter class represents the schema of a component's inputs or outputs, detailing the expected properties, their types, and any required fields. When components are connected within a workflow, the try_map_property method is used to map output properties from one component to the input properties of another. This method performs validation to ensure that the types and formats of the properties are compatible. If there is a mismatch or incompatibility, it raises a PropertyValidationError, preventing the workflow from proceeding with invalid configurations.


Moreover, the Component class utilizes the input_errors and output_errors methods to validate the input and output values against their respective schemas. Before a workflow is executed, these validations ensure that all required properties are provided, correctly typed, and that any defaults or mappings are appropriately set. This rigorous validation mechanism helps catch configuration issues early, reducing runtime errors and enhancing the reliability of the workflow execution.

Conclusion

So, what do we have in the end? Was it worth implementing our own engine? I’d say yes. First and foremost, we have full control over our code. We understand its drawbacks and benefits thoroughly, which means no unexpected surprises during development or future extensions. Additionally, instead of managing multiple additional services (like a worker and PostgreSQL for Airflow), we can minimize the deployment components, making our setup more streamlined.


Another significant advantage is the intellectual property and uniqueness of our solution. As I mentioned earlier in this article, there are no other completely open-source workflow engines that offer this level of extensibility and dynamic visual user interaction.


Overall, it has been a challenging journey. I explored numerous solutions and encountered many obstacles and edge cases during development. However, I ensured robustness by adding integration tests, which are available in our repository. The resulting workflow engine is highly extensible and straightforward to enhance. For instance, implementing a node-canceling feature took me only about 30 minutes, including the integration tests.


So thank you for reading my dev history and see you!