Python added formal asynchronicity in the base language a while ago. It’s fun to play with asyncio tasks
and coroutines
, the basic constructs that execute almost in parallel. But as you start to integrate more with a regular codebase, you may find that things can get tricky. Especially if you’re forced to interact with synchronous code.
The complication arises when invoking awaitable functions. Doing so requires an async
defined code block or coroutine. A non-issue except that if your caller has to be async, then you can’t call it either unless its caller is async. Which then forces its caller into an async block as well, and so on. This is “async creep”.
It can escalate fast, finding its way into all corners of your code. Not a big deal if the codebase is asynchronous, but can impede development when mixing it up.
Following are the two main mechanisms I use to work around this. Both assume we’re building an application that benefits from asynchronous execution.
Whether building an asynchronous application or enhancing a linear one, it’s important to determine the sections that will gain the most from async execution. This is usually not hard to answer, but no one else can do it for you. The general guideline is to start with things that wait on I/O, like file or socket access, HTTP requests, etc.
Once you know which pieces to optimize, start identifying the ones that can run on top of each other. The more you can group together, the better. A great example is code that needs information from several REST APIs that don’t depend on each other. You can use aiohttp
and make all the calls in parallel instead of waiting for each one to finish before getting the next one.
Now it’s a matter of loading up those blocks of code into the main event loop. There’s a few ways of doing that, I like putting them into async functions and using asyncio.ensure_future()
to put them in the loop and loop.run_until_complete()
to wait for completion:
import asyncioimport aiohttp
async def fetch(url):response = await aiohttp.request('GET', url)return await response.text()
loop = asyncio.get_event_loop()loop.run_until_complete(asyncio.gather(asyncio.ensure_future(fetch("http://www.google.com")),asyncio.ensure_future(fetch("http://www.github.com")),asyncio.ensure_future(fetch("http://www.reddit.com"))))
This is a similar example to one I used in a previous article: Threaded Asynchronous Magic and How to Wield it.
asyncio.ensure_future()
turns the functions into coroutines, asyncio.gather()
groups them together, while loop.run_until_complete()
blocks execution until all calls complete. The output of this is a list with the results from each call.
Following the points discussed so far will produce code that runs synchronous blocks. But some of those blocks will execute several asynchronous functions together.
Also discussed in my earlier article, it’s not hard to create a separate thread that operates as a worker. It runs its own event loop and you use thread-safe asyncio methods to give it work. The nice part is you can give it synchronous work with call_soon()
or async work with run_coroutine_threadsafe()
.
from threading import Thread
...
def start_background_loop(loop):asyncio.set_event_loop(loop)loop.run_forever()
# Create a new loopnew_loop = asyncio.new_event_loop()
# Assign the loop to another threadt = Thread(target=start_background_loop, args=(new_loop,))t.start()
# Give it some async workfuture = asyncio.run_coroutine_threadsafe(fetch("http://www.google.com"),new_loop)
# Wait for the resultprint(future.result())
# Do it again but with a callbackasyncio.run_coroutine_threadsafe(fetch("http://www.github.com"),new_loop).add_done_callback(lambda future: print(future.result()))
We get a Future
back from run_coroutine_threadsafe
which we can wait on using the result(timeout)
method, or add a callback with add_done_callback(function)
. The callback function will receive the future as an argument.
Let’s look at something more complicated. What if you have a library or module where most functions can run in parallel, but you only want to do so if the caller is async?
We can take advantage of the threaded model here because the scheduler methods are synchronous. Meaning that your user doesn’t need to declare itself as async and have to deal with the async creep in their code. Asynchronous blocks remain contained to your module.
It also allows for api interfaces that can choose whether to use asyncio or not. In fact, we can even go one step further and auto-detect when to be async using some inspect magic.
Without threads you have no control over your event loop. Users could do their own async fiddling that interferes with how your methods execute. The thread will at least guarantee asynchronous execution inside an event loop that you operate. One that you start and stop when needed. This leads to more predictable, repeatable results.
Let’s look at an example that builds on the earlier ones. Here we make a wrapper method that calls the appropriate sync or async function based on its caller.
import inspectimport requests
...
def is_async_caller():"""Figure out who's calling."""
# Get the calling frame
caller = inspect.currentframe().f\_back.f\_back
# Pull the function name from FrameInfo
func\_name = inspect.getframeinfo(caller)\[2\]
# Get the function object
f = caller.f\_locals.get(
func\_name,
caller.f\_globals.get(func\_name)
)
# If there's any indication that the function object is a
# coroutine, return True. inspect.iscoroutinefunction() should
# be all we need, the rest are here to illustrate.
if any(\[inspect.iscoroutinefunction(f),
inspect.isgeneratorfunction(f),
inspect.iscoroutine(f), inspect.isawaitable(f),
inspect.isasyncgenfunction(f) , inspect.isasyncgen(f)\]):
return True
else:
return False
def fetch(url):"""GET the URL, do it asynchronously if the caller is async"""
# Figure out which function is calling us
if is\_async\_caller():
print("Calling ASYNC method")
# Run the async version of this method and
# print the result with a callback
asyncio.run\_coroutine\_threadsafe(
\_async\_fetch(url),
new\_loop
).add\_done\_callback(lambda f: print(f.result()))
else:
print("Calling BLOCKING method")
# Run the synchronous version and print the result
print(\_sync\_fetch(url))
def _sync_fetch(url):"""Blocking GET"""
return requests.get(url).content
async def _async_fetch(url):"""Async GET"""
resp = await aiohttp.request('GET', url)
return await resp.text()
def call_sync_fetch():"""Blocking fetch call"""
fetch("[http://www.github.com](http://www.github.com)")
async def call_async_fetch():"""Asynchronous fetch call (no different from sync callexcept this function is defined async)"""
fetch("[http://www.github.com](http://www.github.com)")
# Perform a blocking GETcall_sync_fetch()
# Perform an async GETloop = asyncio.get_event_loop()loop.run_until_complete(call_async_fetch())
We’re using inspect
in is_async_caller()
to get the function object that called us and determine whether its a coroutine or not. While this is fancy and illustrates the possibilities, it may not be very performant. We could easily replace the mechanism with an async_execute
argument in the fetch
wrapper and have the user decide.
The call_sync_fetch
and call_async_fetch
functions are there to show how a user could go about calling our wrapper. As you can see, there’s no need to await the fetch call because it’s done automatically by running in a separate thread.
This could prove useful to any python packages that are wanting to add support for asynchronous execution while still supporting legacy code. I’m sure there are pros and cons, feel free to start the discussion in the comments below.
If you liked this article and want to keep up with what I’m working on, please recommend it, visit tryexecptpass.org for more topics and follow me on Twitter.