paint-brush
Understanding Partitioned Services in Distributed Systems by@papalutavasile
346 reads
346 reads

Understanding Partitioned Services in Distributed Systems

by Păpăluță VasileJuly 9th, 2023
Read on Terminal Reader
Read this story w/o Javascript
tldt arrow

Too Long; Didn't Read

In the previous article we explored how to create a Distributed Data Store or more precisely a replicated one. It made the data store more available, at least allowing for more reads simultaneously. But this approach isn’t perfect for any case, especially when we have to store and query large amounts of data. For this, we need a partitioned (or sharded ) data store.
featured image - Understanding Partitioned Services in Distributed Systems
Păpăluță Vasile HackerNoon profile picture


In the previous article, we discussed how to create a Distributed Data Store or more precisely a replicated one. It made the data store more available, at least allowing for more reads simultaneously. However, this approach isn’t perfect for any case, especially when we have to store and query large amounts of data. For this, we need a partitioned (or sharded) data store.


Content Overview

  • Problem Formulation
  • Partitioned Data Stores
  • Partitioning by Key Range
  • Partitioning by Hash of Key
  • Rebalancing and Consistency Hashing
  • Two Partitions per Node
  • Python Implementation
  • Conclusion


Problem Formulation

We have a copy of data per node in the replication data store. Suppose we have 3 nodes in our distributed data store. Also for simplicity, we note the whole amount of data stored as D. Then our Distributed Data Store takes 3D of memory. This becomes impractical when we have a lot of data and heavy queries (especially write ones). This is illustrated in Figure 1.


Partitioned Data Stores

By Partitioning some data you split it into groups that we usually name partitioned or partitions. To each node is assigned one or more partitions, making it serve only these partitions. For example, if we assign to each node a single partition, in our three-node example, then we need only D of memory, but if we assign 2 partitions to each node, then we store in total 2/3 D.


This process is illustrated below:

Figure 1: The amount of data stored depending on the number of partitions per service.


But now arise a question. How do we decide to which partition a piece of data belongs? There are some strategies.


Partitioning by Key Range

Suppose all records in the data store have as an id an integer number. Then we can set partitions as ranges, for example, 0-999, 1000-1999, 2000-2999, and so on. Then using the id we can find the range and redirect the request to the node(s) serving these ranges. To node that partition can also be used when ids are strings, and the first n letters of the id can define the partition.


Figure 2: Partitioning by Key Range.


However, this approach can lead to hot spots - when one or a couple of partitions are more accessed than others. This can put down a node or two. To avoid this case we can use Partitioning by Hash of Key.


Partitioning by Hash of Key

To reduce the risk of hot spots, distributed data stores often use a hash function to determine the partition for a given key or index. Hash functions are used to make skewed data and make it uniformly distributed. If we have for example a 32-bit hash function, then the string taken will be converted into a seemingly random number between 0 and 2^23 - 1. In such a way even very similar strings have evenly distributed hashes.


Usually just having the hash of the index or key isn’t enough. Also, we do the following operation:

hash mod p


Where p is the number of partitions. It’s important to mention that the number of nodes and partitions can be different. In this way, we find the partition to forward the request.

However, there is a problem that also should be solved - rebalancing.

Rebalancing and Consistency Hashing

As usual, in distributed systems can happen anything. For example one of the nodes from the distributed data store can fall, and its partitions will be lost. Or a new machine is added to the cluster, how then do we know which partition(s) to assign to it?


Consistency Hashing is a way to solve the problem of rebalancing. Initially, we need to place all our services in an imaginary circle at an equal distance from each other as shown in the image below.


Then we assign to each service an angle - in our example:

  • Service A: 120;
  • Service B: 240;
  • Service C: 360;


Then when we get a request we compute the hash mod 360 and find the node with the lowest difference between its assigned degree and the value that we get. This structure is called the Hash Ring and when a new node is added it just assigns itself a partition (or an angle) and starts to process requests. When a node falls then his angle value just isn’t used anymore for computing the partition.


Two partitions per node

In the example that we are going to implement below, we are going to have 2 partitions per node. In this case, even if one node falls the data that it was storing can be found in the rest of the nodes.


We are going to do that by searching the two nearest angles from the service from the hash mod 360.


