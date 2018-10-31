How to run asynchronous web requests in parallel with Python 3.5 (without aiohttp)

46,909 reads

@ tyler-burdsall Tyler Burdsall Software Development Engineer at AWS Elemental

Recently at my workplace our IT team finally upgraded our distributed Python versions to 3.5.0. While this is a huge upgrade from 2.6, this still came with some growing pains. Unfortunately, Python 3.5.0 doesn’t meet some of the minimum requirements of some popular libraries, including aiohttp

With these restrictions I still needed to write a script that could pull hundreds of .csv files from our APIs and manipulate the data. Python by itself isn’t event-driven and natively asynchronous (like NodeJS), but the same effect can still be achieved. This article will help detail what I learned while also showing the benefits of asynchronous operations.

Disclaimer: If you have a higher version of Python available (3.5.2+), I highly recommend using aiohttp instead. It’s an incredibly robust library and a great solution for this kind of problem. There are many tutorials online detailing how best to use the library.

Assumptions

This article makes the following assumptions:

You already have familiarity with Python and most of its syntaxYou already have familiarity with basic web requestsYou have a lose concept of asynchronous operations

If you’re just looking for the solution, scroll down to the bottom and the full code is posted. Enjoy!

Setup

Before getting started, ensure that you have requests installed on your machine. The easiest way to install is by typing the following command into your terminal:

$ python -m pip install requests

Alternatively, if you don’t have administrative permissions you can install the library with this command:

$ python -m pip install requests --user

The wrong approach: synchronous requests

To demonstrate the benefits of our parallel approach, let’s first look at approaching the problem in a synchronous manner. I’ll also give an overview of what’s going on in the code. Ultimately, we want to do able to perform a GET request to the URL containing the .csv file and measure the time it takes to read the text inside.

We’ll be downloading multiple .csv files of varying sizes from https://people.sc.fsu.edu/~jburkardt/data/csv /, which provides plenty of data for our example.

As a disclaimer, we’ll be using the Session object from the requests library to perform our GET request.

First, we’ll need a function that executes the web request:c

def fetch (session, csv) : base_url = "https://people.sc.fsu.edu/~jburkardt/data/csv/" with session.get(base_url + csv) as response: data = response.text if response.status_code != 200 : print( "FAILURE::{0}" .format(url)) # Return .csv data for future consumption return data

View code gist here

Session object and the name of the .csv file desired, performs the web request, then returns the text inside the response. This function takes in aobject and the name of the .csv file desired, performs the web request, then returns the text inside the response.

Next, we need a function that can efficiently loop a list of our desired files and measure the time it takes to perform the request:

from timeit import default_timer() def get_data_synchronous () : csvs_to_fetch = [ "ford_escort.csv" , "cities.csv" , "hw_25000.csv" , "mlb_teams_2012.csv" , "nile.csv" , "homes.csv" , "hooke.csv" , "lead_shot.csv" , "news_decline.csv" , "snakes_count_10000.csv" , "trees.csv" , "zillow.csv" ] with requests.Session() as session: print( "{0:<30} {1:>20}" .format( "File" , "Completed at" )) # Set any session parameters here before calling `fetch` # For instance, if you needed to set Headers or Authentication # this can be done before starting the loop total_start_time = default_timer() for csv in csvs_to_fetch: fetch(session, csv) elapsed = default_timer() - total_start_time time_completed_at = "{:5.2f}s" .format(elapsed) print( "{0:<30} {1:>20}" .format(csv, time_completed_at))

View code gist here

Session object and then loops through each .csv file in the csvs_to_fetch list. Once the fetch operation is completed, the measured time is calculated and displayed in an easy-to-read format. This function creates ourobject and then loops through each .csv file in thelist. Once theoperation is completed, the measured time is calculated and displayed in an easy-to-read format.

main function will be simple (for now) and call our function: Finally, ourfunction will be simple (for now) and call our function:

def main () : # Simple for now get_data_synchronous() main()

View code gist here

Once we put it all together, here is what the code looks like for our synchronous example:

import requests from timeit import default_timer def fetch (session, csv) : base_url = "https://people.sc.fsu.edu/~jburkardt/data/csv/" with session.get(base_url + csv) as response: data = response.text if response.status_code != 200 : print( "FAILURE::{0}" .format(url)) # Return .csv data for future consumption return data def get_data_synchronous () : csvs_to_fetch = [ "ford_escort.csv" , "cities.csv" , "hw_25000.csv" , "mlb_teams_2012.csv" , "nile.csv" , "homes.csv" , "hooke.csv" , "lead_shot.csv" , "news_decline.csv" , "snakes_count_10000.csv" , "trees.csv" , "zillow.csv" ] with requests.Session() as session: print( "{0:<30} {1:>20}" .format( "File" , "Completed at" )) # Set any session parameters here before calling `fetch` # For instance, if you needed to set Headers or Authentication # this can be done before starting the loop total_start_time = default_timer() for csv in csvs_to_fetch: fetch(session, csv) elapsed = default_timer() - total_start_time time_completed_at = "{:5.2f}s" .format(elapsed) print( "{0:<30} {1:>20}" .format(csv, time_completed_at)) def main () : # Simple for now get_data_synchronous() main()

