Hackernoon logoAutomated Data Replication From AWS S3 To Microsoft Azure Storage Made Easy by@yi

Automated Data Replication From AWS S3 To Microsoft Azure Storage Made Easy

Yi Ai Hacker Noon profile picture

@yiYi Ai

It may be a requirement of your business to move a good amount of data periodically from one public cloud to another. More specifically, you may face mandates requiring a multi-cloud solution. This article covers one approach to automate data replication from AWS S3 Bucket to Microsoft Azure Blob Storage container using Amazon S3 Inventory, Amazon S3 Batch Operations, Fargate, and AzCopy.

Scenario

Your company produces new CSV files on-premises every day with a total size of around 100GB after compression. All files have a size of 1–2 GB and need to be uploaded to Amazon S3 every night in a fixed time window between 3 am, and 5 am. Your business has decided to copy those CSV files from S3 to Microsoft Azure Storage after all files uploaded to S3. You have to find an easy and fast way to automate the data replication workflow.

To accomplish this task, we can build a data pipeline to copy data periodically from S3 to Azure Storage using AWS Data Wrangler, Amazon S3 Inventory, Amazon S3 Batch Operations, Athena, Fargate, and AzCopy.

The diagram below represents the high-level architecture of the pipeline solution:

What we’ll cover:

  • Create a VPC with private and public subnets, S3 endpoints, and NAT gateway.
  • Create an Azure Storage account and blob container, generate a SAS token, then add a firewall rule to allow traffic from AWS VPC to Azure Storage.
  • Configure daily S3 Inventory Reports on the S3 bucket.
  • Use Athena to filter only the new objects from S3 inventory reports and export those objects’ bucket names & object keys to a CSV manifest file.
  • Use exported CSV manifest file to create an S3 Batch Operations PUT copy Job that copies objects to a destination S3 bucket with lifecycle policy expiration rule configured.
  • Setup an Eventbridge rule, invoke lambda function to run Fargate task that copies all objects with the same prefix in destination bucket to Azure Storage container.

Prerequisites

  • Setup an AWS account
  • Setup an Azure account
  • Install the latest AWS-CLI
  • Install AWS CDK-CLI
  • Basic understanding of AWS CDK
  • Basic understanding of Docker

Let’s begin!

Creating Source and Destination S3 buckets

We use CDK to build our infrastructure on AWS. First, let’s create a source Bucket to receive files from external providers or on-premise and set up daily inventory reports that provide a flat-file list of your objects and metadata.

Next, create a destination bucket as temporary storage with lifecycle policy expiration rule configured on prefix /tmp_transition. All files with the prefix (eg. 

/tmp_transition/file1.csv
) will copy to Azure and will be removed by lifecycle policy after 24hours.

Use the following code to create S3 buckets.

from aws_cdk import (
    aws_s3 as s3,
    core,
)

s3_destination = s3.Bucket(self, "dataBucketInventory",
                           lifecycle_rules=[
                               {
                                 'expiration': core.Duration.days(1.0),
                                   'prefix': 'tmp_transition'
                               },
                           ])

s3_source = s3.Bucket(self, "demoDataBucket",
                      bucket_name=self.s3_source_bucket_name,
                      encryption=s3.BucketEncryption.S3_MANAGED,
                      inventories=[
                          {
                              "frequency": s3.InventoryFrequency.DAILY,
                              "include_object_versions": s3.InventoryObjectVersion.CURRENT,
                              "destination": {
                                  "bucket": s3_destination
                              }
                          }
                      ])

Creating AWS VPC

Next, we need to create VPC with both public and private subnets, NAT Gateway, an S3 endpoint, and attach an endpoint policy that allows access to the Fargate container to which S3 bucket we are copying data to Azure.

Now define your VPC and related resources using the following code.

from aws_cdk import (
    aws_ec2 as ec2,
    core,
)
vpc = ec2.Vpc(self, "demoVPC",
              max_azs=2,
              cidr="10.0.0.0/16",
              nat_gateways=1,
              subnet_configuration=[{
                  "cidrMask": 24,
                  "name": 'private',
                  "subnetType": ec2.SubnetType.PRIVATE
              },
                  {
                  "cidrMask": 24,
                  "name": 'public',
                  "subnetType": ec2.SubnetType.PUBLIC
              }]
              )
