paint-brush
Creating a Distributed Task Queue in Python With Celery + RabbitMQ + Redisby@vjanz
1,750 reads
1,750 reads

Creating a Distributed Task Queue in Python With Celery + RabbitMQ + Redis

by Valon JanuzajFebruary 15th, 2023
Read on Terminal Reader
Read this story w/o Javascript
tldt arrow

Too Long; Didn't Read

A distributed task queue allows you to offload work to another process. It gives you the ability to execute tasks in the background while the application continues to resolve other tasks. In this article, we are going to use Celery, RabbitMQ, and Redis to build a distributed Task queue.
featured image - Creating a Distributed Task Queue in Python With Celery + RabbitMQ + Redis
Valon Januzaj HackerNoon profile picture

In this article, we are going to use Celery, RabbitMQ, and Redis to build a distributed Task queue. But what is a distributed task queue, and why would you build one?


A distributed task queue allows you to offload work to another process, to be handled asynchronously (once you push the work onto the queue, you don’t wait) and in parallel (you can use other cores to process the work).


So it basically gives you the ability to execute tasks in the background while the application continues to resolve other tasks.

A system using workers with RabbitMQ and Redis

Use Cases of Task Queues

The most basic and understandable example would be sending emails after the user is registered. In this case, you don’t know how much time it is going to take to send the email to the user, it can be 1ms but it can be more, or sometimes even not sent at all, because, in these case scenarios, you are not responsible or simply said you’re not aware of the task is going to be successfully done, because it’s another provider who is going to do that for you. So now that you got a simple idea of how you can benefit from the task queues, identifying such tasks is as simple as checking to see if they belong to one of the following categories:


  • Third-party tasks — The web app must serve users quickly without waiting for other actions to complete while the page loads, e.g., sending an email or notification or propagating updates to internal tools (such as gathering data for A/B testing or system logging).
  • Long-running jobs — Jobs that are expensive in resources, where users need to wait while they compute their results, e.g., complex workflow execution (DAG workflows), graph generation, Map-Reduce like tasks, and serving of media content (video, audio).
  • Periodic tasks — Jobs that you will schedule to run at a specific time or after an interval, e.g., monthly report generation or a web scraper that runs twice a day.

Setting up the dependencies for Celery

Celery requires a message transport to send and receive messages. Some candidates that you can use as a message brokers are:



For this tutorial we are going to use RabbitMQ, you can use any other message broker that you want (ex. Redis).


It’s also good to mention for what are we going to use Redis now since for the message transporter we are using RabbitMQ.When tasks are sent to the broker, and then executed by the celery worker, we want to save the state, and also to see which tasks have been executed before. For that, you’re going to need some kind of data-store and for this one, we are going to use Redis.


For the result stores, we also have many candidates:


  • AMQP, Redis

  • Memcached,

  • SQLAlchemy, Django ORM

  • Apache Cassandra, Elasticsearch, Riak, etc


To set up these services we are going to use docker as it’s easy to set up, it’s an isolated environment and you can easily reproduce the same environment when you have a configuration (Dockerfile or docker-compose).

Project setup

Let’s start a new python project from scratch. First let’s create a new directory, create all the files necessary for the project, and then initialize the virtual environment.


mkdir celery-python && cd $_
touch __init__.py
touch tasks.py
touch docker-compose.yaml
touch requirements.txt

# create & activate the virtualenv

python -m venv env
source env/bin/activate


Now let’s install the project requirements. For this project, we are just going to need celery and Redis.


pip install celery redis 


Now it’s time to configure docker-compose to run RabbitMQ and Redis. In the docker-compose.yaml paste the following YAML configuration.


Here we simply start up two services, by defining the image key to point to the image in dockerhub , mapping the ports host:docker and adding environment variables. To see what types of environment variables you can use with your image, you can simply go to the corresponding image in dockerhub, and see the documentation. For example, you can check how to use RabbitMQ image here.


Now, let’s initialize the celery app to use RabbitMQ as a message transporter and Redis as a result store. In the tasks.py, let’s go ahead and paste the following code:


I tried to keep the code as minimal as possible, so you can understand the purpose of this tutorial.As you can see, we have defined the URLs for RabbitMQ and Redis, and then we simply initialize the celery app using those configurations. The first parameter tasks is the name of the current module.


Then we have decorated the function say_hello with @app.task which tells that the function is marked as a task, and then can later be called using .delay() which we will see in a bit.


Normally we would have a module celery_app.py to only initialize the celery application instance, and then a separate moduletasks.py in which we would define the tasks that we want to run by celery.


Build and run services with docker

Now we only need to run the services (RabbitMQ and Redis) with docker. To run the images inside a container we simply run:


docker-compose up -d 


This will take a while if you don’t have these images pulled locally. Then to verify that the containers are up and running we write:


docker ps


And you should see two services running, and additional information for each one, if not check the logs for any possible errors. Now let’s start the celery worker, and then let’s try to run some tasks with the python interactive shell.


# Starting the Celery worker

celery -A tasks worker -l info --pool=solo


This will run celery worker, and if you see the logs it should tell you that it has successfully connected with the broker. Now let’s run a task:


# Running celery tasks

python
---------------------------------
Type "help", "copyright", "credits" or "license" for more information.
>>> from tasks import say_hello
>>> say_hello.delay("Valon")
<AsyncResult: 55ad96a9-f7ea-44f4-9a47-e15b90d6d8a2>


We can see that we called the function using .delay() and then passed the name argument. This method is actually a star-argument shortcut to another method called apply_async(). Then we see that we get <AsyncResult back which is the task that was passed to the broker, and after that will get consumed and finished in the background by celery.


If you look at your worker now, you will see in the logs that the worker received a task and then after 5 seconds will tell you that the task finished successfully.


Now let’s run the same task but let’s put the results store in the game now. In the python shell let’s store the result in a variable, and then lets its properties.


If we didn’t have the backend configured at the celery (Redis), we couldn’t access these properties or functions, because by default it wouldn’t store any state, but since we have it, we can see and get the pieces of information about our tasks. If you wanna dig deeper you can access your Redis database with a tool like TablePlus or you can set Flower to monitor Redis and RabbitMQ.


As you can see in the image above, all the tasks are stored in Redis.


Wrapping up

In this article, we have set up a python application with Celery, RabbitMQ, and Redis from scratch. The purpose of the article was to show you what is task queue, what can we benefit from it, and how to implement it. The examples of the task are just for demonstration, but you can use the same configuration as I did on this one, adding tasks in the tasks module and the configuration in celery_app.py. See the docs here.


I highly encourage you to use celery in your application as it is quite useful when you have things that take longer time, you need to schedule tasks, etc.


If you read the article and found it useful don’t forget to like and share it with your friends & community. If you have any questions, feel free to reach out to me. Connect with me on 👉 LinkedIn, Github


Also published here.