Abstract Database auditing faces several challenges, including performance overhead from continuous tracking of activity, rapidly growing audit log storage needs, and ensuring strong security and immutability of those logs to prevent tampering by unauthorized or privileged users. Organizations must carefully decide what to audit to avoid excessive data collection and noise that results in false positives, while still capturing critical events for compliance and security purposes. Database auditing Integration with monitoring and SIEM tools can be complex, and evolving regulatory standards such as PCI-DSS, GDPR, and HIPAA require constant updates to auditing practices. Additionally, achieving real-time visibility demands more processing power, and advanced auditing solutions can increase operational costs related to tools, hardware, and skilled resources. Overall, balancing comprehensive auditing with performance, cost, and usability remains a key operational challenge. Database Activity Streams (DAS) help overcome traditional auditing challenges by providing a real-time, secure, and scalable way to capture database events without impacting performance. Instead of writing audit logs directly to the database which consumes compute and storage, DAS streams structured activity data asynchronously to external monitoring systems, reducing overhead on the production workload. The streamed events are stored outside the database environment, improving security and preventing privileged users from tampering with logs. Database Activity Streams (DAS) real-time, secure, and scalable DAS can integrate seamlessly with SIEM, analytics, and alerting tools, enabling better threat detection and automated responses with fewer false positives. It also simplifies compliance by ensuring that all required operations are captured consistently while reducing manual effort. Additionally, because the stream is scalable and managed independently, organizations avoid large storage costs and log management complexity. Overall, Database Activity Streams modernize auditing by making it more efficient, secure, scalable, and actionable in real time. Content Overview Content Overview 1. Introduction1.1 Background1.2 Problem Statement2. Methodology2.1 Evaluation2.1.1 Security & Compliance Fit2.1.2 Performance Impact2.1.3 Observability & Usability2.1.4 Data Volume & Cost2.1.5 Access Control & Governance2.2 Other Approaches2.3 Design Overview3. Technical Details for building Database Activity Stream Pipeline3.1 Prerequisites3.2 Technical Design3.2.1 Enable Database Activity Stream (Producer)3.2.2 Kinesis Data Streams (Transport Buffer)3.2.3 Kinesis Data Firehose (Delivery + Transform Orchestration)3.2.4 IAM Permissions3.2.4.1 IAM Role for Lambda 3.2.4.2 IAM Role for Firehose 3.2.5 Lambda Transform (Decode/Decrypt/Normalize)3.2.6 S3 Data Lake (Storage)3.2.7 Athena (Query Layer)4. Use Cases5. Tools6. Conclusion 1. Introduction 1.1 Background 1.2 Problem Statement 2. Methodology 2.1 Evaluation 2.1.1 Security & Compliance Fit 2.1.2 Performance Impact 2.1.3 Observability & Usability 2.1.4 Data Volume & Cost 2.1.5 Access Control & Governance 2.2 Other Approaches 2.3 Design Overview 3. Technical Details for building Database Activity Stream Pipeline 3.1 Prerequisites 3.2 Technical Design 3.2.1 Enable Database Activity Stream (Producer) 3.2.2 Kinesis Data Streams (Transport Buffer) 3.2.3 Kinesis Data Firehose (Delivery + Transform Orchestration) 3.2.4 IAM Permissions 3.2.4.1 IAM Role for Lambda 3.2.4.2 IAM Role for Firehose 3.2.5 Lambda Transform (Decode/Decrypt/Normalize) 3.2.6 S3 Data Lake (Storage) 3.2.7 Athena (Query Layer) 4. Use Cases 5. Tools 6. Conclusion 1. Introduction 1.1 Background Modern organizations need a reliable and scalable solution to continuously monitor and analyze database activity for security, operational visibility, and compliance reporting. Traditional auditing approaches often introduce performance overhead, require complex log management, and offer limited real-time analytics capabilities. To overcome these limitations, a data pipeline can be established using Database Activity Stream (DAS) integrated with AWS services such as Kinesis Data Streams, Kinesis Data Firehose, AWS Lambda, Amazon S3, and Amazon Athena, while storing activity information in a structured JSON format. Database Activity Stream (DAS) Kinesis Data Streams Kinesis Data Firehose AWS Lambda Amazon S3 Amazon Athena JSON In this architecture, the Database Activity Stream securely captures all relevant database activity events with minimal impact on production performance and transmits them in real time to Kinesis Data Streams, which acts as the ingestion layer. From there, Kinesis Data Firehose buffers and delivers the streaming data into an Amazon S3 data lake in JSON format for durable and cost-efficient storage. An optional Lambda function can be used for data transformation or enrichment before the events are stored. Once data resides in S3, Amazon Athena enables serverless querying and analysis of audit logs using standard SQL syntax over JSON objects, making security investigations and compliance reporting faster and more efficient. This end-to-end pipeline ensures secure, scalable, and tamper-resistant database audit management while providing real-time analytics and operational insights to meet enterprise governance requirements. Kinesis Data Streams Kinesis Data Firehose Lambda Amazon Athena 1.2 Problem Statement Traditional database auditing mechanisms often struggle to meet the growing requirements for real-time security monitoring, regulatory compliance, and operational visibility. These systems typically generate large volumes of audit logs that place significant performance overhead on production workloads, leading to latency and resource contention. Additionally, storing audit data within the same database environment creates security vulnerabilities, as privileged users may manipulate or disable logs, compromising audit integrity. Log management becomes increasingly complex as data scales, making it difficult to retain, analyze, and report on required events efficiently. Most legacy auditing approaches lack seamless integration with modern analytics and threat detection tools, resulting in delayed incident response and reduced effectiveness in identifying suspicious activity. These challenges highlight the need for a more scalable, secure, and operationally efficient auditing solution that can support both compliance mandates and proactive security operations. real-time security monitoring regulatory compliance operational visibility 2. Methodology 2.1 Evaluation Evaluates DAS’s ability to capture database activity out-of-band and deliver near real-time events to external consumers with minimal performance impact. out-of-band near real-time minimal performance impact. Assesses encryption in transit and at rest using KMS, along with access control, governance, and compliance-readiness of streamed activity data. encryption in transit and at rest access control governance compliance-readiness Measures the effectiveness of DAS in providing clear, usable, and actionable visibility into database activity for monitoring, auditing, and investigation use cases. clear, usable, and actionable monitoring, auditing, and investigation 2.1.1 Security & Compliance Fit Provides tamper-resistant audit-style visibility compared to app logs. Centralizes DB activity monitoring for compliance needs (e.g., audit trails for privileged access). Helps answer “who accessed what” during incident response. 2.1.2 Performance Impact Typically minimal impact compared to heavy query logging directly on the DB, since DAS is designed to be service-level streaming rather than query log spam. 2.1.3 Observability & Usability Events are searchable by: Events are searchable by: DB user / role source IP / client identity (if available) database/schema/table query type (DDL/DML..etc) timestamps 2.1.4 Data Volume & Cost Primary cost drivers: Primary cost drivers: DAS feature cost (service cost) Streaming + consumption costs (Kinesis, Lambda/consumer compute) Storage and indexing in log/SIEM platform What to estimate up front: What to estimate up front: Expected event volume (queries/sec, auth events, DDL frequency) Retention period (e.g., 30/90/365 days) Indexing strategy (full text vs structured fields) 2.1.5 Access Control & Governance Restrict who can enable/disable DAS Separate KMS keys per environment (prod vs non-prod) Least-privilege IAM roles for consumers Encryption in transit and at rest Document approved use-cases and audit review cadence 2.2 Other Approaches Traditional database auditing approaches include native database audit logs, application-level logging and proxy or gateway auditing. native database audit logs application-level logging proxy or gateway auditing Native database audit logs (such as PostgreSQL pgaudit, MySQL general logs, or Oracle auditing) operate inside the database engine and write audit records to local files or tables. While they can provide detailed visibility, they add CPU and I/O overhead, require careful tuning, and can be disabled or modified by privileged database users, which reduces their reliability during security incidents. Native database audit logs Application-level auditing captures only what the application sends to the database and completely misses direct database access, administrative actions, or activity performed outside the application path, making it insufficient as a primary security control. Application-level auditing Proxy or gateway-based auditing improves visibility for traffic that passes through the proxy but fails to capture local database access or internal database operations and introduces a potential single point of failure. Proxy or gateway-based auditing Database Activity Stream (DAS) addresses these limitations by capturing database activity out of band at the service level rather than inside the database engine or application. Because DAS operates independently of database users and administrators, audit data cannot be altered, suppressed, or deleted by insiders, making it significantly more tamper-resistant and trustworthy for forensic investigations. DAS provides near real-time streaming of database activity, enabling faster detection of suspicious behavior and more effective incident response compared to traditional audit logs that are typically reviewed after the fact. It is designed to minimize performance impact by avoiding heavy audit tables or excessive disk writes, which are common issues with native database auditing at scale. DAS also offers strong encryption using AWS KMS and integrates naturally with centralized logging and SIEM platforms, simplifying governance, access control, and compliance reporting across accounts and regions. Overall, DAS provides broader coverage, stronger security guarantees, more predictable performance, and better alignment with cloud-native security architectures, making it the preferred primary auditing mechanism for production databases, while traditional approaches are best used only as supplementary controls where necessary. Database Activity Stream (DAS) operates independently more tamper-resistant trustworthy real-time streaming minimize performance impact encryption using AWS KMS 2.3 Design Overview Here is the high-level architecture diagram view of how the Database Activity Stream works. 1. RDS / Aurora with Database Activity Stream enabled 1. RDS / Aurora with Database Activity Stream enabled The database emits activity events (authentication, queries, DDL/DML, etc.) through Database Activity Stream. These events are generated out of band and encrypted using AWS KMS. 2. Amazon Kinesis Data Streams 2. Amazon Kinesis Data Streams DAS publishes encrypted activity events into a Kinesis Data Stream. This stream acts as the durable, scalable transport layer and decouples the database from downstream consumers. 3. Kinesis Data Firehose (Source: Kinesis Data Streams) 3. Kinesis Data Firehose (Source: Kinesis Data Streams) Firehose reads batches of records from the Kinesis stream and invokes a Lambda function for record transformation before delivery. 4. AWS Lambda (Transform step) 4. AWS Lambda (Transform step) Lambda receives encrypted Firehose records with payloads Base64-encoded. The lambda function performs below tasks : Base64-decodes the incoming data Decrypts the DAS payload using KMS Parses and converts the event into structured JSON Re-encodes the transformed JSON in Base64 and returns it to Firehose 5. Amazon S3 (Destination) 5. Amazon S3 (Destination) Firehose buffers and delivers the decoded and decrypted JSON records into S3, typically partitioned by account, region, database identifier, and time (year/month/day/hour). Error records are written to a separate prefix. 6. Amazon Athena 6. Amazon Athena Athena reads the JSON files stored in S3 using JSON functions. Analysts and security teams can query database activity using SQL without impacting the production database. 3. Technical Details for building Database Activity Stream Pipeline 3.1 Prerequisites The database must be an AWS-supported engine and version that supports Database Activity Stream, such as supported versions of Amazon Aurora (PostgreSQL/MySQL) or Amazon RDS engines. The database must be running in a stable state, and DAS must be enabled at the cluster or instance level. AWS-supported engine and version Amazon Aurora (PostgreSQL/MySQL) Amazon RDS Database Activity Stream, Kinesis Data Streams, Kinesis Data Firehose, and Athena must all be available in the same AWS Region. The AWS account must have permissions to create and manage these services, including IAM roles, KMS keys, S3 buckets, and CloudWatch logging. Service quotas for Kinesis shards, Firehose delivery streams, Lambda concurrency, and KMS usage should be reviewed and increased if necessary. 3.2 Technical Design Workflow: DAS emits encrypted events → Kinesis buffers → Firehose reads → Firehose invokes Lambda transform → Lambda Base64-decodes, decrypts with KMS, converts to JSON → Firehose writes JSON to S3 (and errors to an error prefix) → Athena external table queries S3 JSON using JSON functions. Workflow: Component Design (One section per component) Component Design (One section per component) 3.2.1 Enable Database Activity Stream (Producer) When DAS is enabled/started at the cluster level, the service begins encrypting every activity event using AWS KMS. At this stage, the database itself does not store these events; instead, the encrypted events are prepared for streaming to an external destination. The encrypted activity events are then published continuously to an Amazon Kinesis Data Stream associated with the cluster. encrypting every activity event using AWS KMS. published continuously to an Amazon Kinesis Data Stream To start DAS, select the cluster, go to Actions and click on Start database activity stream 3.2.2 Kinesis Data Streams (Transport Buffer) Kinesis acts as a durable, scalable buffer that decouples the database from downstream consumers. Events are written in near real time, and Kinesis retention settings determine how long the data can be replayed if downstream processing is interrupted. Kinesis Kinesis To find more details about Kinesis Data Stream, search for Kinesis in the AWS console and click on Kinesis Data Stream. The name of this Data Stream will be something like this - aws-rds-das-cluster-UK47KWG6GKM2UEEBZOQWPXLATM. Kinesis Data Stream aws-rds-das-cluster-UK47KWG6GKM2UEEBZOQWPXLATM. 3.2.3 Kinesis Data Firehose (Delivery + Transform Orchestration) Once events are available in Kinesis data streams, the Kinesis Data Firehose needs to be created so that it can start reading them. Firehose pulls records from the stream in batches and optionally invokes a Lambda function for transformation. Kinesis Data Firehose To create the Firehose, connect to the AWS console, search for Firehose and click on it. You need to mention the Source and Destination in the Firehose. The Source would be the Kinesis Data Stream and Destination would be Amazon s3. Kinesis Data Stream Amazon s3 At this point, the event payloads are still encrypted and, when passed to Lambda, are wrapped in Base64 encoding as part of the Firehose–Lambda integration. Firehose–Lambda 3.2.4 IAM Permissions IAM permissions play a critical role in a Database Activity Stream (DAS) pipeline because the pipeline is made up of multiple AWS services that must securely interact with each other. Without the right permissions, the pipeline will either fail or become a security risk. critical role 3.2.4.1 IAM Role for Lambda : This IAM policy lets a Lambda (or similar service) read encrypted RDS activity data from Kinesis, decrypt it securely, forward it to Firehose, and log its execution. { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kinesis:DescribeStream", "kinesis:GetShardIterator", "kinesis:GetRecords", "kinesis:ListShards" ], "Resource": "arn:aws:kinesis:<<REGION>>:<<ACCOUNT>>:stream/aws-rds-das-cluster-*" }, { "Effect": "Allow", "Action": [ "kms:Decrypt", "kms:DescribeKey" ], "Resource": "<<ARN of KMS Key>>", "Condition": { "StringEquals": { "kms:EncryptionContext:aws:rds:dbc-id":<<CLUSTER_ID>> } } }, { "Effect": "Allow", "Action": [ "firehose:PutRecord", "firehose:PutRecordBatch", "firehose:DescribeDeliveryStream" ], "Resource": "arn:aws:firehose:<<REGION>>:<<ACCOUNT>>:deliverystream/aws-rds-das-<<CLUSTER_ID>>" }, { "Effect": "Allow", "Action": [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:<<REGION>>:<<ACCOUNT>>:log-group:/aws/lambda/<LAMBDA_NAME>:*" ] } ] } { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kinesis:DescribeStream", "kinesis:GetShardIterator", "kinesis:GetRecords", "kinesis:ListShards" ], "Resource": "arn:aws:kinesis:<<REGION>>:<<ACCOUNT>>:stream/aws-rds-das-cluster-*" }, { "Effect": "Allow", "Action": [ "kms:Decrypt", "kms:DescribeKey" ], "Resource": "<<ARN of KMS Key>>", "Condition": { "StringEquals": { "kms:EncryptionContext:aws:rds:dbc-id":<<CLUSTER_ID>> } } }, { "Effect": "Allow", "Action": [ "firehose:PutRecord", "firehose:PutRecordBatch", "firehose:DescribeDeliveryStream" ], "Resource": "arn:aws:firehose:<<REGION>>:<<ACCOUNT>>:deliverystream/aws-rds-das-<<CLUSTER_ID>>" }, { "Effect": "Allow", "Action": [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:<<REGION>>:<<ACCOUNT>>:log-group:/aws/lambda/<LAMBDA_NAME>:*" ] } ] } 3.2.4.2 IAM Role for Firehose : This policy enables reading encrypted RDS activity data from Kinesis, optionally processing it with Lambda, storing it in S3 via Firehose, and logging the process. { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kinesis:DescribeStream", "kinesis:GetShardIterator", "kinesis:GetRecords", "kinesis:ListShards" ], "Resource": "<<ARN of Kinesis Data Stream>>" }, { "Effect": "Allow", "Action": [ "s3:AbortMultipartUpload", "s3:GetBucketLocation", "s3:GetObject", "s3:ListBucket", "s3:ListBucketMultipartUploads", "s3:PutObject" ], "Resource": [ "arn:aws:s3:::<<S3_BUCKET_NAME>>" ] }, { "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": "arn:aws:logs:<<REGION>>:<<ACCOUNT>>:log-group:/aws/kinesisfirehose/*" }, { "Effect": "Allow", "Action": [ "kms:Decrypt" ], "Resource": "*" }, { "Effect": "Allow", "Action": [ "lambda:InvokeFunction", "lambda:GetFunctionConfiguration" ], "Resource": "*" } ] } { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kinesis:DescribeStream", "kinesis:GetShardIterator", "kinesis:GetRecords", "kinesis:ListShards" ], "Resource": "<<ARN of Kinesis Data Stream>>" }, { "Effect": "Allow", "Action": [ "s3:AbortMultipartUpload", "s3:GetBucketLocation", "s3:GetObject", "s3:ListBucket", "s3:ListBucketMultipartUploads", "s3:PutObject" ], "Resource": [ "arn:aws:s3:::<<S3_BUCKET_NAME>>" ] }, { "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": "arn:aws:logs:<<REGION>>:<<ACCOUNT>>:log-group:/aws/kinesisfirehose/*" }, { "Effect": "Allow", "Action": [ "kms:Decrypt" ], "Resource": "*" }, { "Effect": "Allow", "Action": [ "lambda:InvokeFunction", "lambda:GetFunctionConfiguration" ], "Resource": "*" } ] } 3.2.5 Lambda Transform (Decode/Decrypt/Normalize) A Lambda function is required to Base64-decode each record, use AWS KMS decryption permissions to decrypt the DAS payload, and then parse the decrypted content into a structured format such as JSON. The function then returns the transformed records back to Firehose, again Base64-encoded as required by the Firehose contract. The lambda function is written in Python. Here is the lambda function : import base64 import json import zlib import aws_encryption_sdk from aws_encryption_sdk import CommitmentPolicy from aws_encryption_sdk.internal.crypto import WrappingKey from aws_encryption_sdk.key_providers.raw import RawMasterKeyProvider from aws_encryption_sdk.identifiers import WrappingAlgorithm, EncryptionKeyType import boto3 REGION_NAME = 'us-west-2' RESOURCE_ID = 'cluster-UK47KWG6GKM2UEEBZOQWPXLATM' STREAM_NAME = 'aws-rds-das-cluster-UK47KWG6GKM2UEEBZOQWPXLATM' enc_client = aws_encryption_sdk.EncryptionSDKClient( commitment_policy=CommitmentPolicy.FORBID_ENCRYPT_ALLOW_DECRYPT ) class MyRawMasterKeyProvider(RawMasterKeyProvider): provider_id = "BC" def __new__(cls, *args, **kwargs): obj = super(RawMasterKeyProvider, cls).__new__(cls) return obj def __init__(self, plain_key): RawMasterKeyProvider.__init__(self) self.wrapping_key = WrappingKey( wrapping_algorithm=WrappingAlgorithm.AES_256_GCM_IV12_TAG16_NO_PADDING, wrapping_key=plain_key, wrapping_key_type=EncryptionKeyType.SYMMETRIC ) def _get_raw_key(self, key_id): return self.wrapping_key def decrypt_payload(payload, data_key): my_key_provider = MyRawMasterKeyProvider(data_key) my_key_provider.add_master_key("DataKey") decrypted_plaintext, header = enc_client.decrypt( source=payload, materials_manager=aws_encryption_sdk.materials_managers.default.DefaultCryptoMaterialsManager( master_key_provider=my_key_provider ) ) return decrypted_plaintext def decrypt_decompress(payload, key): decrypted = decrypt_payload(payload, key) return zlib.decompress(decrypted, zlib.MAX_WBITS + 16) def run(event): session = boto3.session.Session() kms = session.client('kms', region_name=REGION_NAME) decrypted_records = [] try: # Process each record from Kinesis Firehose for record in event['records']: try: # Decode the base64 encoded data from Firehose record_data = json.loads(base64.b64decode(record['data'])) # Decode the database activity events and key payload_decoded = base64.b64decode(record_data['databaseActivityEvents']) data_key_decoded = base64.b64decode(record_data['key']) # Decrypt the data key using KMS data_key_decrypt_result = kms.decrypt( CiphertextBlob=data_key_decoded, EncryptionContext={'aws:rds:dbc-id': RESOURCE_ID} ) # Decrypt and decompress the payload decrypted_record = decrypt_decompress( payload_decoded, data_key_decrypt_result['Plaintext'] ) # Format the output for Firehose decrypted_records.append({ 'recordId': record['recordId'], 'result': 'Ok', 'data': base64.b64encode(decrypted_record).decode('utf-8') }) except Exception as e: # Handle individual record processing errors print(f"Error processing record: {str(e)}") decrypted_records.append({ 'recordId': record['recordId'], 'result': 'ProcessingFailed', 'data': record['data'] }) except Exception as e: # Handle general processing errors print(f"Error processing events: {str(e)}") raise e return decrypted_records def lambda_handler(event, context): print("Received event:", json.dumps(event, indent=2)) try: # Process the records processed_records = run(event) print(f"Successfully processed {len(processed_records)} records") return {'records': processed_records} except Exception as e: print(f"Error in lambda_handler: {str(e)}") raise e import base64 import json import zlib import aws_encryption_sdk from aws_encryption_sdk import CommitmentPolicy from aws_encryption_sdk.internal.crypto import WrappingKey from aws_encryption_sdk.key_providers.raw import RawMasterKeyProvider from aws_encryption_sdk.identifiers import WrappingAlgorithm, EncryptionKeyType import boto3 REGION_NAME = 'us-west-2' RESOURCE_ID = 'cluster-UK47KWG6GKM2UEEBZOQWPXLATM' STREAM_NAME = 'aws-rds-das-cluster-UK47KWG6GKM2UEEBZOQWPXLATM' enc_client = aws_encryption_sdk.EncryptionSDKClient( commitment_policy=CommitmentPolicy.FORBID_ENCRYPT_ALLOW_DECRYPT ) class MyRawMasterKeyProvider(RawMasterKeyProvider): provider_id = "BC" def __new__(cls, *args, **kwargs): obj = super(RawMasterKeyProvider, cls).__new__(cls) return obj def __init__(self, plain_key): RawMasterKeyProvider.__init__(self) self.wrapping_key = WrappingKey( wrapping_algorithm=WrappingAlgorithm.AES_256_GCM_IV12_TAG16_NO_PADDING, wrapping_key=plain_key, wrapping_key_type=EncryptionKeyType.SYMMETRIC ) def _get_raw_key(self, key_id): return self.wrapping_key def decrypt_payload(payload, data_key): my_key_provider = MyRawMasterKeyProvider(data_key) my_key_provider.add_master_key("DataKey") decrypted_plaintext, header = enc_client.decrypt( source=payload, materials_manager=aws_encryption_sdk.materials_managers.default.DefaultCryptoMaterialsManager( master_key_provider=my_key_provider ) ) return decrypted_plaintext def decrypt_decompress(payload, key): decrypted = decrypt_payload(payload, key) return zlib.decompress(decrypted, zlib.MAX_WBITS + 16) def run(event): session = boto3.session.Session() kms = session.client('kms', region_name=REGION_NAME) decrypted_records = [] try: # Process each record from Kinesis Firehose for record in event['records']: try: # Decode the base64 encoded data from Firehose record_data = json.loads(base64.b64decode(record['data'])) # Decode the database activity events and key payload_decoded = base64.b64decode(record_data['databaseActivityEvents']) data_key_decoded = base64.b64decode(record_data['key']) # Decrypt the data key using KMS data_key_decrypt_result = kms.decrypt( CiphertextBlob=data_key_decoded, EncryptionContext={'aws:rds:dbc-id': RESOURCE_ID} ) # Decrypt and decompress the payload decrypted_record = decrypt_decompress( payload_decoded, data_key_decrypt_result['Plaintext'] ) # Format the output for Firehose decrypted_records.append({ 'recordId': record['recordId'], 'result': 'Ok', 'data': base64.b64encode(decrypted_record).decode('utf-8') }) except Exception as e: # Handle individual record processing errors print(f"Error processing record: {str(e)}") decrypted_records.append({ 'recordId': record['recordId'], 'result': 'ProcessingFailed', 'data': record['data'] }) except Exception as e: # Handle general processing errors print(f"Error processing events: {str(e)}") raise e return decrypted_records def lambda_handler(event, context): print("Received event:", json.dumps(event, indent=2)) try: # Process the records processed_records = run(event) print(f"Successfully processed {len(processed_records)} records") return {'records': processed_records} except Exception as e: print(f"Error in lambda_handler: {str(e)}") raise e 3.2.6 S3 Data Lake (Storage) Firehose then buffers the decrypted and transformed records and delivers them to the configured destination, typically Amazon S3, where they are stored as JSON files using a time-based or environment-based prefix structure. If any records fail transformation, Firehose writes them to a separate error prefix for investigation, without blocking the rest of the pipeline. Amazon S3 Here is the screenshot of sample JSON file stored at S3 location : 3.2.7 Athena (Query Layer) From that point onward, downstream analytics tools such as Amazon Athena can query the JSON data stored in S3 using SQL and JSON extraction functions. Queries can be filtered by time range, database identifier, user, or activity type without impacting the production database. Amazon Athena CREATE EXTERNAL TABLE `rds_das_activity_poc_datastream_new`( `databaseactivityeventlist` array<struct<dbusername:string,command:string,databasename:string,commandtext:string,remotehost:string,objecttype:string,objectname:string,serverhost:string,starttime:string,endtime:string,rowcount:int>> COMMENT 'from deserializer') ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat' LOCATION 's3://poc-datastreams/' TBLPROPERTIES ( 'has_encrypted_data'='true', 'projection.date.format'='yyyy/MM/dd', 'projection.date.interval'='1', 'projection.date.interval.unit'='DAYS', 'projection.date.range'='2021/07/01, NOW', 'projection.date.type'='date', 'projection.db.type'='injected', 'projection.enabled'='true', 'storage.location.template'='s3://poc-datastreams/${date}/', 'transient_lastDdlTime'='1761322978', 'use.null.for.invalid.data'='true'); CREATE EXTERNAL TABLE `rds_das_activity_poc_datastream_new`( `databaseactivityeventlist` array<struct<dbusername:string,command:string,databasename:string,commandtext:string,remotehost:string,objecttype:string,objectname:string,serverhost:string,starttime:string,endtime:string,rowcount:int>> COMMENT 'from deserializer') ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat' LOCATION 's3://poc-datastreams/' TBLPROPERTIES ( 'has_encrypted_data'='true', 'projection.date.format'='yyyy/MM/dd', 'projection.date.interval'='1', 'projection.date.interval.unit'='DAYS', 'projection.date.range'='2021/07/01, NOW', 'projection.date.type'='date', 'projection.db.type'='injected', 'projection.enabled'='true', 'storage.location.template'='s3://poc-datastreams/${date}/', 'transient_lastDdlTime'='1761322978', 'use.null.for.invalid.data'='true'); Once a table is created, you can use the CROSS JOIN UNNEST function or JSON_TABLE (MySQL 8+ only) to query this external table. Here is the sample query : SELECT e.dbusername,e.command,e.databasename,e.commandtext,e.remotehost,e.objecttype,e.serverhost,e.rowcount FROM rds_das_activity_poc_datastream_new CROSS JOIN UNNEST(databaseactivityeventlist) AS t(e) WHERE e.commandtext like 'select%' LIMIT 5; SELECT e.dbusername,e.command,e.databasename,e.commandtext,e.remotehost,e.objecttype,e.serverhost,e.rowcount FROM rds_das_activity_poc_datastream_new CROSS JOIN UNNEST(databaseactivityeventlist) AS t(e) WHERE e.commandtext like 'select%' LIMIT 5; You will get an output as shown in the screenshot below. You can modify the query as per your requirement. 4. Use Cases Here are the key use cases for a Database Activity Stream (DAS) pipeline, explained clearly and concisely: Database Activity Stream (DAS) Security monitoring & threat detection Security monitoring & threat detection Security monitoring & threat detection Capture real-time database activity to detect suspicious behavior such as unauthorized access, privilege escalation, or SQL injection attempts. Compliance & auditing Compliance & auditing Compliance & auditing Meet regulatory requirements (PCI DSS, SOC 2, HIPAA, GDPR) by maintaining an immutable, auditable log of database activities. Forensics & incident investigation Forensics & incident investigation Forensics & incident investigation Analyze historical database activity during security incidents to understand who did what and when. User behavior analytics User behavior analytics User behavior analytics Track database access patterns across applications, users, and roles to identify anomalies or misuse. Data exfiltration detection Data exfiltration detection Data exfiltration detection Monitor large or unusual read operations that may indicate data theft or insider threats. Centralized logging & SIEM integration Centralized logging & SIEM integration Centralized logging & SIEM integration Stream DAS data into S3, CloudWatch, or third-party SIEM tools (Splunk, Datadog, Elastic) for correlation with other security logs. Operational insights Operational insights Operational insights Analyze query activity and workload trends without impacting database performance. 5. Tools A Database Activity Stream (DAS) pipeline typically uses the following AWS tools and services, each playing a specific role in securely streaming and processing database activity data: Database Activity Stream (DAS) pipeline Amazon RDS Database Activity Streams (DAS) Amazon RDS Database Activity Streams (DAS) Amazon RDS Database Activity Streams (DAS) Captures near–real-time, immutable, and encrypted database activity events. Amazon Kinesis Data Streams Amazon Kinesis Data Streams Amazon Kinesis Data Streams Receives DAS events from RDS and acts as the streaming source. AWS Key Management Service (KMS) AWS Key Management Service (KMS) AWS Key Management Service (KMS) Encrypts DAS data and controls decryption access. AWS Identity and Access Management (IAM) AWS Identity and Access Management (IAM) AWS Identity and Access Management (IAM) Defines and enforces who can access each service in the pipeline, ensuring least-privilege security. Amazon Kinesis Data Firehose Amazon Kinesis Data Firehose Amazon Kinesis Data Firehose Reads from Kinesis, optionally transforms data, and delivers it to destinations. AWS Lambda (optional) AWS Lambda (optional) AWS Lambda (optional) Performs real-time data transformation, filtering, or enrichment. Amazon S3 Amazon S3 Amazon S3 Stores DAS data for long-term retention and analysis. Amazon CloudWatch Logs Amazon CloudWatch Logs Amazon CloudWatch Logs Provides monitoring and troubleshooting for Firehose and Lambda. AWS Athena AWS Athena AWS Athena Enables serverless SQL queries on DAS data stored in S3 for audit, security analysis, and compliance reporting. SIEM / Analytics tools (optional) SIEM / Analytics tools (optional) SIEM / Analytics tools (optional) Supports advanced security monitoring and alerting (e.g., Splunk, Datadog, Elastic). 6. Conclusion The Database Activity Stream (DAS) pipeline provides a secure, scalable, and near–real-time mechanism to capture and analyze database activity without impacting database performance. By integrating AWS services such as RDS DAS, Kinesis, KMS, IAM, Firehose, S3, Lambda, CloudWatch, and Athena, the pipeline ensures sensitive audit data is encrypted, access-controlled, reliably delivered, and easily queryable. Overall, a well-designed DAS pipeline strengthens security posture, supports regulatory compliance, enables rapid incident investigation, and provides valuable insights into database usage making it a critical component of a modern cloud security and observability strategy.