paint-brush
Understanding Service Discovery in Distributed Systemsby@papalutavasile
718 reads
718 reads

Understanding Service Discovery in Distributed Systems

by Păpăluță VasileJune 22nd, 2023
Read on Terminal Reader
Read this story w/o Javascript
tldt arrow

Too Long; Didn't Read

A Service Discovery is a special service in a distributed system that keeps and makes available information about other services in the system. In distributed services, the whole application is represented by services — web servers running on different machines. A mechanism for storing, updating, and deleting Services’ information is required. Service Discovery can be implemented in Python as a web app.
featured image - Understanding Service Discovery in Distributed Systems
Păpăluță Vasile HackerNoon profile picture

Working for a couple of years in the data science field and at the same time pursuing a degree in Software Engineering is like dividing yourself into two halves. Half needs to study hard math principles, all types of neural network architectures, data storage, and Business Analysis, while the second one makes you to learn how information is encoded and used by computers, how algorithms work, and how to build all sorts of applications. These two fields complete each other, the second helping the first to become a better data scientist and better understand the world in which my models are living.


In the last year of my Bachelor, I had a course that I absolutely loved — Distributed Systems. I liked it so much that I decided to go ahead with the curriculum and started to learn from one of the greatest books I read — “Designing Data-Intensive Applications” by Martin Kleppmann, and then in combining my bachelor thesis Distributed Systems and Natural Language Processing to create a Chatbot.


Now, after developing a couple of distributed systems and getting more experience in them I decided to start writing a couple of articles explaining and implementing services, patterns, and principles from this field, starting with one of the most important services in all distributed systems.

The problem statement.

In distributed services, the whole application is represented by services — web servers running on different machines responsible for different features of the application. Take for example Spotify’s functionalities. Spotify not only allows users to play music on different devices but also allows artists to load their songs, create music recommendations, provide song lyrics, follow artists, financial management, and others. All these functionalities are implemented by different services or service replicas, having hundreds or thousands of services, which should communicate with each other. However, in such kind of architecture the machines on which the services are running come and go, so the location of those are always changing. A mechanism for storing, updating, and deleting Services’ information is required, and usually for this purpose, Service Discovery is used.

Figure 1: A distributed system without a Service Discovery.


What is service Discovery?

Service Discovery is usually a service designed to be a service registry, keeping all information about the services and as a point of announcing that the services are still alive. Usually, it is one or more web services that provide an API for registering, updating, and deleting service information and some endpoints for sending heartbeat requests. The API provides mostly CRUD functionalities. If you are not familiar with CRUD operations, CRUD stands for:


  • Create — The creation of entities in data storage.
  • Read — Reading the entity values from the data storage.
  • Update — Updating entity values in the data storage.
  • Delete — Deleting entities from the data storage.


When a service in distributed systems starts, it sends a Create request to the Service Discovery to register, so the Service Discovery knows about its existence. Then the service makes the second Read request to get the information about the services to which it should communicate or send the request. Update operation is added for redeploying the service to another location, or for redirecting the load to another service. Finally, the Delete operation is created to just delete a service in case of server drop or manually.

Figure 2: Registering to Service Discovery.


Now let’s explore more about the heartbeat requests.

Heartbeat requests.

A natural question may have appeared in your head. How does Service Discovery know that the service is still running after it has registered? Here comes the Heartbeat requests. After registering and getting the information about requested services, the service starts to send once in t second or minutes request to Service Discovery, announcing that the service is still alive. If the service is not sending heartbeat requests for t x 2 or 3 then depending on the implementation the Service Discovery deletes or triggers a health check of the service. Usually, heartbeat requests don’t have any payload, but again depending on the implementation, it may have some data.

The Python implementation.

Next, I’m going to show how a simple Service Discovery can be implemented in Python as a Flask web app. Following the CRUD API each, each endpoint should accept specific HTTP methods:


  • Create POST;
  • Read: GET;
  • Update: PUT;
  • Delete: DELETE


Usually depending on the amount of data stored the Read endpoint can be split in two: read_all and read_some or read_one. This one depends on the developer and architecture. For this example, we are going to implement two Read endpoints:


  • read all;
  • read some;


