This Tutorial shows how to generate a billing for AWS Glue ETL Job usage (simplified and assumed problem details), with the goal of learning to:
Repository: https://gitlab.com/suekto.andreas/gluebilling
This tutorial shall build a simplified problem of generating billing reports for usage of AWS Glue ETL Job. (Disclaimer: all details here are merely hypothetical and mixed with assumption by author)
Let’s say as an input data is the logs records of job id being run, the start time in RFC3339, the end time in RFC3339, and the DPU it used.
The price of usage is 0.44USD per DPU-Hour, billed per second, with a 10-minute minimum for each ETL job, while crawler cost 0.20USD per DPU-Hour, billed per second with a 200s minimum for each run (once again these numbers are made up for the purpose of learning.)
Now we are going to calculate the daily billing summary for our AWS Glue ETL usage.
Actually for this project I am using a bash-script builder to establish the basis of the project, it still in beta state, and this tutorial is being used to see how comfortable I am with that auto script builder. Similar result can be achieved by manually following this articles:
To do the above we would go along with the following pseudocode:
First let’s build the calculate duration function. The following is the module directory structure that we are going to use :
module gluebilling — billing.py
All files *.py are started with empty files, the __init__.py is to indicate that we define gluebilling as a python module.
let’s start with writing the function skeleton in billing.py.
"""Process records of ETL Job usage into the billing of fee"""from datetime import datetime as dt
def calculate_duration(from_timestamp, to_timestamp):"""Returns duration in second between two timereturn 0
Based on the skeleton, we already have a clearer though of how it should be called and what kind of value shall it returns, now let us create the unit test for this function, to define in more details the behaviour of it. This is written in billing_utest.py
"""Unit Testing for Glue Billing Project"""import unittest
import gluebilling.billing as billing
class CalculateDurationTest(unittest.TestCase):"""Test Cases for Glue Billing Unit Testing"""
def test\_positive\_duration(self):
"""Test successfully calculate duration
between 2 unix timestamp string"""
duration = billing.calculate\_duration(
"1535824800", "1535835600")
self.assertEqual(duration, 10800)
def test\_negative\_duration(self):
"""Test successfully generate negative number"""
duration = billing.calculate\_duration(
"1535835600", "1535824800")
self.assertEqual(duration, -10800)
In writing a unit testing, we import the unittest module, and the module of which we want to test (gluebilling.billing) module.
Next we define the class to host the test cases (CalculateDurationTest) extending the base class (unittest.TestCase)
Afterward within the CalculateDurationTest, we define the list of test cases in a object method format prefix with `test_`. We put in the input arguments, execute the function we want to test, and finally check the result using the assertEqual function coming from unittest.TestCase.
if we run the unittest now it should be failing
$ coverage run --source=gluebilling -m unittest discover -p "*utest*.py" -f -s gluebilling/
coverage with unittest result
Now after having the red state, let us start implementing the function
"""Process records of ETL Job usage into the billing of fee"""from datetime import datetime as dt
def calculate_duration(from_timestamp, to_timestamp):"""Returns duration in second between two timeprovided in unix timestamp"""from_dt = dt.fromtimestamp(float(from_timestamp))to_dt = dt.fromtimestamp(float(to_timestamp))
time\_delta = to\_dt - from\_dt
return time\_delta.total\_seconds()
Now we shall have all test cases in green state
calculate_duration implementation completed !
Noticed that the screenshot above is a development environment setup with automated run of testing and coverage calculation, based on https://hackernoon.com/setting-up-python-dev-in-vs-code-e84f01c1f64b
Following up the same technique above we are adding the calculate fee function as below :
in billing.py
"""Process records of ETL Job usage into the billing of fee"""from datetime import datetime as dtfrom decimal import Decimal
...
def calculate_fee(duration_in_second, dpu_num,minimum_duration, fee_dpu_hour):"""Returns a decimal of the fee incurred in USD,quantized into 8 digit behind comma"""
charged\_duration = duration\_in\_second
if charged\_duration < minimum\_duration:
charged\_duration = minimum\_duration
fee = charged\_duration \* dpu\_num \* Decimal(fee\_dpu\_hour) / 3600
return fee.quantize(Decimal('0.00000000'))
in billing_utest.py we shall add another class to host test cases for calculate_fee.
"""Unit Testing for Glue Billing Project"""import unittestfrom decimal import Decimal
import gluebilling.billing as billing
...
class CalculateFeeTest(unittest.TestCase):"""Unit Test for calculate_fee function"""
def test_more_than_10_min(self):"""When usage is more than 10 min"""fee = billing.calculate_fee(800, 3, 600, "0.44")self.assertEqual(fee, Decimal("0.29333333"))
Noticed that the dpu_hour value is set as string “0.44” instead of float, as this is the more accurate result of calculation if we further convert it into Decimal type.
The reason why I separate the test cases for the 2 functions into different classes because the pylint C0103 snake case requires the length of function capped into 30 characters, so to maintain readability we divide it into different classes for each function to test.
The coverage test result after implementing calculate_fee, notice that the coverage can be done better, it is left for the reader to exercise (see the red colour bar)
This article is using similar basic concept from tutorial from David Illes, the differences would be in the details where we focus our setup to be completely standalone (this shall be reflected in how we initialised the Spark Session, and how we prepare the test data)
Here is the version that we are going to use, we store it as pyspark_htest.py
"""PySparkTest is base class to do functional testing on PySpark"""import unittestimport loggingimport osfrom pyspark.sql import SparkSessionfrom pandas.testing import assert_frame_equal
class PySparkTest(unittest.TestCase):"""BaseClass which setup local PySpark"""
@classmethod
def suppress\_py4j\_logging(cls):
"""Supress the logging level into WARN and above"""
logger = logging.getLogger('py4j')
logger.setLevel(logging.WARN)
@classmethod
def create\_testing\_pyspark\_session(cls):
"""Returns SparkSession connecting to local context
the extrajava session is to generate
the metastore\_db and derby.log into .tmp/ directory"""
tmp\_dir = os.path.abspath(".tmp/")
return (SparkSession.builder
.master('local\[1\]')
.appName('local-testing-pyspark-context')
.config("spark.driver.extraJavaOptions",
"-Dderby.system.home="+tmp\_dir)
.config("spark.sql.warehouse.dir", tmp\_dir)
.getOrCreate())
@classmethod
def setUpClass(cls):
"""Setup the Spark"""
cls.suppress\_py4j\_logging()
cls.spark = cls.create\_testing\_pyspark\_session()
@classmethod
def tearDownClass(cls):
"""Clean up the Class"""
cls.spark.stop()
@classmethod
def assert\_dataframe\_equal(cls, actual, expected, keycolumns):
"""Helper function to compare small dataframe"""
exp\_pd = expected.toPandas().sort\_values(
by=keycolumns
).reset\_index(drop=True)
act\_pd = actual.toPandas().sort\_values(
by=keycolumns
).reset\_index(drop=True)
return assert\_frame\_equal(act\_pd, exp\_pd)
Noticed that this base class providing many useful class methods with the goal of :
Let’s create the skeleton of the function (billing.py) alongside with the expected Schema definition of our input records and targetted output, the billing records.
def get_usage_record_schema():"""Retruns StructType containing the Input UsageData Expected Schema"""return StructType([StructField("job_id", StringType(), False),StructField("type", StringType(), False),StructField("dpu", IntegerType(), False),StructField("from_unix_timestamp", StringType(), False),StructField("to_unix_timestamp", StringType(), False)])
def get_pricing_schema():"""Retruns StructType containing the InputPricing Data Expected Schema"""return StructType([StructField("type", StringType(), False),StructField("dpu_hour", StringType(), False),StructField("minimum_duration", IntegerType(), False)])
def get_billing_schema():"""Retruns StructType containing the Billing Schema"""return StructType([StructField("job_id", StringType(), False),StructField("type", StringType(), False),StructField("dpu", IntegerType(), False),StructField("from_unix_timestamp", StringType(), False),StructField("to_unix_timestamp", StringType(), False),StructField("dpu_hour", StringType(), False),StructField("minimum_duration", IntegerType(), False),StructField("duration", IntegerType(), False),StructField("fee", DecimalType(20, 8), False),])
def generate_billing(usage_df, pricing_df):"""Returns DataFrame of Fee from a DataFrame of Usage Records"""return None
Next we go into the billing_ftest.py to prepare for our functional testing.
"""Functional Testing for Glue Billing Project"""from decimal import Decimalfrom pyspark.sql import Row, SQLContext
import gluebilling.billing as billingimport gluebilling.pyspark_htest as pysparktest
class GenerateBillingTest(pysparktest.PySparkTest):"""Test Cases for Generate Billing"""
def generate\_usage\_data\_001(self):
"""Generate usage data for testing it is a record of
AWS Glue ETL usage"""
rdd = self.spark.sparkContext.parallelize(\[
Row("JOB001", "etl", 3, "1535824800", "1535835600"),
Row("JOB002", "crawler", 3, "1535824800", "1535824850"),
Row("JOB003", "crawler", 3, "1535824800", "1535835600")
\])
schema = billing.get\_usage\_record\_schema()
sqlctx = SQLContext(self.spark.sparkContext)
return sqlctx.createDataFrame(rdd, schema)
def generate\_pricing\_data\_001(self):
"""Generate pricing data for testing it is a record of
AWS Glue ETL usage"""
rdd = self.spark.sparkContext.parallelize(\[
Row("etl", "0.44", 600),
Row("crawler", "0.20", 200)
\])
schema = billing.get\_pricing\_schema()
sqlctx = SQLContext(self.spark.sparkContext)
return sqlctx.createDataFrame(rdd, schema)
def generate\_expected\_billing\_001(self):
"""Generate expected billing"""
rdd = self.spark.sparkContext.parallelize(\[
Row("JOB001", "etl", 3, "1535824800", "1535835600",
"0.44", 600, 10800, Decimal("3.96")),
Row("JOB002", "crawler", 3, "1535824800", "1535824850",
"0.20", 200, 50, Decimal("0.03333333")),
Row("JOB003", "crawler", 3, "1535824800", "1535835600",
"0.20", 200, 10800, Decimal("1.80"))
\])
schema = billing.get\_billing\_schema()
sqlctx = SQLContext(self.spark.sparkContext)
return sqlctx.createDataFrame(rdd, schema)
def test\_with\_set\_001(self):
"""Using all 001 test data set"""
usage\_df = self.generate\_usage\_data\_001()
pricing\_df = self.generate\_pricing\_data\_001()
expected = self.generate\_expected\_billing\_001()
actual = billing.generate\_billing(usage\_df, pricing\_df)
self.assert\_dataframe\_equal(actual, expected, \["job\_id"\])
We prepare our input data in two function:1. generate_usage_data_0012. generate_pricing_data_001The technique being used here is by creating rdd using Row and paralellize method of SparkContext, and then combining with the defined Schema from the main script.
We also prepare the expected output data inside billing_ftest.py, in function generate_expected_billing_001, with similar technique as we prepare the input data.
The last component of billing_ftest.py is test_with_set_001, which is where the test being executed by combining the generation functions of input, and expected dataframe, and then we execute the main script function generate_billing, finally we do asssertion, by leveraging the helper assert method we define in pyspark_htest.py.
Finally let’s complete the function implementation in billing.py.
def generate_billing(usage_df, pricing_df):"""Returns DataFrame of Fee from a DataFrame of Usage Records"""
duration\_udf = udf(calculate\_duration, IntegerType())
join\_data\_df = usage\_df.join(
pricing\_df,
usage\_df.type == pricing\_df.type
).select(
usage\_df.job\_id, usage\_df.type,
usage\_df.dpu, usage\_df.from\_unix\_timestamp,
usage\_df.to\_unix\_timestamp,
pricing\_df.dpu\_hour, pricing\_df.minimum\_duration,
duration\_udf(
usage\_df.from\_unix\_timestamp,
usage\_df.to\_unix\_timestamp).alias("duration")
)
fee\_udf = udf(calculate\_fee, DecimalType(20, 8))
billing\_df = join\_data\_df.select(
"job\_id", "type", "dpu", "from\_unix\_timestamp",
"to\_unix\_timestamp", "dpu\_hour", "minimum\_duration",
"duration",
fee\_udf(
join\_data\_df.duration, join\_data\_df.dpu,
join\_data\_df.minimum\_duration, join\_data\_df.dpu\_hour
).alias("fee")
)
return billing\_df
We are wrapping our calculate_fee and calculate_duration function into udf, as this is the the type that can be passed into pyspark. The 1st argument is the function to be wrapped, while the 2nd argument is the expected return type.
Then we use this in the SELECT part of the PySpark SQL DataFrame to generate the 2 new columns duration and fee accordingly.
That’s it now we have implemented the functional test, I differentiate this with the calculate_fee and calculate_duration because of the speed on how this test being executed, it requires several seconds to start up the pyspark hence its worthed to be group differently.
This open up options for us to filter out what kind of test do we want to run on save (in case we are automating the development experience)