It doesn’t matter if you are running background tasks, preprocessing jobs or ML pipelines. Writing tasks is the easy part. The hard part is the orchestration— Managing dependencies among tasks, scheduling workflows and monitor their execution is tedious. Enter . Your new workflow management platform. Airflow Why Airflow? A couple of years ago, In , I wrote about my own implementation of a workflow engine using Flask, Celery, and Kubernetes. I considered available solutions — including airflow. With no satisfying solution in sight, I decided to implement my own framework. Since then, airflow had come a long way. Here is why I switched to Airflow: Scaling Effectively: when Kubernetes met Celery Scalable When using the right setup, the one we are about to see, Airflow is both scalable and cost-efficient. Batteries Included Though the UI is not perfect, it is one of Airflow’s core competencies. And in this case, a picture is worth a thousand words- Airflow has plenty of integrations both in the form of and in the form of . Operators Executors And an experimental yet indispensable , which implies you can trigger workflows dynamically. REST API for workflows Battle Tested With so many companies using Airflow, I can rest assured knowing it is going to continuously improve. The Perfect Setup for Airflow Has 🔥 Disposable Infrastructure Using helm and some premade commands, we can destroy and re-deploy the entire infrastructure easily. 🚀 Cost-Efficient Execution We use kubernetes as the tasks’ engine. Airflow scheduler will run each task on a new pod and delete it upon completion. Allowing us to scale according to workload using the minimal amount of resources. 🔩 Decoupled Orchestration Another great advantage of using Kubernetes as the task runner is — decoupling orchestration from execution. You can read more about it in . We’re All Using Airflow Wrong and How to Fix It 🏃 Dynamically Updated Workflows We use Git-Sync containers. Those will allow us to update the workflows using git alone. No need to redeploy Airflow on each workflow change. Airflow Execution Options CeleryExecutor + KubernetesPodOperator (recommended) ➕ Decoupling of orchestration and execution. ➖ Extra pods for celery workers redis and flower monitoring. KubernetesExecutor + WhateverOperator ➕ No extra pods. ➖ Weak-Decoupling. we’ll have to define execution code and dependencies inside the DAGs. KubernetesExecutor + KubernetesPodOperator ➕ No extra pods. ➕ Decoupling of orchestration and execution. ➖ — currently causes recursion of pod startup. Unsupported Let’s Set It Up Prerequisites > brew install kubectl > brew install helm Make sure you have: . kubectl context configured to your EKS cluster a . Kubernetes cluster set with autoscaler . ECR Repository for the docker image It is also recommended to . set up Kubernetes Dashboard Setup cookiecutter https: thub.com scalable-airflow-template //gi /talperetz/ To fill in the cookiecutter options check out . scalable airflow template github repo make deploy and voila 🎉 Tasks as Docker Images I use docker images since then I can decouple airflow from the actual tasks it runs. I can change the underlying task without changing anything in airflow configuration, code or deployment. When constructing the image I start with — which provides a fast and intuitive CLI experience. python-cli-template Python CLI Template on Github An Example Workflow datetime datetime, timedelta airflow DAG airflow.contrib.operators.kubernetes_pod_operator KubernetesPodOperator default_args = { : , : , : datetime( , , ), : [ ], : , : , : , : timedelta(minutes= ), } example_workflow = DAG( , default_args=default_args, schedule_interval=timedelta(days= )) example_workflow: t1 = KubernetesPodOperator(namespace= , image= , cmds=[ , ], arguments=[ , ], labels={ : }, name= , task_id= , is_delete_operator_pod= , hostnetwork= , ) t2 = KubernetesPodOperator(namespace= , image= , cmds=[ , ], arguments=[ , ], labels={ : }, name= , task_id= , is_delete_operator_pod= , hostnetwork= , ) t3 = KubernetesPodOperator(namespace= , image= , cmds=[ , ], arguments=[ , ], labels={ : }, name= , task_id= , is_delete_operator_pod= , hostnetwork= , ) t4 = KubernetesPodOperator(namespace= , image= , cmds=[ , ], arguments=[ , ], labels={ : }, name= , task_id= , is_delete_operator_pod= , hostnetwork= , ) t1 >> [t2, t3] >> t4 from import from import from import "owner" "airflow" "depends_on_past" False "start_date" 2015 6 1 "email" "airflow@airflow.com" "email_on_failure" False "email_on_retry" False "retries" 1 "retry_delay" 5 'kube-operator' 1 with 'airflow' "ubuntu:16.04" "bash" "-cx" "echo" "hello world" 'runner' 'airflow' "pod1" 'pod1' True False 'airflow' "ubuntu:16.04" "bash" "-cx" "echo" "hello world" 'runner' 'airflow' "pod2" 'pod2' True False 'airflow' "ubuntu:16.04" "bash" "-cx" "echo" "hello world" 'runner' 'airflow' "pod3" 'pod3' True False 'airflow' "ubuntu:16.04" "bash" "-cx" "echo" "hello world" 'runner' 'airflow' "pod4" 'pod4' True False Advanced Airflow ML Workflows in Twitter Scheduling Notebooks at Netflix Airflow Tips & Tricks If you enjoyed this post, feel free to share it 📤 and if you’re interested in posts to come, make sure to follow me on Medium: https://medium.com/@talperetz24 Twitter: https://twitter.com/talperetz24 LinkedIn: https://www.linkedin.com/in/tal-per/