paint-brush
Scale Your Data Pipelines with Airflow and Kubernetesby@tal-peretz
1,442 reads
1,442 reads

Scale Your Data Pipelines with Airflow and Kubernetes

by Tal PeretzMarch 15th, 2020
Read on Terminal Reader
Read this story w/o Javascript
tldt arrow

Too Long; Didn't Read

Tal Peretz is CTO & Chief Data Scientist @Supertools. He developed a workflow engine using Flask, Celery, and Kubernetes. Airflow is both scalable and cost-efficient. We use Git-Sync containers to update the workflows using git alone. We can destroy and re-deploy the entire infrastructure easily easily. Decoupling of orchestration and execution is a great advantage for Airflow. We are using a template template to set up scalable airflow workflows.

Companies Mentioned

Mention Thumbnail
Mention Thumbnail
featured image - Scale Your Data Pipelines with Airflow and Kubernetes
Tal Peretz HackerNoon profile picture

It doesn’t matter if you are running background tasks, preprocessing jobs or ML pipelines. Writing tasks is the easy part. The hard part is the orchestration— Managing dependencies among tasks, scheduling workflows and monitor their execution is tedious.

Enter Airflow. Your new workflow management platform.

Why Airflow?

A couple of years ago, In Scaling Effectively: when Kubernetes met Celery, I wrote about my own implementation of a workflow engine using Flask, Celery, and Kubernetes. I considered available solutions — including airflow. With no satisfying solution in sight, I decided to implement my own framework. Since then, airflow had come a long way. Here is why I switched to Airflow:

Scalable

When using the right setup, the one we are about to see, Airflow is both scalable and cost-efficient.

Batteries Included

Though the UI is not perfect, it is one of Airflow’s core competencies. And in this case, a picture is worth a thousand words-

Airflow has plenty of integrations both in the form of Operators and in the form of Executors.

And an experimental yet indispensable REST API for workflows, which implies you can trigger workflows dynamically.

Battle Tested

With so many companies using Airflow, I can rest assured knowing it is going to continuously improve.

The Perfect Setup for Airflow Has

🔥 Disposable Infrastructure

Using helm and some premade commands, we can destroy and re-deploy the entire infrastructure easily.

🚀 Cost-Efficient Execution

We use kubernetes as the tasks’ engine. Airflow scheduler will run each task on a new pod and delete it upon completion. Allowing us to scale according to workload using the minimal amount of resources.

🔩 Decoupled Orchestration

Another great advantage of using Kubernetes as the task runner is — decoupling orchestration from execution. You can read more about it in We’re All Using Airflow Wrong and How to Fix It.

🏃 Dynamically Updated Workflows

We use Git-Sync containers. Those will allow us to update the workflows using git alone. No need to redeploy Airflow on each workflow change.

Airflow Execution Options

CeleryExecutor + KubernetesPodOperator (recommended)

➕ Decoupling of orchestration and execution.
➖ Extra pods for celery workers redis and flower monitoring.

KubernetesExecutor + WhateverOperator

➕ No extra pods.
➖ Weak-Decoupling. we’ll have to define execution code and dependencies inside the DAGs.

KubernetesExecutor + KubernetesPodOperator

➕ No extra pods.
➕ Decoupling of orchestration and execution.
Unsupported — currently causes recursion of pod startup.

Let’s Set It Up

Prerequisites

> brew install kubectl
> brew install helm

Make sure you have: 

It is also recommended to set up Kubernetes Dashboard.

Setup

cookiecutter https://github.com/talperetz/scalable-airflow-template

To fill in the cookiecutter options check out scalable airflow template github repo.

make deploy

and voila 🎉

Tasks as Docker Images

I use docker images since then I can decouple airflow from the actual tasks it runs. I can change the underlying task without changing anything in airflow configuration, code or deployment.

When constructing the image I start with python-cli-template — which provides a fast and intuitive CLI experience.

Python CLI Template on Github

An Example Workflow

from datetime import datetime, timedelta

from airflow import DAG
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator

default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "start_date": datetime(2015, 6, 1),
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}

example_workflow = DAG('kube-operator',
                         default_args=default_args,
                         schedule_interval=timedelta(days=1))
with example_workflow:
    t1 = KubernetesPodOperator(namespace='airflow',
                               image="ubuntu:16.04",
                               cmds=["bash", "-cx"],
                               arguments=["echo", "hello world"],
                               labels={'runner': 'airflow'},
                               name="pod1",
                               task_id='pod1',
                               is_delete_operator_pod=True,
                               hostnetwork=False,
                               )

    t2 = KubernetesPodOperator(namespace='airflow',
                               image="ubuntu:16.04",
                               cmds=["bash", "-cx"],
                               arguments=["echo", "hello world"],
                               labels={'runner': 'airflow'},
                               name="pod2",
                               task_id='pod2',
                               is_delete_operator_pod=True,
                               hostnetwork=False,
                               )

    t3 = KubernetesPodOperator(namespace='airflow',
                               image="ubuntu:16.04",
                               cmds=["bash", "-cx"],
                               arguments=["echo", "hello world"],
                               labels={'runner': 'airflow'},
                               name="pod3",
                               task_id='pod3',
                               is_delete_operator_pod=True,
                               hostnetwork=False,
                               )

    t4 = KubernetesPodOperator(namespace='airflow',
                               image="ubuntu:16.04",
                               cmds=["bash", "-cx"],
                               arguments=["echo", "hello world"],
                               labels={'runner': 'airflow'},
                               name="pod4",
                               task_id='pod4',
                               is_delete_operator_pod=True,
                               hostnetwork=False,
                               )


    t1 >> [t2, t3] >> t4

Advanced Airflow

If you enjoyed this post, feel free to share it 📤
and if you’re interested in posts to come, make sure to follow me on