Python added formal asynchronicity in the base language a while ago. It’s fun to play with and , the basic constructs that execute almost in parallel. But as you start to integrate more with a regular codebase, you may find that things can get tricky. Especially if you’re forced to interact with synchronous code. asyncio tasks coroutines The complication arises when invoking functions. Doing so requires an defined code block or coroutine. A non-issue except that if your caller has to be async, then you can’t call either unless its caller is async. Which then forces its caller into an async block as well, and so on. This is “async creep”. awaitable async it It can escalate fast, finding its way into all corners of your code. Not a big deal if the codebase is asynchronous, but can impede development when mixing it up. Following are the two main mechanisms I use to work around this. Both assume we’re building an application that benefits from asynchronous execution. Waiting for blocks of async code Whether building an asynchronous application or enhancing a linear one, it’s important to determine the sections that will gain the most from async execution. This is usually not hard to answer, but no one else can do it for you. The general guideline is to start with things that wait on I/O, like file or socket access, HTTP requests, etc. Once you know which pieces to optimize, start identifying the ones that can run on top of each other. The more you can group together, the better. A great example is code that needs information from several REST APIs that don’t depend on each other. You can use and make all the calls in parallel instead of waiting for each one to finish before getting the next one. aiohttp Now it’s a matter of loading up those blocks of code into the main event loop. There’s a few ways of doing that, I like putting them into async functions and using to put them in the loop and to wait for completion: asyncio.ensure_future() loop.run_until_complete() import asyncioimport aiohttp async def fetch(url):response = await aiohttp.request('GET', url)return await response.text() loop = asyncio.get_event_loop()loop.run_until_complete(asyncio.gather(asyncio.ensure_future(fetch("http://www.google.com")),asyncio.ensure_future(fetch("http://www.github.com")),asyncio.ensure_future(fetch("http://www.reddit.com")))) This is a similar example to one I used in a previous article: . Threaded Asynchronous Magic and How to Wield it turns the functions into coroutines, groups them together, while blocks execution until all calls complete. The output of this is a list with the results from each call. asyncio.ensure_future() asyncio.gather() loop.run_until_complete() Following the points discussed so far will produce code that runs synchronous blocks. But some of those blocks will execute several asynchronous functions together. Use a thread Also discussed in my article, it’s not hard to create a separate thread that operates as a worker. It runs its own event loop and you use thread-safe asyncio methods to give it work. The nice part is you can give it synchronous work with or async work with . earlier call_soon() run_coroutine_threadsafe() from threading import Thread ... def start_background_loop(loop):asyncio.set_event_loop(loop)loop.run_forever() # Create a new loopnew_loop = asyncio.new_event_loop() # Assign the loop to another threadt = Thread(target=start_background_loop, args=(new_loop,))t.start() # Give it some async workfuture = asyncio.run_coroutine_threadsafe(fetch("http://www.google.com"),new_loop) # Wait for the resultprint(future.result()) # Do it again but with a callbackasyncio.run_coroutine_threadsafe(fetch("http://www.github.com"),new_loop).add_done_callback(lambda future: print(future.result())) We get a back from which we can wait on using the method, or add a callback with . The callback function will receive the future as an argument. Future run_coroutine_threadsafe result(timeout) add_done_callback(function) Supporting both async and sync calls in the same API methods Let’s look at something more complicated. What if you have a library or module where most functions can run in parallel, but you only want to do so if the caller is async? We can take advantage of the threaded model here because the scheduler methods are synchronous. Meaning that your user doesn’t need to declare itself as async and have to deal with the async creep in their code. Asynchronous blocks remain contained to your module. It also allows for api interfaces that can choose whether to use asyncio or not. In fact, we can even go one step further and auto-detect when to be async using some inspect magic. Without threads you have no control over your event loop. Users could do their own async fiddling that interferes with how your methods execute. The thread will at least guarantee asynchronous execution inside an event loop that you operate. One that you start and stop when needed. This leads to more predictable, repeatable results. Let’s look at an example that builds on the earlier ones. Here we make a wrapper method that calls the appropriate sync or async function based on its caller. import inspectimport requests ... def is_async_caller():"""Figure out who's calling.""" # Get the calling frame caller = inspect.currentframe().f\_back.f\_back # Pull the function name from FrameInfo func\_name = inspect.getframeinfo(caller)\[2\] # Get the function object f = caller.f\_locals.get( func\_name, caller.f\_globals.get(func\_name) ) # If there's any indication that the function object is a # coroutine, return True. inspect.iscoroutinefunction() should # be all we need, the rest are here to illustrate. if any(\[inspect.iscoroutinefunction(f), inspect.isgeneratorfunction(f), inspect.iscoroutine(f), inspect.isawaitable(f), inspect.isasyncgenfunction(f) , inspect.isasyncgen(f)\]): return True else: return False def fetch(url):"""GET the URL, do it asynchronously if the caller is async""" # Figure out which function is calling us if is\_async\_caller(): print("Calling ASYNC method") # Run the async version of this method and # print the result with a callback asyncio.run\_coroutine\_threadsafe( \_async\_fetch(url), new\_loop ).add\_done\_callback(lambda f: print(f.result())) else: print("Calling BLOCKING method") # Run the synchronous version and print the result print(\_sync\_fetch(url)) def _sync_fetch(url):"""Blocking GET""" return requests.get(url).content async def _async_fetch(url):"""Async GET""" resp = await aiohttp.request('GET', url) return await resp.text() def call_sync_fetch():"""Blocking fetch call""" fetch("[http://www.github.com](http://www.github.com)") async def call_async_fetch():"""Asynchronous fetch call (no different from sync callexcept this function is defined async)""" fetch("[http://www.github.com](http://www.github.com)") # Perform a blocking GETcall_sync_fetch() # Perform an async GETloop = asyncio.get_event_loop()loop.run_until_complete(call_async_fetch()) We’re using in to get the function object that called us and determine whether its a coroutine or not. While this is fancy and illustrates the possibilities, it may not be very performant. We could easily replace the mechanism with an argument in the wrapper and have the user decide. inspect is_async_caller() async_execute fetch The and functions are there to show how a user could go about calling our wrapper. As you can see, there’s no need to await the fetch call because it’s done automatically by running in a separate thread. call_sync_fetch call_async_fetch This could prove useful to any python packages that are wanting to add support for asynchronous execution while still supporting legacy code. I’m sure there are pros and cons, feel free to start the discussion in the comments below. If you liked this article and want to keep up with what I’m working on, please recommend it, visit for more topics and . tryexecptpass.org follow me on Twitter
Share Your Thoughts