subnets = vpc.select_subnets(
    subnet_type=ec2.SubnetType.PRIVATE).subnets
endpoint = vpc.add_gateway_endpoint('s3Endpoint',
                                    service=ec2.GatewayVpcEndpointAwsService.S3,
                                    subnets=[{
                                        "subnet_id": subnets[0].subnet_id
                                    },
                                        {
                                        "subnet_id": subnets[1].subnet_id
                                    }])

endpoint.add_to_policy(iam.PolicyStatement(
    effect=iam.Effect.ALLOW,
    resources=[
        bucket_arn,
        f"{bucket_arn}/*"
    ],
    principals=[iam.ArnPrincipal("*")],
    actions=[
        "s3:GetObject",
        "s3:GetObjects",
        "s3:ListObjects",
        "S3:ListBucket"
    ],
))

# Provides access to the Amazon S3 bucket containing the layers for each Docker image of ECR.
endpoint.add_to_policy(iam.PolicyStatement(
    effect=iam.Effect.ALLOW,
    resources=[
        f"arn:aws:s3:::prod-{self.region}-starport-layer-bucket/*"
    ],
    principals=[iam.ArnPrincipal("*")],
    actions=[
        "s3:GetObject"
    ],
))

While creating NAT Gateway, an Elastic IP Address will create in AWS. We will need the IP address to set up the Azure Storage Firewall rule in step3.

Deploying Azure Storage Account

To simplify managing resources, we can use the Azure Resource Manager template (ARM template) to deploy resources at our Azure subscription level.

I will assume you already have an Azure Subscription setup. We will use Cloud shell to deploy a Resource Group, Azure Storage account, a container, and Firewall rule to allow traffic from a specific IP address.

Click on the Cloud Shell icon in the Azure Portal's header bar, and it will open the Cloud Shell.

Run the following command to deploy:

az group create --name examplegroup --location australiaeast

az deployment group create --resource-group examplegroup --template-uri https://raw.githubusercontent.com/yai333/DataPipelineS32Blob/master/Azure-Template-DemoRG/template.json  --parameters storageAccounts_mydemostroageaccount_name=mydemostorageaccountaiyi --debug

Once the template has been deployed, we can verify the deployment by exploring the Azure portal's resource group. All resources deployed will be displayed in the Overview section of the Resource group.

Let’s create a Firewall rule for our Storage Account:

  1. Firstly, go to the storage account we just deployed.
  2. Secondly, click on the settings menu called Firewalls and virtual networks.
  3. Thirdly, check that you’ve selected to allow access from Selected networks.
  4. Then, for granting access to an internet IP range, enter AWS VPC’s public IP address (step 2) and Save.

We will then generate Shared Access Signatures (SAS) to grant limited access to Azure Storage resources.

Run below command in Cloudshell:

RG_NAME='examplegroup'
ACCOUNT_NAME='mydemostorageaccountaiyi' 
ACCOUNT_KEY=`az storage account keys list --account-name=$ACCOUNT_NAME --query [0].value -o tsv`
BLOB_CONTAINER=democontainer

STORAGE_CONN_STRING=`az storage account show-connection-string --name $ACCOUNT_NAME --resource-group $RG_NAME --output tsv`

SAS=`az storage container generate-sas --connection-string $STORAGE_CONN_STRING -n $BLOB_CONTAINER --expiry '2021-06-30' --permissions aclrw --output tsv`

echo $SAS

We will get the required SAS and URLs that grant

 (a)dd (d)elete (r)ead (w)rite
 access to a blob container 
democontainer
.

se=2021-06-30&sp=racwl&sv=2018-11-09&sr=c&sig=xxxxbBfqfEppPpBZPOTRiwvkh69xxxx/xxxxQA0YtKo%3D

Let’s move back to AWS and put SAS to AWS SSM Parameter Store.

Run following command in local terminator.

aws ssm put-parameter --cli-input-json '{
  "Name": "/s3toblob/azure/storage/sas",
  "Value": "se=2021-06-30&sp=racwl&sv=2018-11-09&sr=c&sig=xxxxbBfqfEppPpBZPOTRiwvkh69xxxx/xxxxQA0YtKo%3D",
  "Type": "SecureString"
}'

Defining Lambda functions and AWS Data Wrangler layer

