Understand Apache Airflow in 2024: Hints by Data Scientist

Written by alexandraoberemok | Published 2024/01/31
Tech Story Tags: airflow | apache-airflow | get-started-with-airflow | data-engineering | data-science | ml-engineering | mlops | machine-learning

TLDRA great guide, on how to learn Apache Airflow from scratch in 2024. This article covers basic concepts of Airflow and is useful for Data Scientists, Data Engineers via the TL;DR App

What is Apache Airflow & how did I dive into it?

Apache Airflow is an open-source tool that allows you to design, plan, and monitor complex workflows. To put it simply, this tool allows you to launch a variety of tasks on schedule. Nowadays, Airflow is used widely in the data industry (and you often can see Airflow as a required skill in job descriptions šŸ™‚)

The Airflow UI looks like this

I discovered Airflow on my first job in IT, I was a Machine Learning engineer intern at the bank. To complete my tasks successfully and contribute to the development of the data platform, I needed to learn how to use Apache Airflow. It was more complicated, like discovering Python - it was less information and explanation of key concepts.

By the way, ChatGPT agrees with me šŸ˜‚. Here is the quote:

ā€œApache Airflow can be difficult to master due to its complexity, steep learning curve, lack of documentation, requirement for programming knowledge, and challenges with debugging issues in a distributed system.ā€

In this article, I will share my experience with Airflow, give you some tips and tricks, explain key concepts, and give a list of useful resources.

DAG and its important features\DAG (Directed Acyclic Graph) is the basic entity of the Airflow. Official Apache Airflow documentation defines that structure as ā€œa collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies. A DAG is defined in a Python script, which represents the DAG structure (tasks and their dependencies) as codeā€.

To simplify, it is a schema of all tasks and dependencies between them, which can be run by Airflow

Here is an example of the DAG code on Python

default_args = {
    'owner': 'surname_name',
    'start_date': datetime(2024, 1, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=15),
    'email': '[email protected]',
    'email_on_failure': False,
}

dag_id = 'dag_example'

with DAG(
    dag_id=dag_id,
    default_args=default_args,
    tags=['project_name', 'surname'],
    schedule_interval='0 12 ***',
    catchup=True,
    max_active_runs=1
) as dag:
# tasks list next

The definitions and detailed explanations of all parameters are given here, I want to highlight some important tips, that were discovered during my experience with Airflow.

Do not change the value of dag_id

If you change this parameter, a new dag entity will appear in Airflow UI, although the code file remains the same. I struggled with that a lot. So, I recommend being careful and attentive.

How to schedule the interval of DAG runs.

The schedule_interval parameter specifies the start time in cron notation. For convenience, it is better to set the interval on this site

https://crontab.guru/

Airflow also has ready presets, like @hourly, @daily

Value None also exists. It is used when DAG shouldn't be triggered by a specific time value.

More scheduling information can be discovered here

Catchup = True + start_date = datetime(year, month, day)

Catchup = True means to execute all past DAG starts that would have been scheduled if the DAG had been created and started at the date specified in the start_date parameter value.

In this case, the dag will be triggered for each interval, starting from start_date and ending with the current moment. Let's imagine that you have a DAG with start_date = (2015, 1, 1) with scheduling_interval = @daily. Can you imagine how many launches will happen? DAG will be run for (2015, 1, 1), then for (2015, 1, 2), and so on.

By default, catchup = True, so itā€™s vital to pay attention to this parameter.

How to beautify your DAG

Above all, DAG has a doc_md parameter, which allows you to write DAG docs, and itā€™ll appear above the dag. It supports markdown and links to external content. A simple and efficient example can be found here

Tags parameter

Tags allow you to find dags in the Airflow UI easily. Put your name/surname or project name for easy navigation.

tags=['project_name', 'surname'] # code snipped 

In UI, it looks like this.

Information about tags in the airflow docs

I am not a big fan of writing docs, but spending several minutes could save tons of nerve cells in the future šŸ˜…


Tasks and their statuses


A task is one work unit, a DAG node. Within a task, it is defined what code will be executed and what data will be passed to the following tasks. Typical task code looks like this.

# Task 1: Bash operator - execute bash command
   task1 = BashOperator(
       task_id='task1',
       bash_command='echo -e "def \nfoo "ā€™ 
	 dag=dag
    )

# Task 2: Python operator - executes Python code
   task2 = PythonOperator(
       task_id='task2',
# some_python_function - python code to execute
       python_callable=some_python_function, 
    )
# task order looks like this. Read more on task orders 
# https://docs.astronomer.io/learn/managing-dependencies
task1 >> task2

Some of the task types will be reviewed later

Task statuses

In Airflow, tasks have different statuses. Here is the list of the most common ones

no_status: the task has not yet been queued up

scheduled: waiting for the finishing of previous tasks in the dag

queued: waiting

running: the task is in progress

success: the task is finished successfully

failed: the task is finished with an error

skipped: the task was skipped

upstream_failed: parent tasks have finished with an error and the task cannot be started

All statuses are listed in the docs

