On this post, I’ll show how to work with multiple queues, scheduled tasks, and retry when something goes wrong.
If you don’t know how to use celery, read this post first: https://fernandofreitasalves.com/executing-time-consuming-tasks-asynchronously-with-django-and-celery/
Let’s say your task depends on an external API or connects to another web service and for any reason, it’s raising a ConnectionError, for instance. It’s plausible to think that after a few seconds the API, web service, or anything you are using may be back on track and working again. In this cases, you may want to catch an exception and retry your task.
from celery import shared_task @shared_task(bind=True, max_retries=3) # you can determine the max_retries heredef access_awful_system(self, my_obj_id): from core.models import Object from requests import ConnectionError o = Object.objects.get(pk=my_obj_id) # If ConnectionError try again in 180 seconds try: o.access_awful_system() except ConnectionError as exc: self.retry(exc=exc, countdown=180) # the task goes back to the queue
The self.retry inside a function is what’s interesting here. That’s possible thanks to bind=True on the shared_task decorator. It turns our function access_awful_system into a method of Task class. And it forced us to use self as the first argument of the function too.
Another nice way to retry a function is using exponential backoff:
self.retry(exc=exc, countdown=2 ** self.request.retries)
Now, imagine that your application has to call an asynchronous task, but need to wait one hour until running it.
In this case, we just need to call the task using the ETA(estimated time of arrival) property and it means your task will be executed any time after ETA. To be precise not exactly in ETA time because it will depend if there are workers available at that time. If you want to schedule tasks exactly as you do in crontab, you may want to take a look at CeleryBeat).
from django.utils import timezonefrom datetime import timedelta
now = timezone.now() # later is one hour from nowlater = now + timedelta(hours=1)
access_awful_system.apply_async((object_id), eta=later)
When you execute celery, it creates a queue on your broker (in the last blog post it was RabbitMQ). If you have a few asynchronous tasks and you use just the celery default queue, all tasks will be going to the same queue.
Suppose that we have another task called too_long_task and one more called quick_task and imagine that we have one single queue and four workers.
In that scenario, imagine if the producer sends ten messages to the queue to be executed by too_long_task and right after that, it produces ten more messages to quick_task. What is going to happen? All your workers may be occupied executing too_long_task that went first on the queue and you don’t have workers on quick_task.
The solution for this is routing each task using named queues.
# CELERY ROUTESCELERY_ROUTES = { 'core.tasks.too_long_task': {'queue': 'too_long_queue'}, 'core.tasks.quick_task': {'queue': 'quick_queue'},}
Now we can split the workers, determining which queue they will be consuming.
# For too long queuecelery --app=proj_name worker -Q too_long_queue -c 2
# For quick queuecelery --app=proj_name worker -Q quick_queue -c 2
I’m using 2 workers for each queue, but it depends on your system.
As, in the last post, you may want to run it on Supervisord
There is a lot of interesting things to do with your workers here.
Another common issue is having to call two asynchronous tasks one after the other. It can happen in a lot of scenarios, e.g. if the second tasks use the first task as a parameter.
You can use chain to do that
from celery import chainfrom tasks import first_task, second_task chain(first_task.s(meu_objeto_id) | second_task.s())
The chain is a task too, so you can use parameters on apply_async, for instance, using an ETA:
chain(salvar_dados.s(meu_objeto_id) | trabalhar_dados.s()).apply_async(eta=depois)
If you just use tasks to execute something that doesn’t need the return from the task you can ignore the results and improve your performance.
If you’re just saving something on your models, you’d like to use this in your settings.py:
CELERY_IGNORE_RESULT = True
Tasks — Celery 4.1.0 documentation_The default prefork pool scheduler is not friendly to long-running tasks, so if you have tasks that run for minutes…_docs.celeryproject.org
Optimizing — Celery 4.1.0 documentation_In Celery; If a task takes 10 minutes to complete, and there are 10 new tasks coming in every minute, the queue will…_docs.celeryproject.org
Deni Bertovic :: Celery — Best Practices_While working on some projects that used Celery for a task queue I’ve gathered a number of best practices and decided…_denibertovic.com
Celery — Best Practices | Hacker News_Try to keep a consistent module import pattern for celery tasks, or explicitly name them, as Celery does a lot of magic…_news.ycombinator.com
Workers Guide — Celery 4.1.0 documentation_More pool processes are usually better, but there’s a cut-off point where adding more pool processes affects…_docs.celeryproject.org
Canvas: Designing Work-flows — Celery 4.1.0 documentation_You just learned how to call a task using the tasks method in the guide, and this is often all you need, but sometimes…_docs.celeryproject.org
Celery Messaging at Scale at Instagram — Pycon 2013
Originally published at Fernando Alves.