Now, let’s move up to lambda functions. We will create three lambda functions and one lambda layer:

  1. fn_create_s3batch_manifest and DataWranglerLayer
  2. fn_create_batch_job
  3. fn_process_transfer_task

fn_create_s3batch_manifest and AWS Data Wrangler layer

This lambda function uses AWS Data Wrangler’s Athena module to filter new files in the past UTC date and save files list to a CSV manifest file.

Copy the following code to CDK stack.py. download 

awswranger-layer
zip file from here.

datawrangler_layer = lambda_.LayerVersion(self, "DataWranglerLayer",
                                                 code=lambda_.Code.from_asset(
                                                     "./layers/awswrangler-layer-1.9.6-py3.6.zip"),
                                                 compatible_runtimes=[
                                                     lambda_.Runtime.PYTHON_3_6]
                                                 )

fn_create_s3batch_manifest = lambda_.Function(self, "CreateS3BatchManifest",
                                              runtime=lambda_.Runtime.PYTHON_3_6,
                                              handler="lambda_create_s3batch_manifest.handler",
                                              timeout=core.Duration.minutes(
                                                  15),
                                              code=lambda_.Code.from_asset(
                                                  "./src"),
                                              layers=[
                                                  datawrangler_layer]
                                              )

fn_create_s3batch_manifest.add_environment(
    "DESTINATION_BUCKET_NAME", s3_destination_bucket_name)
fn_create_s3batch_manifest.add_environment(
    "SOURCE_BUCKET_NAME", self.s3_source_bucket_name)

fn_create_s3batch_manifest.add_to_role_policy(iam.PolicyStatement(
    effect=iam.Effect.ALLOW,
    resources=[
        "*"
    ],
    actions=[
        "glue:GetTable",
        "glue:CreateTable",
        "athena:StartQueryExecution",
        "athena:CancelQueryExecution",
        "athena:StopQueryExecution",
        "athena:GetQueryExecution",
        "athena:GetQueryResults"
    ],
))

fn_create_s3batch_manifest.add_to_role_policy(iam.PolicyStatement(
    effect=iam.Effect.ALLOW,
    resources=[
        f"arn:aws:glue:{self.region}:{self.account}:catalog",
        f"arn:aws:glue:{self.region}:{self.account}:database/*",
        f"arn:aws:glue:{self.region}:{self.account}:table/*"
    ],
    actions=[
        "glue:GetDatabases",
        "glue:GetDatabase",
        "glue:BatchCreatePartition",
        "glue:GetPartitions",
        "glue:CreateDatabase",
        "glue:GetPartition"
    ],
))

s3_destination.add_event_notification(s3.EventType.OBJECT_CREATED,
                                              s3n.LambdaDestination(
                                                  fn_create_s3batch_manifest),
                                              {"prefix": f'{self.s3_source_bucket_name}/demoDataBucketInventory0/', "suffix": '.json'})

then create 

./src/lambda_create_s3batch_manifest.py
 with the following code:

import json
import logging
import os
from datetime import datetime, timedelta
import awswrangler as wr

logger = logging.getLogger()
logger.setLevel(logging.DEBUG)

DATABASE_NAME = "s3datademo"
TABLE_NAME = "dailyobjects"


