Scale Your Data Pipelines with Airflow and Kubernetes

@ tal-peretz Tal Peretz CTO & Chief Data Scientist @Supertools

It doesn’t matter if you are running background tasks, preprocessing jobs or ML pipelines. Writing tasks is the easy part. The hard part is the orchestration— Managing dependencies among tasks, scheduling workflows and monitor their execution is tedious.

Enter Airflow. Your new workflow management platform.

Why Airflow?

A couple of years ago, In Scaling Effectively: when Kubernetes met Celery , I wrote about my own implementation of a workflow engine using Flask, Celery, and Kubernetes. I considered available solutions — including airflow. With no satisfying solution in sight, I decided to implement my own framework. Since then, airflow had come a long way. Here is why I switched to Airflow:

Scalable

When using the right setup, the one we are about to see, Airflow is both scalable and cost-efficient.

Batteries Included

Though the UI is not perfect, it is one of Airflow’s core competencies. And in this case, a picture is worth a thousand words-

Airflow has plenty of integrations both in the form of Operators and in the form of Executors

And an experimental yet indispensable REST API for workflows , which implies you can trigger workflows dynamically.

Battle Tested

With so many companies using Airflow, I can rest assured knowing it is going to continuously improve.

The Perfect Setup for Airflow Has

🔥 Disposable Infrastructure

Using helm and some premade commands, we can destroy and re-deploy the entire infrastructure easily.

🚀 Cost-Efficient Execution

We use kubernetes as the tasks’ engine. Airflow scheduler will run each task on a new pod and delete it upon completion. Allowing us to scale according to workload using the minimal amount of resources.

🔩 Decoupled Orchestration

Another great advantage of using Kubernetes as the task runner is — decoupling orchestration from execution. You can read more about it in We’re All Using Airflow Wrong and How to Fix It

🏃 Dynamically Updated Workflows

We use Git-Sync containers. Those will allow us to update the workflows using git alone. No need to redeploy Airflow on each workflow change.

Airflow Execution Options

CeleryExecutor + KubernetesPodOperator (recommended)

➕ Decoupling of orchestration and execution.

➖ Extra pods for celery workers redis and flower monitoring.

KubernetesExecutor + WhateverOperator

➕ No extra pods.

➖ Weak-Decoupling. we’ll have to define execution code and dependencies inside the DAGs.

KubernetesExecutor + KubernetesPodOperator

➕ No extra pods.

➕ Decoupling of orchestration and execution.

➖ Unsupported — currently causes recursion of pod startup.

Let’s Set It Up

Prerequisites

> brew install kubectl > brew install helm

Make sure you have:

It is also recommended to set up Kubernetes Dashboard

Setup

cookiecutter https: //gi thub.com /talperetz/ scalable-airflow-template

To fill in the cookiecutter options check out scalable airflow template github repo

make deploy

and voila 🎉

Tasks as Docker Images

I use docker images since then I can decouple airflow from the actual tasks it runs. I can change the underlying task without changing anything in airflow configuration, code or deployment.

When constructing the image I start with python-cli-template — which provides a fast and intuitive CLI experience.

Python CLI Template on Github

An Example Workflow

from datetime import datetime, timedelta from airflow import DAG from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator default_args = { "owner" : "airflow" , "depends_on_past" : False , "start_date" : datetime( 2015 , 6 , 1 ), "email" : [ "airflow@airflow.com" ], "email_on_failure" : False , "email_on_retry" : False , "retries" : 1 , "retry_delay" : timedelta(minutes= 5 ), } example_workflow = DAG( 'kube-operator' , default_args=default_args, schedule_interval=timedelta(days= 1 )) with example_workflow: t1 = KubernetesPodOperator(namespace= 'airflow' , image= "ubuntu:16.04" , cmds=[ "bash" , "-cx" ], arguments=[ "echo" , "hello world" ], labels={ 'runner' : 'airflow' }, name= "pod1" , task_id= 'pod1' , is_delete_operator_pod= True , hostnetwork= False , ) t2 = KubernetesPodOperator(namespace= 'airflow' , image= "ubuntu:16.04" , cmds=[ "bash" , "-cx" ], arguments=[ "echo" , "hello world" ], labels={ 'runner' : 'airflow' }, name= "pod2" , task_id= 'pod2' , is_delete_operator_pod= True , hostnetwork= False , ) t3 = KubernetesPodOperator(namespace= 'airflow' , image= "ubuntu:16.04" , cmds=[ "bash" , "-cx" ], arguments=[ "echo" , "hello world" ], labels={ 'runner' : 'airflow' }, name= "pod3" , task_id= 'pod3' , is_delete_operator_pod= True , hostnetwork= False , ) t4 = KubernetesPodOperator(namespace= 'airflow' , image= "ubuntu:16.04" , cmds=[ "bash" , "-cx" ], arguments=[ "echo" , "hello world" ], labels={ 'runner' : 'airflow' }, name= "pod4" , task_id= 'pod4' , is_delete_operator_pod= True , hostnetwork= False , ) t1 >> [t2, t3] >> t4

Advanced Airflow

