Understanding Caching in Distributed Systems

Written by papalutavasile | Published 2023/06/20
Tech Story Tags: caching | distributed-systems | microservices | service-oriented-architecture | python | flask | memcached | software-architecture

TLDRCaching is a way to make a distributed system more efficient. It is a trade-off between 3 properties of the Distributed Systems: Consistency, Availability and Partition Tolerance. In the case of Cache, we are going to implement a highly available and basically consistent system. To do that, we need a technique named Consistancy Hashing.via the TL;DR App


Distributed systems combine hundreds of different services, each handling various requests and processing substantial amounts of data. Certain requests may be more demanding in terms of resources and time, although they do not require frequent re-computation upon every page reload. Let's consider the user's playlist of songs or videos as an example. With each request, the system would execute a few queries and subsequently provide the results to the user. However, currently, each request takes an equal amount of time. By incorporating caching services, only the initial request would experience longer processing time, while subsequent ones would be faster.

Content Overview

  • The problem statement
  • CAP Theorem
  • First things, first
  • Ensuring Availability
  • The Python Implementation
  • Conclusion

The problem statement

Each request to a distributed system requires some computation and time resources. In the case of simple requests, this is pretty straightforward. However, things get more complicated with requests that are repeatable and involve accessing a lot of services, at the same time and needing to return fast responses.

For example, loading a profile page on social media. There are few chances that an update happened or it takes more time for this update to propagate through the system. However, the user wants a quick response. If another user requested this page, it is saved in the cache for some time. If the next request for this page happens within this time, then the user gets the cached page, instead of the page from the initial service, getting a much faster response. This process is illustrated in Figure 1.

CAP Theorem

Before getting to how to implement a Cache, I want to explain some theory behind Distributed Systems. I will try to be as fast as possible, and not too boring 😺. CAP Theorem was proposed in 2000 by Eric Brower. It describes a fundamental trade-off between 3 properties of the Distributed Systems:

  • Consistency — Simply saying is the property of each server to return the correct response to each request.

  • Availability — The property of a system that ensures a response for each incoming request.

  • Partition Tolerance — The ability of a system to continue working without stopping because of delays or missing messages.

The CAP theorem is saying that no distributed system can ensure all three of them. Bad news, yeah? Start panicking, I only started 😺 . The idea is that a distributed cannot exist without Partition Tolerance. Why? Because in such a case, the System will go down pretty fast. So we have only two choices — CP or AP application.

In reality, we must make a decision between having a highly available system with a basic level of consistency or a highly consistent system with a basic level of availability. Different types of systems require different combinations of these two properties. In the case of caching, we aim to implement a highly available and essentially consistent system. In one of the upcoming articles, I will demonstrate how to build a CP system.

First things, first

First, we need to take care of Partition Tolerance. In a distributed system anything can happen; from dead nodes to missing packages. In the case of missing packages, we can simply resend the message or move on to another service replica. So, we need more replicas of the same service to store cached values. But there is a problem, we don’t just want to store the same information on all the replicas. That would be highly inefficient. Here we can make use of shared services.

In the case of sharded services, instead of having one service, we split it into a number of sharded services, each of them handling a part of the requests. In this case, if a shard falls the rest of the nodes are continuing to work, and even more, some of the remaining services can take the responsibilities of the fallen brother, while it is restarted. But to achieve that, we need a technique named Consistency Hashing.

Usually, when we want to forward a request to one of the shards, we find the responsible one first. To do that we take a field of the request, or the whole request, and compute its hash. The hashing function will return an integer number, which is evenly distributed. This is illustrated in the image below:

By taking the mod n of the hash we get the responsible service, where n is the number of shard services. Suppose we get the index of the service that isn’t working, then we cannot propagate the request. In this case, we can make something different and use consistency hashing. Consistency hashing is when instead of computing the hash mod n we do hash mod 360. Then we get a value between 0 and 359. In this setting, every service is related to a degree, and we just find the nearest degree to the value computed. The service related to this service represents the one responsible for this request. In such a way even if a service is down, we can always find a new responsible service. To make this possible we are going to use a structure named Hash Ring illustrated below. In such a way we can ensure a weak form of consistency. In another tutorial, we will look at how to ensure the full Consistency.

Ensuring Availability

So how do ensure availability in this setting? Just by copying the structure above 2 or more times. But first, we need to add to our shared cache as a kind of Gateway. The Gateway will be responsible for computing the hash of the request, finding the responsible node, and propagating the request. This will result in the following tree structure, on way or the other.

Now, we double these service clusters and get the structure below:

When the client wants to save something in the cache, it is sends 2 requests, one per cluster. As such, we have 1 copy of the data per cluster. And when the client wants to request some value from the cache, it sends the request to one of the values, in case of getting an error (because of unavailability of the service, for example), then it can request the second one and get the required value. In such a way the cache ensures high availability. Now let’s see how we implement such services.

The Python implementation

To implement the structure shown in Figure 3 we are going to use two types of services. As leaf nodes, we are going to use Memcached. It is a distributed, open-source, high-performance, distributed memory object caching system. It allows one to memorize something for a specified period of time. Memcached service can be defined in a docker-compose so we are going to define them here.

NOTE: Please make sure that you have Docker Desktop installed and that it’s running.