def handler(event, context):
    logger.info("Received event: " + json.dumps(event, indent=2))

    if DATABASE_NAME not in wr.catalog.databases().values:
        wr.catalog.create_database(DATABASE_NAME)

    event_date = datetime.strptime(
        event["Records"][0]["eventTime"], "%Y-%m-%dT%H:%M:%S.%fZ")

    partition_dt = f'{(event_date - timedelta(days=1)).strftime("%Y-%m-%d")}-00-00'
    previous_partition_dt = f'{(event_date - timedelta(days=2)).strftime("%Y-%m-%d")}-00-00'

    logger.debug(f"partition_dt: {partition_dt}")

    if not wr.catalog.does_table_exist(database=DATABASE_NAME, table=TABLE_NAME):
        table_query_exec_id = wr.athena.start_query_execution(s3_output=f"s3://{os.getenv('DESTINATION_BUCKET_NAME')}/athena_output",
                                                              sql=f"CREATE EXTERNAL TABLE {TABLE_NAME}( \
                                                                    `bucket` string, \
                                                                    key string, \
                                                                    version_id string, \
                                                                    is_latest boolean, \
                                                                    is_delete_marker boolean, \
                                                                    size bigint, \
                                                                    last_modified_date timestamp, \
                                                                    e_tag string \
                                                                    ) \
                                                                    PARTITIONED BY(dt string) \
                                                                    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' \
                                                                    STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat' \
                                                                    OUTPUTFORMAT  'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat' \
                                                                    LOCATION 's3://{os.getenv('DESTINATION_BUCKET_NAME')}/{os.getenv('SOURCE_BUCKET_NAME')}/demoDataBucketInventory0/hive/';",
                                                              database=DATABASE_NAME)

        wr.athena.wait_query(query_execution_id=table_query_exec_id)

    partition_query_exec_id = wr.athena.start_query_execution(
        sql=f"ALTER TABLE {TABLE_NAME} ADD IF NOT EXISTS PARTITION (dt=\'{partition_dt}\');",
        s3_output=f"s3://{os.getenv('DESTINATION_BUCKET_NAME')}/athena_output",
        database=DATABASE_NAME)

    wr.athena.wait_query(query_execution_id=partition_query_exec_id)

    select_query_exec_id = wr.athena.start_query_execution(sql='SELECT DISTINCT bucket as "' +
                                                           os.getenv('SOURCE_BUCKET_NAME') +
                                                           '" , key as "dump.txt" FROM ' +
                                                           TABLE_NAME +
                                                           " where dt = '" +
                                                           partition_dt +
                                                           "' and is_delete_marker = false" +
                                                           " except " +
                                                           'SELECT DISTINCT bucket as "' +
                                                           os.getenv('SOURCE_BUCKET_NAME') +
                                                           '" , key as "dump.txt" FROM ' +
                                                           TABLE_NAME +
                                                           " where dt = '" +
                                                           previous_partition_dt +
                                                           "' and is_delete_marker = false ;",
                                                           database=DATABASE_NAME,
                                                           s3_output=f"s3://{os.getenv('DESTINATION_BUCKET_NAME')}/csv_manifest/dt={partition_dt}")
    return select_query_exec_id

In the above coding, we use Athena query to create Glue Database, Table and add a partition to that table every day. Then lambda executes except query to return the difference between the two date partitions.

Note that 

start_query_execution
 is asynchronous, hence no need to wait for the result in Lambda. Once the query is executed, the result will save to
s3_output=f"s3://{os.getenv('DESTINATION_BUCKET_NAME')}/csv_manifest/dt={partition_dt}" 
as a CSV file.

fn_create_batch_job and S3 Notification

In this section, we will create a lambda function

fn_create_batch_job
and enable Amazon S3 to send a notification to trigger
fn_create_batch_job
when a CSV file is added to an Amazon S3 Bucket
/csv_manifest
 prefix. Put following code to CDK stack.py:

fn_create_batch_job = lambda_.Function(self, "CreateS3BatchJobFunction",
                                               runtime=lambda_.Runtime.PYTHON_3_6,
                                               handler="lambda_create_batch_job.handler",
                                               timeout=core.Duration.minutes(
                                                     5),
                                               code=lambda_.Code.from_asset("./src"))
fn_create_batch_job.add_environment("ROLE_ARN", s3_batch_role.role_arn)
fn_create_batch_job.add_environment(
    "SOURCE_BUCKET_NAME", self.s3_source_bucket_name)

fn_create_batch_job.add_to_role_policy(iam.PolicyStatement(
    effect=iam.Effect.ALLOW,
    actions=["s3:CreateJob"],
    resources=["*"]

))

fn_create_batch_job.add_to_role_policy(iam.PolicyStatement(
    effect=iam.Effect.ALLOW,
    actions=["iam:PassRole"],
    resources=[s3_batch_role.role_arn]
))

s3_destination.add_event_notification(s3.EventType.OBJECT_CREATED,
                                      s3n.LambdaDestination(
                                          fn_create_batch_job),
                                      {"prefix": f'csv_manifest/', "suffix": '.csv'})

Create 

./src/lambda_create_batch_job.py
 with the following code:

import json
import boto3
import logging
import os
from urllib.parse import unquote

logger = logging.getLogger()
logger.setLevel(logging.DEBUG)

s3_control_client = boto3.client('s3control')
s3_cli = boto3.client('s3')


