How to avoid missing a message
Event processing is one of the most common scenarios in serverless and Azure Functions. A few weeks ago I wrote about how you can process events in order with functions, and for this blog I wanted to outline how you can create a reliable message processor so you avoid losing any messages along the way. I’ll be honest — this blog could have easily broken into two or three parts, but I’ve decided to keep it all here in a single post. It’s lengthy, but goes from basics all the way to advanced patterns like circuit breaker and exception filters. While these samples are in C#, all patterns work across any language (unless explicitly stated otherwise).
Challenges of event streams in distributed systems
Imagine a system sending events at a constant rate — lets say 100 events per second. Consuming these events from Azure Functions is easy enough to setup, and within minutes you could have multiple parallel instances processing these 100 events every second. However, what if the event publisher sends a corrupt event? Or your instance has a hiccup and crashes mid-execution? Or a downstream system goes offline? How do you handle these while preserving the overall integrity and throughput of your application?
With queues, reliable messaging comes a bit more naturally. In Azure Functions when you trigger on a queue message, the function can create a “lock” on the queue message, attempt to process, and if failing “release” the lock so another instance can pick it up and retry. This back-and-forth continues until either success is reached, or after a number of attempts (4 by default) the message is added to a poison queue. While a single queue message may be in this retry cycle, it doesn’t prevent other parallel executions from continuing to dequeue the remaining messages — so the overall throughput remains largely unaffected by one bad message. However, storage queues don’t guarantee ordering, and aren’t optimized for the same high throughput of services like Event Hubs.
With event streams like Azure Event Hubs, there is no lock concept. To allow for high throughput, multiple consumer groups, and replayability, services like Event Hubs read more like a tape drive when consuming events. There is a single “offset” pointer in the stream per partition, and you can read forwards or backwards. While reading the event stream, if you hit a failure and decide to keep the pointer in the same spot, it prevents further processing on that partition until the pointer progresses. In other words, if 100 events per second are still coming in, and Azure Functions stops moving the pointer to new events while trying to deal with a single bad one, those events will start to pile up. Before long you have a massive backlog of events that are constantly growing.
Given this offset and consumer behavior, functions will continue to progress the pointer on the stream regardless if the execution succeeded or failed. This means your system and functions need to be aware and structured to deal with those behaviors.
How Azure Functions consume Event Hubs events
The behavior for the Azure Functions Event Hubs trigger is as follows:
- A pointer is created and persisted in Azure Storage for each partition of the Event Hub (you can see this in your storage account if you look)
- When new Event Hub messages are received (in a batch by default), the host will attempt to trigger the function with the batch of messages
- If the function completes execution (with or without exception) the pointer is progressed and checkpointed in storage
- If something prevents the function execution from completing the host will fail to progress the pointer, and on subsequent checks the same messages will be received again (from the previous checkpoint)
- Repeat steps 2–4
There’s a few important things to note in this. The first is that if you have unhandled exceptions you may lose messages — because even an execution that results in an exception will progress the pointer. The second is that, as is the norm with distributed systems, Functions guarantees at-least-once delivery. Your code and dependent systems may need to account for the fact that the same message could be received twice. I have examples showing both of these behaviors, and how to code around it, below:
For these tests I did the following — I publish 100,000 messages to be processed in order (per partition key). I’ll log each message to a Redis cache as it’s processed to validate and visualize order and reliability. For the first test, I wrote it so every 100th message throws an exception without any exception handling.
When I push 100,000 messages at this sample, here’s what I see in Redis:
You’ll notice I missed a whole chunk of messages between 100–112. So what happened here? At some point one of my function instances got a batch of messages for this partition key. That specific batch ended with 112, but at message 100 my exception was thrown. This halted that execution, but the function host continued to progress and read the next batch. Technically these messages are still persisted in Event Hubs, but I’d need to manually go and re-fetch 100–112 to reprocess.
Adding a try-catch
The simplest resolution to this is adding a simple “try/catch” block in my code. Now if an exception is thrown, I can catch it inside the same execution and handle it before the pointer progresses. When I added a catch in the code sample above and re-run the test, I see all 100,000 messages in order.
Best Practice: All Event Hubs functions need to have a catch block
In this sample I used the catch to attempt an additional insert into Redis, but you can imagine other viable options like sending a notification, or outputting the event to a “poison” queue or event hub for later processing.
Retry mechanisms and policies
Some exceptions that arise may be transient in nature. That is, some “hiccup” or other issue may just go away if an operation is attempted again a few moments later. In the previous section I did a single retry in the catch block — but I only retry 1 time, and if that retry failed or threw its own exception I’d be out of luck and still lose events 100–112. There are a number of tools out there to help define more robust retry-policies, and these still allow you to preserve processing order.
For my tests, I used a C# fault-handling library called Polly. This allowed me to define both simple and advanced retry policies like “try to insert this message 3 times (potentially with a delay between retries). If the eventual outcome of all retries was a failure, add a message to a queue so I can continue processing the stream and handle the corrupt or un-processed message later.”
And the resulting Redis:
When working with more advanced exception catching and retry policies, it is worth noting that for precompiled C# class libraries, there is a preview feature to write “Exception Filters” in your function. This enables you to write a method that will execute whenever an unhandled exception is thrown during a function execution. Details and samples can be found in this post.
Non-exception errors or issues
We’ve addressed the kind of exceptions that may occur if your code hits an exception, but what about the case where the function instance has a hiccup or failure in the middle of an execution?
As stated earlier — if a function doesn’t complete execution the offset pointer is never progressed, so the same messages will process again when a new instance begins to pull messages. To simulate this, during the 100,000 message processing I manually stopped, started, and restarted my function app. Here are some of the results (left). You’ll notice while I processed everything and everything is in order, some messages were processed more than once (after 700 I reprocess 601+). That overall is a good thing as it gives me at-least-once guarantees, but does mean my code may require some level of idempotency.
Circuit breaker and stopping the line
The above patterns and behaviors are helpful to retry and make a best-effort at processing any event. While a few failures here and there may be acceptable, what if a significant number of failures are happening and I want to stop triggering on new events until the system reaches a healthy state? This is often achieved with a “circuit breaker” pattern— where you can break the circuit of the event process and resume at a later time.
Polly (the library I used for retries) has support for some circuit-breaker functionality. However these patterns don’t translate as well when working across distributed ephemeral functions where the circuit spans multiple stateless instances. There are some interesting discussions on how this could be solved in Polly, but in the meantime I implemented it manually. There are two pieces that are needed for a circuit breaker in an event process:
- Shared state across all instances to track and monitor health of the circuit
- Master process that can manage the circuit state (open or closed)
For my purpose I used my Redis cache for #1, and Azure Logic Apps for #2. There are multiple other services that could fill both of these, but I found these two worked well.
Failure threshold across instances
Because I may have multiple instances processing events at a single time, I needed to have shared external state to monitor the health of the circuit. The rule I wanted was “If there are more than 100 eventual failures within 30 seconds across all instances, break the circuit and stop triggering on new messages.”
Without going too deep into specifics (all of these samples are in GitHub) I used the TTL and sorted set features in Redis to have a rolling window of the number of failures within the last 30 seconds. Whenever I add a new failure, I check the rolling window to see if the threshold has been crossed (more than 100 in last 30 seconds), and if so, I emit an event to Azure Event Grid. The relevant Redis code is here if interested. This allows me to detect and send an event and break the circuit.
Managing circuit state with Logic Apps
I used Azure Logic Apps to manage the circuit state as the connectors and stateful orchestration were a natural fit. After detecting I needed to break the circuit, I trigger a workflow (Event Grid trigger). The first step is to stop the Azure Function (with the Azure Resource connector), and send a notification email that includes some response options. I can then investigate the health of the circuit, and when things appear to be healthy I can respond to “Start” the circuit. This resumes the workflow which will then start the function, and messages will begin to be processed from the last Event Hub checkpoint.
About 15 minutes ago I sent 100,000 messages and set each 100th message to fail. About ~5,000 messages in I hit the failure threshold, so an event was emitted to Event Grid. My Azure Logic App instantly fired, stopped the function, and sent me an email (above). If I now look at the current state of things in Redis I see a lot of partitions that are partially processed like this:
After clicking the email to restart the circuit, running the same Redis query I can see the function picked up and continued on from the last Event Hub checkpoint. No messages were lost, everything was processed in order, and I was able to break the circuit for as long as I needed with my logic app managing the state.
Hopefully this blog has been helpful in outlining some of the patterns and best practices available for reliably processing message streams with Azure Functions. With this understanding you should be able to take advantage of the dynamic scale and consumption pricing of functions without having to compromise on reliability.
I’ve included a link to the GitHub repo that has pointers to each of the branches for the different pivots of this sample: https://github.com/jeffhollan/functions-csharp-eventhub-ordered-processing. Feel free to reach out to me on Twitter @jeffhollan for any questions.