Distributed Data Store and Transaction Sagas

Written by papalutavasile | Published 2023/07/05
Tech Story Tags: distributed-systems | data | big-data | data-storage | python | python-programming | flask | web

TLDRDistributed Systems are built to handle high amounts of data. However at the same time to make this data highly available the data is stored at many services simultaneously. In this tutorial, we are going to implement replicated services for storing data which we will make a shared data store in the future.via the TL;DR App

The following central theme of the “Designing Data-Intensive application” by Martin Kleppmann is often Distributed Systems are built to handle high amounts of data. However at the same time to make this data highly available the data is stored at many services simultaneously, being it as replicas or as shared services. In this tutorial, we are going to implement replicated services for storing data which we will make a shared data store in the future.

Problem formulation.

In distributed stores, especially when we are having many replicas of the same store, we need to ensure high consistency in data. This means that we need to have the same information about an entity on each of the nodes in the cluster. It at first seems like a simple problem to solve, just make a copy of every data on all nodes. However, the problem comes when this data isn't static, meaning that it changes over time. This problem is illustrated below:

Mike writes in the database the value 1 in all services (illustrated by W(1)), writing to all services, in the following order: Replica 1, Replica 2, and Replica 3. At the same time, Susan writes the value 2, but in another order: Replica 3, Replica 2, and Replica 1. As shown in the figure. They overwrite each other's writes in Replica 1 and Replica 3. And when they read their own reads they get other values than the ones they have written. In such a way the Data Store isn’t any more consistent.

Leader and Followers.

The most common solution to the problem described above is leader-based replication. In such data stores, each node has been named a replica. Each replica is either the Leader or the Follower. The Leader takes all write requests and distributes them to the followers, followers not being able to make writes, if they aren’t coming from the Leader. Leaders can process at the same time the writes and reads coming from the external world, while the Followers can process only reads from the external world and writes from the Leader. This mechanism ensures the consistency of the data in the system if the system is Partition Tolerant. The Leader Based replication is illustrated below:

When Mike makes a write it sends a request to the Leader of the Cluster, performs the write, and forwards the change to the followers. When all writes are made the Leader returns the confirmation message. Then Mike can perform a read (and suppose his request where redirected to the F1). At this time Susan’s request wasn’t forwarded to the Followers so Mike gets back his value. Then when Susan makes her write, without messing with the Mike reads.

Transaction Saga.

Transaction saga is one of these spooky words that we have in distributed service. It actually means the process of executing sequentially multiple local transactions on multiple services. In our case when the Leader gets a write request and forwards this change to other services, it executes a transaction saga. Transaction saga is a pattern that has mainly two forms:

  1. Orchestrator.
  2. Choreography.

Choreography Transaction sagas:

Following the Microsoft documentation:

Choreography is a way to coordinate sagas where participants exchange events without a centralized point of control. With choreography, each local transaction publishes domain events that trigger local transactions in other services.

This pattern is illustrated below:

Orchestrator Transaction sagas:

In contrast to choreography, in this pattern, a centralized controller is needed. It tells the saga participant what local transactions to execute. Also in case of failures, it can ask services to execute compensating transactions to roll back the store. This pattern is illustrated below:

In the example of the Distributed store, we are going to implement the Orchestrator form of the Transaction Saga, where the Orchestrator is the Leader of the replication cluster.

Python implimentation.

All nodes in our distributed data store will be a Flask service. Instead of using a database, we are going to use a dictionary for storing data. Each service will implement the CRUD functionality. These functionalities will be implemented in the CRUDUser. In this tutorial, we are going to configure manually the leader, in one of the following articles, I am going to show you how can Leader election be done.

CRUDUser.

CRUD functionalities are implemented in the separated library as the CRUDUser class.

The class constructor takes the following arguments:

  • leader (bool) - the binary value which decides if it behaves as a leader or follower.
  • followers (dict, default = None) - the dictionary containing the information about followers, None in case of the followers.
# Importing all needed modules.
import uuid
import requests


class CRUDUser:
    def __init__(self, leader : bool, followers : dict =None):
        '''
            The constructor of the CrudUser.
        :param leader: bool
            The parameter deciding if the service is a leader or not.
        :param followers: dict, default = None
            The dictionary containing the credentials of the services.
        '''
        self.users = {}
        self.leader = leader
        if self.leader:
            self.followers = followers