This is illustrated below:


Figure 3: Hash ring.


Python Implementation

To implement the sharded data store we are going to change a little bit the code from the last article. Shortly are going to add the CRUDUser to the capacity to compute the hash mod 360 of the index of the user and find the nodes responsible for the request.


Changes to the main.py

First, to the credentials of the service, we are going to add a new value - degree - this is the partition the service is responsible for:


# Defining the service credentials.
service_info = {
    "host" : "127.0.0.1",
    "port" : 8000,
    "leader" : True,
    "degree" : 120
}

# Defining the followers credentials.
followers = [
    {
        "host" : "127.0.0.1",
        "port" : 8001,
        "degree" : 240
    },
    {
        "host" : "127.0.0.1",
        "port" : 8002,
        "degree" : 360
    }
]


As you can see this value is added to the dictionary with the followers’ credentials too.


sRefactoring of the CRUDUser Class

Now CRUDUser class should be refactored to be able to use the sharding capacity. We are going to start with adapting the constructor, namely adding the service degree to the CRUDUser:


class CRUDUser:
    def __init__(self, leader : bool, degree : int, followers : dict = None) -> None:
        '''
            The constructor of the CRUD class.
        :param leader: bool
            True fro leader and False for follower.
        :param degree: int
            The degree for which the service is responsible.
        :param followers: dict, default = None
            The dictionary with the followers credentials.
        '''
        self.users = {}
        self.leader = leader
        self.deegree = degree
        if self.leader:
            self.followers = followers


Now in main.py while creating the CRUD object we also give the constructor the service’s degree:


# For leader:
crud = CRUDUser(service_info["leader"], service_info["degree"], followers)
# For follower:
crud = CRUDUser(service_info["leader"], service_info["degree"])



Consistency Hashing

We already implemented consistency hashing in the caching article. This time we are doing in a slightly another way. The class method get_responsible_degrees will return the list with 2 degrees which will represent the partitions to which this data will be assigned. It computes the hash mod 360 of the index of the request and finds 2 nearest angles from 120, 240, and 360:


@classmethod
def get_responsible_degrees(cls, string : str) -> list:
    '''
            This function finds the nearest responsible degrees for a string uuid.
        :param string: str
            The value for with to compute the hash and find the nearest deegrees.
        :return: list
            The list with the nearest degrees for the string uuid.
    '''
    angles_list = [120, 240, 360]
    # Computing the hash of the string.
    hash_str = hash(string)
    # Calculating the hash mod 360
    hash_mod_360 = hash_str % 360
    # Calculating the distance between hash mod 360 and all angles.
    dist_list = [abs(hash_mod_360 - angle) for angle in angles_list]
    # Find the nearest 2 angles with the lowest difference.
    responsible_indexes = sorted(range(len(dist_list)), key=dist_list.__getitem__)
    responsible_angles = [angles_list[responsible_indexes[index]] for index in range(2)]
    return responsible_angles


This function is called in the get_selected_services which using the selected angles finds the services responsible for this request, being the leader itself or the followers. Yes, this hash function is computed only by leaders.


This function is listed below:

def get_selected_services(self, string : str) -> list:
    '''
        This function return the list of services responsible for the request.
    :param string: str
        The value which we use to compute the hash mode 360.
    :return: list
        The list of responsible services.
    '''
    selected_services = []
    # Getting the responsible angles.
    responsible_angles = self.get_responsible_degrees(string)

    # Adding itself if in responsible degrees is presented the deegree of the service.
    if self.deegree in responsible_angles:
        selected_services.append("me")
        responsible_angles.remove(self.deegree)
    # Adding the rest of responsible services.
    for angle in responsible_angles:
        for follower in self.followers:
            if follower["degree"] in responsible_angles:
                selected_services.append(follower)
    return selected_services


Related to the CRUD functions, the only functions that are changing are the write ones, the create, update, and delete. The read functions also can be changed to use rerouting, however, we are going to create a gateway for this purpose in one of the future articles, so forwarding o requests will be used only for the write functions.



Create

While adding a record to the data store shard, if it’s a leader we first are computing the responsible services. Having these responsible services, we begin if checking if the service itself is responsible. If it is in the responsible services, then we add the record to the data store, then we are forwarding the request to another follower. If the leader isn’t one of the responsible ones then it is just forwarding the requests to the followers.