View code gist here

Let’s take a look at the results when we run this script:

Synchronous example. Notice how each operation doesn’t start until the last one is completed

asyncio library! Thankfully, we can vastly improve this performance with Python 3’s built-inlibrary!

The right approach: performing multiple requests at once asynchronously

fetch : In order to get this to work, we’ll have to rework some of our existing functions. Beginning with

import requests from timeit import default_timer # We'll need access to this variable later START_TIME = default_timer() def fetch (session, csv) : base_url = "https://people.sc.fsu.edu/~jburkardt/data/csv/" with session.get(base_url + csv) as response: data = response.text if response.status_code != 200 : print( "FAILURE::{0}" .format(url)) # Now we will print how long it took to complete the operation from the # `fetch` function itself elapsed = default_timer() - START_TIME time_completed_at = "{:5.2f}s" .format(elapsed) print( "{0:<30} {1:>20}" .format(csv, time_completed_at)) return data

View code gist here

get_data function asynchronous: Next, we need to make ourfunction asynchronous:

import asyncio from timeit import default_timer from concurrent.futures import ThreadPoolExecutor async def get_data_asynchronous () : csvs_to_fetch = [ "ford_escort.csv" , "cities.csv" , "hw_25000.csv" , "mlb_teams_2012.csv" , "nile.csv" , "homes.csv" , "hooke.csv" , "lead_shot.csv" , "news_decline.csv" , "snakes_count_10000.csv" , "trees.csv" , "zillow.csv" ] print( "{0:<30} {1:>20}" .format( "File" , "Completed at" )) # Note: max_workers is set to 10 simply for this example, # you'll have to tweak with this number for your own projects # as you see fit with ThreadPoolExecutor(max_workers= 10 ) as executor: with requests.Session() as session: # Set any session parameters here before calling `fetch` # Initialize the event loop loop = asyncio.get_event_loop() # Set the START_TIME for the `fetch` function START_TIME = default_timer() # Use list comprehension to create a list of # tasks to complete. The executor will run the `fetch` # function for each csv in the csvs_to_fetch list tasks = [ loop.run_in_executor( executor, fetch, *(session, csv) # Allows us to pass in multiple arguments to `fetch` ) for csv in csvs_to_fetch ] # Initializes the tasks to run and awaits their results for response in await asyncio.gather(*tasks): pass

View code gist here

fetch function for each that needs to be downloaded. This code will now create multiple threads for each .csv file and execute thefunction for each that needs to be downloaded.

main function needs a small tweak to properly initialize our async function: Finally, ourfunction needs a small tweak to properly initialize our async function:

def main () : loop = asyncio.get_event_loop() future = asyncio.ensure_future(get_data_asynchronous()) loop.run_until_complete(future) main()

View code gist here

Now, let’s run the new code and see the results:

Asynchronous example. Notice how the files are not being obtained in order.

With this small change, all 12 of these .csv files were able to be downloaded in 3.43s vs 10.84s. That is a nearly 70% decrease in the time it took to download!

The Asynchronous Code

import requests import asyncio from concurrent.futures import ThreadPoolExecutor from timeit import default_timer START_TIME = default_timer() def fetch (session, csv) : base_url = "https://people.sc.fsu.edu/~jburkardt/data/csv/" with session.get(base_url + csv) as response: data = response.text if response.status_code != 200 : print( "FAILURE::{0}" .format(url)) elapsed = default_timer() - START_TIME time_completed_at = "{:5.2f}s" .format(elapsed) print( "{0:<30} {1:>20}" .format(csv, time_completed_at)) return data async def get_data_asynchronous () : csvs_to_fetch = [ "ford_escort.csv" , "cities.csv" , "hw_25000.csv" , "mlb_teams_2012.csv" , "nile.csv" , "homes.csv" , "hooke.csv" , "lead_shot.csv" , "news_decline.csv" , "snakes_count_10000.csv" , "trees.csv" , "zillow.csv" ] print( "{0:<30} {1:>20}" .format( "File" , "Completed at" )) with ThreadPoolExecutor(max_workers= 10 ) as executor: with requests.Session() as session: # Set any session parameters here before calling `fetch` loop = asyncio.get_event_loop() START_TIME = default_timer() tasks = [ loop.run_in_executor( executor, fetch, *(session, csv) # Allows us to pass in multiple arguments to `fetch` ) for csv in csvs_to_fetch ] for response in await asyncio.gather(*tasks): pass def main () : loop = asyncio.get_event_loop() future = asyncio.ensure_future(get_data_asynchronous()) loop.run_until_complete(future) main()

View code gist here

I hope you enjoyed this article and can use these skills for any projects that require an older Python version (or maybe without as many dependencies). Although Python may not have a straightforward to an async / await pattern, it isn’t difficult at all to achieve fantastic results.

Enjoy!

Share this story @ tyler-burdsall Tyler Burdsall Read my stories Software Development Engineer at AWS Elemental

Tags