def handler(event, context):
    logger.info("Received event: " + json.dumps(event, indent=2))

    account_id = boto3.client('sts').get_caller_identity().get('Account')
    bucket_name = event['Records'][0]['s3']['bucket']['name']
    bucket_arn = event['Records'][0]['s3']['bucket']['arn']
    file_key = event['Records'][0]['s3']['object']['key']
    e_tag = event['Records'][0]['s3']['object']['eTag']
    logger.info('Reading {} from {}'.format(file_key, bucket_name))

    response = s3_control_client.create_job(
        AccountId=account_id,
        ConfirmationRequired=False,
        Operation={
            'S3PutObjectCopy': {
                'TargetResource': bucket_arn,
                'StorageClass': 'STANDARD',
                'TargetKeyPrefix': 'tmp_transition'
            },
        },
        Report={
            'Bucket': bucket_arn,
            'Format': 'Report_CSV_20180820',
            'Enabled': True,
            'Prefix': f'report/{os.getenv("SOURCE_BUCKET_NAME")}',
            'ReportScope': 'FailedTasksOnly'
        },
        Manifest={
            'Spec': {
                'Format': 'S3BatchOperations_CSV_20180820',
                "Fields": ["Bucket", "Key"]
            },
            'Location': {
                'ObjectArn': f'{bucket_arn}/{unquote(file_key)}',
                'ETag': e_tag
            }
        },
        Priority=10,
        RoleArn=os.getenv("ROLE_ARN"),
        Tags=[
            {
                'Key': 'engineer',
                'Value': 'yiai'
            },
        ]
    )

    logger.info("S3 barch job response: " + json.dumps(response, indent=2))
    return

Lambda

fn_create_batch_job
function create S3 Batch Operation Job, copy all the files listed in CSV manifest to S3 Destination Bucket
/tmp_transition prefix
 .

S3 Batch Operations is an Amazon S3 data management feature that lets you manage billions of objects at scale. To start S3 Batch Operation Job, we also need to set up an IAM role S3BatchRole with the corresponding policies:

s3_batch_role = iam.Role(self, "S3BatchRole",
                                 assumed_by=iam.ServicePrincipal(
                                     "batchoperations.s3.amazonaws.com")
                                 )

s3_batch_role.add_to_policy(iam.PolicyStatement(
    effect=iam.Effect.ALLOW,
    resources=[
        s3_destination.bucket_arn,
        f"{s3_destination.bucket_arn}/*"
    ],
    actions=[
        "s3:PutObject",
        "s3:PutObjectAcl",
        "s3:PutObjectTagging",
        "s3:PutObjectLegalHold",
        "s3:PutObjectRetention",
        "s3:GetBucketObjectLockConfiguration"
    ],
))

s3_batch_role.add_to_policy(iam.PolicyStatement(
    effect=iam.Effect.ALLOW,
    resources=[
        s3_source.bucket_arn,
        f"{s3_source.bucket_arn}/*"
    ],
    actions=[
        "s3:GetObject",
        "s3:GetObjectAcl",
        "s3:GetObjectTagging"
    ],
))

s3_batch_role.add_to_policy(iam.PolicyStatement(
    effect=iam.Effect.ALLOW,
    resources=[
        f"{s3_destination.bucket_arn}/*"
    ],
    actions=[
        "s3:GetObject",
        "s3:GetObjectVersion",
        "s3:GetBucketLocation"
    ],
))

s3_batch_role.add_to_policy(iam.PolicyStatement(
    effect=iam.Effect.ALLOW,
    resources=[
        f"{s3_destination.bucket_arn}/report/{self.s3_source_bucket_name}/*"
    ],
    actions=[
        "s3:PutObject",
        "s3:GetBucketLocation"
    ],
))

fn_process_transfer_task and Eventbridge Custom rule

We will create an Eventbridge custom rule that tracks an S3 Batch Operations job in Amazon EventBridge through AWS CloudTrail and send events in Completed status to the target notification resource

fn_process_transfer_task
 .

Lambda 

fn_process_transfer_task
 will then start a Fargate Task programmatically to copy files in 
/tmp_transition
 prefix to Azure Storage Container 
democontainer 
.

fn_process_transfer_task = lambda_.Function(self, "ProcessS3TransferFunction",
                                                    runtime=lambda_.Runtime.PYTHON_3_6,
                                                    handler="lambda_process_s3transfer_task.handler",
                                                    timeout=core.Duration.minutes(
                                                        5),
                                                    code=lambda_.Code.from_asset("./src"))
