This article describes one of the many ways to import data into AWS DynamoDB database. The option explained here uses Amazon EMR and Hive. Using Amazon EMR and Hive you can quickly and efficiently process large amounts of data, such as importing data from Amazon S3 into a DynamoDB table.
The following diagram shows the architecture of the process.
Before getting started, Install the Serverless Framework. Open up a terminal and type npm install -g serverless.
There is a yml file (serverless.yml) in the project directory. Let’s start to define a set of objects in template file as below:
There are 2 S3 buckets, LogBucket is for EMR logs, S3BucketCsvimport is to store csv files.
Resources:
LogBucket:
Type: AWS::S3::Bucket
Properties:
AccessControl: Private
S3BucketCsvimport:
Type: AWS::S3::Bucket
Properties:
AccessControl: Private
BucketEncryption:
ServerSideEncryptionConfiguration:
- ServerSideEncryptionByDefault:
SSEAlgorithm: AES256
BucketName: ${self:custom.csvImportBucketName}
A DynamoDB table to load csv data from S3.
Resources:
ContactsTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: ${self:custom.contactsTable}
SSESpecification:
SSEEnabled: true
AttributeDefinitions:
- AttributeName: id
AttributeType: S
KeySchema:
- AttributeName: id
KeyType: HASH
ProvisionedThroughput:
ReadCapacityUnits: ${self:custom.tableThroughputs.${self:provider.stage}}
WriteCapacityUnits: ${self:custom.tableThroughputs.${self:provider.stage}}
StreamSpecification:
StreamViewType: NEW_AND_OLD_IMAGES
Adding lambda function configuration to serverless.yml. it will be triggered by S3 new objected created event, the lambda function will then start a EMR job flow to process data importing.
startEMRJob:
handler: src/handler.start_emr_job
environment:
CONTACTS_TABLE: ${self:custom.contactsTable}
SUBNET_ID: ${self:custom.vpc.subsetId}
EMR_LOGS_BUCKET:
Ref: LogBucket
CSV_IMPORT_BUCKET: ${self:custom.csvImportBucketName}
events:
- s3:
bucket: ${self:custom.csvImportBucketName}
event: s3:ObjectCreated:*
rules:
- prefix: uploads/
- suffix: .csv
existing: true
We also need to create IAM role for the lambda function, so our lambda function has permission to start EMR job flow.
provider:
iamRoleStatements:
- Effect: "Allow"
Action:
- "iam:PassRole"
Resource:
- arn:aws:iam::#{AWS::AccountId}:role/EMR_DefaultRole
- arn:aws:iam::#{AWS::AccountId}:role/EMR_EC2_DefaultRole
- Effect: "Allow"
Action:
- "elasticmapreduce:RunJobFlow"
Resource: "*"
- Effect: "Allow"
Action:
- "s3:PutObject"
Resource:
- "Fn::Join":
- ""
- - "arn:aws:s3:::"
- ${self:custom.csvImportBucketName}
- "/*"
- Effect: "Allow"
Action:
- "dynamodb:*"
Resource:
- "Fn::GetAtt": [ContactsTable, Arn]
- "Fn::Join":
- "/"
- - { "Fn::GetAtt": [ContactsTable, Arn] }
- "index/*"
Let’s add a lambda function to create an AWS EMR cluster and adding the step details such as the location of the hive scripts, arguments etc. We can use the boto3 lib for EMR, in order to create a cluster and submit the job dynamically from lambda function.
import boto3
import logging
import os
from datetime import datetime
from pathlib import Path
emr = boto3.client('emr')
s3 = boto3.resource('s3')
dynamodb = boto3.resource('dynamodb')
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
def start_emr_job(event, context):
try:
put_dump_record_to_db()
put_step_scripts_to_s3()
cluster_id = emr.run_job_flow(
Name='test_emr_job',
LogUri="s3://{}".format(os.environ['EMR_LOGS_BUCKET']),
ReleaseLabel='emr-5.18.0',
Applications=[
{
'Name': 'Hadoop'
},
{
'Name': 'Livy'
},
{
'Name': 'Pig'
},
{
'Name': 'Hue'
},
{
'Name': 'Hue'
},
{
'Name': 'Hive'
},
],
Instances={
'InstanceGroups': [
{
'Name': "Master nodes",
'Market': 'ON_DEMAND',
'InstanceRole': 'MASTER',
'InstanceType': 'm1.medium',
'InstanceCount': 1,
},
{
'Name': "Slave nodes",
'Market': 'ON_DEMAND',
'InstanceRole': 'CORE',
'InstanceType': 'm1.medium',
'InstanceCount': 2,
}
],
'KeepJobFlowAliveWhenNoSteps': False,
'TerminationProtected': False,
'Ec2SubnetId': os.environ['SUBNET_ID'],
},
Configurations=[
{
'Classification': 'hive-site',
'Properties': {
'hive.execution.engine': 'mr'
}
},
],
Steps=[
{
'Name': 'creating dynamodb table',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': ['hive-script',
'--run-hive-script',
'--args',
'-f',
's3://{}/scripts/step1.q'.format(
os.environ['CSV_IMPORT_BUCKET']),
'-d',
'DYNAMODBTABLE={}'.format(
os.environ["CONTACTS_TABLE"])]
}
},
{
'Name': 'creating csv table',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': ['hive-script',
'--run-hive-script',
'--args',
'-f',
's3://{}/scripts/step2.q'.format(
os.environ['CSV_IMPORT_BUCKET']),
'-d',
'INPUT=s3://{}'.format(
os.environ['CSV_IMPORT_BUCKET']),
'-d',
'TODAY={}'.format(
datetime.today().strftime('%Y-%m-%d'))]
}
},
{
'Name': 'adding partition',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': ['hive-script',
'--run-hive-script',
'--args',
'-f',
's3://{}/scripts/step3.q'.format(
os.environ['CSV_IMPORT_BUCKET']),
'-d',
'INPUT=s3://{}'.format(
os.environ['CSV_IMPORT_BUCKET']),
'-d',
'TODAY={}'.format(
datetime.today().strftime('%Y-%m-%d'))]
}
},
{
'Name': 'import date to dynamodb',
'ActionOnFailure': 'TERMINATE_CLUSTER',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': ['hive-script',
'--run-hive-script',
'--args',
'-f',
's3://{}/scripts/step4.q'.format(
os.environ['CSV_IMPORT_BUCKET']),
'-d',
'TODAY={}'.format(
datetime.today().strftime('%Y-%m-%d'))]
}
}
],
VisibleToAllUsers=True,
JobFlowRole='EMR_EC2_DefaultRole',
ServiceRole='EMR_DefaultRole',
)
logger.info('cluster {} created with the step...'.format(
cluster_id['JobFlowId']))
except Exception as e:
logger.error(e)
raise
def put_dump_record_to_db():
table = dynamodb.Table(os.environ["CONTACTS_TABLE"])
if table.item_count == 0:
table.put_item(
Item={'id': 'NA',
'full_name': 'demo user',
'gender': 'M',
'address': 'NA',
'language': ["English"]})
def put_step_scripts_to_s3():
root_path = Path(__file__).parent.parent
scripts = ["scripts/step1.q",
"scripts/step2.q",
"scripts/step3.q",
"scripts/step4.q"]
for script in scripts:
s3.Bucket(os.environ['CSV_IMPORT_BUCKET']).upload_file(
'{}/{}'.format(root_path, script), script)
And finally…let’s add Hive script.
We will create an external Hive table that maps to the csv data file.
-- Create external table for dynamodb table.
CREATE EXTERNAL TABLE IF NOT EXISTS
dynamodb_contacts (
id string,full_name string, email string,
gender string,address string,language array<string>)
STORED BY
'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
TBLPROPERTIES
("dynamodb.table.name" = "${DYNAMODBTABLE}",
"dynamodb.column.mapping" = "id:id,full_name:full_name,email:email,gender:gender,address:address,language:language");
Establish a mapping between Hive and the Features table in DynamoDB.
-- Create table using csv in s3
CREATE EXTERNAL TABLE IF NOT EXISTS
csv_contacts (id string,first_name string,
last_name string, email string, gender string,
address string, language array<string> )
PARTITIONED BY (created_date string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
COLLECTION ITEMS TERMINATED BY ','
LOCATION '${INPUT}/uploads/'
TBLPROPERTIES (
'serialization.null.format' = '',
'skip.header.line.count' = '1');
Add current date as partition and load table with csv data in S3.
-- add current date as partition
ALTER TABLE csv_contacts
add IF NOT EXISTS PARTITION (created_date='${TODAY}')
LOCATION '${INPUT}/uploads/created_date=${TODAY}/';
Using following Hive script to write data from Amazon S3 to DynamoDB.
-- Import csv data to DynamoDB table
INSERT OVERWRITE TABLE dynamodb_contacts
SELECT DISTINCT id, CONCAT_WS(' ',first_name,last_name),email,
gender, address, language FROM csv_contacts
WHERE
created_date >= '${TODAY}'
Note that Amazon EMR operations on a DynamoDB table count as read/write operations, and are subject to the table’s provisioned throughput settings. for more details please visit EMR document.
$sls deploy --stage dev
After script deployed, copy a csv file to S3 bucket with
created_date={CURRENT_DATE}
prefix, eg.$aws s3 cp csv/contacts.csv s3://myemr.csv.import.dev/uploads/created_date=2020-02-03/contacts.csv
Then we can go to AWS EMR console and check the progress of the EMR steps.
It will take several minutes to complete all the steps and cluster will terminate automatically after running above steps.
Next We can go to AWS DynamoDB console to verify that the data has been loaded into DynamoDB:
Eventually Importing process took about 6 mins to load 1000 records total 76kb into DynamoDB table with write capacity units 10 and no auto scaling enabled.
That’s about it, Thanks for reading!
I hope you have found this article useful, You can find the complete project in my GitHub repo.