paint-brush
Building Microservices With Namekoby@peace
1,080 reads
1,080 reads

Building Microservices With Nameko

by shroomJune 9th, 2021
Read on Terminal Reader
Read this story w/o Javascript
tldt arrow

Too Long; Didn't Read

What is Nameko? Nameko is a framework for building lightweight, highly scalable and fault-tolerant service in Python.
featured image - Building Microservices With Nameko
shroom HackerNoon profile picture

In this chapter, we will learn about Nameko and it's capabilities as a microservice framework.

What is Nameko?

Nameko is a framework for building lightweight, highly scalable and fault-tolerant services in Python following a micro-service architecture design.

It comes with built-in support for:

  1. RPC over AMQP
  2. Asynchronous events (pub-sub) over AMQP

Why Nameko?

Nameko enables you to build a service that can respond to RPC messages, dispatch events on certain actions, and listen to events from other services. It could also have HTTP interfaces for clients that can’t speak AMQP.

Let's create a basic Nameko service and experiment with it capabilities.

Setup Basic Environment

First, you will need Docker installed. We will use Python 3, so make sure you have it installed as well.

To run Nameko, we need the RabbitMQ. It will be responsible for the communication between our Nameko services.

Install

$ pip install nameko

Start A RabbitMQ Container

$ docker run -p 5672:5672 --hostname nameko-rabbitmq rabbitmq:3

Hello World!!

A Nameko service is just a Python class. The class encapsulates the logic in its methods and declares any dependencies as attributes.

Go ahead and create a file called Service.py with the following content:

from nameko.rpc import rpc

class Service:
	name = "service"
	
	@rpc
	def receive_event(self, event):
		return f"Event Received: {event}"

Let’s go ahead and run our example. If you got RabbitMQ running, simply run:

$ nameko run Service

Nameko implements automatic service discovery, meaning when calling an RPC method over AMQP, Nameko will try to find the corresponding RabbitMQ service on its own.

To test our service, run the below command in another terminal:

$ nameko shell

This will create an interactive shell which will connect to that same RabbitMQ instance.

Nameko shell, provides, a special object called n added to the namespace. This object allows you to do RPC calls to a Nameko service. To do an RPC call to our service, run:

>>> n.rpc.service.receive_event(event={'message': 'Hello World!!'})

When an RPC entrypoint is invoked, a Nameko worker is created. A worker is just a stateless instance of the service class, which makes it inherently thread-safe. The maximum number of workers by default for a service is set to 10.

Read more about Nameko Workers here.

If maximum number of workers is set to 1, then only 1 Nameko worker will execute at a time i.e. it will behave as a regular queue.

Communicate Between 2 Nameko Services

In order to communicate from one Nameko service to another and vice-versa, Nameko provides an

RpcProxy
construct. Here's how you use it:

from nameko.rpc import rpc, RpcProxy

class SenderService:
	name = "sender_service"
	receiver_service_proxy = RpcProxy("receiver_service")

	@rpc
	def send_event(self, event):
		return self.receiver_service_proxy.receive_event({'message': 'Hello World!!'})

class ReceiverService:
	name = "receiver_service"
	
	@rpc
	def receive_event(self, event):
		return f"Event Received: {event}"

Communicate Between A Nameko & A Non-Nameko Service

There will be scenarios where we need to call a Nameko service from something that isn’t a Nameko service like an API service or a cron job. Here's how you do it:

from nameko.standalone.rpc import ClusterRpcProxy

AMQP_URI = "pyamqp://user:paswword@hostname"

config = {
    'AMQP_URI': AMQP_URI
}

with ClusterRpcProxy(config) as cluster_rpc:
    cluster_rpc.service.receive_event({'message': 'Hello World!!'})

Concurrency

Nameko is built on top of the eventlet library, which provides concurrency via “greenthreads”.

Greenthreads unlike OS threads, cooperatively yield to each other instead of preemptively being scheduled by the OS. This behaviour proves to be advantageous when a service is I/O heavy.

One greenthread yields control only when it is busy doing I/O, giving another greenthread a chance to execute thereby allowing the service to use shared data structures without the need of using locks and other synchronisation mechanisms.

Let's experiment with Nameko concurrency in practice by modifying the above code:

from time import sleep
from nameko.rpc import rpc

