Apache Airflow is an open-source platform to programmatically author, schedule, and monitor workflows. It was originally developed by the engineering team at Airbnb but was given to the Apache Software Foundation where it is licensed under Apache 2.0.
Airflow is commonly used in data engineering and data science pipelines to automate the execution of tasks, such as data transformation, loading and analysis. It is also used in other industries, such as finance, healthcare and e-commerce, to automate business processes.
Airflow is very flexible with regard to what it can connect to. This includes data lakes, data warehouses, databases, APIs and, of course, object stores. It excels in those use cases that benefit from data-pipelines-as-code, such as:
Automation driven data flows
Machine learning training models and retraining
Backups and snapshots
Airflow is written in Python and uses a directed acyclic graph (DAG) to represent the workflow. Each node in the DAG represents a task, and the edges between the nodes represent dependencies between the tasks. A DAG does not care about the tasks themselves, just the order, number of retries etc. A complex DAG can become brittle and difficult to troubleshoot, particularly if there are dozens of tasks that must be managed by the architect.
Airflow provides a web interface to manage and monitor the workflows, as well as an API to create, update, and delete workflows. It also has a rich set of features, including support for scheduling, alerting, testing, and version control.
MinIO is the perfect companion for Airflow because of its industry-leading performance and scalability, which puts every data-intensive workload within reach. By storing petabytes of data in MinIO buckets, you can create data pipelines in Airflow to process vast amounts of data which is essential for DAGs to run as quickly as possible. Once the processing is done, you can even store the end result back in a MinIO bucket for other tools to consume. MinIO is capable of tremendous performance - a recent benchmark achieved 325 GiB/s (349 GB/s) on GETs and 165 GiB/s (177 GB/s) on PUTs with just 32 nodes of off-the-shelf NVMe SSDs.
In addition to this, Apache Airflow can also store its logs in a MinIO bucket. This can be helpful in cloud or container orchestration environments, where the local filesystem is ephemeral and the logs can be lost if the machine is terminated or the container is stopped.
To minimize the risk of data loss in these environments, it is recommended to use a more durable and cloud-native storage solution like MinIO to store petabytes of data and logs. This ensures that the data is persisted even if the machine or container is terminated. Further, they can be accessed from any location, networking and security permitting.
There are several reasons to use MinIO with Apache Airflow:
In this tutorial we’ll show you multiple use cases of Airflow with MinIO.
Install Airflow using Pip. You have to install pip
if its not installed and if python
is not available you also need to symlink it.
apt-get install python3-pip
ln -s /usr/bin/python /usr/bin/python3
AIRFLOW_VERSION=2.5.0
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)
pip install "apache-airflow==${AIRFLOW_VERSION}"
Install the Amazon provider for connecting to MinIO
pip install apache-airflow-providers-amazon
Start airflow in standalone mode
airflow standalone
Create a bucket in MinIO using mc make bucket
mc admin alias minio http://<IP>:9000 minioadmin minioadmin
mc mb minio/airflow-logs
Open /root/airflow/airflow.cfg
and add the following settings under [logging]
[logging]
remote_logging = True
remote_base_log_folder = s3://airflow-logs
remote_log_conn_id = my_s3_conn
encrypt_s3_logs = False
For remote_base_log_folder
use the bucket name you created in MinIO in the previous step
The remote_log_conn_id
should match the name of the connection ID we’ll create in the next step.
Within the Airflow UI, go to Admin -> Connections
Create a new connection with the name my_s3_conn
.
Enter minioadmin
for the Access Key and Secret Key.
In Extras, let's set the URL to our local MinIO deployment with the following syntax
{ "endpoint_url": "http://<ip>:9000" }
Now, to test and confirm this is working, Go to DAGs -> example_sensor_decorator and enable this DAG.
On the right hand side using the “Play” button, trigger the DAG.
After a few seconds, once the DAG is finished running, run the following command to see the logs. For each DAG run, a separate log folder is created.
mc ls minio/airflow-logs
Using a MinIO bucket to store logs for Airflow DAG runs is just one of the use cases we’re exploring. In the next phase we’ll create a custom DAG to demonstrate more use cases.
In this example we’re going to create a custom DAG. What will this DAG do?
We’ll connect to the Ghost Blog API
Fetch blogs based on certain parameters
Back them up to a bucket in MinIO
Let’s set up the Framework for the DAG.
Import the required python libraries for the DAG framework
from airflow.decorators import dag, task
Create a schedule for how often to run the DAG
@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
)
Next let's create a Task to pull the blogs from the Ghost API and put them in a MinIO bucket.
Let’s import a couple of python packages to connect to the Ghost API and MinIO bucket
import json
import requests
from minio import Minio
import urllib3
Using the requests
module lets connect to the Ghost API and fetch some blogs
api_token = "<token>"
page = 1
total_pages = 1
while page <= total_pages:
api_url = ("https://minio.ghost.io/ghost/api/content/posts/?limit=1&page=%s&key=%s" % (page, api_token))
response_str = requests.get(api_url)
response_json = requests.get(api_url).json()
print(response_json["meta"])
print(response_json["posts"][0]["url"])
total_pages = response_json["meta"]["pagination"]["pages"]
page = page + 1
Putting blogs in MinIO bucket
config = {
"dest_bucket": "processed", # This will be auto created
"minio_endpoint": "http://<ip>:9000",
"minio_username": "minioadmin",
"minio_password": "minioadmin",
}
# Since we are using self-signed certs we need to disable TLS verification
http_client = urllib3.PoolManager(cert_reqs='CERT_NONE')
urllib3.disable_warnings()
# Initialize MinIO client
minio_client = Minio(config["minio_endpoint"],
secure=True,
access_key=config["minio_username"],
secret_key=config["minio_password"],
http_client = http_client
)
# Create destination bucket if it does not exist
if not minio_client.bucket_exists(config["dest_bucket"]):
minio_client.make_bucket(config["dest_bucket"])
print("Destination Bucket '%s' has been created" % (config["dest_bucket"]))
minio_client.fget_object(bucket_name, object_path, object_path)
print("- Doing some pseudo image resizing or ML processing on %s" % object_path)
minio_client.fput_object(config["dest_bucket"], object_path, object_path)
print("- Uploaded processed object '%s' to Destination Bucket '%s'" % (object_path, config["dest_bucket"]))
minio_client.fput_object(config["dest_bucket"], object_path, object_path)
print("- Uploaded processed object '%s' to Destination Bucket '%s'" % (object_path, config["dest_bucket"]))
Please note the above code is not meant to work “out of the box” but rather give you the idea and show you the path to create your own DAG using your preferred input source; the destination will always be MinIO.
The possibilities are endless when you have cloud-native high-performance storage such as MinIO integrated with cloud-native tools such as Airflow. In this example, we’ve shown you some of the basics, such as saving DAG logs in a MinIO bucket and also writing a custom DAG that can talk to any API and perform operations on it, in this case to back up an entire blog to a MinIO bucket.
But this is just the beginning, when you go cloud-native you tap into myriad integrated frameworks. With Airflow, you can create any number of multi-cloud pipelines. For instance, you can ETL thousands of terabytes of unstructured data into structured data placed in MinIO that other processes can then read and analyze. You could even use this pipeline as an Image Resizer (similar to Orchestrate Complex Workflows Using Apache Kafka and MinIO) by taking images of various sizes, resizing them to the size required by your business, and then putting them into another MinIO bucket where they are available for web serving.
Give Airflow and MinIO a try for yourself, if you have any questions be sure to reach out to us on Slack!