The Create, Read, Update, and Delete functions are similar to the ones from the Service Discovery Article. However, of the Leader, it should forward the transaction to the followers and in case of create operation, it should also create a new UUID for the new entities. Bellow is illustrated each CRUD operation function:

    def create(self, user_dict : dict):
        '''
            The create function from CRUD, This function adds a service to the datstore.
        :param user_dict: dict
            The dictionary containing the information about users.
        :return: dict, int
        '   The response and the status code.
        '''
        # If the service is a leader the service adds the user to the data store.
        if self.leader:
            index = str(uuid.uuid4())
            user_dict["id"] = index
        if user_dict["id"] not in self.users:
            # If the service is a follower the service and the user is not registered
            # the user is added to the data store.
            self.users[user_dict["id"]] = user_dict

            # If the service is a leader then forwards the request to the followers.
            if self.leader:
                for follower in self.followers:
                    requests.post(f"http://{follower['host']}:{follower['port']}/user",
                                  json = user_dict,
                                  headers = {"Token" : "Leader"})

            # Returning the response.
            return user_dict, 200
        else:
            return {
                "message" : "Error: User already exists!"
            }, 400

As shown above, in the case of a leader a new UUID is created, and the value is added to the store. Then the entities, with the new UUID are forwarded to the followers. A key aspect for the leader to recognize the Leader’s request is the Leader adding a header token - “Leader” value, to be recognized by the follower, and perform the local transaction.

In the case of Read functionally the process is a little simpler because the reads can be performed from any service the function just returns the requested value if they exist, if not a 404 error status code is added:

    def get_user(self, index : str):
        '''
            This function returns a user by the index from the data store..
        :param index: str
            The index of the requested user.
        :return: dict, int
        '   The response and the status code.
        '''
        # Return the user's information if it exists.
        if index in self.users:
            return self.users[index], 200
        else:
            # Return the error message if user doesn't exists.
            return {
                "message" : "Missing user!"
            }, 404

    def get_users(self, user_id_list : list):
        '''
            This function returns a list of users from the data store.
        :param user_id_list: list
            The list with user ids.
        :return: dict, int
        '   The response and the status code.
        '''
        # Fining the set of missing ids from the list.
        missing_ids = set(user_id_list).difference(list(self.users.keys()))

        # If there are missing ids then an error code is returned.
        if len(missing_ids) > 0:
            return {
                "message" : "Missing ids!",
                "ids" : list(missing_ids)
            }, 404
        else:
            # If all ids are present in the data store a list with user credentials are returned.
            user_info = {index : self.users[index] for index in user_id_list}
            return user_info, 200

The Updated and Delete functionalities, follows a similar operation as in the case of the Create. The transaction is done locally, then forwarded to the followers:

    def update_user(self, index : str, user_dict : dict):
        '''
            This function updates the user's information with the provided information.
        :param index: str
            The index of the user in the data store.
        :param user_dict: dict
            The new users information.
        :return: dict, int
        '   The response and the status code.
        '''
        # Checking if the user id is registered.
        if index in self.users:
            # Updating the user's information.
            self.users[index].update(user_dict)

            # If the service is the leader the it forward the information to the followers.
            if self.leader:
                for follower in self.followers:
                    requests.put(f"http://{follower['host']}:{follower['port']}/user/{index}",
                                 json = self.users[index],
                                 headers = {"Token" : "Leader"})

            # Returning the response and the status code.
            return self.users[index], 200
        else:
            return {
                "message" : "Missing user!"
            }, 404

    def delete_user(self, index : str):
        '''
            This function deletes a user from the data store by index.
        :param index: str
            The index of the user in the data store.
        :return: dict, int
        '   The response and the status code.
        '''
        # Checking if the user id is registered.
        if index in self.users:
            # Deleting the user from the data store..
            user_dict = self.users.pop(index)

            # If the service is the leader the it forward the request to the followers.
            if self.leader:
                for follower in self.followers:
                    requests.delete(f"https://{follower['host']}:{follower['port']}/user/{index}",
                                    headers = {"Token" : "Leader"})

            # Returning the response and the status code.
            return user_dict, 200
        else:
            return {
                "message" : "Missing user!"
            }, 404

Next, we are going to implement the service,

First, we need to import the CRUDUser class and the Flask modules, configure the service, and create it:

# Importing all needed modules.
from flask import Flask, request
from crud import CRUDUser


# Defining the service information.
service_info = {
    "host" : "127.0.0.1",
    "port" : 8000,
    "leader" : True
}

# Defining the followers information. (only presented in the leader).
followers = [
    {
        "host" : "127.0.0.1",
        "port" : 8001
    },
    {
        "host" : "127.0.0.1",
        "port" : 8002
    }
]