Also instead of connecting a database, we are going to implement a ServiceRegistery class which will store the services information. The service information will be a simple dictionary with the following fields:


  • name — the name of the service.
  • host — the host of the service.
  • port — the service port.


The service information will look like this.

{
	"name" : "playlist-service-1",
	"host" : "234:48:194:123",
	"port" : 6000
}

Listing 1: The service credentials schema.


For service CRUD operations the ServiceRegistery has one field and five functions (remember we implement 2 forms of read requests). A field is a simple dictionary that we use to store services information and is set up in the constructor as follows:

class ServiceRegistery:
    def __init__(self):
        self.services = {}
        self.heartbeats = {}
        self.heartbeats_lock = threading.Lock()
        self.time_threashold = 30

The heartbeats, heartbeats_lock, and time_threashold are used for heartbeat requests management and will be covered later.


The create operation takes the request body and if the service described by the request body doesn’t exist then it is added, else an error message is returned as follows:

def create(self, request_body : dict):
    '''
        This function adds a service in service registries.
    :param request_body: dict
        The request body.
    :returns: dict, int
        The response returned.
        The status code.
    '''
    # Getting the name of the service.
    name = request_body["name"]

    # Checking the presence of the service.
    if name in self.services:
        return {
            "message" : "Service already registered!"
        }, 208
    else:
        # Adding the service to the service and heartbeat registries.
        self.services[name] = request_body
        self.heartbeats_lock.acquire()
        self.heartbeats[name] = time.time()
        self.heartbeats_lock.release()
        return request_body, 200


As you can see in the listing above the request status returned in case of the error is 208. This state is named Already Reported. It is usually used when a resource is requested several times.

The read-all operation doesn’t require any parameter so it just returns the dictionary with services’ information and the status code 200. In the case of the read some function it takes the list of services requested, if all services are presented, then they are returned as a dictionary, else a list of missing services names and an error status code 404.

def read_all(self):
    '''
        This function returns the dictionary of the all registered services.
    :return: dict, int
        The service registry.
        The status code.
    '''
    return self.services, 200

def read_some(self, services_list : list):
    '''
        This function returns the dictionary with the information of the requested services.
    :param services_list: list
        The list containing the names of services requested.
    :return: dict, int
        The dictionary with the service information or the error message.
        The status code.
    '''
    # Getting the missing services in the service registry from the requested ones.
    services_dif = set(services_list).difference(self.services.keys())

    # If there are missing services then the list of missing services is returned.
    if len(services_dif) > 0:
        return {
            "missing_services" : list(services_dif)
        }, 404
    else:
        # If all services are present the information of those services is returned.
        return {
            service_name : self.services[service_name]
            for service_name in services_list
        }, 200


While updating the services’ information the update function reads the request body and updates the information of the service with it. In case of the existence of this service, the updated information about the service and 200 status code is returned, else an error message with the 404 error state is returned.

def update(self, request_body : dict):
    '''
        This function updates the information about a service.
    :param request_body: dict
        The body of the request.
    :return: dict, int
        The new values of the service.
        The status code.
    '''
    # Getting the name of the service.
    name = request_body["name"]

    # If the service is present then it's information is updated.
    if name in self.services:
        self.services[name].update(request_body)
        return self.services[name], 200
    else:
        # If the service is missing the error message is returned.
        return {
            "message" : "No such service!"
        }, 404


Finally deleting a service is done by the following function of the ServiceRegistey:

def delete(self, request_body : dict):
    '''
        This function deletes a service from service registry.
    :param request_body: dict
        The body of the request.
    :return: dict, int
        The new values of the service.
        The status code.
    '''
    # Getting the name of the service from request body.
    name = request_body["name"]
    if name in self.services:
        # Getting the service information and deleting it from service registry.
        service_info = self.services[name]
        del self.services[name]
        
        
        # Deleting the service from heartbeats.
        self.heartbeats_lock.acquire()
        del self.heartbeats[name]
        self.heartbeats_lock.release()
        return service_info, 200
    else:
        # Returning the error message if the requested service isn't present in registry.
        return {
            "message" : "No such service!"
        }, 404


All these functions are called in the Service Discovery endpoints listed below:

