Recently I started using AWS CloudWatch Log Insights and I find the tool really useful to extract data about the systems I'm running without having to set up dedicated monitoring tools, which come with their own set of permissions, rules, configuration language, and so forth.
Log Insights allow you to query log outputs with a language based on regular expressions with hints of SQL and to produce tables or graphs of quantities that you need to monitor. For example, the system I am monitoring runs Celry in ECS containers that log received tasks with a line like the following
16:39:11,156 [32mINFO [0m [34m[celery.worker.strategy][0m [01mReceived task: lib.tasks.lists.trigger_list_log_notification[9b33b464-d4f9-4909-8d4e-1a3134fead97] [0m
In this case the specific function in the system that was triggered is
lib.tasks.log_notification
, and I'm interested in knowing which functions are called the most, so I can easily count them withparse @message /\[celery\.(?<source>[a-z.]+)\].*Received task: (?<task>[a-z._]+)\[/
| filter not isblank(source)
| stats count(*) as number by task
| sort number desc
| limit 9
This gives me a nice table of the top 9
source
functions and the number of task
submitted for each, and the time frame can be adjusted with the usual CloudWatch controls1 lib.tasks.lists.trigger_list_log_notification 4559
2 lib.tasks.notify.notify_recipient 397
3 lib.message._send_mobile_push_notification 353
4 lib.tasks.jobs.check_job_cutoffs 178
5 lib.tasks.notify.check_message_cutoffs 177
6 lib.tasks.notify.check_notification_retry 177
7 lib.tasks.notify.async_list_response 81
8 lib.tasks.hmrc_poll.govtalk_periodic_poll 59
9 lib.tasks.lists.recalculate_list_entry 56
Using time bins, quantities can also be easily plotted. For example, I can process and visualise the number of received tasks with
parse @message /\[celery\.(?<source>[a-z.]+)\].*Received task: (?<task>[a-z._]+)\[/
| filter not isblank(source)
| stats count(*) by bin(30s)
Unfortunately, I quickly discovered an important limitation of Log Insights, that is queries are not metrics. Which also immediately implies that I can't set up alarms on those queries. As fun as it is to look at nice plots, I need something automatic that sends me messages or scales up systems in reaction to specific events such as "too many submitted tasks".
The standard solution to this problem suggested by AWS is to write a Lambda that runs the query and stores the value into a custom CloudWatch metric, which I can then use to satisfy my automation needs. I did it, and in this post I will show you exactly how, using Terraform, Python and Zappa, CloudWatch, and DynamoDB. At the end of the post I will also briefly discuss the cost of the solution.
Before I get into the details of the specific tools or solutions that I decided to implement, let me have a look at the bigger picture. The initial idea is very simple: a Lambda function can run a specific Log Insights query and store the results in a custom metric, which can in turn be used to trigger alarms and other actions.
For a single system I already have 4 or 5 of these queries that I'd like to run, and I have multiple systems, so I'd prefer to have a solution that doesn't require me to deploy and maintain a different Lambda for each query. The maintenance can be clearly automated as well, but such a solution smells of duplicated code miles away, and if there is no specific reason to go down that road I prefer to avoid it.
Since Log Insights queries are just strings of code, however, we can store them somewhere and then simply loop on all of them within the same Lambda function. To implement this, I created a DynamoDB table and every element contains all the data I need to run each query, such as the log group that I want to investigate and the name of the target metric.
In the following sections, I will discuss the main components of the solution from the infrastructural point of view, showing how I created them with Terraform. The four main AWS services that I will use are: DynamoDB, Lambda, IAM, CloudWatch.
I put the bulk of the code in a module so that I can easily create the same structure for multiple AWS accounts. While my current setup is a bit more complicated that that, the structure of the code can be simplified as
+ common
+ lambda-loginsights2metrics
+ cloudwatch.tf
+ dynamodb.tf
+ iam.tf
+ lambda.tf
+ variables.tf
+ account1
+ lambda-loginsights2metrics
+ main.tf
+ variables.tf
Since I will refer to them in the following sections, let me show you the four variables I defined for this module.
First I need to receive the items that I need to store in the DynamoDB table
common/lambda-loginsights2metrics/variables.tf
variable "items" {
type = "list"
default = []
}
I prefer to have a prefix in front of my components that allows me to duplicate them without clashes
common/lambda-loginsights2metrics/variables.tf
variable "prefix" {
type = "string"
default = "loginsights2metrics"
}
The Lambda function will require a list of security groups that grant access to specific network components
common/lambda-loginsights2metrics/variables.tf
variable "security_groups" {
type = "list"
default = []
}
Finally, Lambda functions need to be told which VPC subnets they can use to run
common/lambda-loginsights2metrics/variables.tf
variable "vpc_subnets" {
type = "list"
default = []
}
Resources
Let's start with the corner stone, which is the DynamoDB table that
contains data for the queries. As DynamoDB is not a SQL database we
don't need to define columns in advance. This clearly might get us into
trouble later, so we need to be careful and be consistent when we write
items, adding everything is needed by the Lambda code.
common/lambda-loginsights2metrics/dynamodb.tf
resource "aws_dynamodb_table" "loginsights2metrics" {
name = "${var.prefix}-items"
billing_mode = "PAY_PER_REQUEST"
hash_key = "SlotName"
attribute {
name = "SlotName"
type = "S"
}
}
Speaking of items, I assume I will pass them when I call the module, so here I just need to loop on the input variable
items
common/lambda-loginsights2metrics/dynamodb.tf
resource "aws_dynamodb_table_item" "item" {
count = length(var.items)
table_name = aws_dynamodb_table.loginsights2metrics.name
hash_key = aws_dynamodb_table.loginsights2metrics.hash_key
item = jsonencode(element(var.items, count.index))
}
Since the query is written as a Terraform string and will be read from Python there are two small caveats here. To be consistent with Terraform's syntax we need to escape double quotes in the query, and to avoid fights with Python we need to escape backslashes. So for example a valid query like
parse @message /\[celery\.(?<source>[a-z.]+)\].*Received task: (?<task>[a-z._]+)\[/
| filter not isblank(source)
| stats count(*) as Value by bin(1m)
will be stored as
"parse @message /\\[celery\\.(?<source>[a-z.]+)\\].*Received task: (?<task>[a-z._]+)\\[/ | filter not isblank(source) | stats count(*) as Value by bin(1m)"
Another remark is that the Lambda I will write in Python will read data plotted with the name
Value
on bins of 1 minute, so the query should end with stats X as Value by bin(1m)
where X
is a specific stat, for example stats count(*) as Value by bin(1m)
.The reason behind 1 minute is that the maximum standard resolution of CloudWatch metrics is 1 minute. Should you want more you need to have a look at CloudWatch High-Resolution Metrics.
Resources
IAM roles are central in AWS. In this specific case we have the so-called Lambda execution role, which is the IAM role that the Lambda assumes when you run it. In AWS users or services (that is humans or AWS components) assume a role, receiving the permissions connected with it. To assume roles, however, they need to have a specific permission, a so-called trust policy.
Let's define a trust policy that allows the Lambda service to assume the role that we will define
common/lambda-loginsights2metrics/iam.tf
data "aws_iam_policy_document" "trust" {
statement {
actions = ["sts:AssumeRole"]
principals {
type = "Service"
identifiers = [
"lambda.amazonaws.com"
]
}
}
}
and after that the role in question
common/lambda-loginsights2metrics/iam.tf
resource "aws_iam_role" "loginsights2metrics" {
name = var.prefix
assume_role_policy = data.aws_iam_policy_document.trust.json
}
To run, Lambdas need an initial set of permissions which can be found in the canned policy
AWSLambdaVPCAccessExecutionRole
. You can see the content of the policy in the IAM console or dumping it with aws iam get-policy
and aws iam get-policy-version
$ aws iam get-policy --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole
{
"Policy": {
"PolicyName": "AWSLambdaVPCAccessExecutionRole",
"PolicyId": "ANPAJVTME3YLVNL72YR2K",
"Arn": "arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole",
"Path": "/service-role/",
"DefaultVersionId": "v2",
"AttachmentCount": 0,
"PermissionsBoundaryUsageCount": 0,
"IsAttachable": true,
"Description": "Provides minimum permissions for a Lambda function to execute while accessing a resource within a VPC - create, describe, delete network interfaces and write permissions to CloudWatch Logs. ",
"CreateDate": "2016-02-11T23:15:26Z",
"UpdateDate": "2020-10-15T22:53:03Z"
}
}
$ aws iam get-policy-version --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole --version-id v2
{
"PolicyVersion": {
"Document": {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents",
"ec2:CreateNetworkInterface",
"ec2:DescribeNetworkInterfaces",
"ec2:DeleteNetworkInterface",
"ec2:AssignPrivateIpAddresses",
"ec2:UnassignPrivateIpAddresses"
],
"Resource": "*"
}
]
},
"VersionId": "v2",
"IsDefaultVersion": true,
"CreateDate": "2020-10-15T22:53:03Z"
}
}
Attaching a canned policy is just a matter of creating a specific
aws_iam_role_policy_attachment
resourcecommon/lambda-loginsights2metrics/iam.tf
resource "aws_iam_role_policy_attachment" "loginsights2metrics-" {
role = aws_iam_role.loginsights2metrics.name
policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole"
}
Now that we have the IAM role and the basic policy we can assign custom permissions to it. We need to grant the Lambda permissions on other AWS components, namely CloudWatch to run Log Insights queries and to store metrics and DynamoDB to retrieve all the items from the queries table.
common/lambda-loginsights2metrics/iam.tf
data "aws_iam_policy_document" "loginsights2metrics" {
statement {
actions = [
"cloudwatch:PutMetricData",
"cloudwatch:PutMetricAlarm",
"logs:StartQuery",
"logs:GetQueryResults",
"logs:GetLogEvents",
]
resources = ["*"]
}
statement {
actions = [
"dynamodb:Scan"
]
resources = [aws_dynamodb_table.loginsights2metrics.arn]
}
}
Through
aws_iam_role_policy
we can create and assign the policy out of a data structurecommon/lambda-loginsights2metrics/iam.tf
resource "aws_iam_role_policy" "loginsights2metrics" {
name = var.prefix
role = aws_iam_role.loginsights2metrics.name
policy = data.aws_iam_policy_document.loginsights2metrics.json
}
Resources
We can now create the Lambda function container. I do not use Terraform as a deployer, as I think it should be used to define static infrastructure only, so I will use a dummy function here and later deploy the real code using the AWS CLI.
The dummy function can be easily created with
common/lambda-loginsights2metrics/lambda.tf
data "archive_file" "dummy" {
type = "zip"
output_path = "${path.module}/lambda.zip"
source {
content = "dummy"
filename = "dummy.txt"
}
}
The Lambda function is a bit more complicated. As I mentioned, I'll use Zappa to package the function, so the
handler
has to be "zappa.handler.lambda_handler"
. The IAM role given to the function is the one we defined previously, while memory_size
and timeout
The environment variables allow me to inject the name of the DynamoDB table so that I don't need to hardcode it. I also pass another variable, the Sentry DSN that I use in my configuration. This is not essential for the problem at hand, but I left it there to show how to pass such values.
common/lambda-loginsights2metrics/lambda.tf
resource "aws_lambda_function" "loginsights2metrics" {
function_name = "loginsights2metrics"
handler = "zappa.handler.lambda_handler"
runtime = "python3.8"
filename = data.archive_file.dummy.output_path
role = aws_iam_role.loginsights2metrics.arn
memory_size = 128
timeout = 300
vpc_config {
subnet_ids = var.vpc_subnets
security_group_ids = var.security_groups
}
environment {
variables = {
"SENTRY_DSN" = "https://XXXXXX:@sentry.io/YYYYYY",
"DYNAMODB_TABLE" = aws_dynamodb_table.loginsights2metrics.name
}
}
lifecycle {
ignore_changes = [last_modified, filename]
}
}
Please note that I instructed Terraform to ignore changes to the two attributes
last_modified
and filename
, and that I haven't used any source_code_hash
. This way I can safely apply Terraform to change parameters like memory_size
or timeout
without affecting what I deployed with the CI.Since I want to trigger the function from AWS CloudWatch Events I need to grant the service
events.amazonaws.com
the lambda:InvokeFunction
permission.common/lambda-loginsights2metrics/lambda.tf
resource "aws_lambda_permission" "loginsights2metrics" {
statement_id = "AllowExecutionFromCloudWatch"
action = "lambda:InvokeFunction"
function_name = aws_lambda_function.loginsights2metrics.function_name
principal = "events.amazonaws.com"
source_arn = aws_cloudwatch_event_rule.rate.arn
}
Resources
Since 2018 Lambdas have a maximum execution time of 15 minutes (900 seconds), which is more than enough for many services, but to be conservative I preferred to leverage Zappa's asynchronous calls and to make the main Lambda call itself for each query. The Lambda doesn't clearly call the same Python function (it's not recursive), but from AWS's point of view we have a Lambda that calls itself, so we need to give it a specific permission to do this.
common/lambda-loginsights2metrics/iam.tf
data "aws_iam_policy_document" "loginsights2metrics_exec" {
statement {
actions = [
"lambda:InvokeAsync",
"lambda:InvokeFunction"
]
resources = [aws_lambda_function.loginsights2metrics.arn]
}
}
I could not define this when I defined the rest of the IAM components because this needs the Lambda to be defined, but the resource is in the same file. Terraform doesn't care about which resource we defined first and where we define it as long as there are no loops in the definitions.
We can now assign the newly created policy document to the IAM role we created previously
common/lambda-loginsights2metrics/iam.tf
resource "aws_iam_role_policy" "loginsights2metrics_exec" {
name = "${var.prefix}-exec"
role = aws_iam_role.loginsights2metrics.name
policy = data.aws_iam_policy_document.loginsights2metrics_exec.json
}
Resources
Whenever you need to run Lambdas (or other things) periodically, the standard AWS solution is to use CloudWatch Events, which work as the AWS cron system. CloudWatch Events are made of rules and targets, so first of all I defined a rule that gets triggered every 2 minutes
common/lambda-loginsights2metrics/cloudwatch.tf
resource "aws_cloudwatch_event_rule" "rate" {
# Zappa requires the name to match the processing function
name = "main.loginsights2metrics"
description = "Trigger Lambda ${var.prefix}"
schedule_expression = "rate(2 minutes)"
}
Please note that Zappa has a specific requirement for CloudWatch Events, so I left a comment to clarify this to my future self. The second part of the event is the target, which is the Lambda function that we defined in the previous section.
common/lambda-loginsights2metrics/cloudwatch.tf
resource "aws_cloudwatch_event_target" "lambda" {
rule = aws_cloudwatch_event_rule.rate.name
target_id = "${var.prefix}-target"
arn = aws_lambda_function.loginsights2metrics.arn
}
Resources
Now the module is finished, so I just need to create some items for the DynamoDB table and to call the module itself
account1/lambda-loginsights2metrics/main.tf
locals {
items = [
{
"SlotName" : {
"S" : "Celery Logs submitted tasks"
},
"LogGroup" : {
"S" : "mycluster/celery",
},
"ClusterName" : {
"S" : "mycluster"
},
"Query" : {
"S" : "parse @message /\\[celery\\.(?<source>[a-z.]+)\\].*Received task: (?<task>[a-z._]+)\\[/ | filter not isblank(source) | stats count(*) as Value by bin(1m)",
},
"Namespace" : {
"S" : "Custom"
},
"MetricName" : {
"S" : "Submitted tasks"
}
},
{
"SlotName" : {
"S" : "Celery Logs succeeded tasks"
},
"LogGroup" : {
"S" : "mycluster/celery",
},
"ClusterName" : {
"S" : "mycluster"
},
"Query" : {
"S" : "parse @message /\\[celery.(?<source>[a-z\\._]+)].*Task (?<task>[a-z\\._]+)\\[.*\\] (?<event>[a-z]+)/ | filter source = \"app.trace\" | filter event = \"succeeded\" | stats count(*) as Value by bin(1m)",
},
"Namespace" : {
"S" : "Custom"
},
"MetricName" : {
"S" : "Succeeded tasks"
}
},
{
"SlotName" : {
"S" : "Celery Logs retried tasks"
},
"LogGroup" : {
"S" : "mycluster/celery",
},
"ClusterName" : {
"S" : "mycluster"
},
"Query" : {
"S" : "parse @message /\\[celery.(?<source>[a-z\\._]+)].*Task (?<task>[a-z\\._]+)\\[.*\\] (?<event>[a-z]+)/ | filter source = \"app.trace\" | filter event = \"retry\" | stats count(*) as Value by bin(1m)",
},
"Namespace" : {
"S" : "Custom"
},
"MetricName" : {
"S" : "Retried tasks"
}
}
]
}
I need to provide a security group for the Lambda, and in this case I can safely use the default one provided by the VPC
account1/lambda-loginsights2metrics/main.tf
data "aws_security_group" "default" {
name = "default"
vpc_id = var.vpc_id
}
And I can finally call the module
account1/lambda-loginsights2metrics/main.tf
module "loginsights2metrics" {
source = "../../common/lambda-loginsights2metrics"
items = local.items
security_groups = [data.aws_security_group.default.id]
vpc_subnets = var.vpc_private_subnets
}
Please note that the variable
vpc_private_subnets
is a list of subnet names that I created in another module.As I mentioned before, the Python code of the Lambda function is contained in a different repository and deployed with the CI using Zappa. Given we are interacting with AWS I am clearly using Boto3, the AWS SDK for Python. The code was developed locally without Zappa's support, to test out the Boto3 functions I wanted to use, then quickly adjusted to be executed in a Lambda.
I think the code is pretty straightforward, but I left my original comments to be sure everything is clear.
import os
import time
import json
from datetime import datetime, timedelta
import boto3
from zappa.asynchronous import task
# CONFIG
logs = boto3.client("logs", region_name="eu-west-1")
cw = boto3.client("cloudwatch", region_name="eu-west-1")
dynamodb = boto3.resource("dynamodb", region_name="eu-west-1")
@task
def put_metric_data(item):
slot_name = item["SlotName"]
log_group = item["LogGroup"]
cluster_name = item["ClusterName"]
query = item["Query"]
namespace = item["Namespace"]
metric_name = item["MetricName"]
# This runs the Log Insights query fetching data
# for the last 15 minutes.
# As we deal with logs processing it's entirely possible
# for the metric to be updated, for example because
# a log was received a bit later.
# When we put multiple values for the same timestamp
# in the metric CW can show max, min, avg, and percentiles.
# Since this is an update of a count we should then always
# use "max".
start_query_response = logs.start_query(
logGroupName=log_group,
startTime=int((datetime.now() - timedelta(minutes=15)).timestamp()),
endTime=int(datetime.now().timestamp()),
queryString=query,
)
query_id = start_query_response["queryId"]
# Just polling the API. 5 seconds seems to be a good
# compromise between not pestering the API and not paying
# too much for the Lambda.
response = None
while response is None or response["status"] == "Running":
print(f"{slot_name}: waiting for query to complete ...")
time.sleep(5)
response = logs.get_query_results(queryId=query_id)
# Data comes in a strange format, a dictionary of
# {"field":name,"value":actual_value}, so this converts
# it into something that can be accessed through keys
data = []
for d in response["results"]:
sample = {}
for i in d:
field = i["field"]
value = i["value"]
sample[field] = value
data.append(sample)
# Now that we have the data, let's put them into a metric.
for d in data:
timestamp = datetime.strptime(d["bin(1m)"], "%Y-%m-%d %H:%M:%S.000")
value = int(d["Value"])
print(f"{slot_name}: putting {value} on {timestamp}")
cw.put_metric_data(
Namespace=namespace,
MetricData=[
{
"MetricName": metric_name,
"Dimensions": [{"Name": "Cluster", "Value": cluster_name}],
"Timestamp": timestamp,
"Value": value,
"Unit": "None",
}
],
)
def loginsights2metrics(event, context):
with open("package_info.json", "r") as f:
package_info = json.load(f)
build_timestamp = int(package_info["build_time"])
build_datetime = datetime.fromtimestamp(build_timestamp)
print("###################################")
print(
"LogInsights2Metrics - Build date: "
f'{build_datetime.strftime("%Y/%m/%d %H:%M:%S")}'
)
print("###################################")
print(f'Reading task from DynamoDB table {os.environ["DYNAMODB_TABLE"]}')
table = dynamodb.Table(os.environ["DYNAMODB_TABLE"])
# This is the simplest way to get all entries in the table
# The next loop will asynchronously call `put_metric_data`
# on each entry.
response = table.scan(Select="ALL_ATTRIBUTES")
for i in response["Items"]:
print(f"* Processing item {i['SlotName']}")
put_metric_data(i)
So, when the Lambda is executed, the entry point is the function
loginsights2metrics
which queries the DynamoDB table and loops over all the items contained in it. The loop executes the function put_metric_data
which being a Zappa task runs it in a new Lambda invocation. This function runs the Log Insights query, adjusts Boto3's output, and finally puts the values in the custom metric.The problem I mention in the comment just before I run
logs.start_query
is interesting. Log Insights are queries, and since they extract data from logs the result can change between two calls of the same query. This means that, since there is an overlap between calls (we run a query on the last 15 minutes every 2 minutes), the function will put multiple values in the same bin of the metric. This is perfectly normal, and The Zappa settings I use for the function are
zappa_settings.json
{
"main": {
"app_module": "main",
"app_function": "main.loginsights2metrics",
"runtime": "python3.8",
"log_level": "WARNING",
"xray_tracing": true,
"exception_handler": "zappa_sentry.unhandled_exceptions"
}
}
And the requirements are
requirements.txt
zappa
zappa-sentry
Please note that as I mentioned before
zappa-sentry
is not a strict requirement for this solution.The code can be packaged and deployed with a simple bash script like
#!/bin/bash
VENV_DIRECTORY=venv
LAMBDA_PACKAGE=lambda.zip
REGION=eu-west-1
FUNCTION_NAME=loginsights2metrics
if [[ -d ${VENV_DIRECTORY} ]]; then rm -fR ${VENV_DIRECTORY}; fi
if [[ -f ${LAMBDA_PACKAGE} ]]; then rm -fR ${LAMBDA_PACKAGE}; fi
python -m venv ${VENV_DIRECTORY}
source ${VENV_DIRECTORY}/bin/activate
pip install -r requirements.txt
zappa package main -o ${LAMBDA_PACKAGE}
rm -fR ${VENV_DIRECTORY}
aws --region=${REGION} lambda update-function-code --function-name ${FUNCTION_NAME} --zip-file "fileb://${LAMBDA_PACKAGE}"
I will follow here the AWS guide on Lambda pricing and the calculations published in 2018 by my colleague João Neves on his blog.
I assume the following:
Requests:
5 invocations/event * 30 events/hour * 24 hours/day * 31 days/month = 111600 requests
Duration:
0.128 GB/request * 111600 requests * 5 seconds = 71424 GB-second
Total:
$0.20 * 111600 / 10^6 + $0.0000166667 * 71424 ~= $1.22/month
As you can see, for applications like this it's extremely convenient to use a serverless solution like Lambda functions.
Previously published at https://www.thedigitalcatonline.com/blog/2021/03/22/aws-log-insights-as-cloudwatch-metrics-with-python-and-terraform/