This Tutorial shows how to generate a billing for AWS Glue ETL Job usage (simplified and assumed problem details), with the goal of learning to: 1. Unittest in PySpark 2. Writing Basic Function Definition and Conversion to UDF Repository: [https://gitlab.com/suekto.andreas/gluebilling](https://gitlab.com/suekto.andreas/gluebilling) ### Business Problem This tutorial shall build a simplified problem of generating billing reports for usage of AWS Glue ETL Job. (Disclaimer: all details here are merely hypothetical and mixed with assumption by author) Let’s say as an input data is the logs records of job id being run, the start time in RFC3339, the end time in RFC3339, and the DPU it used. The price of usage is 0.44USD per DPU-Hour, billed per second, with a 10-minute minimum for each ETL job, while crawler cost 0.20USD per DPU-Hour, billed per second with a 200s minimum for each run (once again these numbers are made up for the purpose of learning.) Now we are going to calculate the daily billing summary for our AWS Glue ETL usage. ### Prerequisite 1. Install JDK 1.8, and setup the JAVA\_HOME into the corresponding location (I personally using Mac, and following this [link](https://docs.oracle.com/javase/8/docs/technotes/guides/install/mac_jdk.html) and [download-link](http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html) — and JAVA\_HOME would be /Library/Java/JavaVirtualMachines/jdk1.8.0\_181.jdk/Contents/Home/) 2. Python 2.7.x 3. Python Coverage, unittest, and PySpark packages (Preferably installed under virtualenv) Actually for this project I am using a bash-script builder to establish the basis of the project, it still in beta state, and this tutorial is being used to see how comfortable I am with that auto script builder. Similar result can be achieved by manually following this articles: * [https://hackernoon.com/setting-up-python-dev-in-vs-code-e84f01c1f64b](https://hackernoon.com/setting-up-python-dev-in-vs-code-e84f01c1f64b) * [https://medium.com/@suekto.andreas/automate-documentation-in-python-925c38eae69f](https://medium.com/@suekto.andreas/automate-documentation-in-python-925c38eae69f) ### Activities To do the above we would go along with the following pseudocode: 1. Load into PySpark DataFrame 2. Calculate the duration in seconds out of the from and to unix timestamp 3. Calculate the fee per records #### 001 — Calculate Duration (+ unit test) First let’s build the calculate duration function. The following is the module directory structure that we are going to use :  module gluebilling — billing.py All files \*.py are started with empty files, the \_\_init\_\_.py is to indicate that we define gluebilling as a python module. let’s start with writing the function skeleton in billing.py. """Process records of ETL Job usage into the billing of fee""" from datetime import datetime as dt def calculate\_duration(from\_timestamp, to\_timestamp): """Returns duration in second between two time return 0 Based on the skeleton, we already have a clearer though of how it should be called and what kind of value shall it returns, now let us create the unit test for this function, to define in more details the behaviour of it. This is written in billing\_utest.py """Unit Testing for Glue Billing Project""" import unittest import gluebilling.billing as billing class CalculateDurationTest(unittest.TestCase): """Test Cases for Glue Billing Unit Testing""" def test\_positive\_duration(self): """Test successfully calculate duration between 2 unix timestamp string""" duration = billing.calculate\_duration( "1535824800", "1535835600") self.assertEqual(duration, 10800) def test\_negative\_duration(self): """Test successfully generate negative number""" duration = billing.calculate\_duration( "1535835600", "1535824800") self.assertEqual(duration, -10800) In writing a unit testing, we import the unittest module, and the module of which we want to test (gluebilling.billing) module. Next we define the class to host the test cases (CalculateDurationTest) extending the base class (unittest.TestCase) Afterward within the CalculateDurationTest, we define the list of test cases in a object method format prefix with \`test\_\`. We put in the input arguments, execute the function we want to test, and finally check the result using the assertEqual function coming from unittest.TestCase. if we run the unittest now it should be failing $ coverage run --source=gluebilling -m unittest discover -p "\*utest\*.py" -f -s gluebilling/  coverage with unittest result Now after having the red state, let us start implementing the function """Process records of ETL Job usage into the billing of fee""" from datetime import datetime as dt def calculate\_duration(from\_timestamp, to\_timestamp): """Returns duration in second between two time provided in unix timestamp""" from\_dt = dt.fromtimestamp(float(from\_timestamp)) to\_dt = dt.fromtimestamp(float(to\_timestamp)) time\_delta = to\_dt - from\_dt return time\_delta.total\_seconds() Now we shall have all test cases in green state  calculate\_duration implementation completed ! Noticed that the screenshot above is a development environment setup with automated run of testing and coverage calculation, based on [https://hackernoon.com/setting-up-python-dev-in-vs-code-e84f01c1f64b](https://hackernoon.com/setting-up-python-dev-in-vs-code-e84f01c1f64b) #### 002 — Calculate Fee (+ unit test) Following up the same technique above we are adding the calculate fee function as below : in billing.py """Process records of ETL Job usage into the billing of fee""" from datetime import datetime as dt from decimal import Decimal ... def calculate\_fee(duration\_in\_second, dpu\_num, minimum\_duration, fee\_dpu\_hour): """Returns a decimal of the fee incurred in USD, quantized into 8 digit behind comma""" charged\_duration = duration\_in\_second if charged\_duration < minimum\_duration: charged\_duration = minimum\_duration fee = charged\_duration \* dpu\_num \* Decimal(fee\_dpu\_hour) / 3600 return fee.quantize(Decimal('0.00000000')) in billing\_utest.py we shall add another class to host test cases for calculate\_fee. """Unit Testing for Glue Billing Project""" import unittest from decimal import Decimal import gluebilling.billing as billing ... class CalculateFeeTest(unittest.TestCase): """Unit Test for calculate\_fee function""" def test\_more\_than\_10\_min(self): """When usage is more than 10 min""" fee = billing.calculate\_fee(800, 3, 600, "0.44") self.assertEqual(fee, Decimal("0.29333333")) > Noticed that the dpu\_hour value is set as **string “0.44”** instead of float, as this is the more accurate result of calculation if we further convert it into Decimal type. The reason why I separate the test cases for the 2 functions into different classes because the [pylint C0103](http://pylint-messages.wikidot.com/messages:c0103) snake case requires the length of function capped into 30 characters, so to maintain readability we divide it into different classes for each function to test.  The coverage test result after implementing calculate\_fee, notice that the coverage can be done better, it is left for the reader to exercise (see the red colour bar) #### 003 — PySpark Billing Calculation (+ functional test) This article is using similar basic concept from [tutorial from David Illes](https://blog.cambridgespark.com/unit-testing-with-pyspark-fb31671b1ad8), the differences would be in the details where we focus our setup to be completely standalone (this shall be reflected in how we initialised the Spark Session, and how we prepare the test data) Here is the version that we are going to use, we store it as pyspark\_htest.py """PySparkTest is base class to do functional testing on PySpark""" import unittest import logging import os from pyspark.sql import SparkSession from pandas.testing import assert\_frame\_equal class PySparkTest(unittest.TestCase): """BaseClass which setup local PySpark""" @classmethod def suppress\_py4j\_logging(cls): """Supress the logging level into WARN and above""" logger = logging.getLogger('py4j') logger.setLevel(logging.WARN) @classmethod def create\_testing\_pyspark\_session(cls): """Returns SparkSession connecting to local context the extrajava session is to generate the metastore\_db and derby.log into .tmp/ directory""" tmp\_dir = os.path.abspath(".tmp/") return (SparkSession.builder .master('local\[1\]') .appName('local-testing-pyspark-context') .config("spark.driver.extraJavaOptions", "-Dderby.system.home="+tmp\_dir) .config("spark.sql.warehouse.dir", tmp\_dir) .getOrCreate()) @classmethod def setUpClass(cls): """Setup the Spark""" cls.suppress\_py4j\_logging() cls.spark = cls.create\_testing\_pyspark\_session() @classmethod def tearDownClass(cls): """Clean up the Class""" cls.spark.stop() @classmethod def assert\_dataframe\_equal(cls, actual, expected, keycolumns): """Helper function to compare small dataframe""" exp\_pd = expected.toPandas().sort\_values( by=keycolumns ).reset\_index(drop=True) act\_pd = actual.toPandas().sort\_values( by=keycolumns ).reset\_index(drop=True) return assert\_frame\_equal(act\_pd, exp\_pd) Noticed that this base class providing many useful class methods with the goal of : * suppressing any logging into WARN only (PySpark using py4j for logging) * building Spark Session in localhost with 1 core, and setting up the temporary metastore\_db to be tidied up stored in .tmp/ directory, alongside with the derby.log (Just so that our workspace is tidy and clean) — create\_testing\_pyspark\_session * The setUpClass and tearDownClass is automatically called once. * assert\_dataframe\_equal — receiving PySpark Dataframe, and then converting them all into Pandas, sorting it by the keys (because PySpark results does not maintain the order) then we use Pandas testing to compare the two dataframe. Let’s create the skeleton of the function (billing.py) alongside with the expected Schema definition of our input records and targetted output, the billing records. def get\_usage\_record\_schema(): """Retruns StructType containing the Input Usage Data Expected Schema""" return StructType(\[ StructField("job\_id", StringType(), False), StructField("type", StringType(), False), StructField("dpu", IntegerType(), False), StructField("from\_unix\_timestamp", StringType(), False), StructField("to\_unix\_timestamp", StringType(), False) \]) def get\_pricing\_schema(): """Retruns StructType containing the Input Pricing Data Expected Schema""" return StructType(\[ StructField("type", StringType(), False), StructField("dpu\_hour", StringType(), False), StructField("minimum\_duration", IntegerType(), False) \]) def get\_billing\_schema(): """Retruns StructType containing the Billing Schema""" return StructType(\[ StructField("job\_id", StringType(), False), StructField("type", StringType(), False), StructField("dpu", IntegerType(), False), StructField("from\_unix\_timestamp", StringType(), False), StructField("to\_unix\_timestamp", StringType(), False), StructField("dpu\_hour", StringType(), False), StructField("minimum\_duration", IntegerType(), False), StructField("duration", IntegerType(), False), StructField("fee", DecimalType(20, 8), False), \]) def generate\_billing(usage\_df, pricing\_df): """Returns DataFrame of Fee from a DataFrame of Usage Records""" return None Next we go into the billing\_ftest.py to prepare for our functional testing. """Functional Testing for Glue Billing Project""" from decimal import Decimal from pyspark.sql import Row, SQLContext import gluebilling.billing as billing import gluebilling.pyspark\_htest as pysparktest class GenerateBillingTest(pysparktest.PySparkTest): """Test Cases for Generate Billing""" def generate\_usage\_data\_001(self): """Generate usage data for testing it is a record of AWS Glue ETL usage""" rdd = self.spark.sparkContext.parallelize(\[ Row("JOB001", "etl", 3, "1535824800", "1535835600"), Row("JOB002", "crawler", 3, "1535824800", "1535824850"), Row("JOB003", "crawler", 3, "1535824800", "1535835600") \]) schema = billing.get\_usage\_record\_schema() sqlctx = SQLContext(self.spark.sparkContext) return sqlctx.createDataFrame(rdd, schema) def generate\_pricing\_data\_001(self): """Generate pricing data for testing it is a record of AWS Glue ETL usage""" rdd = self.spark.sparkContext.parallelize(\[ Row("etl", "0.44", 600), Row("crawler", "0.20", 200) \]) schema = billing.get\_pricing\_schema() sqlctx = SQLContext(self.spark.sparkContext) return sqlctx.createDataFrame(rdd, schema) def generate\_expected\_billing\_001(self): """Generate expected billing""" rdd = self.spark.sparkContext.parallelize(\[ Row("JOB001", "etl", 3, "1535824800", "1535835600", "0.44", 600, 10800, Decimal("3.96")), Row("JOB002", "crawler", 3, "1535824800", "1535824850", "0.20", 200, 50, Decimal("0.03333333")), Row("JOB003", "crawler", 3, "1535824800", "1535835600", "0.20", 200, 10800, Decimal("1.80")) \]) schema = billing.get\_billing\_schema() sqlctx = SQLContext(self.spark.sparkContext) return sqlctx.createDataFrame(rdd, schema) def test\_with\_set\_001(self): """Using all 001 test data set""" usage\_df = self.generate\_usage\_data\_001() pricing\_df = self.generate\_pricing\_data\_001() expected = self.generate\_expected\_billing\_001() actual = billing.generate\_billing(usage\_df, pricing\_df) self.assert\_dataframe\_equal(actual, expected, \["job\_id"\]) We prepare our input data in two function: 1\. generate\_usage\_data\_001 2\. generate\_pricing\_data\_001 The technique being used here is by creating rdd using Row and paralellize method of SparkContext, and then combining with the defined Schema from the main script. We also prepare the expected output data inside billing\_ftest.py, in function generate\_expected\_billing\_001, with similar technique as we prepare the input data. The last component of billing\_ftest.py is test\_with\_set\_001, which is where the test being executed by combining the generation functions of input, and expected dataframe, and then we execute the main script function generate\_billing, finally we do asssertion, by leveraging the helper assert method we define in pyspark\_htest.py. Finally let’s complete the function implementation in billing.py. def generate\_billing(usage\_df, pricing\_df): """Returns DataFrame of Fee from a DataFrame of Usage Records""" duration\_udf = udf(calculate\_duration, IntegerType()) join\_data\_df = usage\_df.join( pricing\_df, usage\_df.type == pricing\_df.type ).select( usage\_df.job\_id, usage\_df.type, usage\_df.dpu, usage\_df.from\_unix\_timestamp, usage\_df.to\_unix\_timestamp, pricing\_df.dpu\_hour, pricing\_df.minimum\_duration, duration\_udf( usage\_df.from\_unix\_timestamp, usage\_df.to\_unix\_timestamp).alias("duration") ) fee\_udf = udf(calculate\_fee, DecimalType(20, 8)) billing\_df = join\_data\_df.select( "job\_id", "type", "dpu", "from\_unix\_timestamp", "to\_unix\_timestamp", "dpu\_hour", "minimum\_duration", "duration", fee\_udf( join\_data\_df.duration, join\_data\_df.dpu, join\_data\_df.minimum\_duration, join\_data\_df.dpu\_hour ).alias("fee") ) return billing\_df We are wrapping our calculate\_fee and calculate\_duration function into udf, as this is the the type that can be passed into pyspark. The 1st argument is the function to be wrapped, while the 2nd argument is the expected return type. Then we use this in the SELECT part of the PySpark SQL DataFrame to generate the 2 new columns duration and fee accordingly. ### Conclusion That’s it now we have implemented the functional test, I differentiate this with the calculate\_fee and calculate\_duration because of the speed on how this test being executed, it requires several seconds to start up the pyspark hence its worthed to be group differently. This open up options for us to filter out what kind of test do we want to run on save (in case we are automating the development experience)