fn_process_transfer_task.add_environment(
    "CLUSTER_NAME", cluster_name)

fn_process_transfer_task.add_environment(
    "PRIVATE_SUBNET1", subnets[0].subnet_id)
fn_process_transfer_task.add_environment(
    "PRIVATE_SUBNET2", subnets[1].subnet_id)
fn_process_transfer_task.add_environment(
    "TASK_DEFINITION", task_definition.task_definition_arn)
fn_process_transfer_task.add_environment(
    "S3_BUCKET_NAME", s3_destination_bucket_name)

fn_process_transfer_task.add_to_role_policy(iam.PolicyStatement(
    effect=iam.Effect.ALLOW,
    resources=[
        task_definition.task_definition_arn
    ],
    actions=[
        "ecs:RunTask"
    ],
))

fn_process_transfer_task.add_to_role_policy(iam.PolicyStatement(
    effect=iam.Effect.ALLOW,
    actions=["iam:PassRole"],
    resources=[task_definition.execution_role.role_arn]
))

fn_process_transfer_task.add_to_role_policy(iam.PolicyStatement(
    effect=iam.Effect.ALLOW,
    actions=["iam:PassRole"],
    resources=[task_definition.task_role.role_arn]
))


trail = trail_.Trail(
            self, "CloudTrail", send_to_cloud_watch_logs=True)

event_rule = trail.on_event(self, "S3JobEvent",
                            target=targets.LambdaFunction(
                                handler=fn_process_transfer_task)
                            )
event_rule.add_event_pattern(
    source=['aws.s3'],
    detail_type=[
        "AWS Service Event via CloudTrail"],
    detail={
        "eventSource": [
            "s3.amazonaws.com"
        ],
        "eventName": [
            "JobStatusChanged"
        ],
        "serviceEventDetails": {
            "status": ["Complete"]
        }
    }
)

Create 

./src/lambda_process_s3transfer_task.py
 with the following code:

import json
import boto3
import logging
import os

logger = logging.getLogger()
logger.setLevel(logging.DEBUG)

ecs = boto3.client('ecs')


def handler(event, context):
    logger.info("Received event: " + json.dumps(event, indent=2))
    logger.info("ENV SUBNETS: " + json.dumps(os.getenv('SUBNETS'), indent=3))

    response = ecs.run_task(
        cluster=os.getenv("CLUSTER_NAME"),
        taskDefinition=os.getenv("TASK_DEFINITION"),
        launchType='FARGATE',
        count=1,
        platformVersion='LATEST',
        networkConfiguration={
            'awsvpcConfiguration': {
                'subnets': [
                        os.getenv("PRIVATE_SUBNET1"),
                        os.getenv("PRIVATE_SUBNET2"),
                ],
                'assignPublicIp': 'DISABLED'
            }
        },
        overrides={"containerOverrides": [{
            "name": "azcopy",
            'memory': 512,
            'memoryReservation': 512,
            'cpu': 2,
            'environment': [
                    {
                        'name': 'S3_SOURCE',
                        'value': f'https://s3.{os.getenv("AWS_REGION")}.amazonaws.com/{os.getenv("S3_BUCKET_NAME")}/tmp_transition'
                    }
            ],
        }]})
    return str(response)

Now, We have set up the Serverless part. Let’s move up to the Fargate task and process the data replication.

Creating an AWS Fargate task

We will create:

  • An ECR image with AzCopy was installed. AzCopy is a command-line utility that you can use to copy blobs or files to or from a storage account.
  • An ECS Cluster with a Fargte task.

Let’s getting started.

1) Build ECS, ECR, and Fargate stack.

from aws_cdk import (
    aws_iam as iam,
    aws_ecr as ecr_,
    aws_ecs as ecs,
    core,
)

ecr = ecr_.Repository(self, "azcopy")
cluster = ecs.Cluster(self, "DemoCluster",
                      vpc=vpc, container_insights=True)

task_definition = ecs.FargateTaskDefinition(
    self, "azcopyTaskDef")