version: "3"
services:
  memcached-1:
    image: memcached
    ports:
      - 11211:11211

  memcached-2:
    image: memcached
    ports:
      - 11212:11211

  memcached-3:
    image: memcached
    ports:
      - 11213:11211

  memcached-4:
    image: memcached
    ports:
      - 11214:11211

  memcached-5:
    image: memcached
    ports:
      - 11215:11211

  memcached-6:
    image: memcached
    ports:
      - 11216:11211

To activate them, just run the following command in the terminal:

docker compose build

Then, run the next command to run the services:

docker compose up

These commands will run all services. You can let them run or not while developing the cache gateways. Just don’t forget to start them up with the second command when you want to test the whole app.

Next, Memcached services use TCP protocol for communication, but fortunately, we don’t need to implement the communication ourselves. For this purpose, we can use the pymemcache library.

To install it use the following command:

pip install pymemcache

Now, we can get to implementing the cache gateway service. First, we import the Flask modules and pymemcache’s client.

# Importing all needed modules.
from flask import Flask, request
from pymemcache.client import base

Next, we need to define the memcached clients:

# Defining the Memcached clients.
memcache_client1 = base.Client(("localhost", 11211))
memcache_client2 = base.Client(("localhost", 11212))
memcache_client3 = base.Client(("localhost", 11213))

The Client class takes as an argument the tuple with the host and the port of the service to connect to. We are going to use them as clients for interaction with the Memcached Service.

Now we implement the Hash Ring as a dictionary mapping degree to the Memcached service. Also, we need to implement the function that would find us the responsible service for a request:

# Defining the Hash Ring.
HASH_RING = {
    0 : memcache_client1,
    120 : memcache_client2,
    240 : memcache_client3
}

def find_memcache_service(request_body):
    '''
        This function returns based on the request body the responsible service for this request.
    :param request_body: dict
        The content of the request.
    :return: int
        The index of the Memcached service responsible of the request.
    '''
    chosen_service_index = 0
    min_difference = 360
    
    # Extracting the hash of the user id and calculating the hash mode 360.
    hash_payload = int(hash(request_body["user_id"]))
    hash_mod_360 = hash_payload % 360
    
    # Finding the degree with the lowest distance from hash mode 360.
    for index in HASH_RING:
        # Calculating the difference between the degree and hash mod 360.
        difference = abs(hash_mod_360 - index)
        if difference < min_difference:
            # Updating the nearest degree.
            min_difference = difference
            chosen_service_index = index
    return chosen_service_index

As shown in the listing above, when a request comes the service extracts the hash of the user id from the request, followed by computing the hash mod 360. Then from the Hash ring, the service is find the Memcached service with the lowest difference between the mod 360 and the assigned degree. This service is the one to which the data is forwarded.

Going further we need to implement the endpoints of the Flask Service, starting with the /save one.

# Creating the Flask application.
app = Flask(__name__)

@app.route("/save", methods=["POST"])
def save():
    '''
        This endpoint processes the save requests.
    '''
    # Extracting the request body.
    request_body = request.json
    
    # Getting the index (degree) of the responsible Memcached service.
    memcache_index = find_memcache_service(request_body)
    
    # Sending the request to the Memcached service.
    HASH_RING[memcache_index].set(request_body["user_id"],
                                  request_body,
                                  expire=30)
    return {
        "message" : "Saved!"
    }, 200

During the processing of the request, we extract the request body and find the responsible Memcache service, then save data to this service for 30 seconds, returning the confirmation message.

During the processing of the /cache requests, we do the same things: extracting the data, finding the responsible Memcache, and getting the value. If the value is present in the responsible shard then we return the value and 200 status code, if not then we return an error message and the 404 status code:

@app.route("/cache", methods=["GET"])
def cache():
    '''
        This endpoint processes the caching requests.
    '''
    # Extracting the request body.
    request_body = request.json
    
    # Getting the index (degree) of the responsible Memcached service.
    memcache_index = find_memcache_service(request_body)
    
    # Getting the cached value from the responsible Memcached service.
    cached_value = HASH_RING[memcache_index].get(request_body["user_id"])
    
    # Returning the requested value or the error message.
    if cached_value:
        return cached_value, 200
    else:
        return {
            "message" : "No such data"
        }, 404

Finally, run the app:

# Running the main service.
app.run()

NOTE: When running the application don’t forget to run the docker compose too.

During the development of the code for this article, I used the following schemas for requests:

{
    "user_id" : "55852755-068a-48ce-b7b1-89e60320cc65",
    "playlists" : [
        "0d37b63d-61dc-4bf4-85bf-27e46558f9be",
        "3d56c68d-54c9-47ed-b863-8d67748b5f1c",
        "ca97bbde-e0fe-4c50-8225-0dbfcc12b431"
    ]
}
{
    "user_id" : "55852755-068a-48ce-b7b1-89e60320cc65"
}

Now to implement the structure from Figure 4 just make a copy of the Flask Service in another folder and change the ports of the Memcached clients to the remaining from a docker-compose file, and set the service itself to another port like shown below:

app.run(port=6000)

Conclusion

Caching is a very important concept and functionality for any distributed system. It enhances the speed with which the application can work and also saves a lot of computational resources if applied correctly. In the following article in the series, I will show how to easily take full advantage of having two or more services of services clusters.

The full commented code of the service can be found here: GitHub

Also published here.


Written by papalutavasile | A self-taught AI engineer
Published by HackerNoon on 2023/06/20