AWS Lambda plus Layers is one of the best solutions for managing a data pipeline and for implementing a serverless architecture. This post shows how to build a simple data pipeline using AWS Lambda Functions, S3 and DynamoDB.
Every day an external datasource exports data to S3 and imports to AWS DynamoDB table.
On a daily basis, an external data source exports data of the pervious day in csv format to an S3 bucket. S3 event triggers an AWS Lambda Functions that do ETL process and save the data to DynamoDB.
Before getting started, Install the Serverless Framework. Open up a terminal and type npm install -g serverless to install Serverless framework.
Create a new service using the AWS Python template, specifying a unique name and an optional path.
$ serverless create --template aws-python --path data-pipline
Then can run the following command project root directory to install serverless-python-requirements Plugin,
$ serverless plugin install -n serverless-python-requirements
Edit the serverless.yml file to look like the following:
plugins:
- serverless-python-requirements
custom:
pythonRequirements:
dockerizePip: non-linux
layer: true #Put dependencies into a Lambda Layer.
You need to have Docker installed to be able to set dockerizePip: true or dockerizePip: non-linux.
This will create a dev.document.files bucket which fires the importCSVToDB function when an csv file is added inside the bucket.
functions:
importCSVToDB:
handler: handler.importCSVToDB
layers:
- {Ref: PythonRequirementsLambdaLayer}
environment:
documentsTable: ${self:custom.documentsTableName}
bucketName: ${self:custom.s3bucketName}
events:
- s3:
bucket: ${self:custom.s3bucketName}
event: s3:ObjectCreated:Put
rules:
- suffix: .csv
Full sample serverless.yml as below:
<a href="https://medium.com/media/1302415b19cd3cb5e75d15241f3a995c/href">https://medium.com/media/1302415b19cd3cb5e75d15241f3a995c/href</a>
Now, let’s update our handler.py to create the pandas dataframe from the source csv in S3 bucket, convert dataframe to list of dictionaries and load the dict object to DynamoDB table using update_item method:
<a href="https://medium.com/media/039571ee7f34f2fc3027fdaebb7e7680/href">https://medium.com/media/039571ee7f34f2fc3027fdaebb7e7680/href</a>
As you can see from above lambda function, We use Pandas to read csv file, Pandas is the most popular data manipulation package in Python, and DataFrames are the Pandas data type for storing tabular 2D data.
Let’s deploy the service and test it out!
$ sls deploy --stage dev
To test the data import, We can manually upload an csv file to s3 bucket or using AWS cli to copy a local file to s3 bucket:
$ aws s3 cp sample.csv s3://dev.document.files
And there it is. You will get data imported into the DynamoDB DocumentsTable table.
You can find complete project in my GitHub repo:
Alternatively, You can use AWS Data Pipeline to import csv file into dynamoDB table
AWS Data Pipeline is a web service that you can use to automate the movement and transformation of data. With AWS Data Pipeline, you can define data-driven workflows, so that tasks can be dependent on the successful completion of previous tasks. You define the parameters of your data transformations and AWS Data Pipeline enforces the logic that you’ve set up.
What is AWS Data Pipeline? - AWS Data Pipeline