It may be a requirement of your business to move a good amount of data periodically from one public cloud to another. More specifically, you may face mandates requiring a multi-cloud solution. This article covers one approach to automate data replication from AWS S3 Bucket to Microsoft Azure Blob Storage container using Amazon S3 Inventory, Amazon S3 Batch Operations, Fargate, and AzCopy.
Your company produces new CSV files on-premises every day with a total size of around 100GB after compression. All files have a size of 1–2 GB and need to be uploaded to Amazon S3 every night in a fixed time window between 3 am, and 5 am. Your business has decided to copy those CSV files from S3 to Microsoft Azure Storage after all files uploaded to S3. You have to find an easy and fast way to automate the data replication workflow.
To accomplish this task, we can build a data pipeline to copy data periodically from S3 to Azure Storage using AWS Data Wrangler, Amazon S3 Inventory, Amazon S3 Batch Operations, Athena, Fargate, and AzCopy.
The diagram below represents the high-level architecture of the pipeline solution:
We use CDK to build our infrastructure on AWS. First, let’s create a source Bucket to receive files from external providers or on-premise and set up daily inventory reports that provide a flat-file list of your objects and metadata.
Next, create a destination bucket as temporary storage with lifecycle policy expiration rule configured on prefix /tmp_transition. All files with the prefix (eg.
/tmp_transition/file1.csv
) will copy to Azure and will be removed by lifecycle policy after 24hours.Use the following code to create S3 buckets.
from aws_cdk import (
aws_s3 as s3,
core,
)
s3_destination = s3.Bucket(self, "dataBucketInventory",
lifecycle_rules=[
{
'expiration': core.Duration.days(1.0),
'prefix': 'tmp_transition'
},
])
s3_source = s3.Bucket(self, "demoDataBucket",
bucket_name=self.s3_source_bucket_name,
encryption=s3.BucketEncryption.S3_MANAGED,
inventories=[
{
"frequency": s3.InventoryFrequency.DAILY,
"include_object_versions": s3.InventoryObjectVersion.CURRENT,
"destination": {
"bucket": s3_destination
}
}
])
Next, we need to create VPC with both public and private subnets, NAT Gateway, an S3 endpoint, and attach an endpoint policy that allows access to the Fargate container to which S3 bucket we are copying data to Azure.
Now define your VPC and related resources using the following code.
from aws_cdk import (
aws_ec2 as ec2,
core,
)
vpc = ec2.Vpc(self, "demoVPC",
max_azs=2,
cidr="10.0.0.0/16",
nat_gateways=1,
subnet_configuration=[{
"cidrMask": 24,
"name": 'private',
"subnetType": ec2.SubnetType.PRIVATE
},
{
"cidrMask": 24,
"name": 'public',
"subnetType": ec2.SubnetType.PUBLIC
}]
)
subnets = vpc.select_subnets(
subnet_type=ec2.SubnetType.PRIVATE).subnets
endpoint = vpc.add_gateway_endpoint('s3Endpoint',
service=ec2.GatewayVpcEndpointAwsService.S3,
subnets=[{
"subnet_id": subnets[0].subnet_id
},
{
"subnet_id": subnets[1].subnet_id
}])
endpoint.add_to_policy(iam.PolicyStatement(
effect=iam.Effect.ALLOW,
resources=[
bucket_arn,
f"{bucket_arn}/*"
],
principals=[iam.ArnPrincipal("*")],
actions=[
"s3:GetObject",
"s3:GetObjects",
"s3:ListObjects",
"S3:ListBucket"
],
))
# Provides access to the Amazon S3 bucket containing the layers for each Docker image of ECR.
endpoint.add_to_policy(iam.PolicyStatement(
effect=iam.Effect.ALLOW,
resources=[
f"arn:aws:s3:::prod-{self.region}-starport-layer-bucket/*"
],
principals=[iam.ArnPrincipal("*")],
actions=[
"s3:GetObject"
],
))
While creating NAT Gateway, an Elastic IP Address will create in AWS. We will need the IP address to set up the Azure Storage Firewall rule in step3.
To simplify managing resources, we can use the Azure Resource Manager template (ARM template) to deploy resources at our Azure subscription level.
I will assume you already have an Azure Subscription setup. We will use Cloud shell to deploy a Resource Group, Azure Storage account, a container, and Firewall rule to allow traffic from a specific IP address.
Click on the Cloud Shell icon in the Azure Portal's header bar, and it will open the Cloud Shell.
Run the following command to deploy:
az group create --name examplegroup --location australiaeast
az deployment group create --resource-group examplegroup --template-uri https://raw.githubusercontent.com/yai333/DataPipelineS32Blob/master/Azure-Template-DemoRG/template.json --parameters storageAccounts_mydemostroageaccount_name=mydemostorageaccountaiyi --debug
Once the template has been deployed, we can verify the deployment by exploring the Azure portal's resource group. All resources deployed will be displayed in the Overview section of the Resource group.
Let’s create a Firewall rule for our Storage Account:
We will then generate Shared Access Signatures (SAS) to grant limited access to Azure Storage resources.
Run below command in Cloudshell:
RG_NAME='examplegroup'
ACCOUNT_NAME='mydemostorageaccountaiyi'
ACCOUNT_KEY=`az storage account keys list --account-name=$ACCOUNT_NAME --query [0].value -o tsv`
BLOB_CONTAINER=democontainer
STORAGE_CONN_STRING=`az storage account show-connection-string --name $ACCOUNT_NAME --resource-group $RG_NAME --output tsv`
SAS=`az storage container generate-sas --connection-string $STORAGE_CONN_STRING -n $BLOB_CONTAINER --expiry '2021-06-30' --permissions aclrw --output tsv`
echo $SAS
We will get the required SAS and URLs that grant
(a)dd (d)elete (r)ead (w)rite
access to a blob container democontainer
.se=2021-06-30&sp=racwl&sv=2018-11-09&sr=c&sig=xxxxbBfqfEppPpBZPOTRiwvkh69xxxx/xxxxQA0YtKo%3D
Let’s move back to AWS and put SAS to AWS SSM Parameter Store.
Run following command in local terminator.
aws ssm put-parameter --cli-input-json '{
"Name": "/s3toblob/azure/storage/sas",
"Value": "se=2021-06-30&sp=racwl&sv=2018-11-09&sr=c&sig=xxxxbBfqfEppPpBZPOTRiwvkh69xxxx/xxxxQA0YtKo%3D",
"Type": "SecureString"
}'
Now, let’s move up to lambda functions. We will create three lambda functions and one lambda layer:
fn_create_s3batch_manifest and AWS Data Wrangler layer
This lambda function uses AWS Data Wrangler’s Athena module to filter new files in the past UTC date and save files list to a CSV manifest file.
Copy the following code to CDK stack.py. download
awswranger-layer
zip file from here.datawrangler_layer = lambda_.LayerVersion(self, "DataWranglerLayer",
code=lambda_.Code.from_asset(
"./layers/awswrangler-layer-1.9.6-py3.6.zip"),
compatible_runtimes=[
lambda_.Runtime.PYTHON_3_6]
)
fn_create_s3batch_manifest = lambda_.Function(self, "CreateS3BatchManifest",
runtime=lambda_.Runtime.PYTHON_3_6,
handler="lambda_create_s3batch_manifest.handler",
timeout=core.Duration.minutes(
15),
code=lambda_.Code.from_asset(
"./src"),
layers=[
datawrangler_layer]
)
fn_create_s3batch_manifest.add_environment(
"DESTINATION_BUCKET_NAME", s3_destination_bucket_name)
fn_create_s3batch_manifest.add_environment(
"SOURCE_BUCKET_NAME", self.s3_source_bucket_name)
fn_create_s3batch_manifest.add_to_role_policy(iam.PolicyStatement(
effect=iam.Effect.ALLOW,
resources=[
"*"
],
actions=[
"glue:GetTable",
"glue:CreateTable",
"athena:StartQueryExecution",
"athena:CancelQueryExecution",
"athena:StopQueryExecution",
"athena:GetQueryExecution",
"athena:GetQueryResults"
],
))
fn_create_s3batch_manifest.add_to_role_policy(iam.PolicyStatement(
effect=iam.Effect.ALLOW,
resources=[
f"arn:aws:glue:{self.region}:{self.account}:catalog",
f"arn:aws:glue:{self.region}:{self.account}:database/*",
f"arn:aws:glue:{self.region}:{self.account}:table/*"
],
actions=[
"glue:GetDatabases",
"glue:GetDatabase",
"glue:BatchCreatePartition",
"glue:GetPartitions",
"glue:CreateDatabase",
"glue:GetPartition"
],
))
s3_destination.add_event_notification(s3.EventType.OBJECT_CREATED,
s3n.LambdaDestination(
fn_create_s3batch_manifest),
{"prefix": f'{self.s3_source_bucket_name}/demoDataBucketInventory0/', "suffix": '.json'})
then create
./src/lambda_create_s3batch_manifest.py
with the following code:import json
import logging
import os
from datetime import datetime, timedelta
import awswrangler as wr
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
DATABASE_NAME = "s3datademo"
TABLE_NAME = "dailyobjects"
def handler(event, context):
logger.info("Received event: " + json.dumps(event, indent=2))
if DATABASE_NAME not in wr.catalog.databases().values:
wr.catalog.create_database(DATABASE_NAME)
event_date = datetime.strptime(
event["Records"][0]["eventTime"], "%Y-%m-%dT%H:%M:%S.%fZ")
partition_dt = f'{(event_date - timedelta(days=1)).strftime("%Y-%m-%d")}-00-00'
previous_partition_dt = f'{(event_date - timedelta(days=2)).strftime("%Y-%m-%d")}-00-00'
logger.debug(f"partition_dt: {partition_dt}")
if not wr.catalog.does_table_exist(database=DATABASE_NAME, table=TABLE_NAME):
table_query_exec_id = wr.athena.start_query_execution(s3_output=f"s3://{os.getenv('DESTINATION_BUCKET_NAME')}/athena_output",
sql=f"CREATE EXTERNAL TABLE {TABLE_NAME}( \
`bucket` string, \
key string, \
version_id string, \
is_latest boolean, \
is_delete_marker boolean, \
size bigint, \
last_modified_date timestamp, \
e_tag string \
) \
PARTITIONED BY(dt string) \
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' \
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat' \
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat' \
LOCATION 's3://{os.getenv('DESTINATION_BUCKET_NAME')}/{os.getenv('SOURCE_BUCKET_NAME')}/demoDataBucketInventory0/hive/';",
database=DATABASE_NAME)
wr.athena.wait_query(query_execution_id=table_query_exec_id)
partition_query_exec_id = wr.athena.start_query_execution(
sql=f"ALTER TABLE {TABLE_NAME} ADD IF NOT EXISTS PARTITION (dt=\'{partition_dt}\');",
s3_output=f"s3://{os.getenv('DESTINATION_BUCKET_NAME')}/athena_output",
database=DATABASE_NAME)
wr.athena.wait_query(query_execution_id=partition_query_exec_id)
select_query_exec_id = wr.athena.start_query_execution(sql='SELECT DISTINCT bucket as "' +
os.getenv('SOURCE_BUCKET_NAME') +
'" , key as "dump.txt" FROM ' +
TABLE_NAME +
" where dt = '" +
partition_dt +
"' and is_delete_marker = false" +
" except " +
'SELECT DISTINCT bucket as "' +
os.getenv('SOURCE_BUCKET_NAME') +
'" , key as "dump.txt" FROM ' +
TABLE_NAME +
" where dt = '" +
previous_partition_dt +
"' and is_delete_marker = false ;",
database=DATABASE_NAME,
s3_output=f"s3://{os.getenv('DESTINATION_BUCKET_NAME')}/csv_manifest/dt={partition_dt}")
return select_query_exec_id
In the above coding, we use Athena query to create Glue Database, Table and add a partition to that table every day. Then lambda executes except query to return the difference between the two date partitions.
Note that
start_query_execution
is asynchronous, hence no need to wait for the result in Lambda. Once the query is executed, the result will save to s3_output=f"s3://{os.getenv('DESTINATION_BUCKET_NAME')}/csv_manifest/dt={partition_dt}"
as a CSV file.In this section, we will create a lambda function
fn_create_batch_job
and enable Amazon S3 to send a notification to trigger fn_create_batch_job
when a CSV file is added to an Amazon S3 Bucket /csv_manifest
prefix. Put following code to CDK stack.py:fn_create_batch_job = lambda_.Function(self, "CreateS3BatchJobFunction",
runtime=lambda_.Runtime.PYTHON_3_6,
handler="lambda_create_batch_job.handler",
timeout=core.Duration.minutes(
5),
code=lambda_.Code.from_asset("./src"))
fn_create_batch_job.add_environment("ROLE_ARN", s3_batch_role.role_arn)
fn_create_batch_job.add_environment(
"SOURCE_BUCKET_NAME", self.s3_source_bucket_name)
fn_create_batch_job.add_to_role_policy(iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=["s3:CreateJob"],
resources=["*"]
))
fn_create_batch_job.add_to_role_policy(iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=["iam:PassRole"],
resources=[s3_batch_role.role_arn]
))
s3_destination.add_event_notification(s3.EventType.OBJECT_CREATED,
s3n.LambdaDestination(
fn_create_batch_job),
{"prefix": f'csv_manifest/', "suffix": '.csv'})
Create
./src/lambda_create_batch_job.py
with the following code:import json
import boto3
import logging
import os
from urllib.parse import unquote
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
s3_control_client = boto3.client('s3control')
s3_cli = boto3.client('s3')
def handler(event, context):
logger.info("Received event: " + json.dumps(event, indent=2))
account_id = boto3.client('sts').get_caller_identity().get('Account')
bucket_name = event['Records'][0]['s3']['bucket']['name']
bucket_arn = event['Records'][0]['s3']['bucket']['arn']
file_key = event['Records'][0]['s3']['object']['key']
e_tag = event['Records'][0]['s3']['object']['eTag']
logger.info('Reading {} from {}'.format(file_key, bucket_name))
response = s3_control_client.create_job(
AccountId=account_id,
ConfirmationRequired=False,
Operation={
'S3PutObjectCopy': {
'TargetResource': bucket_arn,
'StorageClass': 'STANDARD',
'TargetKeyPrefix': 'tmp_transition'
},
},
Report={
'Bucket': bucket_arn,
'Format': 'Report_CSV_20180820',
'Enabled': True,
'Prefix': f'report/{os.getenv("SOURCE_BUCKET_NAME")}',
'ReportScope': 'FailedTasksOnly'
},
Manifest={
'Spec': {
'Format': 'S3BatchOperations_CSV_20180820',
"Fields": ["Bucket", "Key"]
},
'Location': {
'ObjectArn': f'{bucket_arn}/{unquote(file_key)}',
'ETag': e_tag
}
},
Priority=10,
RoleArn=os.getenv("ROLE_ARN"),
Tags=[
{
'Key': 'engineer',
'Value': 'yiai'
},
]
)
logger.info("S3 barch job response: " + json.dumps(response, indent=2))
return
Lambda
fn_create_batch_job
function create S3 Batch Operation Job, copy all the files listed in CSV manifest to S3 Destination Bucket /tmp_transition prefix
.S3 Batch Operations is an Amazon S3 data management feature that lets you manage billions of objects at scale. To start S3 Batch Operation Job, we also need to set up an IAM role S3BatchRole with the corresponding policies:
s3_batch_role = iam.Role(self, "S3BatchRole",
assumed_by=iam.ServicePrincipal(
"batchoperations.s3.amazonaws.com")
)
s3_batch_role.add_to_policy(iam.PolicyStatement(
effect=iam.Effect.ALLOW,
resources=[
s3_destination.bucket_arn,
f"{s3_destination.bucket_arn}/*"
],
actions=[
"s3:PutObject",
"s3:PutObjectAcl",
"s3:PutObjectTagging",
"s3:PutObjectLegalHold",
"s3:PutObjectRetention",
"s3:GetBucketObjectLockConfiguration"
],
))
s3_batch_role.add_to_policy(iam.PolicyStatement(
effect=iam.Effect.ALLOW,
resources=[
s3_source.bucket_arn,
f"{s3_source.bucket_arn}/*"
],
actions=[
"s3:GetObject",
"s3:GetObjectAcl",
"s3:GetObjectTagging"
],
))
s3_batch_role.add_to_policy(iam.PolicyStatement(
effect=iam.Effect.ALLOW,
resources=[
f"{s3_destination.bucket_arn}/*"
],
actions=[
"s3:GetObject",
"s3:GetObjectVersion",
"s3:GetBucketLocation"
],
))
s3_batch_role.add_to_policy(iam.PolicyStatement(
effect=iam.Effect.ALLOW,
resources=[
f"{s3_destination.bucket_arn}/report/{self.s3_source_bucket_name}/*"
],
actions=[
"s3:PutObject",
"s3:GetBucketLocation"
],
))
We will create an Eventbridge custom rule that tracks an S3 Batch Operations job in Amazon EventBridge through AWS CloudTrail and send events in Completed status to the target notification resource
fn_process_transfer_task
.Lambda
fn_process_transfer_task
will then start a Fargate Task programmatically to copy files in /tmp_transition
prefix to Azure Storage Container democontainer
.fn_process_transfer_task = lambda_.Function(self, "ProcessS3TransferFunction",
runtime=lambda_.Runtime.PYTHON_3_6,
handler="lambda_process_s3transfer_task.handler",
timeout=core.Duration.minutes(
5),
code=lambda_.Code.from_asset("./src"))
fn_process_transfer_task.add_environment(
"CLUSTER_NAME", cluster_name)
fn_process_transfer_task.add_environment(
"PRIVATE_SUBNET1", subnets[0].subnet_id)
fn_process_transfer_task.add_environment(
"PRIVATE_SUBNET2", subnets[1].subnet_id)
fn_process_transfer_task.add_environment(
"TASK_DEFINITION", task_definition.task_definition_arn)
fn_process_transfer_task.add_environment(
"S3_BUCKET_NAME", s3_destination_bucket_name)
fn_process_transfer_task.add_to_role_policy(iam.PolicyStatement(
effect=iam.Effect.ALLOW,
resources=[
task_definition.task_definition_arn
],
actions=[
"ecs:RunTask"
],
))
fn_process_transfer_task.add_to_role_policy(iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=["iam:PassRole"],
resources=[task_definition.execution_role.role_arn]
))
fn_process_transfer_task.add_to_role_policy(iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=["iam:PassRole"],
resources=[task_definition.task_role.role_arn]
))
trail = trail_.Trail(
self, "CloudTrail", send_to_cloud_watch_logs=True)
event_rule = trail.on_event(self, "S3JobEvent",
target=targets.LambdaFunction(
handler=fn_process_transfer_task)
)
event_rule.add_event_pattern(
source=['aws.s3'],
detail_type=[
"AWS Service Event via CloudTrail"],
detail={
"eventSource": [
"s3.amazonaws.com"
],
"eventName": [
"JobStatusChanged"
],
"serviceEventDetails": {
"status": ["Complete"]
}
}
)
Create
./src/lambda_process_s3transfer_task.py
with the following code:import json
import boto3
import logging
import os
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
ecs = boto3.client('ecs')
def handler(event, context):
logger.info("Received event: " + json.dumps(event, indent=2))
logger.info("ENV SUBNETS: " + json.dumps(os.getenv('SUBNETS'), indent=3))
response = ecs.run_task(
cluster=os.getenv("CLUSTER_NAME"),
taskDefinition=os.getenv("TASK_DEFINITION"),
launchType='FARGATE',
count=1,
platformVersion='LATEST',
networkConfiguration={
'awsvpcConfiguration': {
'subnets': [
os.getenv("PRIVATE_SUBNET1"),
os.getenv("PRIVATE_SUBNET2"),
],
'assignPublicIp': 'DISABLED'
}
},
overrides={"containerOverrides": [{
"name": "azcopy",
'memory': 512,
'memoryReservation': 512,
'cpu': 2,
'environment': [
{
'name': 'S3_SOURCE',
'value': f'https://s3.{os.getenv("AWS_REGION")}.amazonaws.com/{os.getenv("S3_BUCKET_NAME")}/tmp_transition'
}
],
}]})
return str(response)
Now, We have set up the Serverless part. Let’s move up to the Fargate task and process the data replication.
We will create:
Let’s getting started.
1) Build ECS, ECR, and Fargate stack.
from aws_cdk import (
aws_iam as iam,
aws_ecr as ecr_,
aws_ecs as ecs,
core,
)
ecr = ecr_.Repository(self, "azcopy")
cluster = ecs.Cluster(self, "DemoCluster",
vpc=vpc, container_insights=True)
task_definition = ecs.FargateTaskDefinition(
self, "azcopyTaskDef")
task_definition.add_container("azcopy", image=ecs.ContainerImage.from_registry(
ecr.repository_uri),
logging=ecs.LogDrivers.aws_logs(stream_prefix="s32blob"),
environment={
'AZURE_BLOB_URL': 'https://mydemostroageaccount.blob.core.windows.net/democontainer/'},
secrets={
'SAS_TOKEN': ecs.Secret.from_ssm_parameter(
ssm.StringParameter.from_secure_string_parameter_attributes(self, 'sas',
parameter_name='/azure/storage/sas', version=2))
})
task_definition.task_role.add_to_policy(iam.PolicyStatement(
effect=iam.Effect.ALLOW,
resources=[
bucket_arn,
f"{bucket_arn}/*"
],
actions=[
"s3:GetObject",
"s3:GetObjects",
"s3:ListObjects",
"S3:ListBucket"
],
))
ecr.grant_pull(task_definition.obtain_execution_role())
2) Build a Docker image and install Azcopy there.
FROM alpine AS azcopy
RUN apk add --no-cache wget \
&& wget https://aka.ms/downloadazcopy-v10-linux -O /tmp/azcopy.tgz \
&& export BIN_LOCATION=$(tar -tzf /tmp/azcopy.tgz | grep "/azcopy") \
&& tar -xzf /tmp/azcopy.tgz $BIN_LOCATION --strip-components=1 -C /usr/bin
FROM alpine:3.9
RUN apk update && apk add libc6-compat ca-certificates jq curl
COPY --from=azcopy /usr/bin/azcopy /usr/local/bin/azcopy
RUN ldd /usr/local/bin/azcopy
COPY entrypoint.sh /
RUN chmod 777 /entrypoint.sh
ENTRYPOINT ["sh", "/entrypoint.sh"]
#!/bin/bash
echo "export AWS_CONTAINER_CREDENTIALS_RELATIVE_URI=$AWS_CONTAINER_CREDENTIALS_RELATIVE_URI" >> /root/.profile
json=$(curl "http://169.254.170.2${AWS_CONTAINER_CREDENTIALS_RELATIVE_URI}")
export AWS_ACCESS_KEY_ID=$(echo "$json" | jq -r '.AccessKeyId')
export AWS_SECRET_ACCESS_KEY=$(echo "$json" | jq -r '.SecretAccessKey')
export AWS_SESSION_TOKEN=$(echo "$json" | jq -r '.Token')
azcopy copy "${S3_SOURCE}" \
"${AZURE_BLOB_URL}?${SAS_TOKEN}" \
--recursive=true
Note that to use AzCopy transfer files from AWS, we will need to set up AWS Credentials in the container. We can retrieve AWS credentials using:
curl http://169.254.170.2/$AWS_CONTAINER_CREDENTIALS_RELATIVE_URI
3) Push Docker image to ECR
eval $(aws ecr get-login --region ap-southeast-2 --no-include-email)
docker build . -t YOUR_ACCOUNT_ID.dkr.ecr.ap-southeast-2.amazonaws.com/YOUR_ECR_NAME
docker push YOUR_ACCOUNT_ID.dkr.ecr.ap-southeast-2.amazonaws.com/YOUR_ECR_NAME
Great! We have what we need! You can find the full solution CDK project in my Github Repo. Clone the repo and deploy the stack:
cd CDK-S3toblob
pip install -r requirements.txt
cdk deploy
Once the stack has been successfully created, navigate to the AWS CloudFormation console, locate the stack we just created, and go to the Resources tab to find the deployed resources.
Now it’s time to test our workflow; go to the S3 source bucket
demo-databucket-source
. Upload as many files in different folders (prefix). Wait 24 hours for the next inventory report generated; then, you will see the whole pipeline start running, and files will eventually be copied to Azure democontainer
.We should see the logs of the Fargate task like the below screenshot.
We can also monitor, troubleshoot, and set alarms for ECS resources using CloudWatch Container Insights.
In this article, I introduced the approach to automate data replication from AWS S3 to Microsoft Azure Storage. I walked you through how to use CDK to deploy VPC, AWS S3, Lambda, Cloudtrail, Fargte resources, showing you how to use the ARM template deploy Azure services. I showed you how to use the AWS Wrangler library and Athena query to create a table and querying the table.
I hope you have found this article useful. You can find the complete project in my GitHub repo.