Asyncio: How to Say Goodbye Without Losing Your Data

Written by taras | Published 2023/03/27
Tech Story Tags: python | asyncio | programming | backend | python-programming | python3 | asynchronous-tasks | coding

TLDRThis article explains how to gracefully shut down an asyncio application without losing data. The author illustrates what's wrong with asyncio.shield() and shows how to handle interruption signals to guarantee that your asyncio application will be shut down gracefully.via the TL;DR App

Hi folks! Today, I want to illustrate how confusing it might be to gracefully shut down an asyncio app without losing anything important.

When shutting down an asyncio application, it's crucial to make sure all running tasks have finished execution before closing down the application. A hard exit can cause data loss and other unforeseen issues. Therefore, it's recommended to use a graceful shutdown mechanism that allows running tasks to complete before shutting down the application.

To accomplish this, the asyncio module provides a shield() function that can be used to ensure a task completes its execution even if the application is shutting down. However, in this article, I will illustrate that it is not as simple as it may seem.

1. Without Any Protection

Let's start with a simple program that runs multiple tasks in the background and waits until they're done. Then, I will try to interrupt it in the middle.

# example1.py

import asyncio


async def worker(n: int) -> None:
    print(f"[{n}] Started!")
    try:
        # this is a task that shouldn't be canceled in the middle
        await asyncio.sleep(10)
    except asyncio.CancelledError:
        print(f"[{n}] Canceled (this is bad)!")
    else:
        print(f"[{n}] Successfully done!")


async def main() -> None:
    # create 6 unprotected tasks
    tasks = []
    for n in range(6):
        tasks.append(asyncio.create_task(worker(n)))

    # wait for all tasks to finish
    await asyncio.gather(*tasks)


if __name__ == '__main__':
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("App was interrupted")
    else:
        print("App was finished gracefully")

Let's run example1.py and interrupt it afterwards:

> python3 example1.py
[0] Started!
[1] Started!
[2] Started!
[3] Started!
[4] Started!
[5] Started!
^C[0] Canceled (this is bad)!
[1] Canceled (this is bad)!
[2] Canceled (this is bad)!
[3] Canceled (this is bad)!
[4] Canceled (this is bad)!
[5] Canceled (this is bad)!
App was interrupted

As you can see, when I interrupted the script (the ^C sign appears when I press ctrl+C), all tasks were immediately cancelled without waiting for their completion. But this is quite expected since we didn't even bother to protect the tasks.

2. With “shield”

Let's update main() by using asyncio.shield() to prevent 3 of 6 tasks from being cancelled, as described in the documentation.

# example2.py

import asyncio


async def worker(n: int) -> None:
    ...


async def main() -> None:
    # create 6 tasks, shield only first 3
    tasks = []
    for n in range(6):
        task = asyncio.create_task(worker(n))
        if n < 3:
            # protect task from being canceled 
			# (spoiler: it will be canceled anyway)
            task = asyncio.shield(task)

        tasks.append(task)

    # wait for all tasks to finish
    await asyncio.gather(*tasks)


if __name__ == '__main__':
    ...
> python3 example2.py
[0] Started!
[1] Started!
[2] Started!
[3] Started!
[4] Started!
[5] Started!
^C[3] Canceled (this is bad)!
[4] Canceled (this is bad)!
[5] Canceled (this is bad)!
[2] Canceled (this is bad)!
[0] Canceled (this is bad)!
[1] Canceled (this is bad)!
App was interrupted

Do you see the difference from the previous example (example1.py)? There isn't any. Nothing has changed. Why is it so?

It is because shield() protects a coroutine only if the parent coroutine (inside which the shield() is used) gets cancelled. So it doesn't protect the shielded coroutine from direct cancellation.

Let me explain. If you shut down an app, you'll likely do it the conventional way by following these steps:

  1. tasks = asyncio.all_tasks() - retrieve all tasks
  2. [t.cancel() for t in tasks] - cancel all tasks
  3. loop.run_until_complete(gather(*tasks)) - wait for the cancelled tasks to complete

Since shield() works by creating an inner task that is also included in the all_tasks() call, it also receives a cancellation exception, just like everything else.

Now, let's take a look at a piece of code from asyncio (Python 3.11.2) that is called before closing the loop:

def _cancel_all_tasks(loop):
    to_cancel = tasks.all_tasks(loop)
    if not to_cancel:
        return

    for task in to_cancel:
        task.cancel()

    loop.run_until_complete(tasks.gather(*to_cancel, return_exceptions=True))

As we can see, nothing fancy, just the same three steps.

3. Handle Signals

Now it's time to actually protect tasks from unexpected cancellation. The way we do this is by implementing the following steps:

  1. Make a set of tasks that we want to protect.
  2. Handle basic interruption signals (SIGHUP, SIGTERM, SIGINT) to implement our own shutdown logic.
  3. Within the shutdown function, cancel only unprotected tasks.

# example3.py

import asyncio
import signal

# tasks that shouldn't be canceled
_DO_NOT_CANCEL_TASKS: set[asyncio.Task] = set()


def protect(task: asyncio.Task) -> None:
    _DO_NOT_CANCEL_TASKS.add(task)


def shutdown(sig: signal.Signals) -> None:
    print(f"Received exit signal {sig.name}")

    all_tasks = asyncio.all_tasks()
    tasks_to_cancel = all_tasks - _DO_NOT_CANCEL_TASKS

    for task in tasks_to_cancel:
        task.cancel()

    print(f"Cancelled {len(tasks_to_cancel)} out of {len(all_tasks)} tasks")


def setup_signal_handler() -> None:
    loop = asyncio.get_running_loop()

    for sig in (signal.SIGHUP, signal.SIGTERM, signal.SIGINT):
        loop.add_signal_handler(sig, shutdown, sig)

async def worker(n: int) -> None:
    ...


async def main() -> None:
    # setup graceful shutdown
    setup_signal_handler()

    # protect main task from being canceled, 
		# otherwise it will cancel all other tasks
    protect(asyncio.current_task())

    # create 6 tasks, shield only first 3
    tasks = []
    for n in range(6):
        task = asyncio.create_task(worker(n))
        if n < 3:
            protect(task)

        tasks.append(task)

    # wait for all tasks to finish
    await asyncio.gather(*tasks)


if __name__ == '__main__':
    ...
> python3 example3.py
[0] Started!
[1] Started!
[2] Started!
[3] Started!
[4] Started!
[5] Started!
^CReceived exit signal SIGINT
Cancelled 3 out of 7 tasks
[5] Canceled (this is bad)!
[3] Canceled (this is bad)!
[4] Canceled (this is bad)!
[0] Successfully done!
[1] Successfully done!
[2] Successfully done!
App was finished gracefully

Here we go! Now the group of protected tasks has not been interrupted in the middle.

However, ensure that the tasks you are waiting for are not long-running tasks. Otherwise, there is a risk of the application being forcefully terminated (SIGKILL).

Conclusion

To sum up, when it comes to shutting down an asyncio application, it's important to protect against potential unforeseen problems. While asyncio provides a shield() function to ensure task completion, this alone is not enough to ensure a graceful shutdown. Instead, custom shutdown logic that protects tasks from unexpected cancellation is necessary. By handling interruption signals, you can guarantee that your asyncio app will be shut down gracefully.

P.S. All the code mentioned in this article can be obtained from this repository.


Written by taras | Passionate software developer with 10+ years of experience @ ANNA Money
Published by HackerNoon on 2023/03/27