If you work with Airflow, chances are you've already explored their documentation. However, there is not a lot of material on how to build and manage data pipelines in Airflow.
In this article, I'm not going to explain how to code DAGs in Airflow—that's already well-covered elsewhere. Instead, I will show you how the basic concepts in Airflow relate to building and debugging data pipelines. I will explore how to implement pipelines with DAGs, how to schedule DAG runs, and how to manually intervene in case of issues.
I focus on the Grid View as a tool for observability and manual debugging. We will examine several parameters that control Airflow’s behavior. Each parameter will be introduced in the proper context and related to practical examples.
Note: This article was written with Airflow v2.3.4 in mind. Things may have changed in the meantime.
We discuss Airflow in the context of the basic data pipeline, which involves loading data from a source, transforming it, and writing it into a target. This is also known as ETL (extract, transform, load).
In Airflow, this pipeline would be implemented with a DAG. DAGs can get pretty complex, but at the end of the day, most of them implement this basic workflow.
The Grid View is accessible after clicking on a DAG. it is the most important view in Airflow.
It is structured like this:
Most DAGs are designed to run independently. It is still necessary for engineers to manually intervene when things go wrong. These interventions happen in the Grid View.
To properly implement a pipeline in Airflow, you must figure out two things:
Let’s explore these two aspects in some detail, with practical examples.
Most DAGs run on a schedule because data pipelines need to regularly load and write data.
The Airflow schedule is defined with a start date and a schedule interval. The schedule interval defines how often the pipeline runs. This is defined with cron syntax and is visible from the UI.
Airflow offers the ability to trigger DAG runs manually by pressing on a Play button.
It can be tempting to use this option when you want to run your DAG off-schedule, but you shouldn’t do that.
Why not?
Generally, the correct way to run your DAG off-schedule is to clear or queue individual tasks. More on this in the tasks section.
You just pushed an update to a DAG that adds one or more new tasks. A new row for the task should appear in the Grid View, but there will be no task squares that you can clear because the task has never yet been scheduled. If you need to run the task immediately, you might be tempted to start a manual run. There is a better solution. You can select the latest DAG run and then “Queue up new tasks”.
catchup
. If this DAG parameter is true, when the DAG is created, Airflow will execute all DAG runs between the start date and the current run. Say you have a daily pipeline that processes the data for the day. You are deploying this on 2023-06-01. The first time you deploy it, you want it to run on all the data from 2023. If you set catchup to true, Airflow will create a separate DAG run for each day between 2023-01-01 and 2023-06-01. If you set catchup to false, Airflow will start from the deployment day (2023-06-01) and not bother with the past.
In most cases, you simply want to start with the most decent DAG run, and catchup is set to false.
There are several reasons for a DAG to miss a scheduled run.
Unsurprisingly, a paused DAG will not run on schedule. If a DAG is deactivated while a run is pending, it will not activate new tasks in that run and the run will be suspended.
max_active_runs
. This parameter decides how many DAG runs can happen concurrently. If this is set to 2, a scheduled DAG run will not start on schedule if 2 other DAG instances are still running. In this case, all subsequent runs will queue up, and start running when the next slot is free. This causes DAGs to get “stuck in traffic” and be late while they wait for the previous runs to complete.
In practice, if your workflow is sequential, i.e. if each run depends on the previous one, you want to set max_active_runs to 1. If your workflow is atomic, i.e. each run can execute independently, this parameter can be higher. But there are other considerations to keep this number small: you don’t want to strain your source and target systems, and there is a global limit on how many DAGs can run in the entire Airflow instance.
As mentioned, the DAG status can be: running, succeeded, failed. In the grid view, you can manually mark a DAG run as failed or succeeded. This is mostly done for tracking purposes, as from the Airflow homepage we monitor the number of running, succeeded and failed runs.
The task is the fundamental unit of execution in the DAG. Tasks are very versatile and have a lot of parameters to configure. You can read all available task parameters here.
Here are a few things to know about the task:
Under the hood, a task is powered by an Operator, which is a Python object specialized for a specific job. Examples of operators are: PostgresOperator, RedshiftSQLOperator, PythonOperator. You can see which operators are being used from the graph view.
Every task instance gets some input parameters which specify its behavior. You can see these by clicking on the task in Grid View and selecting “rendered template” on the right.
Some of these parameters are dynamically provided by Airflow at runtime, and this allows the task to know when it is being run (among other things). At my company we mostly use the parameter schedule_time
. In practice, this is how we implement time-sensitive data loads like: “Load all of the records for today”. Most of our data load tasks use this.
An important subtlety: schedule time is not the same as “effective time”. Schedule time represents “when a DAG was supposed to run, regardless of whether it ran on time”. As we saw above, sometimes DAGs get “stuck in traffic” by running late and getting in a queue. Imagine you have an hourly task that looks at the schedule_time
and loads the data for the previous hour. The 16:00 run gets stuck for 4 hours before an engineer fixes it. Now the 17:00 run will actually run at 20:00. You want this run to load data that arrived between 16:00 and 17:00, not between 19:00 and 20:00! Luckily, the schedule_time indicates the time at which the DAG run was originally scheduled, not the time at which it is actually running.
Each task has a set number of retries (this can be configured). If a run fails, the task will retry until it either succeeds or runs out of retries. Then the task will be marked as failed. The Grid View shows the logs for each try.
A task can be in many different states: deferred, failed, queued, running, scheduled, skipped, success, up_for_reschedule, up_for_retry, upstream_failed.
In the Grid View, you can manually mark a task as failed or succeeded. You can also clear a task, which means that Airflow will schedule and run it again. These manual actions are crucial for debugging pipelines, as we explain below.
There are several conditions for a task to run:
The DAG instance of the task (i.e. the column in the Grid View) must be running.
The task dependencies must be resolved. Typically, this means that all upstream tasks must have succeeded. For example, you want all relevant data loads to have succeeded before you start transforming data. However, tasks can have more specific trigger rules that determine if and when they start running based on what happens in the DAG execution.
If the depends_on_past
parameter is true, the previous instance for that task (in the previous DAG run) must have succeeded for this task to run.
There are free task slots. There are local (DAG-level) and global (Airflow instance level) parameters that limit how many tasks can run at the same time. If all available slots are taken, the task will wait.
The depends_on_past
parameter deserves special mention. It decides whether a task waits for previous instances of itself to have succeeded before starting.
This parameter is used for sequential workflows. Say you have a transformation task that computes cumulative metrics on a table. You need each task run to succeed in the correct order, or your data will be corrupted; so you will set depends_on_past to true.
In contrast, if task runs are atomic, meaning they can run independently in any order, you won’t need to activate this parameter.
The same applies to snapshot workflows, where a single task run loads all the data. In that case, you only want to run the latest instance of the task without worrying about past instances. Then depends_on_past can be set to false.
It all starts with a failing task. When a task fails (after all the supported retries) a few things happen:
The corespondent DAG run will terminate with failed state.
Dependent downstream tasks in the same dag run will not run. Instead they will be in the state ‘upstream failed’. You can visualize this as ‘blocking execution vertically’ in the Grid View.
Future executions of the same task (in subsequent DAG runs) will not start if depends_on_past is active. You can visualize this as ‘blocking execution horizontally’ in the Grid View. Consequently, those subsequent DAGs will hang in ‘running’ state until those tasks are completed. If depends_on_past is not active, future tasks will start anyway.
As you can see, there are several ways in which a failed task can obstruct the DAG, based on your configuration. After you read the logs and resolved the problem, there are two ways to deal with a failed task:
The easiest solution is to mark the task as succeeded. Then it won't obstruct other tasks. Do this only if you don't care about running that particular task instance.
The other solution is to clear the task. Then Airflow will once again schedule it and execute it. Do this to make sure that this particular task instance executes.
This is a good place to note that you can also take a task that either ran successfully, or has yet to run, and mark it as failed. You're not going to do this often but sometimes it comes in handy. For instance, if you think there is a bug in that task, you may want to prevent downstream or future tasks from running until you solve it.
When manually changing the status of a task, Airflow provides some powerful modifiers that allow you to apply the change recursively:
Downstream / upstream. This propagates the change to all the tasks that: a) this task depends on; b) depend on this task. Again, you can visualize this as applying the change vertically through the Grid.
Future / past. This propagates the change to all past and/or future instances of this task. You can visualize this as applying the change horizontally through the Grid.
With these modifiers, you can change the status of many tasks simultaneously based on logical relations.
Now let's see some real-world examples of how to use these features. We will once again refer to our minimal data pipeline that reads from a source, transforms data, and writes to a target.
Transform fails; transforms are atomic. The transform task fails. It’s a bug in the transformation code. Transforms are atomic (they run independently). After fixing the bug, we clear the task, it reruns, it’s successful.
Write target task failed, but the data was written correctly. If you are sure you don’t need to rerun it, you can simply mark it successful.
Transform fails, problem is at read_source. The transform failed but after investigating, you decide the transform code works fine. However, the code that reads from the source had a silent failure. After fixing the bug, you clear the transform with the option “clear upstream tasks”. This will rerun the source and then the transform. Or you can clear the source with the option “clear downstream tasks”; it’s the same thing.
Note that since write_target depends on transform, it will not run if transform fails, but it will be in the state “upstream failed”. Airflow will wait for the upstream dependencies to be successful before running it.
Transform has a silent failure, transform tasks are sequential. The transform tasks have been successful. You discover that one week ago, there was a silent failure in a transform. Transform task are sequential and build upon each other, e.g. they compute a cumulate sum. After debugging the code, you clear the transform from one week ago. You must also select “clear future tasks” so Airflow will re-run all transform tasks that came after it.
These examples are not exhaustive but they should give you a sense of how to think about these problems.
Cascading failures in Airflow can be tricky to deal with. If not addressed in time, a single failure can propagate through the execution. Consider what happens when an Airflow task (call it T) fails despite all the retries and turns red:
All tasks downstream of T in the same DAG run fail (unless their trigger rule says otherwise).
The corresponding DAG run fails.
When the next DAG run starts, if depends_on_past is true, the future instance of T will not run. So that DAG will also fail. The downstream dependencies of T will not run either.
In sum, the execution of tasks downstream of T, and possibly of all future instances of T and its downstream tasks, will be halted.
You will see similar cascading issues if T doesn’t fail, but takes a really long time to execute.
If nobody is managing this (e.g. during the weekend) you might come back to a severely impaired DAG, with a lot of tasks that have failed or haven’t run on schedule. Let’s call them “missed tasks”.
To fix the DAG as quickly as possible, you want to do the following:
Mark as many “missed tasks” as possible as “successful”. This means turning them green in the Grid without running them. You want to do this whenever you have a chance because it’s the quickest fix.
Clear and rerun all the failed tasks that actually need to run.
Run all the late tasks that actually need to run.
There are no hard and fast rules for how to do this. You have to think of it in terms of your data needs. Here are a few examples to give you an idea in the context of data backfills.
Snapshots. Every time your pipeline runs, it takes a full snapshot of the source data, reloading everything from scratch. In this case, you only care about the latest run. Therefore you can run the latest load and mark all others as successful. Think of this as “only run the rightmost task”.
Incremental sequential loads. These are loads that are incremental (i.e. load only portions of data at a time) and depend on each other (e.g. building a cumulate sum). In this case, you want to clear and rerun the first run that failed and make sure to also clear all subsequent tasks. Think of it as “start from the left and run everything until the end”.
Incremental atomic loads. Same as above, but the loads don’t build on each other. You still have to run all loads, but you can do it concurrently and in any order.
Data load with lower bound filter. In my team, we have a pipeline that based on the task schedule time TST, loads all the data from source that is dated after TST. So if the task runs on 2023-01-01, it will read all data at source that occurred on that day and beyond. In this case, for a backfill, you can simply run the first task that failed, and mark all the others as successful. Think of it as “only run the leftmost task”.
In this article, we explored how to manage and debug data pipelines in Airflow, focusing on the basic data pipeline of loading, transforming, and writing data.
We discussed how to implement pipelines with DAGs, how to schedule DAG runs, and how to manually intervene on DAGs in case of issues.
We also covered the basics of the Grid View, the most important view in Airflow, and how to use it for observability and manual debugging.
Finally, we explored real-world examples of how to use Airflow's features to deal with common pipeline issues and cascading failures.
This should be enough to get you started on building and managing data pipelines in Airflow. Good luck!
The lead image for this article was generated by HackerNoon's AI Image Generator via the prompt "pipeline".