task_definition.add_container("azcopy", image=ecs.ContainerImage.from_registry(
    ecr.repository_uri),
    logging=ecs.LogDrivers.aws_logs(stream_prefix="s32blob"),
    environment={
    'AZURE_BLOB_URL': 'https://mydemostroageaccount.blob.core.windows.net/democontainer/'},
    secrets={
    'SAS_TOKEN': ecs.Secret.from_ssm_parameter(
        ssm.StringParameter.from_secure_string_parameter_attributes(self, 'sas',
                                                                    parameter_name='/azure/storage/sas', version=2))
})

task_definition.task_role.add_to_policy(iam.PolicyStatement(
    effect=iam.Effect.ALLOW,
    resources=[
        bucket_arn,
        f"{bucket_arn}/*"
    ],
    actions=[
        "s3:GetObject",
        "s3:GetObjects",
        "s3:ListObjects",
        "S3:ListBucket"
    ],
))
ecr.grant_pull(task_definition.obtain_execution_role())

2) Build a Docker image and install Azcopy there.

FROM alpine AS azcopy
RUN apk add --no-cache wget \
    &&	wget https://aka.ms/downloadazcopy-v10-linux -O /tmp/azcopy.tgz \
    &&	export BIN_LOCATION=$(tar -tzf /tmp/azcopy.tgz | grep "/azcopy") \
    &&	tar -xzf /tmp/azcopy.tgz $BIN_LOCATION --strip-components=1 -C /usr/bin

FROM alpine:3.9
RUN apk update && apk add libc6-compat ca-certificates jq curl
COPY --from=azcopy /usr/bin/azcopy /usr/local/bin/azcopy
RUN ldd /usr/local/bin/azcopy
COPY entrypoint.sh  /
RUN chmod 777 /entrypoint.sh

ENTRYPOINT ["sh", "/entrypoint.sh"]
#!/bin/bash

echo "export AWS_CONTAINER_CREDENTIALS_RELATIVE_URI=$AWS_CONTAINER_CREDENTIALS_RELATIVE_URI" >> /root/.profile
json=$(curl "http://169.254.170.2${AWS_CONTAINER_CREDENTIALS_RELATIVE_URI}")

export AWS_ACCESS_KEY_ID=$(echo "$json" | jq -r '.AccessKeyId') 
export AWS_SECRET_ACCESS_KEY=$(echo "$json" | jq -r '.SecretAccessKey') 
export AWS_SESSION_TOKEN=$(echo "$json" | jq -r '.Token') 

azcopy copy "${S3_SOURCE}" \
"${AZURE_BLOB_URL}?${SAS_TOKEN}" \
--recursive=true

Note that to use AzCopy transfer files from AWS, we will need to set up AWS Credentials in the container. We can retrieve AWS credentials using:

curl http://169.254.170.2/$AWS_CONTAINER_CREDENTIALS_RELATIVE_URI

3) Push Docker image to ECR

eval $(aws ecr get-login --region ap-southeast-2 --no-include-email)

docker build . -t YOUR_ACCOUNT_ID.dkr.ecr.ap-southeast-2.amazonaws.com/YOUR_ECR_NAME

docker push YOUR_ACCOUNT_ID.dkr.ecr.ap-southeast-2.amazonaws.com/YOUR_ECR_NAME

Great! We have what we need! You can find the full solution CDK project in my Github Repo. Clone the repo and deploy the stack:

cd CDK-S3toblob 
pip install -r requirements.txt
cdk deploy

Once the stack has been successfully created, navigate to the AWS CloudFormation console, locate the stack we just created, and go to the Resources tab to find the deployed resources.

Now it’s time to test our workflow; go to the S3 source bucket 

demo-databucket-source
 . Upload as many files in different folders (prefix). Wait 24 hours for the next inventory report generated; then, you will see the whole pipeline start running, and files will eventually be copied to Azure 
democontainer
 .

We should see the logs of the Fargate task like the below screenshot.

We can also monitor, troubleshoot, and set alarms for ECS resources using CloudWatch Container Insights.

Conclusion

In this article, I introduced the approach to automate data replication from AWS S3 to Microsoft Azure Storage. I walked you through how to use CDK to deploy VPC, AWS S3, Lambda, Cloudtrail, Fargte resources, showing you how to use the ARM template deploy Azure services. I showed you how to use the AWS Wrangler library and Athena query to create a table and querying the table.

I hope you have found this article useful. You can find the complete project in my GitHub repo.

Tags

Join Hacker Noon

Create your free account to unlock your custom reading experience.