In this post, we develop the essential code for extracting, transforming, and loading options data. We will need to retrieve data from the TD Ameritrade API, transform it, and load the data into the PostgreSQL database we created in Part I: Designing the Database. We will also run some checks to ensure that the pipeline works.
This is the second post in my series, Towards Open Options Chains: A Data Pipeline Solution:
requests
, python-dotenv
, pandas
, and psycopg2
The use case for the data is to run backtests of options trading strategies. Therefore, the data pipeline must collect sufficiently fine-grained data for us to do backtests. This means we'll need options prices, volumes, and the options greeks (e.g. delta, theta, and gamma), collected at sufficiently high frequency.
The first step in our pipeline involves extracting the data. In our case, we will retrieve data from the TD Ameritrade (TDA) Option Chains API.
First, we will obtain credentials for using the API by doing the following:
Next, we construct requests to the TDA Option Chains API in Python. We define a few parameters:
.env
) in the same folder containing your API key in a variable API_KEY=<your API key here>
. We will load it into Python using python-dotenv
.datetime
module. I've chosen to use the current date as the start date and the day 45 days out as the end date.
That gives us the following:
# Import modules
import json
import requests
from dotenv import dotenv_values
# Load environment variables - API key is accessible at env['API_KEY']
env = dotenv_values('.env')
# Configure dates
start_date = datetime.today()
end_date = start_date + timedelta(days=45)
# Other params
TICKER = 'FB'
# Configure request
headers = {'Authorization': ''}
params = (
('apikey', env['API_KEY']),
('symbol', TICKER),
('contractType', 'PUT'),
('strikeCount', '50'),
('range', 'ALL'),
('fromDate', start_date),
('toDate', end_date),
)
We now use the requests
library to send the request with the defined headers and parameters, receiving a JSON response with the options chain if all went well:
# Get data
response = requests.get(
'https://api.tdameritrade.com/v1/marketdata/chains',
headers=headers,
params=params
)
# Extract data
data = json.loads(response.content)
The data is structured as follows:
root
└── Puts/Calls
└── Contract (by expiry date)
└── Strike
└── Fields (e.g. prices, greeks)
We parse the JSON object using the following code to obtain a Pandas dataframe:
# Extract puts data
puts = []
dates = list(data['putExpDateMap'].keys())
for date in dates:
strikes = data['putExpDateMap'][date]
for strike in strikes:
puts += data['putExpDateMap'][date][strike]
# Define columns
columns = ['putCall', 'symbol', 'description', 'exchangeName', 'bid', 'ask',
'last', 'mark', 'bidSize', 'askSize', 'bidAskSize', 'lastSize',
'highPrice', 'lowPrice', 'openPrice', 'closePrice', 'totalVolume',
'tradeDate', 'tradeTimeInLong', 'quoteTimeInLong', 'netChange',
'volatility', 'delta', 'gamma', 'theta', 'vega', 'rho', 'openInterest',
'timeValue', 'theoreticalOptionValue', 'theoreticalVolatility',
'optionDeliverablesList', 'strikePrice', 'expirationDate',
'daysToExpiration', 'expirationType', 'lastTradingDay', 'multiplier',
'settlementType', 'deliverableNote', 'isIndexOption', 'percentChange',
'markChange', 'markPercentChange', 'mini', 'inTheMoney', 'nonStandard']
# Convert to dataframe
puts = pd.DataFrame(puts, columns=columns)
The transform step will convert the data into the final structure/format to be loaded into Postgres. For our use case, it is fairly simple: (1) select the right columns and (2) ensure we have the right data types.
First, inspecting the data, we identify several fields to keep (below). We will select only these fields for processing.
Second, we ensure that the data is in the right format. Most of the columns have been parsed by Pandas correctly. Although we could assume that Pandas will continue to parse the data correctly, it may not be wise to do so. Suppose that for the first 6 months, the API has always returned 100% float values for the OHLC prices. One day, it starts to return the string nan
, just like the options greeks data returned today. This results in Pandas now parsing the OHLC prices as object
types and the data loading step in the pipeline may fail. Hence, we should explicitly format the data. Also, for any data that was coerced to missing values, we fill that in with a placeholder value -99
.
The final step is to convert the column names into snake case. This is just a personal preference. The code to do everything we discussed is shown below:
# Select columns
puts = puts[['putCall', 'symbol', 'description', 'bid', 'ask', 'last', 'bidSize',
'askSize', 'lastSize', 'highPrice', 'lowPrice', 'openPrice',
'closePrice', 'totalVolume', 'quoteTimeInLong', 'volatility', 'delta',
'gamma', 'theta', 'vega', 'rho', 'openInterest', 'timeValue',
'theoreticalOptionValue', 'strikePrice', 'expirationDate',
'daysToExpiration']]
# Convert floats
def conv_num(x):
return pd.to_numeric(x.astype(str).str.replace('NaN|nan', '', regex=True))
for col in ['bid', 'ask', 'last', 'highPrice', 'lowPrice', 'openPrice',
'closePrice', 'volatility', 'delta', 'gamma', 'theta', 'vega',
'rho', 'timeValue', 'theoreticalOptionalValue', 'strikePrice']:
puts[col] = conv_num(puts[col])
# Specifically for puts delta: make it positive
puts['delta'] = -puts['delta']
# Convert strings
def conv_str(x):
return x.astype(str)
for col in ['putCall', 'symbol', 'description']:
puts[col] = conv_str(puts[col])
# Convert integers
def conv_int(x):
return x.astype(int)
for col in ['bidSize', 'askSize', 'lastSize', 'totalVolume', 'quoteTimeInLong',
'openInterest', 'expirationDate', 'daysToExpiration']:
puts[col] = conv_int(puts[col])
# Fill missing values
puts = puts.fillna(-99)
# Rename columns
puts = puts.rename(columns={
'putCall': 'put_call',
'bidSize': 'bid_size',
'askSize': 'ask_size',
'lastSize': 'last_size',
'highPrice': 'high_price',
'lowPrice': 'low_price',
'openPrice': 'open_price',
'closePrice': 'close_price',
'totalVolume': 'total_volume',
'quoteTimeInLong': 'quote_time',
'openInterest': 'open_interest',
'timeValue': 'time_value',
'theoreticalOptionValue': 'theoretical_value',
'strikePrice': 'strike_price',
'expirationDate': 'expiration_date',
'daysToExpiration': 'dte',
})
After extracting the data and transforming it to conform to the defined schema, we will need to load the data into our Postgres database. We use INSERT ... ON CONFLICT DO NOTHING
, keeping only the first entry in the event of repeated rows:
INSERT INTO FB (<all columns here>) VALUES (<all values here>) ON CONFLICT DO NOTHING;
We use the psycopg2
library to load the data from Python into Postgres. We follow the typical process of (1) defining the query (as above), (2) connecting to the database, (3) creating a cursor, (4) executing the query with the parameters, (5) committing the transaction, and then (6) closing the connection. Note that we have placed the database password into the .env
file to avoid hardcoding it.
import psycopg2 as pg2
col_str = ', '.join(puts.columns.tolist())
query_insert = f"INSERT INTO fb ({col_str}) VALUES %s ON CONFLICT DO NOTHING"
# Connect to database
conn = pg2.connect(
host='localhost',
database='optionsdata',
user='postgres',
password=env['DB_PASSWORD']
)
# Loop through rows
with conn.cursor() as cursor:
for t in puts.itertuples(index=False, name=None):
cursor.execute(query_insert % str(t))
conn.commit()
# Close connection
conn.close()
To inspect the data, we retrieve all records from the table using the following code:
# Run select * query
with conn.cursor() as cursor:
cursor.execute('SELECT * FROM fb;')
df = pd.DataFrame(cur.fetchall(), columns=...)
The first time we run both snippets above, we'll obtain a dataframe (df
) containing data from the latest API query. If we run it again without changing the API data, we should see no change to df
.
In this post, we achieved the following:
Putting everything together, we have a one-off script that enables us to extract data from an API, transform it, and load it into a Postgres database. This is similar in functionality to the solutions by Kleppen [2], BlackArbs [3], and Sauers [4], which have data persisted in a database or file as opposed to a Pandas dataframe, putting them one level above the other solutions. Still, this is only a snapshot. We need to run this code repeatedly at different times of the day, and a tool to manage these runs.
In the next post, we will set up Apache Airflow, a workflow orchestration tool commonly used for managing data pipelines. It will enable us to schedule ETL jobs for automated data collection.
Credits for image: Kevin Ku on Unsplash
This article was first published here