In this post, we will build on our work in and by converting our ETL pipeline into a , which comprises the tasks and dependencies for the pipeline on Airflow. Part II: Foundational ETL Code Part III: Getting Started with Airflow Directed Acyclic Graph (DAG) The Series This is the fourth post in my series, : Towards Open Options Chains: A Data Pipeline for Collecting Options Data at Scale Database Design Foundational ETL Code Getting Started with Airflow Building the DAG Containerising the Pipeline About Directed Acyclic Graphs (DAGs) Airflow defines DAGs as a “core concept of Airflow, collecting Tasks together, organised with dependencies and relationships to say how they should run” [1]. DAGs are essentially Python scripts that contain the code for each step in the data pipeline, and contain the standard code blocks: Library imports DAG arguments DAG definition Task definitions Task pipeline Our Data Pipeline Before we dive into the code blocks, it’s important that we plan ahead. What’s our DAG going to look like? First, we already have our template: the ETL job we’ve written so far! That is, (1) query the TD Ameritrade (TDA) API, (2) process it into the required format, and (3) load it into Postgres. We , because dumping all our code into one function may be problematic. Suppose that step 3 fails in the pipeline. By then, we’ve already retrieved data from the API and processed it, but treating all three steps as one task, Airflow would re-run the entire thing. That’s a waste of resources and time. separate the ETL steps into different tasks in the DAG Second, we write in the flexibility to collect data on new tickers. Consider this scenario: our DAG is currently configured to collect data on FB, and we would like to now switch over to gold (GDX). Our scripts have hardcoded the ticker in the extract and load steps, and we don’t have a table yet. To resolve this, we use a variable in all our steps, and add a new task at the very beginning of the pipeline to . When we change the we want to collect data on, our DAG will run without throwing any errors resulting from the relation (table) not existing in Postgres. FB GDX TICKER create a table for the ticker if it does not already exist TICKER Therefore, the pipeline we intend to build is: With the concept for the DAG defined, let’s run through the code blocks. Library Imports In this first block, we import the necessary libraries to make our code work. The first two imports pertain to time. is used for us to specify dates and durations, and is for us to define timezones, which are essential for scheduling the workflows at the correct time. datetime pendulum # Imports import pendulum from datetime import datetime, timedelta from airflow.models import DAG, Variable from airflow.operators.python_operator import PythonOperator from airflow.providers.postgres.hooks.postgres import PostgresHook The Airflow imports are: : For defining a no-code AI chatbot that automatically downloads options data, asks how much money you want to make, trades on your brokerage to make that amount for you, and keeps 5% commissions so it can expand and take over the world. Yea, no. It defines a DAG. DAG : If you recall from , we created “environment variables” in Airflow. This function allows you to access them. Variable Part III: Getting Started with Airflow : Operators are the basic building blocks of Airflow DAGs. They contain the logic for a single task. The PythonOperator is an Operator that runs Python code. There are many others like BashOperator for running bash scripts, S3FileTransformOperator for working with AWS S3, and even a PostgresOperator for interacting with a PostgreSQL database. PythonOperator : As explained in , Hooks simplify the code needed to interact with other services. Under the hood, Operators use Hooks for interactivity. PostgresHook Part III You may have noticed that most of the imports are for Airflow functions. But where are the others? Where’s ? Where’s ? numpy pandas In Airflow’s guide, it is stated that top-level imports generate a lot of overhead processing. Hence, it’s better to import them inside the Python callables, which are the functions defined in the script and called by the tasks that need them. best practices DAG Arguments Here, we define some arguments we need to instantiate the DAG. I’ve also thrown in the variables that will be required by the Python callables in the pipeline. First, we create the dictionary, which we will pass to the DAG definition. There are many more settings, but the ones I wanted to set explicitly were as shown. Pay close attention to the . We use to set the date, and explicitly specify a timezone with assistance from . This is crucial for us because . This refers to Eastern Standard Time (EST) in autumn/winter (UTC-05:00) and Eastern Daylight Time (EDT) in spring/summer (UTC-04:00). By specifying in our , we make this DAG . There are also benefits downstream when we specify the DAG’s schedule interval. default_args start_date datetime pendulum the market operates on US Eastern time i.e. where Wall Street is America/New_York start_date timezone-aware # Set arguments us_east_tz = pendulum.timezone('America/New_York') default_args = { 'owner': 'chrischow', 'start_date': datetime(2022, 1, 7, 7, 30, tzinfo=us_east_tz), 'retries': 1, 'retry_delay': timedelta(minutes=1) } # Set variables TICKER = 'FB' # Get variables API_KEY = Variable.get('API_KEY') DB_PASSWORD = Variable.get('DB_PASSWORD') Next, we set the ticker that we want to collect data on. . We’ll change this later on when we convert the DAG into a dynamic one. We will set only one ticker for now Finally, we retrieve the variables that we set via the Airflow UI. DAG Definition Next, we define the DAG. As you can see, we load in the from before, and add a description and tags. default_args We also specify the using a cron expression (use to experiment). What the expression means is: schedule_interval Crontab Guru Run every 30th minute past the hour, … From 8am to 9pm, … Every week from Monday to Friday. Note that we specify . We can get away with this because . This is a big gotcha. If we hadn’t set the timezone and instead used Singapore time, we would need two DAGs: one to run from 8pm to midnight (AM in New York) and one to run from midnight to 9am (PM in New York). This is for autumn/winter. We would then have to manually change the time again once spring arrives. 8am - 9pm we set the correct timezone earlier on dag = DAG( dag_id=f'get_options_data_{TICKER}', default_args=default_args, description=f'ETL for {TICKER} options data', schedule_interval='*/30 8-21 * * 1-5', catchup=False, tags=['finance', 'options', TICKER] ) The last setting is . This becomes a problem when we set a start date (from ) in the past, but trigger the DAG now. If we require the DAG to , Airflow will trigger the DAG for intervals that it has not been run for since the last execution date. For example, if the start date was 1 Jan 2021, the last execution date was 1 Jan 2021, and we trigger the dag one year later on 1 Jan , the Airflow scheduler would create and execute DAG runs to make up for the whole damn year of 2021, until 1 Jan 2022. For safety, we turn catchup off. We don’t need it anyway. catchup default_args catchup 2022 Task Definitions From , we wrote most of the code we need to define all the tasks in our data pipeline. But, we still have to figure out how to transfer data between tasks. Inside a Python runtime, all outputs from any function can be accessed by any other function. Inside an Airflow DAG, each Operator saves no outputs. Creation of temporary files are handled inside the Python callable. Therefore, unlike the Python runtime, we need to explicitly save data in a staging area, or find a way to pass data around. Part II Fortunately, Airflow has or that enable Tasks to communicate with one another [3]. XComs comprise a key to identify themselves, the task ID and DAG ID it came from, and serialisable values. In a way, XComs abstract away the staging process for you. Thanks to Postgres, the maximum size for an XCom is 1GB (another reason why I chose it). In contrast, the limit for MySQL is a measly 64kb. Using SQLite would have given us 2GB, but as a database engine, it does not have the robust features that Postgres does. XComs cross-communications In our task definitions, we need to add code to use XComs to transfer data (1) from the extract to the transform step, and (2) from the transform to the load step. You might be wondering why we didn’t simply use a temporary directory as a staging area. The reason is that if this solution is upgraded into a one, we can’t be confident that all tasks in the same DAG run will be completed on the same machine. If one task is on machine A, and a task that depends on the first task is on machine B, it will not have access to the temporary folder created on machine A. Besides, if you’re running this solution locally, it won’t hurt to use XComs. distributed Task 0: Create Table The first task is to create a table based on a ticker, if the table does not already exist. Only two lines of code are required: (1) connect to Postgres via the PostgresHook, and (2) create the table using the same SQL statement as in . Part II # Function to create table def create_table(ticker): # Define Postgres hook pg_hook = PostgresHook(postgres_conn_id='postgres_optionsdata') # Create table if it doesn't exist pg_hook.run(f""" CREATE TABLE IF NOT EXISTS {ticker} ( put_call VARCHAR(5) NOT NULL, symbol VARCHAR(32) NOT NULL, description VARCHAR(64) NOT NULL, bid DOUBLE PRECISION, ask DOUBLE PRECISION, last DOUBLE PRECISION, bid_size INTEGER, ask_size INTEGER, last_size INTEGER, high_price DOUBLE PRECISION, low_price DOUBLE PRECISION, open_price DOUBLE PRECISION, close_price DOUBLE PRECISION, total_volume INTEGER, quote_time BIGINT, volatility DOUBLE PRECISION, delta DOUBLE PRECISION, gamma DOUBLE PRECISION, theta DOUBLE PRECISION, vega DOUBLE PRECISION, rho DOUBLE PRECISION, open_interest INTEGER, time_value DOUBLE PRECISION, theoretical_value DOUBLE PRECISION, strike_price DOUBLE PRECISION, expiration_date BIGINT, dte INTEGER, PRIMARY KEY (symbol, quote_time) ) """) Once again, the benefit of dynamically creating tables is that when we input a new ticker that we have not collected data on before, the pipeline will simply create a new table for that ticker before running the ETL steps. You could, of course, drop this step from the pipeline later on if you want strict control over the tickers being monitored. Task 1: Extract This is where XComs come into play. To use XComs, we need to add the argument or . Airflow passes this task instance object into the callables, thereby giving us access to XComs through it. As you can see in the final line of the code, we use the object to push the raw data in Python dictionary format (because it’s serialisable) as an XCom. ti task_instance ti # Function to get data from TDA API def extract_options_data_from_tda(ticker, ti): # Import modules import json import requests # Configure dates start_date = datetime.utcnow().replace(tzinfo=us_east_tz) end_date = start_date + timedelta(days=45) # Configure request headers = {'Authorization': ''} params = ( ('apikey', API_KEY), ('symbol', ticker), ('contractType', 'PUT'), ('strikeCount', '50'), ('range', 'ALL'), ('fromDate', start_date), ('toDate', end_date), ) # Get data response = requests.get( 'https://api.tdameritrade.com/v1/marketdata/chains', headers=headers, params=params ) data = json.loads(response.content) # Push XCOM ti.xcom_push(key='raw_data', value=data) Task 2: Transform The code below should look familiar. In this function, we have only one argument: . Near the top of the function definition, we use it to the data from the previous task that was stored as an XCom. ti pull # Function to transform data def transform_options_data(ti): # Import modules import pandas as pd # Pull XCOM data = ti.xcom_pull(key='raw_data', task_ids=['extract_options_data_from_tda'])[0] # Define columns columns = ['putCall', 'symbol', 'description', 'exchangeName', 'bid', 'ask', 'last', 'mark', 'bidSize', 'askSize', 'bidAskSize', 'lastSize', 'highPrice', 'lowPrice', 'openPrice', 'closePrice', 'totalVolume', 'tradeDate', 'tradeTimeInLong', 'quoteTimeInLong', 'netChange', 'volatility', 'delta', 'gamma', 'theta', 'vega', 'rho', 'openInterest', 'timeValue', 'theoreticalOptionValue', 'theoreticalVolatility', 'optionDeliverablesList', 'strikePrice', 'expirationDate', 'daysToExpiration', 'expirationType', 'lastTradingDay', 'multiplier', 'settlementType', 'deliverableNote', 'isIndexOption', 'percentChange', 'markChange', 'markPercentChange', 'mini', 'inTheMoney', 'nonStandard'] # Extract puts data puts = [] dates = list(data['putExpDateMap'].keys()) for date in dates: strikes = data['putExpDateMap'][date] for strike in strikes: puts += data['putExpDateMap'][date][strike] # Convert to dataframe puts = pd.DataFrame(puts, columns=columns) # Select columns puts = puts[['putCall', 'symbol', 'description', 'bid', 'ask', 'last', 'bidSize', 'askSize', 'lastSize', 'highPrice', 'lowPrice', 'openPrice', 'closePrice', 'totalVolume', 'quoteTimeInLong', 'volatility', 'delta', 'gamma', 'theta', 'vega', 'rho', 'openInterest', 'timeValue', 'theoreticalOptionValue', 'strikePrice', 'expirationDate', 'daysToExpiration']] # Convert floats def conv_num(x): return pd.to_numeric(x.astype(str).str.replace('NaN|nan', '', regex=True)) for col in ['bid', 'ask', 'last', 'highPrice', 'lowPrice', 'openPrice', 'closePrice', 'volatility', 'delta', 'gamma', 'theta', 'vega', 'rho', 'timeValue', 'theoreticalOptionalValue', 'strikePrice']: puts[col] = conv_num(puts[col]) # Specifically for puts delta: make it positive puts['delta'] = -puts['delta'] # Convert strings def conv_str(x): return x.astype(str) for col in ['putCall', 'symbol', 'description']: puts[col] = conv_str(puts[col]) # Convert integers def conv_int(x): return x.astype(int) for col in ['bidSize', 'askSize', 'lastSize', 'totalVolume', 'quoteTimeInLong', 'openInterest', 'expirationDate', 'daysToExpiration']: puts[col] = conv_int(puts[col]) # Fill missing values puts = puts.fillna(-99) # Rename columns puts = puts.rename(columns={ 'putCall': 'put_call', 'bidSize': 'bid_size', 'askSize': 'ask_size', 'lastSize': 'last_size', 'highPrice': 'high_price', 'lowPrice': 'low_price', 'openPrice': 'open_price', 'closePrice': 'close_price', 'totalVolume': 'total_volume', 'quoteTimeInLong': 'quote_time', 'openInterest': 'open_interest', 'timeValue': 'time_value', 'theoreticalOptionValue': 'theoretical_value', 'strikePrice': 'strike_price', 'expirationDate': 'expiration_date', 'daysToExpiration': 'dte', }) # Push XCOM ti.xcom_push(key='transformed_data', value=puts.to_dict('records')) After processing the data, we do the same thing as we did in Task 2: we push the data as an XCom. Note that XCom values . Hence, we convert the data into a Python dictionary before pushing. must be serialisable Task 3: Load Finally, we use the function below to load the data into Postgres. The steps are a combination of what we’ve seen before. We (1) create a PostgresHook to connect to Postgres, (2) pull the XCom from the previous task, (3) convert it back into a dataframe, (4) prepare the SQL query for inserting it into Postgres, and (5) run the query. Note that the query has been updated to use the specified ticker. INSERT # Function to load data def load_data_into_postgres(ticker, ti): # Import modules import pandas as pd # Define Postgres hook pg_hook = PostgresHook(postgres_conn_id='postgres_optionsdata') # Pull XCOM puts = ti.xcom_pull(key='transformed_data', task_ids=['transform_options_data'])[0] puts = pd.DataFrame(puts) # Prepare insert query col_str = ', '.join(puts.columns.tolist()) query_insert = f"INSERT INTO {ticker} ({col_str}) VALUES %s ON CONFLICT DO NOTHING" # Convert to rows rows = list(puts.itertuples(index=False, name=None)) for row in rows: pg_hook.run(query_insert % str(row)) Operators Until now, we’ve only defined the Python callables that contain the logic for each step in the pipeline. We still need to call them. We do so using Operators within the DAG context: with dag: # Define operators task0_create_table = PythonOperator( task_id='create_table', python_callable=create_table, op_kwargs={'ticker': TICKER} ) task1_extract = PythonOperator( task_id='extract_options_data_from_tda', python_callable=extract_options_data_from_tda, op_kwargs={'ticker': TICKER} ) task2_transform = PythonOperator( task_id = 'transform_options_data', python_callable=transform_options_data ) task3_load = PythonOperator( task_id='load_data_into_postgres', python_callable=load_data_into_postgres, op_kwargs={'ticker': TICKER} ) Task Pipeline The task pipeline indicates the dependencies between the tasks. It should also be defined inside the DAG context (see above). Our pipeline is extremely simple because it’s linear: # Set up dependencies task0_create_table >> task1_extract >> task2_transform >> task3_load Dynamic DAGs Recall that at the start of this post, we defined . We then used this in our DAG definition and Operators. The problem with the way this code is written now is that we would need one pretty much one identical script per ticker we would like to monitor. The only different would be the contents of the variable. TICKER='FB' TICKER To not repeat ourselves, we will refactor the code so that we’ll be creating , which are adaptive pipelines that change based on the inputs we give them. This code should replace the relevant sections previously presented. dynamic DAGs First, we specify a list of tickers that we want to monitor ( ). Then, we wrap the DAG definition and operators in a function that takes the and as arguments. When this function is called, it will dynamically: TICKERS ticker default_args Create a new DAG with an ID that represents the ticker Provide a description that says which ticker is being monitored Adds the ticker as a tag for ease of filtering in the Airflow UI Task 0: If run for the first time, it will create a new Postgres table named after that ticker Task 1: Query the TDA API for that ticker Task 3: Load the processed data into its dedicated table in Postgres # List of tickers TICKERS = ['FB', 'GOOG'] # Function to create DAG def create_dag(ticker, default_args): dag = DAG( dag_id=f'get_options_data_{ticker}', default_args=default_args, description=f'ETL for {ticker} options data', schedule_interval='*/30 8-21 * * 1-5', catchup=False, tags=['finance', 'options', ticker] ) with dag: # Define operators task0_create_table = PythonOperator( task_id='create_table', python_callable=create_table, op_kwargs={'ticker': ticker} ) task1_extract = PythonOperator( task_id='extract_options_data_from_tda', python_callable=extract_options_data_from_tda, op_kwargs={'ticker': ticker} ) task2_transform = PythonOperator( task_id = 'transform_options_data', python_callable=transform_options_data ) task3_load = PythonOperator( task_id='load_data_into_postgres', python_callable=load_data_into_postgres, op_kwargs={'ticker': ticker} ) # Set up dependencies task0_create_table >> task1_extract >> task2_transform >> task3_load return dag # Create DAGs for ticker in TICKERS: globals()[f'get_options_data_{ticker}'] = create_dag(ticker, default_args) Thereafter, we loop through the list of tickers, registering each dynamic DAG in the dictionary of global variables through . In the Airflow UI, we should see one DAG for each ticker: globals() Now, we have a single script that defines the standard data pipeline for collecting options data on a specified list of tickers. Changes to the script will change the pipelines for the respective tickers. all Testing the DAG Finally, we trigger the DAG to ensure it is working fine. Re-launch Airflow with the following commands: # In one bash terminal, run: airflow scheduler # In a separate bash terminal, run: airflow webserver On the far right of the main dashboard (DAGs), click the play button, and then “Trigger DAG”. The DAG will run. Then, click on the DAG ID ( , for example) to see more details about the DAG. get_options_data_FB In the Tree view, you should see the status of each task in each DAG run. Green is good. If the tasks fail, you can investigate what went wrong via the logs. Click on a task instance (coloured square in the Tree view), and then click the Log button. Here’s a sample of the logs of a task instance for the load step in our pipeline: Recap / Quickstart Here’s a quick recap of what you need to do to get the whole system up and running. In a bash terminal: # Start Postgres sudo service postgresql start # Activate environment conda activate airflow # Start the Airflow Scheduler airflow scheduler # To shut it down, hit Ctrl+C In a new bash terminal: # Start the Airflow Webserver airflow webserver # To shut it down, hit Ctrl+C The Airflow UI should now be available at . http://localhost:8080 Summary In this post, we built our data pipeline as a DAG in Airflow, and refactored the code to facilitate the creation of identical DAGs for other tickers (dynamic DAGs). Kudos to you if you followed the series and implemented the code on your own up to this point. By now, you should have Airflow and Postgres set up on your local machine, and a swanky new data pipeline installed in Airflow as a dynamic DAG. This should be sufficient for you to start collecting options data for your favourite tickers. But, isn’t it all quite cumbersome having to open a few bash terminals and launch each service manually? In the next post, we move our solution into Docker containers. This will allow us to launch and close our app more easily. Credits for image: Kevin Ku on Unsplash References , Apache Airflow Documentation DAGs , Apache Airflow Documentation Best Practices , Apache Airflow Documentation XComs