Hey there, I'm Igor!
During the past time in BasedLabs, we've been working on the MVP of the NoLabs workflow engine. NoLabs empowers bioinformaticians to create pipelines and workflows for their lab experiments. While numerous workflow engines exist, we had compelling reasons to develop our own.
Decision-making is hard, and selecting among numerous solutions for the core of your product is especially challenging. The problem with choosing an incorrect solution is that, in the initial stages, a framework, platform, or library you have selected might seem like a good decision until you encounter unavoidable use cases in your business requirements or environment. A small detail or missing feature that went unnoticed in the early stages becomes a huge obstacle when you have 100,000 lines of code in your solution. As wise people say, "Understanding a problem clearly is the first step to solving it."
Firstly, our specific requirements necessitated a custom solution. Existing platforms like Apache Airflow, Prefect, Dagster and others either meet only a subset of our needs or have critical shortcomings. For instance, they may lack forked task force-kill functionality, suffer from crucial bugs like tasks hanging in the CANCELLING
state (and you cannot recover from it apart from shutting down worker completely), or lack a user interface and I/O component validation. Many also don't support dynamic task execution, which is essential for our use case. Secondly, the complexity of some systems, particularly Apache Airflow, posed significant challenges.
Simple mechanisms tend to be more reliable and easier to understand. An old car from the 1980s is simpler to fix and maintain (our grands did it by hand) than a brand-new Tesla. A system made of a few parts is easier to comprehend than one with thousands of moving components.
As Terry Davis, the creator of TempleOS, once said, "Genius admires simplicity, fool admires complexity". There's a significant difference between "easy" and "simple," though. "Easy" implies "without effort," whereas "simple" means "without complication." From my experience—which, while not vast, has been quite broad—good software is a simple software. Simple software can still be built from complex parts if they are abstracted well enough, but in my humble opinion - every software developer should be in a constant thoughts of how could I simplify my solution, reduce moving parts in it, while preserving a good architectural practices and functionality. With each additional gear in your system you add complexity to it. In some cases, a vertical slices architecture can provide more advantages than a traditional layered approach, and a simple monolith can sometimes outperform SOA by avoiding unnecessary overhead. Airflow is extremely extensible and feature-rich and complex at the same time. Having just two devs (including me) in the team you have to keep these borders of how much data and concepts around you solution you keep in head at the same time (that’s why microservices tend to be per-team component). Also you must maintain a separate infrastructure for it, making it more suitable for corporations with dedicated IT departments and DevOps teams, but not for the small startups.
At a high level, Celery is just a task queue—but an extremely powerful one and an excellent framework for distributed task execution. It supports a good subset of features we needed, also we realized we could easily test it using the "threads" task pool executor mode without spinning up a separate Docker container or similar infrastructure. Since it's essentially a task queue—you can represent a node in a graph as a task—it's perfect for dynamic task execution. Using a worker with a "prefork" pool allows for easy force-killing of tasks, and monitoring workers is straightforward with the Celery API.
At a high level, one might assume that distributing tasks across workers can immediately yield a functional workflow engine. However, this simplistic view breaks down when faced with real-world complexities. Designing a truly robust and scalable asynchronous system requires careful consideration of numerous edge cases, such as:
To tackle these challenges effectively, the following strategies can be employed:
SUCCESS
state.
As everybody knows every software starts with a set of functional and nonfunctional requirements, or use-cases. For distributed and scalable workflow engine there are plenty of ones. Some can be covered by celery, some must be implemented from the scratch.
There I leave just a couple of such requirements:
Requirement |
Priority |
---|---|
User can create a workflow from components |
High |
User can execute entire workflow |
High |
User can execute individual component |
Low |
User can connect components IO with each other |
High |
… |
|
Component must fail if worker executing component’s task is down |
Medium |
Component must fail if property validation error occurs |
High |
User can cancel component execution |
Medium |
User can add new component to graph |
Medium |
Task must be taken by free worker |
High |
There are multiple workflow architectural styles, as described in the Microsoft Architecture Guide.
We have chosen the State Machine style because, due to the dynamic nature of the workflow, there is uncertainty regarding the number of active nodes in the graph. Additionally, there is no predefined workflow structure that can be described in code. Each component is aware only of its predecessors. By traversing this graph and checking the states of the nodes and their dependencies, the graph scheduler can determine the current state of the workflow and identify the necessary state transitions or actions for those components.
Each node of the workflow (graph, component, job and ‘steps’ inside them) has a state:
class ControlStates(str, Enum):
SCHEDULED = "SCHEDULED" # Ready for execution
FAILURE = "FAILURE"
STARTED = "STARTED"
CANCELLING = "CANCELLING" # In a process of cleaning up\shutting down tasks, etc
SUCCESS = "SUCCESS"
CANCELLED = "CANCELLED"
UNKNOWN = "UNKNOWN" # Initial state
There must also be a state transition matrix to disallow incorrect states transition (remember - no data or failure is better than invalid data!)
state_transitions = {
ControlStates.UNKNOWN: [ ],
ControlStates.CANCELLING: [ControlStates.CANCELLED],
ControlStates.SCHEDULED: [ ControlStates.SCHEDULED, ControlStates.STARTED, ...],
ControlStates.STARTED: [ ControlStates.CANCELLING, ControlStates.FAILURE, ...],
...
}
There are also PROGRESS_STATES
also, that define a set of state that must be ‘controlled’ by the scheduler. Each node must support methods for handling these progress states. Inside such methods we determine should we move to the next state or remain in the current (for example if celery task is still running on worker, or task is terminating).
PROGRESS_STATES = [ControlStates.STARTED, ControlStates.CANCELLING]
You cannot get a million dollars when yesterday you were unemployed a broke or become a jiu-jitsu black belt from the first day of training. Every big goal can be solved basically by divide and conquer algorithm, which is used successfully in software development and life both. If the problem is too big to comprehend you must divide it into smaller manageable pieces and conquer them all. This concept lays in the root of software engineering itself - modules, microservices, recursion algorithms, scalability - all of this allows you to tackle a big problem as a pieces of smaller and manageable ones, then recombine these pieces into a single working system.
Each execution unit of the graph is a separate execution node class which exposes necessary methods for each moving part of the workflow. The logic of this approach is that each bit of application that has a distributed state should be an instance of ExecutionNode
that scheduler can track effectively.
class ExecutionNode(ABC):
"""
Represents single execution node in scheduler.
"""
async def get_state(self) -> Optional[ControlStates]:
'''Get state. In case of composite node (like graph) can be a composite state of inner nodes.'''
...
async def set_state(self, state: ControlStates, pipe: Optional[Pipeline] = None):
...
@abstractmethod
async def sync_started(self):
'''Is executed by the scheduler when component in STARTED state'''
...
@abstractmethod
async def start(self, **kwargs):
...
Some execution nodes are not atomic, but contain necessary steps. Component is a composite of it’s pre-execute node, all jobs nodes and post-execution node. Composite nodes are in control of managing states of each child node. So schematically the logic of i.e component node is following:
By adopting this composite execution node approach, you simplify each node's logic by dividing it into smaller, more manageable units, allowing the scheduler to control and monitor them more effectively.
For example method for syncing the started
state of the ComponentExecutionNode
:
async def sync_started(self):
state = await self.get_state()
if state != ControlStates.STARTED:
return
await self.sync_main_task()
if await self.get_state() in TERMINAL_STATES:
return
any_job_succeeded = False
if await self.main_task.get_state() == ControlStates.SUCCESS:
any_job_succeeded = await self.sync_jobs()
if await self.get_state() in TERMINAL_STATES:
return
if await self.get_state() in TERMINAL_STATES:
return
if (
await self.main_task.get_state() == ControlStates.SUCCESS
and any_job_succeeded
):
await self.sync_complete_task()
if await self.get_state() in TERMINAL_STATES:
return
There is a special case of execution node - celery execution node, whose logic resides within a Celery task. When you invoke the start
method, it runs a celery task. The smallest components of any node are celery execution nodes essentially, allowing you to track Celery tasks and the execution of each node using the Celery API.
class CeleryExecutionNode(ExecutionNode, ABC):
@abstractmethod
async def start(self, **kwargs):
...
async def sync_started(self) -> ControlStates:
"""
Executed by scheduler every n seconds if task state == STARTED.
"""
...
# Get information about inner celery task
async_result = AsyncResult(id=task_id, app=self.celery)
celery_state = async_result.state
new_state = celery_to_internal_mapping[celery_state]
...
return new_state
async def _prepare_for_start(self, queue: str, pipe: Optional[Pipeline] = None) -> str:
"""
Creates and saves the task id for further tracking before task is executed.
:returns: celery_task_id
"""
task_id = str(uuid.uuid4())
timestamp = datetime.datetime.now(datetime.UTC).timestamp()
(pipe or rd).hset(self.celery_task_id_cid, mapping={
'task_id': task_id,
'queue': queue,
'timestamp': timestamp
})
# Will discuss this later
OrphanedTasksTracker().track(task_id=task_id,
queue=queue,
timestamp=timestamp)
return task_id
So what should be written inside start
method?
@abstractmethod
async def start(self, **kwargs):
...
It depends. For component node, which is a composite node it is just:
class ComponentExecutionNode(CeleryExecutionNode):
async def start(self, **kwargs):
await self.set_state(state=ControlStates.STARTED)
For celery execution nodes it is more complex
class JobMainTaskExecutionNode(CeleryExecutionNode):
async def start(self):
pipe = get_redis_pipe() # Pipe allows transactional behavior in redis
queue = settings.workflow_queue
# Create a celery task id and save it so monitoring process can check it later
celery_task_id = await self._prepare_for_start(queue=queue, pipe=pipe)
self.celery.send_task( # Create task in celery.
name=Tasks.job_main_task,
...
)
await self.set_state(ControlStates.STARTED, pipe=pipe)
pipe.execute() # 'Commit'
So as I described components and jobs are multi-stepped and they consist of celery tasks that do some work. They have pre and post execution actions, jobs also can have some long running task (which I’ll describe later). To support this functionality class ComponentFlowHandler
exists, that provides methods to be overridden and used to extend functionality of nodes.
class ComponentFlowHandler(Generic[TInput, TOutput]):
"""
Represents component flow and available client handlers
"""
async def on_start(self, inp: TInput) -> List[uuid.UUID]:
"""
Must return IDs of jobs. Called from inside ComponentMainTaskExecutionNode.
"""
return []
async def on_finish(self, inp: TInput, job_ids: List[uuid.UUID]) -> Optional[TOutput]:
"""Is called from inside celery task of ComponentCompleteTaskExecutionNode"""
....
async def on_job_start(self, job_id: uuid.UUID):
"""Is called from inside celery task of JobMainTaskExecutionNode"""
...
async def on_job_finish(self, job_id: uuid.UUID, long_running_output: Optional[Dict[str, Any]]):
"""Is called from inside celery task of JobCompleteTaskExecutionNode"""
...
async def cancel_job(self, job_id: uuid.UUID, reason: Optional[str] = None):
node = JobExecutionNode(
experiment_id=self.experiment_id,
component_id=self.component_id,
job_id=job_id,
)
if await node.can_cancel():
await node.cancel(message=reason)
async def schedule(
self,
job_id: uuid.UUID,
celery_task_name: str,
celery_queue: str,
input: Optional[Union[BaseModel, Dict[str, Any]]] = None
):
"""Schedule long running jobs"""
...
In the biotech world, some experiments can run for days or even weeks. Imagine fine-tuning a model that designs small molecules for a specific target—which can take hours to days—or running a BLAST search that takes minutes. Firstly, it is a bad design principle to make any synchronous communication around such tasks. Suppose a user clicks the "run experiment" button on the web UI; how should the main application worker communicate with another service that fine-tunes the model? Should it send a message and wait for the response? What if the main application worker goes down during this time? This means that the long-running task will be orphaned. In software design, if any task is running, all communication with it must always be asynchronous; you must not spend any thread or process resources waiting for the results of such a task. In NoLabs, we provide a special API that developers can leverage to run long-running tasks on different services, such as those running AI models.
class ComponentFlowHandler(Generic[TInput, TOutput]):
async def schedule(
self,
job_id: uuid.UUID,
celery_task_name: str, # any celery task name
celery_queue: str, # any celery queue
input: Optional[Union[BaseModel, Dict[str, Any]]] = None
):
"""Schedule long running jobs"""
node = JobLongRunningTaskExecutionNode(
experiment_id=self.experiment_id,
component_id=self.component_id,
job_id=job_id,
)
can_schedule = await node.can_schedule()
if can_schedule:
arguments = {}
if isinstance(input, dict):
arguments = input
elif input:
arguments = input.model_dump()
await node.schedule(
celery_task_name=celery_task_name,
arguments=arguments,
celery_queue=celery_queue,
)
While other nodes are internal to the package, the Graph
class offers a high-level API for workflow manipulation. It includes three methods that encapsulate the core execution logic.
class Graph:
async def set_components_graph(self, components: List[Component]):
"""
Sets the components (nodes of the graph).
If any of the component is running - we must prohibit the operation,
since it can lead to incorrect internal state.
We must apply distributed lock to these methods to prevent incorrect internal state.
"""
lock = redlock(key=self._id, blocking=True)
lock.acquire()
try:
# Set graph nodes.
finally:
if lock.owned():
lock.release()
async def schedule(self, component_ids: List[uuid.UUID]):
"""
Schedule all components as soon as possible.
"""
lock = redlock(key=self._id, blocking=True)
try:
# Schedule graph nodes.
finally:
if lock.owned():
lock.release()
async def sync(self):
"""
Synchronize the graph and move the states of internal nodes.
"""
lock = redlock(key=self._id, blocking=False, auto_release_time=10.0)
if not lock.acquire():
return
try:
# Synchronize graph nodes which are in progress states like 'STARTED' or 'CANCELLING'.
# Call sync_started or sync_cancelling of components.
finally:
if lock.owned():
lock.release()
To run the scheduler periodically, the Celery scheduler comes into play. It synchronizes all graphs every n seconds, advancing the entire state machine to the next state at each step.
@celery.task(name=Tasks.sync_graphs_task,
bind=True,
queue=settings.workflow_queue)
def sync_graphs(bind):
async def _():
# Prevent more than one worker going into this method.
lock = redlock(key=Tasks.sync_graphs_task, auto_release_time=10.0)
if not lock.acquire():
return
try:
experiments = Experiment.objects.only("id")
for experiment in experiments:
graph = Graph(experiment_id=experiment.id)
await graph.sync()
finally:
if lock.owned():
lock.release()
async_to_sync(_)()
Orphaned tasks can easily lead to Celery queue starvation, negatively impacting the overall user experience. For instance, consider a scenario where you run a task with a timeout of one day. If you lose track of whether the task is still executing or if the corresponding worker is down, it can result in prolonged delays and inefficiencies. To mitigate this issue, we assign a unique task_id to every task and ensure it is written to Redis before sending the task to Celery. This approach helps track tasks more reliably and prevents them from being orphaned.
The following code snippet demonstrates how we handle orphaned tasks by monitoring the Celery queue and removing tasks that no longer have an active worker or queue. The remove_orphaned_tasks task leverages Redis-based locking and Celery introspection features to manage task states effectively
@celery.task(
name=Tasks.remove_orphaned_tasks, queue=settings.workflow_queue, max_retries=0
)
def remove_orphaned_tasks():
lock = redlock(key=Tasks.remove_orphaned_tasks, auto_release_time=10.0)
if not lock.acquire():
return
try:
celery = get_celery_app()
inspect = celery.control.inspect()
tasks = list(OrphanedTasksTracker.get_task_data())
if not tasks:
return
active_workers = inspect.active() or {}
active_queues = inspect.active_queues()
workers = set(active_workers.keys())
queues = set()
for l in active_queues.values():
for queue in l:
queues.add(queue['name'])
for task in tasks:
task_id = task['task_id']
task_queue = task['queue']
timestamp = task['timestamp']
async_result = AsyncResult(id=task_id, app=celery)
if async_result.state in READY_STATES:
OrphanedTasksTracker.remove_task(task_id)
continue
if datetime.datetime.now(datetime.UTC).timestamp() - timestamp < 60.0:
continue
if async_result.info:
worker_name = async_result.info.get('hostname')
if worker_name and worker_name not in workers:
celery.backend.store_result(
task_id, Exception("Task worker is down (failed in progress)"), state=FAILURE
)
OrphanedTasksTracker.remove_task(task_id)
continue
if task_queue not in queues:
celery.backend.store_result(
task_id, Exception("Task worker is down (not started)"), state=FAILURE
)
OrphanedTasksTracker.remove_task(task_id)
continue
finally:
if lock.owned():
lock.release()
In the workflow engine, properties validation plays a crucial role in ensuring that the components within a workflow are correctly configured and interconnected. Each component has input and output schemas defined using Pydantic models, which provide a structured way to represent the expected data formats and types. The Parameter
, Property
, and Component
classes are central to this validation process.
The Parameter
class represents the schema of a component's inputs or outputs, detailing the expected properties, their types, and any required fields. When components are connected within a workflow, the try_map_property
method is used to map output properties from one component to the input properties of another. This method performs validation to ensure that the types and formats of the properties are compatible. If there is a mismatch or incompatibility, it raises a PropertyValidationError
, preventing the workflow from proceeding with invalid configurations.
Moreover, the Component
class utilizes the input_errors
and output_errors
methods to validate the input and output values against their respective schemas. Before a workflow is executed, these validations ensure that all required properties are provided, correctly typed, and that any defaults or mappings are appropriately set. This rigorous validation mechanism helps catch configuration issues early, reducing runtime errors and enhancing the reliability of the workflow execution.
So, what do we have in the end? Was it worth implementing our own engine? I’d say yes. First and foremost, we have full control over our code. We understand its drawbacks and benefits thoroughly, which means no unexpected surprises during development or future extensions. Additionally, instead of managing multiple additional services (like a worker and PostgreSQL for Airflow), we can minimize the deployment components, making our setup more streamlined.
Another significant advantage is the intellectual property and uniqueness of our solution. As I mentioned earlier in this article, there are no other completely open-source workflow engines that offer this level of extensibility and dynamic visual user interaction.
Overall, it has been a challenging journey. I explored numerous solutions and encountered many obstacles and edge cases during development. However, I ensured robustness by adding integration tests, which are available in our repository. The resulting workflow engine is highly extensible and straightforward to enhance. For instance, implementing a node-canceling feature took me only about 30 minutes, including the integration tests.
So thank you for reading my dev history and see you!