Before you go, check out these stories!

0
Hackernoon logoHow to run asynchronous web requests in parallel with Python 3.5 (without aiohttp) by@tyler-burdsall

How to run asynchronous web requests in parallel with Python 3.5 (without aiohttp)

Author profile picture

@tyler-burdsallTyler Burdsall

Software Development Engineer at AWS Elemental

Recently at my workplace our IT team finally upgraded our distributed Python versions to 3.5.0. While this is a huge upgrade from 2.6, this still came with some growing pains. Unfortunately, Python 3.5.0 doesn’t meet some of the minimum requirements of some popular libraries, including aiohttp.

With these restrictions I still needed to write a script that could pull hundreds of .csv files from our APIs and manipulate the data. Python by itself isn’t event-driven and natively asynchronous (like NodeJS), but the same effect can still be achieved. This article will help detail what I learned while also showing the benefits of asynchronous operations.

Disclaimer: If you have a higher version of Python available (3.5.2+), I highly recommend using aiohttp instead. It’s an incredibly robust library and a great solution for this kind of problem. There are many tutorials online detailing how best to use the library.

Assumptions

This article makes the following assumptions:

You already have familiarity with Python and most of its syntaxYou already have familiarity with basic web requestsYou have a lose concept of asynchronous operations

If you’re just looking for the solution, scroll down to the bottom and the full code is posted. Enjoy!

Setup

Before getting started, ensure that you have requests installed on your machine. The easiest way to install is by typing the following command into your terminal:

$ python -m pip install requests

Alternatively, if you don’t have administrative permissions you can install the library with this command:

$ python -m pip install requests --user

The wrong approach: synchronous requests

To demonstrate the benefits of our parallel approach, let’s first look at approaching the problem in a synchronous manner. I’ll also give an overview of what’s going on in the code. Ultimately, we want to do able to perform a GET request to the URL containing the .csv file and measure the time it takes to read the text inside.

We’ll be downloading multiple .csv files of varying sizes from https://people.sc.fsu.edu/~jburkardt/data/csv/, which provides plenty of data for our example.

As a disclaimer, we’ll be using the Session object from the requests library to perform our GET request.

First, we’ll need a function that executes the web request:c

def fetch(session, csv):
    base_url = "https://people.sc.fsu.edu/~jburkardt/data/csv/"
    with session.get(base_url + csv) as response:
        data = response.text
        if response.status_code != 200:
            print("FAILURE::{0}".format(url))
        # Return .csv data for future consumption
        return data

View code gist here

This function takes in a

Session
object and the name of the .csv file desired, performs the web request, then returns the text inside the response.

Next, we need a function that can efficiently loop a list of our desired files and measure the time it takes to perform the request:

from timeit import default_timer()

def get_data_synchronous():
    csvs_to_fetch = [
        "ford_escort.csv",
        "cities.csv",
        "hw_25000.csv",
        "mlb_teams_2012.csv",
        "nile.csv",
        "homes.csv",
        "hooke.csv",
        "lead_shot.csv",
        "news_decline.csv",
        "snakes_count_10000.csv",
        "trees.csv",
        "zillow.csv"
    ]

    with requests.Session() as session:
        print("{0:<30} {1:>20}".format("File", "Completed at"))
        
        # Set any session parameters here before calling `fetch`
        # For instance, if you needed to set Headers or Authentication
        # this can be done before starting the loop
        
        total_start_time = default_timer()
        for csv in csvs_to_fetch:
            fetch(session, csv)
            elapsed = default_timer() - total_start_time
            time_completed_at = "{:5.2f}s".format(elapsed)
            print("{0:<30} {1:>20}".format(csv, time_completed_at))

View code gist here

This function creates our

Session
object and then loops through each .csv file in the
csvs_to_fetch
list. Once the
fetch
operation is completed, the measured time is calculated and displayed in an easy-to-read format.

Finally, our

main
function will be simple (for now) and call our function:

def main():
    # Simple for now
    get_data_synchronous()

main()

View code gist here

Once we put it all together, here is what the code looks like for our synchronous example:

import requests
from timeit import default_timer

def fetch(session, csv):
    base_url = "https://people.sc.fsu.edu/~jburkardt/data/csv/"
    with session.get(base_url + csv) as response:
        data = response.text
        if response.status_code != 200:
            print("FAILURE::{0}".format(url))
        # Return .csv data for future consumption
        return data

def get_data_synchronous():
    csvs_to_fetch = [
        "ford_escort.csv",
        "cities.csv",
        "hw_25000.csv",
        "mlb_teams_2012.csv",
        "nile.csv",
        "homes.csv",
        "hooke.csv",
        "lead_shot.csv",
        "news_decline.csv",
        "snakes_count_10000.csv",
        "trees.csv",
        "zillow.csv"
    ]

    with requests.Session() as session:
        print("{0:<30} {1:>20}".format("File", "Completed at"))
        
        # Set any session parameters here before calling `fetch`
        # For instance, if you needed to set Headers or Authentication
        # this can be done before starting the loop
        
        total_start_time = default_timer()
        for csv in csvs_to_fetch:
            fetch(session, csv)
            elapsed = default_timer() - total_start_time
            time_completed_at = "{:5.2f}s".format(elapsed)
            print("{0:<30} {1:>20}".format(csv, time_completed_at))