# Creating the data store.
crud = CRUDUser(service_info["leader"], followers)

# Creating the flask application.
app = Flask(__name__)

During the write requests (Create, Update, and Delete), we check a special condition before making the write. For the write to happen one of the following conditions needs to happen:

  • The service should be a leader.
  • The Leader Token should be present in the request Headers.

These conditions are verified in the following listing:

    if not crud.leader and ("Token" not in headers or headers["Token"] != "leader"):
		# Return error 403 Access denied!
    else:
		# Make the write.

Below are the endpoints of the service.

@app.route("/user", methods = ["POST"])
def add_user():
    '''
        This function handles the create requests.
    '''
    # Extracting the headers and checking if request can be processes.
    headers = dict(request.headers)
    if not crud.leader and ("Token" not in headers or headers["Token"] != "leader"):
        return {
            "message" : "Access denied!"
        }, 403
    else:
        # Trying to create a user in the data store.
        return_dict, status_code = crud.create(request.json)
        return return_dict, status_code


@app.route("/user", methods=["GET"])
def get_users():
    '''
        This function handles the read some requests.
    '''
    # Getting the users ids.
    user_list = request.json["users_id"]
    # Getting the response and status code from the tha data store.
    return_dict, status_code = crud.get_users(user_list)
    return return_dict, status_code


@app.route("/user/<index>", methods=["GET"])
def get_user(index):
    '''
        THis function handles the read one requests.
    '''
    # Getting the response and the status code from the data store.
    return_dict, status_code = crud.get_user(index)
    return return_dict, status_code


@app.route("/user/<index>", methods=["PUT"])
def update_user(index):
    '''
        This function handles the update requests.
    '''
    # Extracting the headers and checking if request can be processes.
    headers = dict(request.headers)
    if not crud.leader and ("Token" not in headers or headers["Token"] != "leader"):
        return {
            "message" : "Access denied!"
        }, 403
    else:
        # Getting the new credentials from the request.
        new_user_data = request.json
        # Trying to update the user's information and getting the response and status code from the data store.
        return_dict, status_code = crud.update_user(index, new_user_data)
        return return_dict, status_code


@app.route("/user/<index>", methods=["DELETE"])
def delete_user(index):
    '''
        This function handles the delete requests.
    '''
    # Extracting the headers and checking if request can be processes.
    headers = dict(request.headers)
    if not crud.leader and ("Token" not in headers or headers["Token"] != "leader"):
        return {
            "message" : "Access denied!"
        }, 403
    else:
        # Trying to delete the user and getting the response and status code from data store.
        return_dict, status_code = crud.delete_user(index)
        return return_dict, status_code

Finally, we need to run our application:

app.run(
    host = service_info["host"],
    port = service_info["port"]
)

Now to finish the application you need to copy these two files in two additional folders, getting the following structure:

But don't forget in service-2 and service-3 (which are going to be the followers to change the configurations:

service_info = {
    "host" : "127.0.0.1",
    "port" : 8000,
    "leader" : True
}

followers = [
    {
        "host" : "127.0.0.1",
        "port" : 8001
    },
    {
        "host" : "127.0.0.1",
        "port" : 8002
    }
]
crud = CRUDUser(service_info["leader"], followers)
# Becomes.
service_info = {
    "host" : "127.0.0.1",
    "port" : 8001,
    "leader" : False
}

crud = CRUDUser(service_info["leader"])
# And
service_info = {
    "host" : "127.0.0.1",
    "port" : 8002,
    "leader" : False
}

crud = CRUDUser(service_info["leader"])

Now we can finally run the applications in three separate terminals. If you are going to make a written request to the Leader, it will be propagated to the followers too. However, making a similar request to followers will return a 403 error status response. However, read requests will work for all services, Leaders, and followers.

Conclusion.

Leader-Follower replication is a simple approach for creating distributed stores. It ensures data consistency and allows multiple users to access simultaneously data. However, because of the replication we are storing 3 copies of the data, which can be pretty expensive. In the following article, we are going to talk about partitioned or shared services, which are going to keep the consistency, but at the same time reduce by one-third the amount of the data stored.

The full commented code of the class can be found here: GitHub URL

Written by Păpăluță Vasile

LinkedIn:https://www.linkedin.com/in/vasile-păpăluță/
Instagram:https://www.instagram.com/science_kot/
GitHub:https://github.com/ScienceKot


Written by papalutavasile | A self-taught AI engineer
Published by HackerNoon on 2023/07/05