How to include SNS and Kinesis in your e2e tests
AWS Serverless Hero. Independent Consultant. Developer Advocate at Lumigo.
Being event-driven is a key characteristic of a serverless architecture. Often our Lambda functions would publish events to SNS topics or Kinesis streams instead of calling downstream APIs directly.
This is a great way for building systems in a loosely-coupled way. The upstream APIs don’t have to know who is interested in their events. Their job is to report events that occur within their bounded context, and that’s it. Subscribers can look out for events they’re interested in and react to them completely independent of the publisher.
Event-driven systems like this are also more resilient as you avoid common microservices pitfalls such as cascade failures. Because systems don’t have to integrate with each other directly, but through a robust message broker instead.
It’s definitely my preferred way of building systems. However, it’s not without its drawbacks. One such drawback is that they’re harder to test end-to-end. In this post, we’ll discuss two ways you can include SNS and Kinesis outputs in your end-to-end tests:
- by storing the messages in a DynamoDB table and then poll the table
- by broadcasting the messages to an API Gateway websocket and listening for the right messages to arrive
Both approaches go well with using temporary CloudFormation stacks during CI/CD pipeline, as discussed in my previous post
The code for both approaches is available in this Github repo
. Feel free to try it out yourself and let me know your questions in the comments below.
The general approach
With both SNS and Kinesis, there’s no easy way for us to query the data that has been sent to them. So both approaches follow the same pattern of deploying a set of resources only for the
These include Lambda functions that would subscribe to the SNS topic and/or Kinesis stream and forward the messages.
Of course, we wouldn’t want to deploy these functions to production! With CloudFormation you can use Conditions
to conditionally include certain resources and outputs.
With the latest Serverless framework 1.5.2 release
, you can add
to function definitions as well. However, these do not extend to other related resources such as
and IAM roles. The best solution I have found is to not include those function definitions at all, except for the
In the repo, you’ll find the function definitions are broken out into a separate YML file.
There’ll be one for every environment.
The SNS and Kinesis listener functions are defined only in the
This is an unfortunate set up to work around limitations of the Serverless framework. If you need to do this routinely, then you should consider wrapping the behaviour into a plugin and share with others. Or, you can create a PR and submit the change into the Serverless core ;-)
Anyhow, from here on, the two approaches diverge.
With this approach, the SNS and Kinesis listeners would write the messages they receive into a DynamoDB table. Like the functions, the table is also provisioned conditionally and only available for the e2e-test stage.
The end-to-end tests would invoke the deploy Lambda function directly*. And then poll the DynamoDB table once per second to see if the published messages are present.
* If the SUT function is configured with API Gateway or some other event source. Then you should trigger the function by calling the API Gateway endpoint instead. For brevity, I didn’t configure the SUT function with any event source, hence the direct invocation.
You can implement the polling logic with async-retry
and the test might look something like this:
In this case, the test would poll the table 20 times with a 1s delay between each attempt. Which means the test could run for up to 20s. You should set the test timeout accordingly.
When you first deploy the
stage, the first tests usually take longer. On average, these end-to-end tests complete in around 2–4 seconds.
This approach is very simple to set up and doesn’t require too many moving parts. However, since we’re polling DynamoDB once per second we can wait a whole second to see a new message. Of course, you can reduce the delay by changing the
between the retry. But if you’re after a more instantaneous, push-based way to find out when messages are received, then consider the next approach instead.
via API Gateway WebSocket
With this approach, the SNS and Kinesis listeners would forward messages to an API Gateway WebSocket endpoint instead.
However, because of the way API Gateway’s WebSocket works, it’s slightly more involved. You need additional Lambda functions and a DynamoDB table to track who’s connected to the WebSocket endpoint.
This adds more moving parts (and therefore complexity) but is sadly required to implement broadcasting.
The end-to-end tests would still invoke the SUT function directly. But we also need to connect to the WebSocket endpoint so we can listen for the messages as they come in. Also, don’t forget to disconnect after the tests finish so we don’t leave phantom connections in the DynamoDB table. p.s. one of the benefits of using temporary stacks is that we don’t need to worry about cleaning up these test data.
Once we’re connected to the WebSocket endpoint and triggered the SUT function, we have to wait for the expected messages.
To make this nice and easy, I used RxJS
’s WebSocket wrapper and capture the messages we receive in a ReplaySubject
to avoid race conditions.
When we wait for a message to come through, it doesn’t matter if the message was received before
. That’s the beauty of the ReplaySubject
— it gives us the buffered messages first.
And that’s it! Once you deploy to an
stage, you can run these tests to make sure that everything is working end-to-end in AWS.
The advantage of this approach is that it gives us a realtime update when messages are received from SNS/Kinesis. However, it requires extra moving parts compared to the DynamoDB approach.
You might be able to amortize these additional override by making them a “feature” of the environment and reuse in other projects. That is, if you need to run similar tests in lots of other projects, then why not leave the WebSockets endpoint around and reuse it. You might even do this for all your non-production environments so you can run end-to-end tests in all of them.
So that’s it! Two approaches for including SNS and Kinesis in your end-to-end tests. As I always say, these should not be taken as prescription and you should absolutely tweak these to make them work for you. If you have any questions or comments about these approaches, please let me know in the comments below.
Based on feedback, I decided to include a few other approaches for you to consider. I have experimented with these in the past, but over time I have come to prefer the two approaches mentioned above. Because they work for both SNS and Kinesis. Whereas the approaches below works for either SNS or Kinesis but not both.
p.s. all three alternatives have been included in the demo repo
SQS poller (SNS only)
For SNS, we can subscribe a SQS queue to the SNS topic. As before, this queue is deployed conditionally and is only available in the
With SQS, we can poll the queue and wait (up to 20s) for new messages to arrive. This gives us an easy way to validate messages that published to SNS.
Kinesis poller (Kinesis only)
With Kinesis, we can poll the stream directly from our tests. There is no additional resource required for the
However, this requires a sequence of API calls:
- First, describe the stream to get the shard IDs.
- Then, for each shard, get a shard iterator that starts at a timestamp before we invoke the SUT function.
- For each shard, keep polling and report any messages we receive. A lot of the requests would return without any data
Your polling code might look something like this:
Local webserver (SNS only)
This is the most elaborate approach in our list:
- Start a local webserver as part of test setup (e.g. with express or restify).
- Expose the local endpoint publicly with ngrok.
- Subscribe the ngrok endpoint to the SNS topic.
- Wait for the SNS notification with the right message.
- After the test, unsubscribe the ngrok endpoint from the SNS topic.
- Clean up and delete the ngrok endpoint.
The good news is that you don’t need to deploy any special resources to facilitate your end-to-end tests.
Hi, my name is Yan Cui
. I’m an AWS Serverless Hero
and the author of Production-Ready Serverless
. I have run production workload at scale in AWS for nearly 10 years and I have been an architect or principal engineer with a variety of industries ranging from banking, e-commerce, sports streaming to mobile gaming. I currently work as an independent consultant focused on AWS and serverless.
In this course, we’ll cover everything you need to know to use AWS Step Functions service effectively. Including basic concepts, HTTP and event triggers, activities, design patterns and best practices.
Come learn about operational BEST PRACTICES for AWS Lambda: CI/CD, testing & debugging functions locally, logging, monitoring, distributed tracing, canary deployments, config management, authentication & authorization, VPC, security, error handling, and more.
You can also get 40% off the face price with the code ytcui.
Subscribe to get your daily round-up of top tech stories!