Databases are one of the pillars of any application. If a database server goes down, the whole system will no longer perform its basic services: authenticate users, publish our beloved Hackernoon posts, store comments for the crush’s selfies…
This is why designing the system for High Availability (the ability of a distributed system to operate continuously without failing for a designated period of time) is essential.
One solution to achieve High availability is to have more components than needed. So for our database system, we can replicate the master database, and if it fails, switch it out for one of the working replicas.
First, we have to list out our constraints:
TRIGGERS! THEY ARE AWESOME !!
Database triggers allow us to execute SQL functions whenever an event is received. An event can be any operation such as INSERT, UPDATE, DELETE or TRUNCATE. Triggers are useful for a wide range of cases, like verifying constraints or boosting performance. In our case, we will use triggers to notify our Python listener when data has been removed or added. We define a procedure called notify_account_changes()
that will handle sending notifications about the changes in the database. This is how to define the trigger on the table "users":
CREATE TRIGGER users_changed
AFTER INSERT OR UPDATE
ON users
FOR EACH ROW
EXECUTE PROCEDURE notify_account_changes();
Okay, now on to the next problem.
from the PostgreSQL documentation (https://www.postgresql.org/docs/current/sql-notify.html):
The
NOTIFY
command sends a notification event together with an optional “payload” string to each client application that has previously executedLISTEN channel
for the specified channel name in the current database.
In other words, we can have many listeners on a channel by executing LISTEN channel_name
. And to send notifications, we use the NOTIFY
command or the built-in system function pg_notify('channel_name', 'payload')
. The payload is a message that we want to send. In our case, the notification payload will include:
Here’s the code for the notify_account_changes()
CREATE OR REPLACE FUNCTION notify_account_changes()
RETURNS trigger AS
$$
BEGIN
PERFORM pg_notify(
'users_changed',
json_build_object(
'table', TG_TABLE_NAME,
'operation', TG_OP,
'new_record', row_to_json(NEW),
'old_record', row_to_json(OLD)
)::text
);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
Now we've received a notification about changes from the triggers. At this stage, we have completed the database side.
to connect to the PostgreSQL we are going to use the one and only “psycopg2” library.
we can install it using:
pip install psycopg2
To connect to the PostgreSQL database and start listening:
conn = psycopg2.connect(host="localhost", dbname="DBNAME", user="USERNAME", password="PASSWORD")
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
cursor = conn.cursor()
cursor.execute("LISTEN users_changed;")
cursor.execute()
enables us to execute SQL commands like SELECT or INSERT…
Now that we are listening to notifications. How can we parse them?
def handle_notify():
conn.poll()
for notify in conn.notifies:
print(notify.payload)
conn.notifies.clear()
loop = uvloop.new_event_loop()
asyncio.set_event_loop(loop)
loop.add_reader(conn, handle_notify)
loop.run_forever()
Here we used asynchronous libraries, uvloop
and asyncio
. The line loop.add_reader(conn, handle_notify)
enables us to invoke the handle_notify()
function only when the conn
file descriptor (which represents the connection to our database) has an incoming stream of data. After executing INSERT INTO users VALUES (2,'al');
The output is:
{
"table" : "users",
"operation" : "INSERT",
"new_record" : {
"id":2,
"name":"al"
},
"old_record" : null
}
Now that we have received all the information we need about database changes. How can we replicate it?
Before we do this, we have to address some problems:
to solve these problems we introduce RabbitMQ.
RabbitMQ is a messaging broker - an intermediary for messaging. It gives your applications a common platform to send and receive messages, and your messages a safe place to live until received.
RabbitMQ allows us to send messages using queues to different parts of our distributed system. It uses persistent queues to save messages and transactions to ensure that they have been executed properly.
So how does RabbitMQ work?
Each changes_listener
python script will have its own queue. to send messages to all queues we will send it to a fanout exchange with the changes_publisher
script. the fanout exchange broadcasts every message to all connected queues.
To use RabbitMQ with python we will use the pika
library
pip install pika
In changes_publisher.py
we will connect to the RabbitMQ server. Then we will create an exchange named “replication”. For each notification received we will broadcast it to all RabbitMQ queues.
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='replication', exchange_type='fanout')
def handle_notify():
conn.poll()
for notify in conn.notifies:
channel.basic_publish(exchange='replication', routing_key='', body=notify.payload)
conn.notifies.clear()
And in our changes_listener.py
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='replication', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='replication', queue=queue_name)
def callback(channel, method, properties, body):
op = json.loads(body.decode('utf-8'))
print(op)
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
Here we connect to RabbitMQ, declare fanout exchange “replication”, declare temporary queue, bind the queue to the exchange and start consuming messages. on each new message received our callback function will be executed.
Executing this SQL query UPDATE users SET name='alfonso' WHERE id=2;
will result in this output from the changes_listener.py
:
{
'table': 'users',
'operation': 'UPDATE',
'new_record': {
'id': 2,
'name': 'alfonso'
},
'old_record': {
'id': 2,
'name': 'al'
}
}
Now that we have received the changes in our python script, all that is left to do is commit these changes. To do this, we will have to change our callback function to generate SQL commands from every received JSON message:
def operation_handler(op):
def handle_insert():
table, data = op['table'], op['new_record']
sql = f"""INSERT INTO {table}
VALUES ('{data['id']}','{data['name']}');
"""
return sql
sql = None
if op['operation'] == 'INSERT':
sql = handle_insert()
# we can add other operation handlers here
return sql
def callback(channel, method, properties, body):
op = json.loads(body.decode('utf-8'))
sql = operation_handler(op)
cursor.execute(sql)
In the callback, when we receive the message, we convert it to a dictionary and pass it to operation_handler(op)
. This function will look at the message and return the appropriate SQL using its values.
THIS IS IT. WE NOW HAVE ALL THE PIECES WORKING.
If we check the replica database after executing a simple SQL query INSERT INTO users VALUES (2,'al');
We find this:
PS: PostgreSQL has built-in options for database replication, but our solution enables us to have full control over the replication process. We can replicate only certain tables, certain users or modify the records in any way we want.
Step by step, we have finished implementing our database replication system. We have gone from configuring triggers and notifications in the database to broadcasting the changes using RabbitMQ to finally writing the changes back into a PostgreSQL replica.
Implementing this solution was not easy, it required a lot of pieces, but it is all worth it. Offering users zero downtime is a huge business advantage.
Here’s the code for the project https://github.com/Bechir-Brahem/postgres-database-replicator
Thank you for reading <3