Trigger rule

The trigger rule determines at what state of the parent task the execution of the next task starts. The easiest option is that all previous tasks have been completed successfully. ā€œall successā€ is the default value of the parameter.

In Python, that value looks like this.

 task1 = BashOperator(
      task_id='task1',
      bash_command='echo -e "def \nfoo "ā€™,
      trigger_rule='all_success',
	  dag=dag
    )

But there are several more options. Here are the most popular ones:

All-failed - the task will be started if the parent tasks end with an error

All-done - the task will be started if the parent tasks have finished (success, error, and task skip are allowed)

One-success - the task will start as soon as one of the tasks is completed successfully

More about trigger rules (with clear explanations and pictures) can be found here


Airflow Operators

Weā€™ve discussed tasks in Airflow. Operators, in general - like preset for execution of a different task. For example, bash command, python code, SQL query, and many more.

Here is my classification of operators

Decorative (Empty Operator, Dummy Operator)

Universal (Python Operator, Bash Operator)

For databases (For example, Postgre Operator, MsSql Operator)

Specific (For example, SparkSubmit Operator, KuberPod Operator)

Decorative

The name ā€œdecorativeā€ refers to Empty and Dummy operators. I used empty operators for testing - for example if the Airflow works correctly. Dummy operators, according to the docs, are used for defining the beginning and end of the group of tasks.

The code for both of them is pretty simple.

task_1 = EmptyOperator(task_id="task_1", dag=dag)
task_2 = DummyOperator(task_id='task_2', dag=dag)

Universal operators

I named them universal because they are used widely and suitable for a variety of tasks. It's a Bash Operator and Python Operator. Python Operator launches Python code, whereas Bash Operator launches Bash script or command. Can you imagine how many things can be done with Python or bash?

Code examples can be found above in the task entity explanation

Operators for work with databases

These types of operators are used to interact with different databases. Code example for PostgresOperatorI(used for execution of queries in PostgreSQL database) from official docs

create_pet_table = PostgresOperator(
        task_id="create_pet_table",
# code for SQL query
        sql="""
            CREATE TABLE IF NOT EXISTS pet (
            pet_id SERIAL PRIMARY KEY,
            name VARCHAR NOT NULL,
            pet_type VARCHAR NOT NULL,
            birth_date DATE NOT NULL,
            OWNER VARCHAR NOT NULL);
          """,
# database connection id (possible to add in the Airflow UI)
   postgres_conn_id="postgres_default")

Based on my experience, I donā€™t recommend using this operator for executing ā€œheavy operations.ā€ It could slow DAG drastically.

Specific operators

They are used to execute specific tasks. In my work, I had experience with KubernetesPodOperator (docs) (greetings for the DevOps team for answering tons of questions ) and SparkSubmit Operator. SparkSubmit Operator is used for launching Spark applications - convenient for working with large amounts of data. The best thing about it is that you can transfer spark settings as arguments.

Code example with spark-submit operator listed below

spark_task = SparkSubmitOperator(
    task_id='spark_job',
    application='/path/to/your/spark/job.py',
    conn_id='spark_default', # name of the Spark connection
    executor_memory='4g', 
    num_executors=2,
    name='airflow-spark-example', 
    verbose=False,
    dag=dag,
)

Sensors

A sensor is a type of operator that monitors at a certain time interval and checks if a criterion is met or not. If yes, it completes successfully, if no, it retries until time runs out.

Here are several types of sensors

DateTimeSensor waits for the specified date and time to pass. Useful when you need to execute tasks from the same DAG at different times

ExternalTaskSensor waits for an Airflow task to be completed. Useful when you need to implement dependencies between DAGs in the same Airflow environment

PythonSensor waits for the called Python object to return True

SqlSensor waits for data to appear in a SQL table.

HttpSensor waits for API availability.

On my job, I used an SQL sensor to check whether the data had been delivered. When the newest data is uploaded, the ML model starts scoring.

Simple sensor code example

waiting_task = FileSensor(
  task_id='waiting_task',
  poke_interval=120,
  timeout=60 * 30 * 30,
  mode='reschedule'
)

Explanation of the key parameters

Poke_interval - how many seconds the sensor checks the condition (by default - 30 sec)

Timeout - during what time the sensor works (it is highly desirable to specify)

Mode parameters have two values: poke and reschedule

poke (default) - task occupies a slot during operation (task in active status)

reschedule - occupies the working slot only during the check itself and sleeps during the remaining times. Between sensor checks, the task status becomes up to reschedul). Recommended for use with a large time interval

If you want to learn moreā€¦

If you want to understand Airflow better, I highly recommend reading Data Pipelines with Apache Airflow by Bas Harenslak and Julian de Ruiter. The book allows you to dive deeply into Airflow concepts. There are still many interesting things to learn; this article just covered basic entities.

I hope learning Apache Airflow has become much easier with this article. An understanding of giving concepts is enough to start writing DAGs or understanding how DAGs work.


Written by alexandraoberemok | Data Scientist with Data Engineering background
Published by HackerNoon on 2024/01/31