Yan Cui

@theburningmonk

Applying the pub-sub and push-pull messaging patterns with AWS Lambda

August 4th 2017

AWS offers a wealth of options for implementing messaging patterns such as pub-sub and push-pull with Lambda, let’s compare and contrast some of these options.

This image is completely unrelated but I just thought it’s a cool pattern.

pub-sub

Publish-Subscribe (often shortened to pub-sub) is a messaging pattern where publishers and subscribers are decoupled through an intermediary broker (ZeroMQ, RabbitMQ, SNS, etc.).

From Wikipedia, https://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern

SNS + Lambda

In the AWS ecosystem, the obvious candidate for the broker role is SNS.

SNS will make 3 attempts for your function to process a message before sending it to a Dead Letter Queue (DLQ) if a DLQ is specified for the function. However, according to an analysis by the folks at OpsGenie, the no. of retries can be as many as 6.

Another thing to consider is the degree of parallelism this setup offers. For each message SNS will create a new invocation of your function. So if you publish 100 messages to SNS then you can have 100 concurrent executions of the subscribed Lambda function.

This is great if you’re optimising for throughput.

However, we’re often constrained by the max throughput our downstream dependencies can handle — databases, S3, internal/external services, etc.

If the burst in throughput is short then there’s a good chance the retries would be sufficient (there’s a randomised, exponential back off between retries too) and you won’t miss any messages.

Erred messages are retried 2 times with exponential back off. If the burst is short-lived then the retry is likely to succeed, resulting in no message loss.

If the burst in throughput is sustained over a long period of time, then you can exhaust the max no. of retries. At this point you’ll have to rely on the DLQ and possibly human intervention in order to recover the messages that couldn’t be processed the first time round.

Erred messages are retried 2 times with exponential back off. But the burst in message rate overlaps with the retries, further exasperating the problem and eventually the max no. of retries are exhausted and erred messages have to be delivered to the DLQ instead (if one is specified).

Similarly, if the downstream dependency experiences an outage then all messages received and retried during the outage are bound to fail.

Any message received or retried during the downstream message will fail and be sent to the DLQ.

You can also run into Lambda limit on no. of concurrent executions in a region. Since this is an account wide limit, it will also impact your other systems that rely on AWS Lambda — APIs, event processing, cron jobs, etc.

Kinesis Streams + Lambda

Kinesis Streams differ from SNS in many ways:

  • Lambda polls Kinesis for records up to 5 times a second, whereas SNS would push messages to Lambda
  • records are received in batches (up to your specified maximum), SNS invokes your function with one message
  • if your function returns an error or times out, then you’ll keep receiving the same batch of records until you either successfully process them or the data are no longer available in the stream
  • the degree of parallelism is determined by the no. of shards in the stream as there is one dedicated invocation per shard
  • Kinesis Streams are charged based on no. of records pushed to the stream; shard hours, and whether or not you enable extended retention

SNS is prone to suffer from temporal issues — bursts in traffic, downstream outage, etc. Kinesis on the other hand deals with these issues much better.

  • degree of parallelism is constrained by no. of shards, which can be used to amortise bursts in message rate
Bursts in message rate is amortised, as the max throughput is determined by no. of shards * max batch size * 5 reads per second. Which gives you two levers to adjust the max throughput with.
  • records are retried until success, unless the outage lasts longer than the retention policy you have on the stream (default is 24 hours) you will eventually be able to process the records
The impact of a downstream outage is absorbed by the retry-until-success invocation policy.

But Kinesis Streams is not without its own problems. In fact, from my experience using Kinesis Streams with Lambda I have found a no. of caveats that we needed to understand in order to make effective use of them.

You can read about these caveats here.

There are also several operational considerations to take into account:

  • because Kinesis Streams is charged (in part) based on shard hours, so a dormant stream would have a baseline cost of $0.015 per shard per hour (~$11 per shard per month)
  • there is no built-in auto-scaling capability for Kinesis Streams neither, so there is also additional management overhead for scaling them up based on utilization

It is possible to build auto-scaling capability yourself, which I had done at my previous (failed) startup. Whilst I can’t share the code you can read about the approach and my design thinking here.

Interestingly, Kinesis Streams is not the only streaming option available on AWS, there is also DynamoDB Streams.

DynamoDB Streams + Lambda

DynamoDB Streams can be used as a like-for-like replacement for Kinesis Streams.

By and large, DynamoDB Streams + Lambda works the same way as Kinesis Streams + Lambda. Operationally, it does have some interesting twists:

  • DynamoDB Streams auto-scales the no. of shards
  • if you’re processing DynamoDB Streams with AWS Lambda then you don’t pay for the reads from DynamoDB Streams (but you still pay for the read & write capacity units for the DynamoDB table itself)
  • Kinesis Streams offers the option to extend data retention to 7 days; DynamoDB Streams doesn’t offer such option

