Apache Airflow is an open-source tool that allows you to design, plan, and monitor complex workflows. To put it simply, this tool allows you to launch a variety of tasks on schedule. Nowadays, Airflow is used widely in the data industry (and you often can see Airflow as a required skill in job descriptions 🙂)
The Airflow UI looks like this
I discovered Airflow on my first job in IT, I was a Machine Learning engineer intern at the bank. To complete my tasks successfully and contribute to the development of the data platform, I needed to learn how to use Apache Airflow. It was more complicated, like discovering Python - it was less information and explanation of key concepts.
By the way, ChatGPT agrees with me 😂. Here is the quote:
“Apache Airflow can be difficult to master due to its complexity, steep learning curve, lack of documentation, requirement for programming knowledge, and challenges with debugging issues in a distributed system.”
In this article, I will share my experience with Airflow, give you some tips and tricks, explain key concepts, and give a list of useful resources.
DAG and its important features\DAG (Directed Acyclic Graph) is the basic entity of the Airflow. Official Apache Airflow documentation defines that structure as “a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies. A DAG is defined in a Python script, which represents the DAG structure (tasks and their dependencies) as code”.
To simplify, it is a schema of all tasks and dependencies between them, which can be run by Airflow
Here is an example of the DAG code on Python
default_args = {
'owner': 'surname_name',
'start_date': datetime(2024, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=15),
'email': '[email protected]',
'email_on_failure': False,
}
dag_id = 'dag_example'
with DAG(
dag_id=dag_id,
default_args=default_args,
tags=['project_name', 'surname'],
schedule_interval='0 12 ***',
catchup=True,
max_active_runs=1
) as dag:
# tasks list next
The definitions and detailed explanations of all parameters are given
Do not change the value of dag_id
If you change this parameter, a new dag entity will appear in Airflow UI, although the code file remains the same. I struggled with that a lot. So, I recommend being careful and attentive.
How to schedule the interval of DAG runs.
The schedule_interval parameter specifies the start time in cron notation. For convenience, it is better to set the interval on this site
https://crontab.guru/
Airflow also has ready presets, like @hourly, @daily
Value None also exists. It is used when DAG shouldn't be triggered by a specific time value.
More scheduling information can be discovered
Catchup = True + start_date = datetime(year, month, day)
Catchup = True means to execute all past DAG starts that would have been scheduled if the DAG had been created and started at the date specified in the start_date parameter value.
In this case, the dag will be triggered for each interval, starting from start_date and ending with the current moment. Let's imagine that you have a DAG with start_date = (2015, 1, 1) with scheduling_interval = @daily. Can you imagine how many launches will happen? DAG will be run for (2015, 1, 1), then for (2015, 1, 2), and so on.
By default, catchup = True, so it’s vital to pay attention to this parameter.
Above all, DAG has a doc_md parameter, which allows you to write DAG docs, and it’ll appear above the dag. It supports markdown and links to external content. A simple and efficient example can be found
Tags parameter
Tags allow you to find dags in the Airflow UI easily. Put your name/surname or project name for easy navigation.
tags=['project_name', 'surname'] # code snipped
In UI, it looks like this.
Information about tags in the airflow
I am not a big fan of writing docs, but spending several minutes could save tons of nerve cells in the future 😅
Tasks and their statuses
A task is one work unit, a DAG node. Within a task, it is defined what code will be executed and what data will be passed to the following tasks. Typical task code looks like this.
# Task 1: Bash operator - execute bash command
task1 = BashOperator(
task_id='task1',
bash_command='echo -e "def \nfoo "’
dag=dag
)
# Task 2: Python operator - executes Python code
task2 = PythonOperator(
task_id='task2',
# some_python_function - python code to execute
python_callable=some_python_function,
)
# task order looks like this. Read more on task orders
# https://docs.astronomer.io/learn/managing-dependencies
task1 >> task2
Some of the task types will be reviewed later
Task statuses
In Airflow, tasks have different statuses. Here is the list of the most common ones
no_status: the task has not yet been queued up
scheduled: waiting for the finishing of previous tasks in the dag
queued: waiting
running: the task is in progress
success: the task is finished successfully
failed: the task is finished with an error
skipped: the task was skipped
upstream_failed: parent tasks have finished with an error and the task cannot be started
All statuses are listed in the
Trigger rule
The trigger rule determines at what state of the parent task the execution of the next task starts. The easiest option is that all previous tasks have been completed successfully. “all success” is the default value of the parameter.
In Python, that value looks like this.
task1 = BashOperator(
task_id='task1',
bash_command='echo -e "def \nfoo "’,
trigger_rule='all_success',
dag=dag
)
But there are several more options. Here are the most popular ones:
All-failed - the task will be started if the parent tasks end with an error
All-done - the task will be started if the parent tasks have finished (success, error, and task skip are allowed)
One-success - the task will start as soon as one of the tasks is completed successfully
More about trigger rules (with clear explanations and pictures) can be found
We’ve discussed tasks in Airflow. Operators, in general - like preset for execution of a different task. For example, bash command, python code, SQL query, and many more.
Here is my classification of operators
Decorative (Empty Operator, Dummy Operator)
Universal (Python Operator, Bash Operator)
For databases (For example, Postgre Operator, MsSql Operator)
Specific (For example, SparkSubmit Operator, KuberPod Operator)
The name “decorative” refers to Empty and Dummy operators. I used empty operators for testing - for example if the Airflow works correctly. Dummy operators, according to the docs, are used for defining the beginning and end of the group of tasks.
The code for both of them is pretty simple.
task_1 = EmptyOperator(task_id="task_1", dag=dag)
task_2 = DummyOperator(task_id='task_2', dag=dag)
I named them universal because they are used widely and suitable for a variety of tasks. It's a Bash Operator and Python Operator. Python Operator launches Python code, whereas Bash Operator launches Bash script or command. Can you imagine how many things can be done with Python or bash?
Code examples can be found above in the task entity explanation
These types of operators are used to interact with different databases. Code example for PostgresOperatorI(used for execution of queries in PostgreSQL database) from official
create_pet_table = PostgresOperator(
task_id="create_pet_table",
# code for SQL query
sql="""
CREATE TABLE IF NOT EXISTS pet (
pet_id SERIAL PRIMARY KEY,
name VARCHAR NOT NULL,
pet_type VARCHAR NOT NULL,
birth_date DATE NOT NULL,
OWNER VARCHAR NOT NULL);
""",
# database connection id (possible to add in the Airflow UI)
postgres_conn_id="postgres_default")
Based on my experience, I don’t recommend using this operator for executing “heavy operations.” It could slow DAG drastically.
They are used to execute specific tasks. In my work, I had experience with KubernetesPodOperator (
Code example with spark-submit operator listed below
spark_task = SparkSubmitOperator(
task_id='spark_job',
application='/path/to/your/spark/job.py',
conn_id='spark_default', # name of the Spark connection
executor_memory='4g',
num_executors=2,
name='airflow-spark-example',
verbose=False,
dag=dag,
)
A sensor is a type of operator that monitors at a certain time interval and checks if a criterion is met or not. If yes, it completes successfully, if no, it retries until time runs out.
Here are several types of sensors
DateTimeSensor waits for the specified date and time to pass. Useful when you need to execute tasks from the same DAG at different times
ExternalTaskSensor waits for an Airflow task to be completed. Useful when you need to implement dependencies between DAGs in the same Airflow environment
PythonSensor waits for the called Python object to return True
SqlSensor waits for data to appear in a SQL table.
HttpSensor waits for API availability.
On my job, I used an SQL sensor to check whether the data had been delivered. When the newest data is uploaded, the ML model starts scoring.
Simple sensor code example
waiting_task = FileSensor(
task_id='waiting_task',
poke_interval=120,
timeout=60 * 30 * 30,
mode='reschedule'
)
Explanation of the key parameters
Poke_interval - how many seconds the sensor checks the condition (by default - 30 sec)
Timeout - during what time the sensor works (it is highly desirable to specify)
Mode parameters have two values: poke and reschedule
poke (default) - task occupies a slot during operation (task in active status)
reschedule - occupies the working slot only during the check itself and sleeps during the remaining times. Between sensor checks, the task status becomes up to reschedul). Recommended for use with a large time interval
If you want to understand Airflow better, I highly recommend reading
I hope learning Apache Airflow has become much easier with this article. An understanding of giving concepts is enough to start writing DAGs or understanding how DAGs work.