def main():
    # Simple for now
    get_data_synchronous()

main()

View code gist here

Let’s take a look at the results when we run this script:

Synchronous example. Notice how each operation doesn’t start until the last one is completed

Thankfully, we can vastly improve this performance with Python 3’s built-in

asyncio
library!

The right approach: performing multiple requests at once asynchronously

In order to get this to work, we’ll have to rework some of our existing functions. Beginning with

fetch
:

import requests
from timeit import default_timer

# We'll need access to this variable later
START_TIME = default_timer()

def fetch(session, csv):
    base_url = "https://people.sc.fsu.edu/~jburkardt/data/csv/"
    with session.get(base_url + csv) as response:
        data = response.text
        if response.status_code != 200:
            print("FAILURE::{0}".format(url))
        # Now we will print how long it took to complete the operation from the 
        # `fetch` function itself
        elapsed = default_timer() - START_TIME
        time_completed_at = "{:5.2f}s".format(elapsed)
        print("{0:<30} {1:>20}".format(csv, time_completed_at))

        return data

View code gist here

Next, we need to make our

get_data
function asynchronous:

import asyncio
from timeit import default_timer
from concurrent.futures import ThreadPoolExecutor

async def get_data_asynchronous():
    csvs_to_fetch = [
        "ford_escort.csv",
        "cities.csv",
        "hw_25000.csv",
        "mlb_teams_2012.csv",
        "nile.csv",
        "homes.csv",
        "hooke.csv",
        "lead_shot.csv",
        "news_decline.csv",
        "snakes_count_10000.csv",
        "trees.csv",
        "zillow.csv"
    ]
    print("{0:<30} {1:>20}".format("File", "Completed at"))
    
    # Note: max_workers is set to 10 simply for this example,
    # you'll have to tweak with this number for your own projects
    # as you see fit
    with ThreadPoolExecutor(max_workers=10) as executor:
        with requests.Session() as session:
            # Set any session parameters here before calling `fetch`

            # Initialize the event loop        
            loop = asyncio.get_event_loop()
            
            # Set the START_TIME for the `fetch` function
            START_TIME = default_timer()
            
            # Use list comprehension to create a list of
            # tasks to complete. The executor will run the `fetch`
            # function for each csv in the csvs_to_fetch list
            tasks = [
                loop.run_in_executor(
                    executor,
                    fetch,
                    *(session, csv) # Allows us to pass in multiple arguments to `fetch`
                )
                for csv in csvs_to_fetch
            ]
            
            # Initializes the tasks to run and awaits their results
            for response in await asyncio.gather(*tasks):
                pass

View code gist here

This code will now create multiple threads for each .csv file and execute the

fetch
function for each that needs to be downloaded.

Finally, our

main
function needs a small tweak to properly initialize our async function:

def main():
    loop = asyncio.get_event_loop()
    future = asyncio.ensure_future(get_data_asynchronous())
    loop.run_until_complete(future)

main()

View code gist here

Now, let’s run the new code and see the results:

Asynchronous example. Notice how the files are not being obtained in order.

With this small change, all 12 of these .csv files were able to be downloaded in 3.43s vs 10.84s. That is a nearly 70% decrease in the time it took to download!

The Asynchronous Code

import requests
import asyncio
from concurrent.futures import ThreadPoolExecutor
from timeit import default_timer

START_TIME = default_timer()

def fetch(session, csv):
    base_url = "https://people.sc.fsu.edu/~jburkardt/data/csv/"
    with session.get(base_url + csv) as response:
        data = response.text
        if response.status_code != 200:
            print("FAILURE::{0}".format(url))

        elapsed = default_timer() - START_TIME
        time_completed_at = "{:5.2f}s".format(elapsed)
        print("{0:<30} {1:>20}".format(csv, time_completed_at))

        return data

async def get_data_asynchronous():
    csvs_to_fetch = [
        "ford_escort.csv",
        "cities.csv",
        "hw_25000.csv",
        "mlb_teams_2012.csv",
        "nile.csv",
        "homes.csv",
        "hooke.csv",
        "lead_shot.csv",
        "news_decline.csv",
        "snakes_count_10000.csv",
        "trees.csv",
        "zillow.csv"
    ]
    print("{0:<30} {1:>20}".format("File", "Completed at"))
    with ThreadPoolExecutor(max_workers=10) as executor:
        with requests.Session() as session:
            # Set any session parameters here before calling `fetch`
            loop = asyncio.get_event_loop()
            START_TIME = default_timer()
            tasks = [
                loop.run_in_executor(
                    executor,
                    fetch,
                    *(session, csv) # Allows us to pass in multiple arguments to `fetch`
                )
                for csv in csvs_to_fetch
            ]
            for response in await asyncio.gather(*tasks):
                pass

def main():
    loop = asyncio.get_event_loop()
    future = asyncio.ensure_future(get_data_asynchronous())
    loop.run_until_complete(future)

main()

View code gist here

I hope you enjoyed this article and can use these skills for any projects that require an older Python version (or maybe without as many dependencies). Although Python may not have a straightforward to an async / await pattern, it isn’t difficult at all to achieve fantastic results.

Enjoy!

Tags

The Noonification banner

Subscribe to get your daily round-up of top tech stories!