class Service:
	name = "service"
	
	@rpc
	def receive_event(self, event):
		sleep(5)
		return f"Event Received: {event}"

We are using sleep from the time module, which is a blocking call. However, when running our services using

nameko run
, nameko will automatically monkey patch blocking calls to non-blocking calls such as sleep(5) i.e. making it async.

The response time of a single RPC call to our service will be 5 seconds. Now, if we make 10 calls in one go to the same RPC, how long will it take to get the response of all 10 calls?

Let's run the following code in a nameko shell:

def time_concurrent_invocations():
	start_time = time.perf_counter()
	responses = []
	num_concurrent_calls = 10
	for i in range(num_concurrent_calls):
	    response = n.rpc.service.receive_event({'message': f'Worker {i+1}'})
    	responses.append(response)

	for response in responses:
		print(response.result)

	end_time = time.perf_counter()

	print(f'Total Time: {round(end_time-start_time, 3)}')

time_concurrent_invocations()

This example runs in just around five seconds. Each worker will be blocked waiting for the sleep call to finish, but this does not stop another worker to start, implicit yielding in action.

If you change num_concurrent_calls = 20 in the above snippet, the execution will finish in 10 seconds.

Async Pub-Sub

Let's suppose, we now have to do an asynchronous task like sending a notification or uploading a file on cloud.

from nameko.events import EventDispatcher, event_handler
from nameko.rpc import rpc

class MessageService:
    
    name = "message_service"

    dispatch = EventDispatcher()

    
    def time_consuming_function(self, payload):
        self.dispatch("heavy_payload_event", payload)

    @rpc
    def receive_message(self, event):
        
        if event['payload']:
            self.time_consuming_function(event['payload'])

        print(f'Message Received: {event['message']}')



class TimeConsumingService:
    name = "time_consuming_service"

    @event_handler("message_service", "heavy_payload_event")
    def time_consuming_event_handler(self, payload):
        pass

When

receive_message
processes an event with a payload, it calls
time_consuming_function
that utilises the
EventDispatcher
to process the payload in an asynchronous manner by invoking the
time_consuming_event_handler
in a separate greenthread. The caller thread here, does not wait for the event handler to return a response, thereby allowing the caller thread to finish it's execution faster and accept further requests.

Scalable

We have been using only one server and running one instance of RabbitMQ. In a production environment, you will want to arbitrarily increase the number of nodes running the service that is getting too many calls.

To simulate service scaling, let's revisit our service from the concurrency section. Open another terminal and run the service as before, using

$ nameko run Service
. This will start another service instance with the potential to run ten more workers. Now, try running that snippet again with num_concurrent_calls = 20. It should now take five seconds again to run. When there are more than one service instances running, Nameko will round-robin the RPC requests among the available instances.

In fact you can configure these services in such a way that they can run on completely different machines and scale them independently. All you need to do is point these services at the same RabbitMQ broker.

Create a config file with broker URI:

# config.yaml
AMQP_URI: amqp://<rabbitmq-ip>:5672/

Run these services on different machines using:

$ nameko run <nameko_service> --config config.yaml

Fault Tolerant

Nameko is highly roboust and fault tolerant so, it continues operating properly in the event of failure of one or more nodes in the service cluster till at least one healthy node remains functioning.

Try running 3 instances of the Service and execute the test snippet with num_concurrent_calls = 50. As soon as you execute the test snippet, kill one or 2 instances of the Service.The missed messages will be re-routed to healthy node(s), thus avoiding message loss.

This behaviour is due the fact that messages are ack’d after worker execution completes successfully, and if the connection is lost after delivery but before acknowledgement, RabbitMQ will reclaim and redeliver the message.

What Happens If The RabbitMQ Server Dies And There Are Messages Left In The Queue?

Nameko sets

delivery_mode=PERSISTENT
by default for the queues it creates for RPC over AMQP. This tells RabbitMQ to save the messages to disk. However, there is a short time window when RabbitMQ has accepted a message but, hasn't saved it yet meaning, the persistence guarantees are not strong. To solve this, Nameko uses publisher confirms by default. Confirms have a performance penalty but guarantee that messages aren't lost.

Conclusion

Nameko is designed to help you build systems using micro-services and scale from a single instance of a single service, to a cluster with many instances of many different services.

To learn more about Nameko checkout Nameko Documentation and join the Nameko Discourse

Photo Credit: Krzysztof Niewolny