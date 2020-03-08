Dev/Avocado at Sematext.com. Co-Founder at Bookvar.co. Author of "Serverless JavaScript by Example"
CloudWatch collects monitoring and operational data in the form of logs, metrics, and events, providing you with a unified view of AWS resources, applications and services that run on AWS, and on-premises servers.
— AWS Documentation
$ npm install -g serverless
$ sls config credentials \
--provider aws \
--key xxxxxxxxxxxxxx \
--secret xxxxxxxxxxxxxx
$ sls create --template aws-nodejs --path lambda-cwlogs-to-logsene
$ cd lambda-cwlogs-to-logsene
$ npm init -y
$ npm i logsene-js zlib serverless-iam-roles-per-function
# serverless.yml
service: lambda-cwlogs-to-logsene
plugins:
- serverless-iam-roles-per-function
custom:
stage: ${opt:stage, self:provider.stage}
secrets: ${file(secrets.json)}
provider:
name: aws
runtime: nodejs8.10
stage: dev
region: ${self:custom.secrets.REGION, 'us-east-1'}
versionFunctions: false
functions:
shipper:
handler: shipper.handler
description: Sends CloudWatch logs from Kinesis to Sematext Elastic Search API
memorySize: 128
timeout: 3
events:
- stream:
type: kinesis
arn:
Fn::GetAtt:
- LogsKinesisStream
- Arn
batchSize: ${self:custom.secrets.BATCH_SIZE}
startingPosition: LATEST
enabled: true
environment:
LOGS_TOKEN: ${self:custom.secrets.LOGS_TOKEN}
LOGS_BULK_SIZE: 100
LOG_INTERVAL: 2000
subscriber:
handler: subscriber.handler
description: Subscribe all CloudWatch log groups to Kinesis
memorySize: 128
timeout: 30
events:
- http:
path: subscribe
method: get
- cloudwatchEvent:
event:
source:
- aws.logs
detail-type:
- AWS API Call via CloudTrail
detail:
eventSource:
- logs.amazonaws.com
eventName:
- CreateLogGroup
- schedule:
rate: rate(60 minutes)
iamRoleStatements:
- Effect: "Allow"
Action:
- "iam:PassRole"
- "sts:AssumeRole"
- "logs:PutSubscriptionFilter"
- "logs:DeleteSubscriptionFilter"
- "logs:DescribeSubscriptionFilters"
- "logs:DescribeLogGroups"
- "logs:PutRetentionPolicy"
Resource: "*"
environment:
filterName: ${self:custom.stage}-${self:provider.region}
region: ${self:provider.region}
shipperFunctionName: "shipper"
subscriberFunctionName: "subscriber"
prefix: "/aws/lambda"
retentionDays: ${self:custom.secrets.LOG_GROUP_RETENTION_IN_DAYS}
kinesisArn:
Fn::GetAtt:
- LogsKinesisStream
- Arn
roleArn:
Fn::GetAtt:
- CloudWatchLogsRole
- Arn
resources:
Resources:
LogsKinesisStream:
Type: AWS::Kinesis::Stream
Properties:
Name: ${self:service}-${self:custom.stage}-logs
ShardCount: ${self:custom.secrets.KINESIS_SHARD_COUNT}
RetentionPeriodHours: ${self:custom.secrets.KINESIS_RETENTION_IN_HOURS}
CloudWatchLogsRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Principal:
Service:
- logs.amazonaws.com
Action:
- sts:AssumeRole
Policies:
- PolicyName: root
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action:
- kinesis:PutRecords
- kinesis:PutRecord
Resource:
Fn::GetAtt:
- LogsKinesisStream
- Arn
RoleName: ${self:service}-${self:custom.stage}-cloudwatchrole
file.
secrets.json
{
"LOGS_TOKEN": "your-token",
"REGION": "us-east-1",
"BATCH_SIZE": 1000,
"LOG_GROUP_RETENTION_IN_DAYS": 1,
"KINESIS_RETENTION_IN_HOURS": 24,
"KINESIS_SHARD_COUNT": 1
}
. Paste this snippet in.
subscriber.js
// subscriber.js
const AWS = require('aws-sdk')
AWS.config.region = process.env.region
const cloudWatchLogs = new AWS.CloudWatchLogs()
const prefix = process.env.prefix
const kinesisArn = process.env.kinesisArn
const roleArn = process.env.roleArn
const filterName = process.env.filterName
const retentionDays = process.env.retentionDays
const shipperFunctionName = process.env.shipperFunctionName
const filterPattern = ''
const setRetentionPolicy = async (logGroupName) => {
const params = {
logGroupName: logGroupName,
retentionInDays: retentionDays
}
await cloudWatchLogs.putRetentionPolicy(params).promise()
}
const listLogGroups = async (acc, nextToken) => {
const req = {
limit: 50,
logGroupNamePrefix: prefix,
nextToken: nextToken
}
const res = await cloudWatchLogs.describeLogGroups(req).promise()
const newAcc = acc.concat(res.logGroups.map(logGroup => logGroup.logGroupName))
if (res.nextToken) {
return listLogGroups(newAcc, res.nextToken)
} else {
return newAcc
}
}
const upsertSubscriptionFilter = async (options) => {
console.log('UPSERTING...')
const { subscriptionFilters } = await cloudWatchLogs.describeSubscriptionFilters({ logGroupName: options.logGroupName }).promise()
const { filterName, filterPattern } = subscriptionFilters[0]
if (filterName !== options.filterName || filterPattern !== options.filterPattern) {
await cloudWatchLogs.deleteSubscriptionFilter({
filterName: filterName,
logGroupName: options.logGroupName
}).promise()
await cloudWatchLogs.putSubscriptionFilter(options).promise()
}
}
const subscribe = async (logGroupName) => {
const options = {
destinationArn: kinesisArn,
logGroupName: logGroupName,
filterName: filterName,
filterPattern: filterPattern,
roleArn: roleArn,
distribution: 'ByLogStream'
}
try {
await cloudWatchLogs.putSubscriptionFilter(options).promise()
} catch (err) {
console.log(`FAILED TO SUBSCRIBE [${logGroupName}]`)
console.error(JSON.stringify(err))
await upsertSubscriptionFilter(options)
}
}
const subscribeAll = async (logGroups) => {
await Promise.all(
logGroups.map(async logGroupName => {
if (logGroupName.endsWith(shipperFunctionName)) {
console.log(`SKIPPING [${logGroupName}] BECAUSE IT WILL CREATE CYCLIC EVENTS FROM IT'S OWN LOGS`)
return
}
console.log(`SUBSCRIBING [${logGroupName}]`)
await subscribe(logGroupName)
console.log(`UPDATING RETENTION POLICY TO [${retentionDays} DAYS] FOR [${logGroupName}]`)
await setRetentionPolicy(logGroupName)
})
)
}
const processAll = async () => {
const logGroups = await listLogGroups([])
await subscribeAll(logGroups)
}
exports.handler = async () => {
console.log('subscriber start')
await processAll()
console.log('subscriber done')
return {
statusCode: 200,
body: JSON.stringify({ message: `Subscription successful!` })
}
}
function. It'll grab all Log Groups from CloudWatch which match the prefix, and put them in an easily accessible array. You'll then pass them to a
processAll()
function, which will map through them while subscribing them to the Kinesis stream you defined in the
subscribeAll()
.
serverless.yml
by which logs will get ingested. For now, I’ve chosen to keep it blank and not filter out anything. But, based on your needs you can match it with what kind of pattern your logger of choice creates.
filterPattern
// shipper.js
const Zlib = require('zlib')
const Logsene = require('logsene-js')
const logger = new Logsene(process.env.LOGS_TOKEN)
const errorPatterns = [
'error'
]
const configurationErrorPatterns = [
'module initialization error',
'unable to import module'
]
const timeoutErrorPatterns = [
'task timed out',
'process exited before completing'
]
/**
* Sample of a structured log
* ***************************************************************************
* Timestamp RequestId Message
* 2019-03-08T15:58:45.736Z 53499d7f-60f1-476a-adc8-1e6c6125a67c Hello World!
* ***************************************************************************
*/
const structuredLogPattern = '[0-9]{4}-(0[1-9]|1[0-2])-(0[1-9]|[1-2][0-9]|3[0-1])T(2[0-3]|[01][0-9]):[0-5][0-9]:[0-5][0-9].[0-9][0-9][0-9]Z([ \t])[a-zA-Z0-9]{8}-[a-zA-Z0-9]{4}-[a-zA-Z0-9]{4}-[a-zA-Z0-9]{4}-[a-zA-Z0-9]{12}([ \t])(.*)'
const regexError = new RegExp(errorPatterns.join('|'), 'gi')
const regexConfigurationError = new RegExp(configurationErrorPatterns.join('|'), 'gi')
const regexTimeoutError = new RegExp(timeoutErrorPatterns.join('|'), 'gi')
const regexStructuredLog = new RegExp(structuredLogPattern)
const lambdaVersion = (logStream) => logStream.substring(logStream.indexOf('[') + 1, logStream.indexOf(']'))
const lambdaName = (logGroup) => logGroup.split('/').reverse()[0]
const checkLogError = (log) => {
if (log.message.match(regexError)) {
log.severity = 'error'
log.error = {
type: 'runtime'
}
} else if (log.message.match(regexConfigurationError)) {
log.severity = 'error'
log.error = {
type: 'configuration'
}
} else if (log.message.match(regexTimeoutError)) {
log.severity = 'error'
log.error = {
type: 'timeout'
}
}
return log
}
const splitStructuredLog = (message) => {
const parts = message.split('\t', 3)
return {
timestamp: parts[0],
requestId: parts[1],
msg: parts[2]
}
}
/**
* Create payload for Logsene API
*/
const parseLog = (functionName, functionVersion, message, awsRegion) => {
if (
message.startsWith('START RequestId') ||
message.startsWith('END RequestId') ||
message.startsWith('REPORT RequestId')
) {
return
}
// if log is structured
if (message.match(regexStructuredLog)) {
const { timestamp, requestId, msg } = splitStructuredLog(message)
return checkLogError({
message: msg,
function: functionName,
version: functionVersion,
region: awsRegion,
type: 'lambda',
severity: 'debug',
timestamp: timestamp,
requestId: requestId
})
} else { // when log is NOT structured
return checkLogError({
message: message,
function: functionName,
version: functionVersion,
region: awsRegion,
type: 'lambda',
severity: 'debug'
})
}
}
const parseLogs = (event) => {
const logs = []
event.Records.forEach(record => {
const payload = Buffer.from(record.kinesis.data, 'base64')
const json = (Zlib.gunzipSync(payload)).toString('utf8')
const data = JSON.parse(json)
if (data.messageType === 'CONTROL_MESSAGE') { return }
const functionName = lambdaName(data.logGroup)
const functionVersion = lambdaVersion(data.logStream)
const awsRegion = record.awsRegion
data.logEvents.forEach(logEvent => {
const log = parseLog(functionName, functionVersion, logEvent.message, awsRegion)
if (!log) { return }
logs.push(log)
})
})
return logs
}
const shipLogs = async (logs) => {
return new Promise((resolve) => {
if (!logs.length) { return resolve('No logs to ship.') }
logs.forEach(log => logger.log(log.severity, 'LogseneJS', log))
logger.send(() => resolve('Logs shipped successfully!'))
})
}
exports.handler = async (event) => {
try {
const res = await shipLogs(parseLogs(event))
console.log(res)
} catch (err) {
console.log(err)
return err
}
return 'shipper done'
}
and
parseLogs()
functions. The former will take the event parameter, extract all log events, parse them, add them to an array, and return that array. While the latter will take that same logs array, add every single log event to the LogseneJS buffer, and send them all in one go. The location is the Logs App you created above.
shipLogs()
START RequestId
...
END RequestId
REPORT RequestId
in Node.js).
console.log()
function will skip the START, END, and REPORT log events entirely, and only return user-defined log events as either debug or error based on if they’re user-defined stdout or any type of error in the function runtime, configuration or duration.
parseLog()
Timestamp RequestId Message
2019-03-08T15:58:45.736Z 53499d7f-60f1-476a-adc8-1e6c6125a67c Hello World!
$ sls deploy
[output]
Serverless: Packaging service...
Serverless: Excluding development dependencies...
Serverless: Uploading CloudFormation file to S3...
Serverless: Uploading artifacts...
Serverless: Uploading service .zip file to S3 (2.15 MB)...
Serverless: Validating template...
Serverless: Updating Stack...
Serverless: Checking Stack update progress...
............
Serverless: Stack update finished...
Service Information
service: lambda-cwlogs-to-logsene
stage: dev
region: us-east-1
stack: lambda-cwlogs-to-logsene-dev
api keys:
None
endpoints:
GET - https://.execute-api.us-east-1.amazonaws.com/dev/subscribe
functions:
shipper: lambda-cwlogs-to-logsene-dev-shipper
subscriber: lambda-cwlogs-to-logsene-dev-subscriber
layers:
None
Serverless: Removing old service artifacts from S3…