It doesn’t matter if you are running background tasks, preprocessing jobs or ML pipelines. Writing tasks is the easy part. The hard part is the orchestration— Managing dependencies among tasks, scheduling workflows and monitor their execution is tedious.
Enter Airflow. Your new workflow management platform.
A couple of years ago, In Scaling Effectively: when Kubernetes met Celery, I wrote about my own implementation of a workflow engine using Flask, Celery, and Kubernetes. I considered available solutions — including airflow. With no satisfying solution in sight, I decided to implement my own framework. Since then, airflow had come a long way. Here is why I switched to Airflow:
Scalable
When using the right setup, the one we are about to see, Airflow is both scalable and cost-efficient.
Batteries Included
Though the UI is not perfect, it is one of Airflow’s core competencies. And in this case, a picture is worth a thousand words-
Airflow has plenty of integrations both in the form of Operators and in the form of Executors.
And an experimental yet indispensable REST API for workflows, which implies you can trigger workflows dynamically.
Battle Tested
With so many companies using Airflow, I can rest assured knowing it is going to continuously improve.
🔥 Disposable Infrastructure
Using helm and some premade commands, we can destroy and re-deploy the entire infrastructure easily.
🚀 Cost-Efficient Execution
We use kubernetes as the tasks’ engine. Airflow scheduler will run each task on a new pod and delete it upon completion. Allowing us to scale according to workload using the minimal amount of resources.
🔩 Decoupled Orchestration
Another great advantage of using Kubernetes as the task runner is — decoupling orchestration from execution. You can read more about it in We’re All Using Airflow Wrong and How to Fix It.
🏃 Dynamically Updated Workflows
We use Git-Sync containers. Those will allow us to update the workflows using git alone. No need to redeploy Airflow on each workflow change.
CeleryExecutor + KubernetesPodOperator (recommended)
➕ Decoupling of orchestration and execution.
➖ Extra pods for celery workers redis and flower monitoring.
KubernetesExecutor + WhateverOperator
➕ No extra pods.
➖ Weak-Decoupling. we’ll have to define execution code and dependencies inside the DAGs.
KubernetesExecutor + KubernetesPodOperator
➕ No extra pods.
➕ Decoupling of orchestration and execution.
➖ Unsupported — currently causes recursion of pod startup.
Prerequisites
> brew install kubectl
> brew install helm
Make sure you have:
It is also recommended to set up Kubernetes Dashboard.
Setup
cookiecutter https://github.com/talperetz/scalable-airflow-template
To fill in the cookiecutter options check out scalable airflow template github repo.
make deploy
and voila 🎉
I use docker images since then I can decouple airflow from the actual tasks it runs. I can change the underlying task without changing anything in airflow configuration, code or deployment.
When constructing the image I start with python-cli-template — which provides a fast and intuitive CLI experience.
Python CLI Template on Github
An Example Workflow
from datetime import datetime, timedelta
from airflow import DAG
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2015, 6, 1),
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
example_workflow = DAG('kube-operator',
default_args=default_args,
schedule_interval=timedelta(days=1))
with example_workflow:
t1 = KubernetesPodOperator(namespace='airflow',
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo", "hello world"],
labels={'runner': 'airflow'},
name="pod1",
task_id='pod1',
is_delete_operator_pod=True,
hostnetwork=False,
)
t2 = KubernetesPodOperator(namespace='airflow',
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo", "hello world"],
labels={'runner': 'airflow'},
name="pod2",
task_id='pod2',
is_delete_operator_pod=True,
hostnetwork=False,
)
t3 = KubernetesPodOperator(namespace='airflow',
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo", "hello world"],
labels={'runner': 'airflow'},
name="pod3",
task_id='pod3',
is_delete_operator_pod=True,
hostnetwork=False,
)
t4 = KubernetesPodOperator(namespace='airflow',
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo", "hello world"],
labels={'runner': 'airflow'},
name="pod4",
task_id='pod4',
is_delete_operator_pod=True,
hostnetwork=False,
)
t1 >> [t2, t3] >> t4
If you enjoyed this post, feel free to share it 📤
and if you’re interested in posts to come, make sure to follow me on