The fact that DynamoDB Streams auto-scales the no. of shards can be a double-edged sword. On one hand it eliminates the need for you to manage and scale the stream (or come up with home baked auto-scaling solution); on the other hand, it can also diminish the ability to amortize spikes in load you pass on to downstream systems.

AFAIK there is no way to limit the no. of shards a DynamoDB stream can scale up to — something you’d surely consider when implementing your own auto-scaling solution.

Should I use Kinesis or DynamoDB Streams?

I think the most pertinent question is “what is your source of truth?”

Does a row being written in DynamoDB make it canon to the state of your system? This is certainly the case in most N-tier systems that are built around a database, regardless whether it’s RDBMS or NoSQL.

In an event sourced system where state is modelled as a sequence of events (as opposed to a snapshot) the source of truth might well be the Kinesis stream — as soon as an event is written to the stream it’s considered canon to the state of the system.

Then, there’re other considerations around cost, auto-scaling, etc.

From a development point of view, DynamoDB Streams also has some limitations & shortcoming:

  • each stream is limited to events from one table
  • the records describe DynamoDB events and not events from your domain, which I always felt creates a sense of dissonance when I’m working with these events

Cost Implication of your Broker choice

Excluding the cost of Lambda invocations for processing the messages, here are some cost projections for using SNS vs Kinesis Streams vs DynamoDB Streams as the broker. I’m making the assumption that throughput is consistent, and that each message is 1KB in size.

monthly cost at 1 msg/s

monthly cost at 1,000 msg/s

These projections should not be taken at face value. For starters, the assumptions about a perfectly consistent throughput and message size is unrealistic, and you’ll need some headroom with Kinesis & DynamoDB Streams even if you’re not hitting the throttling limits.

That said, what these projections do tell me is that:

  1. you get an awful lot with each shard in Kinesis Streams
  2. whilst there’s a baseline cost for using Kinesis Streams, the cost grows much slower with scale compared to SNS and DynamoDB Streams, thanks to the significantly lower cost per million requests

Stacking it up

Whilst SNS, Kinesis & DynamoDB Streams are your basic choices for the broker, the Lambda functions can also act as brokers in their own right and propagate events to other services.

This is the approach used by the aws-lambda-fanout project from awslabs. It allows you to propagate events from Kinesis and DynamoDB Streams to other services that cannot directly subscribe to the 3 basic choice of brokers either because account/region limitations, or that they’re just not supported.

The aws-lambda-fanout project from awslabs propagates events from Kinesis and DynamoDB Streams to other services across multiple accounts and regions.

Whilst it’s a nice idea and definitely meets some specific needs, it’s worth bearing in mind the extra complexities it introduces — handling partial failures, dealing with downstream outages, misconfigurations, etc.

push-pull, aka fan-out/fan-in

The push-pull messaging pattern is often referred to as fan-out/fan-in.

It’s really two separate patterns working in tandem. Fan-out is often used on its own, where messages are delivered to a pool of workers in a round-robin fashion and each message is delivered to only one worker.

This is useful in at least two different ways:

  1. having a pool of workers to carry out the actual work allows for parallel processing and lead to increased throughput
  2. if each message represents an expensive task that can be broken down into smaller subtasks that can be carried out in parallel

In the second case where the original task (say, a batch job) is partitioned into many subtasks, you’ll need fan-in to collect result from individual workers and aggregate them together.

fan-out with SNS

As discussed above, SNS’s invocation per message policy is a good fit here as we’re optimizing for throughput and parallelism during the fan-out stage.

Here, a ventilator function would partition the expensive task into subtasks, and publish a message to the SNS topic for each subtask.

This is essentially the approach we took when we implemented the timeline feature at Yubl (the last startup I worked at) which works the same as Twitter’s timeline — when you publish a new post it is distributed to your followers’ timeline; and when you follow another user, their posts would show up in your timeline shortly after.

Yubl had a timeline feature which works the same way as Twitter’s timeline. When you publish a new post, the post will be distributed to the timeline of your followers.
A real-world example of fan-out whereby a user’s new post is distributed to his followers. Since the user can have tens of thousands of followers the task is broken down into many subtasks — each subtask involves distributing the new post to 1k followers and can be performed in parallel.

fan-out with SQS

Before the advent of AWS Lambda, this type of workload is often carried out with SQS. Unfortunately SQS is not one of the supported event sources for Lambda, which puts it in a massive disadvantage here.

That said, SQS itself is still a good choice for distributing tasks and if your subtasks take longer than 5 minutes to complete (the max execution time for Lambda) you might still have to find a way to make the SQS + Lambda setup work.

Let me explain what I mean.

First, it’s possible for a Lambda function to go beyond the 5 min execution time limit by writing it as a recursive function. However, the original invocation (triggered by SNS) has to signal whether or not the SNS message was successfully processed, but that information is only available at the end of the recursion!

