In this chapter, we will learn about Nameko and it's capabilities as a microservice framework.
Nameko is a framework for building lightweight, highly scalable and fault-tolerant services in Python following a micro-service architecture design.
It comes with built-in support for:
Nameko enables you to build a service that can respond to RPC messages, dispatch events on certain actions, and listen to events from other services. It could also have HTTP interfaces for clients that can’t speak AMQP.
Let's create a basic Nameko service and experiment with it capabilities.
First, you will need Docker installed. We will use Python 3, so make sure you have it installed as well.
To run Nameko, we need the RabbitMQ. It will be responsible for the communication between our Nameko services.
$ pip install nameko
$ docker run -p 5672:5672 --hostname nameko-rabbitmq rabbitmq:3
A Nameko service is just a Python class. The class encapsulates the logic in its methods and declares any dependencies as attributes.
Go ahead and create a file called Service.py with the following content:
from nameko.rpc import rpc
class Service:
name = "service"
@rpc
def receive_event(self, event):
return f"Event Received: {event}"
Let’s go ahead and run our example. If you got RabbitMQ running, simply run:
$ nameko run Service
Nameko implements automatic service discovery, meaning when calling an RPC method over AMQP, Nameko will try to find the corresponding RabbitMQ service on its own.
To test our service, run the below command in another terminal:
$ nameko shell
This will create an interactive shell which will connect to that same RabbitMQ instance.
Nameko shell, provides, a special object called n added to the namespace. This object allows you to do RPC calls to a Nameko service. To do an RPC call to our service, run:
>>> n.rpc.service.receive_event(event={'message': 'Hello World!!'})
When an RPC entrypoint is invoked, a Nameko worker is created. A worker is just a stateless instance of the service class, which makes it inherently thread-safe. The maximum number of workers by default for a service is set to 10.
Read more about Nameko Workers here.
If maximum number of workers is set to 1, then only 1 Nameko worker will execute at a time i.e. it will behave as a regular queue.
In order to communicate from one Nameko service to another and vice-versa, Nameko provides an
RpcProxy
construct. Here's how you use it:from nameko.rpc import rpc, RpcProxy
class SenderService:
name = "sender_service"
receiver_service_proxy = RpcProxy("receiver_service")
@rpc
def send_event(self, event):
return self.receiver_service_proxy.receive_event({'message': 'Hello World!!'})
class ReceiverService:
name = "receiver_service"
@rpc
def receive_event(self, event):
return f"Event Received: {event}"
There will be scenarios where we need to call a Nameko service from something that isn’t a Nameko service like an API service or a cron job. Here's how you do it:
from nameko.standalone.rpc import ClusterRpcProxy
AMQP_URI = "pyamqp://user:paswword@hostname"
config = {
'AMQP_URI': AMQP_URI
}
with ClusterRpcProxy(config) as cluster_rpc:
cluster_rpc.service.receive_event({'message': 'Hello World!!'})
Nameko is built on top of the eventlet library, which provides concurrency via “greenthreads”.
Greenthreads unlike OS threads, cooperatively yield to each other instead of preemptively being scheduled by the OS. This behaviour proves to be advantageous when a service is I/O heavy.
One greenthread yields control only when it is busy doing I/O, giving another greenthread a chance to execute thereby allowing the service to use shared data structures without the need of using locks and other synchronisation mechanisms.
Let's experiment with Nameko concurrency in practice by modifying the above code:
from time import sleep
from nameko.rpc import rpc
class Service:
name = "service"
@rpc
def receive_event(self, event):
sleep(5)
return f"Event Received: {event}"
We are using sleep from the time module, which is a blocking call. However, when running our services using
nameko run
, nameko will automatically monkey patch blocking calls to non-blocking calls such as sleep(5) i.e. making it async.The response time of a single RPC call to our service will be 5 seconds. Now, if we make 10 calls in one go to the same RPC, how long will it take to get the response of all 10 calls?
Let's run the following code in a nameko shell:
def time_concurrent_invocations():
start_time = time.perf_counter()
responses = []
num_concurrent_calls = 10
for i in range(num_concurrent_calls):
response = n.rpc.service.receive_event({'message': f'Worker {i+1}'})
responses.append(response)
for response in responses:
print(response.result)
end_time = time.perf_counter()
print(f'Total Time: {round(end_time-start_time, 3)}')
time_concurrent_invocations()
This example runs in just around five seconds. Each worker will be blocked waiting for the sleep call to finish, but this does not stop another worker to start, implicit yielding in action.
If you change num_concurrent_calls = 20 in the above snippet, the execution will finish in 10 seconds.
Let's suppose, we now have to do an asynchronous task like sending a notification or uploading a file on cloud.
from nameko.events import EventDispatcher, event_handler
from nameko.rpc import rpc
class MessageService:
name = "message_service"
dispatch = EventDispatcher()
def time_consuming_function(self, payload):
self.dispatch("heavy_payload_event", payload)
@rpc
def receive_message(self, event):
if event['payload']:
self.time_consuming_function(event['payload'])
print(f'Message Received: {event['message']}')
class TimeConsumingService:
name = "time_consuming_service"
@event_handler("message_service", "heavy_payload_event")
def time_consuming_event_handler(self, payload):
pass
When
processes an event with a payload, it calls receive_message
that utilises the time_consuming_function
to process the payload in an asynchronous manner by invoking the EventDispatcher
in a separate greenthread. The caller thread here, does not wait for the event handler to return a response, thereby allowing the caller thread to finish it's execution faster and accept further requests.time_consuming_event_handler
We have been using only one server and running one instance of RabbitMQ. In a production environment, you will want to arbitrarily increase the number of nodes running the service that is getting too many calls.
To simulate service scaling, let's revisit our service from the concurrency section. Open another terminal and run the service as before, using
$ nameko run Service
. This will start another service instance with the potential to run ten more workers. Now, try running that snippet again with num_concurrent_calls = 20. It should now take five seconds again to run. When there are more than one service instances running, Nameko will round-robin the RPC requests among the available instances.In fact you can configure these services in such a way that they can run on completely different machines and scale them independently. All you need to do is point these services at the same RabbitMQ broker.
Create a config file with broker URI:
# config.yaml
AMQP_URI: amqp://<rabbitmq-ip>:5672/
Run these services on different machines using:
$ nameko run <nameko_service> --config config.yaml
Nameko is highly roboust and fault tolerant so, it continues operating properly in the event of failure of one or more nodes in the service cluster till at least one healthy node remains functioning.
Try running 3 instances of the Service and execute the test snippet with num_concurrent_calls = 50. As soon as you execute the test snippet, kill one or 2 instances of the Service.The missed messages will be re-routed to healthy node(s), thus avoiding message loss.
This behaviour is due the fact that messages are ack’d after worker execution completes successfully, and if the connection is lost after delivery but before acknowledgement, RabbitMQ will reclaim and redeliver the message.
Nameko sets
by default for the queues it creates for RPC over AMQP. This tells RabbitMQ to save the messages to disk. However, there is a short time window when RabbitMQ has accepted a message but, hasn't saved it yet meaning, the persistence guarantees are not strong. To solve this, Nameko uses publisher confirms by default. Confirms have a performance penalty but guarantee that messages aren't lost.delivery_mode=PERSISTENT
Nameko is designed to help you build systems using micro-services and scale from a single instance of a single service, to a cluster with many instances of many different services.
To learn more about Nameko checkout Nameko Documentation and join the Nameko Discourse
Photo Credit: Krzysztof Niewolny