# Importing all needed modules.
from flask import Flask, request
from service_registry import ServiceRegistry
import threading

# Creating the Flask application.
app = Flask(__name__)

# Creation of the Service registry.
service_registry = ServiceRegistry()

@app.route("/service", methods=["POST"])
def create():
    '''
        This function processes the registration requests.
    '''
    request_body = request.json
    response, status_code = service_registry.create(request_body)
    return response, status_code

@app.route("/service", methods=["GET"])
def read():
    '''
        This function processes the requests of getting information about services.
    '''
    request_body = request.json
    if request_body is not None:
        response, status_code = service_registry.read_some(request_body["services"])
    else:
        response, status_code = service_registry.read_all()
    return response, status_code

@app.route("/service", methods=["PUT"])
def update():
    '''
        This function processes the update requests.
    '''
    request_body = request.json
    response, status_code = service_registry.update(request_body)
    return response, status_code

@app.route("/service", methods=["DELETE"])
def delete():
    '''
        This function processes the delete requests.
    '''
    request_body = request.json
    response, status_code = service_registry.delete(request_body)
    return response, status_code

# Running the main service.
app.run()

Heartbeat endpoint implementation.

To implement the ability of the Service Discovery to find services that aren’t working anymore as said before after registering and getting other services’ information the service starts to send heartbeat requests to Service Discovery. After each received request the Service Discovery updates the last heartbeat timestamp of the service. Also, the Service Discovery is running on a separate thread in which one at t time it is checking the passed times since the last heartbeat request. If the passed time is less than t, then the service is declared dead, and depending on the implementation different actions are taken on.


First, below is showed the function for adding a heartbeat in the ServiceRegistery:

def add_heartbeat(self, service_name : str):
    '''
        This function updates the heartbeat timestamp for a service
    :param service_name: str
        The name of service sending the heartbeat request.
    :return: dict, int
        The new values of the service.
        The status code.
    '''
    # Checking if the service is registered.
    if service_name not in self.heartbeats:
        # Returning the error message and 404 status code.
        return {
            "message" : "Not registered service!"
        }, 404
    else:
        # Updating the last heartbeat timestamp.
        self.heartbeats_lock.acquire()
        self.heartbeats[service_name] = time.time()
        self.heartbeats_lock.release()

        # Returning the success message and the 200 status code.
        return {
            "message" : "Heartbeat received!"
        }, 200


This function is called when the heartbeat endpoint (listed below) is called:

@app.route("/heartbeat/<service>", methods=["POST"])
def heartbeat(service):
    '''
        This function processes the heartbeat requests.
    '''
    response, status_code = service_registry.add_heartbeat(service)
    return response, status_code


When the service wants to send the heartbeat request it should send its name as a parameter, in such a way its last heartbeat request timestamp will be updated. Finally below is presented the code of the function that is checking the heartbeats, it runs as a separate thread and is just printing the name of the service that it thinks is not active.

def check_heartbeats(self):
    '''
        This function checks the last heartbeats of the registered services every 10 seconds
        and prints the one that didn't send a heartbeat request more than 30 seconds.
    '''
    while True:
        self.heartbeats_lock.acquire()
        # Iterating through services and checking how long a go the service sent the last heartbeat
        # request.
        for service in self.heartbeats:
            if time.time() - self.heartbeats[service] > self.time_threashold:
                print(f"Service - {service} seems to be dead!")
        self.heartbeats_lock.release()
        time.sleep(10)


This function is running on a separate thread in the main module.

# Starting up the processing of checking heartbeats.
threading.Thread(target=service_registry.check_heartbeats).start()

# Running the main service.
app.run()

Conclusion.

Distributed systems is a vast field in Computer Science and a technology to eager to know and understand. Service Discovery is a special service in such a system that keeps and makes available information about other services in the system. Also, it keeps on the availability of the services. It also is the service to start with when developing a Distributed Service, because each service begins its activity by registering to this service.


The full commented code of the service can be found here: GitHub link


Written by Păpăluță VasileLinkedIn: https://www.linkedin.com/in/vasile-păpăluță/

Instagram: https://www.instagram.com/science_kot/

GitHub: https://github.com/ScienceKot


Also published here.