With SQS, you have a message handle that can be passed along during recursion. The recursed invocation can then use the handle to:

  • extend the visibility timeout for the message so another SQS poller does not receive it whilst we’re still processing the message
  • delete the message if we’re able to successfully process it

A while back, I prototyped an architecture for processing SQS messages using recursive Lambda functions. The architecture allows for elastically scaling up and down the no. of pollers based on the size of the backlog (or whatever CloudWatch metric you choose to scale on).

You can read all about it here.

I don’t believe it lowers the bar of entry for the SQS + Lambda setup enough for regular use, not to mention the additional cost of running a Lambda function 24/7 for polling SQS. However, I do know of a few companies (including one of my ex-employers) that are using this architecture at scale in production so it probably works well enough.

Anyhow, keep it in your back pocket in the unfortunate event that you need to make AWS Lambda work with SQS.

What about Kinesis or DynamoDB Streams?

Personally I don’t feel these are great options, because the degree of parallelism is constrained by the no. of shards. Whilst you can increase the no. of shards, it’s a really expensive way to get extra parallelism, especially given the way resharding works in Kinesis Streams — after splitting an existing shard, the old shard is still around for at least 24 hours (based on your retention policy) and you’ll continue to pay for it.

Therefore, dynamically adjusting the no. of shards to scale up and down the degree of parallelism you’re after can incur lots unnecessary cost.

With DynamoDB Streams, you don’t even have the option to reshard the stream — it’s a managed stream that reshards as it sees fit.

fan-in: collecting results from workers

When the ventilator function partition the original task into many subtasks, it can also include two identifiers with each subtask — one for the top level job, and one for the subtask. When the subtasks are completed, you can use the identifiers to record their results against.

For example, you might use a DynamoDB table to store these results. But bare in mind that DynamoDB has a max item size of 400KB including attribute names.

Alternatively, you may also consider storing the results in S3, which has a max object size of a whopping 5TB! For example, you can store the results as the following:

bucket/job_id/task_01.json
bucket/job_id/task_02.json
bucket/job_id/task_03.json
...

Note that in both cases we’re prone to experience hot partitions — large no. of writes against the same DynamoDB hash key or S3 prefix.

To mitigate this negative effect, be sure to use a GUID for the job ID.

Depending on the volume of write operations you need to perform against S3, you might need to tweak the approach. For example:

  • partition the bucket with top level folders and place results in to the correct folder based on hash value of the job ID
bucket/01/job_id_001/task_01.json
bucket/01/job_id_001/task_02.json
bucket/01/job_id_001/task_03.json
...
  • store the results in easily hashable but unstructured way in S3, but also record references to them in DynamoDB table
bucket/ffa7046a-105e-4a00-82e6-849cd36c303b.json
bucket/8fb59303-d379-44b0-8df6-7a479d58e387.json
bucket/ba6d48b6-bf63-46d1-8c15-10066a1ceaee.json
...

fan-in: tracking overall progress

When the ventilator function runs and partitions the expensive task into lots small subtasks, it should also record the total no. of subtasks. This way, it allows each invocation of the worker function to atomically decrement the count, until it reaches 0.

The invocation that sees the count reach 0 is then responsible for signalling that all the subtasks are complete. It can do this in many ways, perhaps by publishing a message to another SNS topic so the worker function is decoupled from whatever post steps that need to happen to aggregate the individual results.

(wait, so are we back to the pub-sub pattern again?) maybe ;-)

At this point, the sink function (or reducer, as it’s called in the context of a map-reduce job) would be invoked. Seeing as you’re likely to have a large no. of results to collect, it might be a good idea to also write the sink function as a recursive function too.

Anyway, these are just a few of the ways I can think of to implement pub-sub and push-poll patterns with AWS Lambda. Let me know in the comments if I have missed any obvious alternatives.

Hi, my name is Yan Cui. I’m an AWS Serverless Hero and the author of Production-Ready Serverless. I have run production workload at scale in AWS for nearly 10 years and I have been an architect or principal engineer with a variety of industries ranging from banking, e-commerce, sports streaming to mobile gaming. I currently work as an independent consultant focused on AWS and serverless.

You can contact me via Email, Twitter and LinkedIn.

Check out my new course, Complete Guide to AWS Step Functions.

In this course, we’ll cover everything you need to know to use AWS Step Functions service effectively. Including basic concepts, HTTP and event triggers, activities, design patterns and best practices.

Get your copy here.

Come learn about operational BEST PRACTICES for AWS Lambda: CI/CD, testing & debugging functions locally, logging, monitoring, distributed tracing, canary deployments, config management, authentication & authorization, VPC, security, error handling, and more.

You can also get 40% off the face price with the code ytcui.

Get your copy here.

More by Yan Cui

More Related Stories