Today, I’m going to explain to you one of the easiest and most useful implementations of the Event bus pattern in microservices architecture that I have used in production for over 5 years. This implementation is based on RabbitMQ - a powerful open-source message broker that you can easily integrate into your project. I use Python in my examples, but you can use any other language.
An “Event bus” is a design pattern that enables different parts of a software system to communicate with each other by publishing and subscribing events. Producers send events to the event bus without knowing the consumer. Consumers receive only the events they are subscribed to.
This approach provides the following advantages:
I am a software architect who designs a “Knowledge base” system that contains the following services:
In most cases, these services can interact with each other asynchronously. For example, the “Document” service doesn’t need to know if an email has been sent or not. It just notifies the “Notification” service that the document has changed. That is the first reason I should use a message broker, like RabbitMQ, in this case.
But the Event bus pattern is about more than just asynchronously, it’s about decoupling. When you want to add a new service that needs to know about document updates, with the basic approach, I should modify the “Document” service so that it notifies the new service. This approach creates strong connections between services that complicate scalability and maintainability.
The Event bus aims to avoid this problem because all services send events to one place - the Event bus - and do not need to know anything about the consumers. You can add or change any other services without modifying the producer - the “Document” service.
To implement the Event bus pattern with RabbitMQ, I will use the Headers Exchange, because it allows consumers to subscribe only to the required events based on multiple event attributes, instead of a single Routing Key as in other exchange types.
Every event should contain the "meta" attributes in the message headers, which will be used by the consumer for filtering, e.g. object type (Document, File, User), event type (Create/Update/Delete), related fields (Author ID, Category), and many others.
The producer sends events to the exchange, which is used as an "entry point." If the exchange has no subscribers, the events will be dropped. When a consumer wants to subscribe to the events, it creates a queue and binds it to the exchange using attribute values as a filter.
Example:
The “Notification“ service wants to receive events about the creation of a new document in order to send an email. It creates a new queue with binding attributes type = document
and event = create
, and consumes it.
The “Document” service has received a new document and wants to notify others about it. It sends a message with the document data in the message payload and routing attributes in the headers: type = document
, event = create
The “Notification“ service receives the event and handles it.
The Events exchange can receive events from other services, however, the “Notification“ service doesn’t receive them because of filtering parameters.
I use the Pika library to connect to RabbitMQ.
Install dependencies:
pip install pika
Import required packages:
import json
import pika
Specify the parameters that will be used in the next code examples:
host='localhost' # RabbitMQ hostname
port=5672 # RabbitMQ port
virtual_host='/' # RabbitMQ virtual host
username='guest' # RabbitMQ username
password='guest' # RabbitMQ password
exchange_name='events' # Name of the exchange
queue_name='queue-1' # Name of the queue for consumer
Create a RabbitMQ connection:
connection = pika.BlockingConnection(pika.ConnectionParameters(
host=host,
port=port,
virtual_host=virtual_host,
credentials=pika.PlainCredentials(username, password)
))
channel = connection.channel()
Create the headers exchange:
channel.exchange_declare(exchange=exchange_name, exchange_type='headers')
Create a queue, and bind it to the exchange for the consumer:
# The queue will only contain events that match the following headers
filter = {
'type': 'document',
'event': 'create'
}
channel.queue_declare(queue_name, durable=True)
channel.queue_bind(queue_name, exchange_name, routing_key='', arguments=filter)
I use the empty routing_key
parameter because the RabbitMQ headers exchange ignores it.
Subscribe the consumer for events:
def on_message_received(channel, method, properties, body):
print("Message received:", body)
print("Routing headers", properties)
channel.basic_consume(queue_name, on_message_callback=on_message_received, auto_ack=True)
I set auto_ack
to true
to indicate that the automatic acknowledgment mode will be used, and that I do not need to do it manually.
Send events from the producer:
def send_event(body, headers)
channel.basic_publish(
exchange=exchange_name,
routing_key='', # Headers exchange ignores routing key
body=json.dumps(body).encode(), # Sends body in JSON format, but you can choose your own
properties=pika.BasicProperties(
headers=headers
)
)
send_event({'name': 'Document 1'}, {'type': 'document', 'event': 'create'})
send_event({'name': 'Document 1'}, {'type': 'document', 'event': 'update'})
send_event({'filename': 'kitty.jpeg'}, {'type': 'file', 'event': 'create'})
The first event will be received by the consumer because all headers match. However, the second and third events will not be received because the headers must match exactly.
If you want to match events by the first matching header (OR conjunction), you need to set binding the argument x-match
to any
:
filter = {
'type': 'document',
'event': 'create',
'x-match': 'any'
}
channel.queue_declare(queue_name, durable=True)
channel.queue_bind(queue_name, exchange_name, routing_key='', arguments=filter)
In this case, all three events will be received, because at least one header in each event matches.
However, if you want to match events based on an IN condition, for example, type = document or type = file
, you cannot do that because RabbitMQ does not support duplicates in header names. To solve this problem, you can use a different approach - put the header value as a key postfix, and change it to 1
( type = document
will be changed to type.document = 1
):
filter = {
'type.document': '1',
'type.file': '1',
'x-match': 'any'
}
channel.queue_bind(queue_name, exchange_name, routing_key='', arguments=filter)
send_event({'name': 'Document 1'}, {'type': 'document', 'type.document': '1'})
Although RabbitMQ headers exchange has powerful routing abilities, you cannot combine ‘any‘ and ‘all‘ strategies to implement complex filtering with nested OR/AND, for example, if you want to receive events that match the following query - (type = document or type = file) and event = create
. However, you can do it in your business logic:
def on_message_received(channel, method, properties, body):
if properties.headers['event'] != 'create':
return # Ignore
# do something
channel.basic_consume(queue_name, on_message_callback=on_message_received, auto_ack=True)
You can see the full code in the GitHub repository.
RabbitMQ is a powerful solution for message brokering that allows you to implement the Event bus design pattern for microservices architecture. This implementation allows services to interact asynchronously, improving system responsiveness and enabling the addition of new services without impacting existing ones.
By filtering events based on headers, you can ensure that only relevant messages are processed by each consumer. It is simple and useful!