If this code is run by a follower then it just skips all leader-related code and just adds the record to the data store.

def create(self, user_dict : dict):
    '''
        This function adds the new user to the data store and forwards the request to other services if it's the leader.
    :param user_dict: dict
        The dictionary containing the user's information.
    :return: dict, integer
        The response and the status code of the request.
    '''
    resp_services = []
    # If the service is the leader then we are creating a new uuid for the record and getting the list of
    # responsible services.
    if self.leader:
        index = str(uuid.uuid4())
        user_dict["id"] = index
        resp_services = self.get_selected_services(index)
    if user_dict["id"] not in self.users:
        # Checking the conditions for adding the data to the data store.
        if (self.leader and "me" in resp_services) or not self.leader:
            self.users[user_dict["id"]] = user_dict
            # Removing the "me" from the responsible services list.
            if "me" in resp_services:
                resp_services.remove("me")

        # Forwarding the request to the responsible followers.
        if self.leader:
            for follower in resp_services:
                requests.post(f"http://{follower['host']}:{follower['port']}/user",
                              json = user_dict,
                              headers = {"Token" : "Leader"})

        return user_dict, 200
    else:
        # Returning the error response if the user already exists.
        return {
                   "message" : "Error: User already exists!"
               }, 400

A practically identical code is added for the update and delete functions too:


Update and Delete

def update_user(self, index : str, user_dict : dict):
    '''
        This function updates the user's information in the data store.
    :param index: str
        The index of user to update the information.
    :param user_dict: dict
        The dictionary with the new values for the fields to update.
    :return: dict, int
        The response and the status code of the request.
    '''
    resp_services = []
    if self.leader:
        # Getting the responsible followers.
        resp_services = self.get_selected_services(index)
    if index in self.users:
        # Checking the conditions for updating the data to the data store.
        if (self.leader and "me" in resp_services) or not self.leader:
            self.users[index].update(user_dict)
            # Removing the "me" from the responsible services list.
            if "me" in resp_services:
                resp_services.remove("me")
        # Forwarding the request to the responsible followers.
        if self.leader:
            for follower in resp_services:
                requests.put(f"http://{follower['host']}:{follower['port']}/user/{index}",
                             json = self.users[index],
                             headers = {"Token" : "Leader"})

        return self.users[index], 200
    else:
        # Returning the error response if the user already doesn't exist.
        return {
                   "message" : "Missing user!"
               }, 404

def delete_user(self, index : str):
    '''
        This function removes the user from the data store.
    :param index: str
        The index of the user to delete from the data store.
    :return: dict, int
        The response and the status code of the request.
    '''
    resp_services = []
    if self.leader:
        # Getting the responsible followers.
        resp_services = self.get_selected_services(index)
    if index in self.users:
        # Checking the conditions for updating the data to the data store.
        if (self.leader and "me" in resp_services) or not self.leader:
            user_dict = self.users.pop(index)
            # Removing the "me" from the responsible services list.
            if "me" in resp_services:
                resp_services.remove("me")
        # Forwarding the request to the responsible followers.
        if self.leader:
            for follower in resp_services:
                requests.delete(f"http://{follower['host']}:{follower['port']}/user/{index}",
                                headers = {"Token" : "Leader"})

        return user_dict, 200
    else:
        # Returning the error response if the user already doesn't exist.
        return {
                   "message" : "Missing user!"
               }, 404


At rest, the main.py, and crud.py module remains unchanged. Just ensure that you updated these modules in all services folders and run the application.


To test it send a POST request to the leader. You will see that the requester has forwarded to at least one another service. If you will send a get request with the returned uuid to this service you will get the data about the user from another service. This means that the data was assigned to the partition shared by these two services.


Conclusion

Sharded services are a form of saving the amount of data stored in a distributed system ensuring at the same time the data consistency in case of multiple partitions per node. In this article we explored how to create a harder data store with multiple partitions per node, keeping in such a way instead of 3D, 2/3 D amount of data. In the next article, we are going to explore how to dynamically assign partition leaders and how to re-assign a new one when the previous leader was declared dead


The full commented code of the class can be found here: GitHub URL