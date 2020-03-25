Multiprocessing for heavy API requests with Python and the PokéAPI

While working on a recent project, I realized that heavy processes for python like scrapping could be made easier though python's multiprocessing library. The documentation and community engaging in multiprocessing is fairly sparse, so I wanted to share some of my learnings through an example project of scrapping the PokéAPI . Below I wrote a bit of code that pulls all of the available pokémon while minding the API's 100 calls per 60 second limits. You'll see that the iteration is fairly slow as there are 964 pokémon the API returns.

Before Multiprocessing

get_number_pokemon , get_pokemon , and get_all_pokemon . The first, get_number_pokemon , simply returns all the url's from the api for the next process, get_all_pokemon , to iterate over the urls and pull the information together by using get_pokemon to pull each URL's response data. I simily create three calls, and. The first,, simply returns all the url's from the api for the next process,, to iterate over the urls and pull the information together by usingto pull each URL's response data.

import requests as req import timeit import time import pandas as pd from IPython.display import Image, HTML import random from tqdm import tqdm from ratelimit import limits, sleep_and_retry ## Rate limit to help with overcalling ## pokemon api is 100 calls per 60 seconds max @sleep_and_retry @limits(calls=100, period=60) def call_api (url) : response = req.get(url) if response.status_code == 404 : return 'Not Found' if response.status_code != 200 : print( 'here' , status_code, url) raise Exception( 'API response: {}' .format(response.status_code)) return response API_POKEMON = 'https://pokeapi.co/api/v2/pokemon/{pokemon}' def get_number_pokemon () : res = req.get(API_POKEMON.format(pokemon= '' )) number_pokemon = res.json()[ 'count' ] res_url = call_api(API_POKEMON.format(pokemon= '?offset=0&limit={limit}' .format(limit=str(number_pokemon)))) pokemon_links_values = [link[ 'url' ] for link in res_url.json()[ 'results' ]] return pokemon_links_values def get_pokemon (link= '' ) : info = None resolved = False try : while not resolved: res = None tooManyCalls = False try : res = call_api(link) if res == 'Not Found' : resolved = True break except Exception as e: print(e) if e == 'too many calls' : tooManyCalls = True if tooManyCalls: time.sleep( 60 ) elif res.status_code < 300 : pokemon_info = res.json() info = { 'Image' : pokemon_info[ 'sprites' ][ 'front_default' ], 'id' : pokemon_info[ 'id' ], 'name' : pokemon_info[ 'name' ], 'height' : pokemon_info[ 'height' ], 'base_experience' : pokemon_info[ 'base_experience' ], 'weight' : pokemon_info[ 'weight' ], 'species' : pokemon_info[ 'species' ][ 'name' ] } resolved = True elif res.status_code == 429 : time.sleep( 60 ) else : sleep_val = random.randint( 1 , 10 ) time.sleep(sleep_val) except Exception as e: print(e) return info finally : return info def get_all_pokemon (links_pokemon=None) : list_pokemon = [] for link in tqdm(links_pokemon): pokemon = get_pokemon(link) if pokemon != None : list_pokemon.append(pokemon) time.sleep( 0.3 ) pd.set_option( 'display.max_colwidth' , None ) df_pokemon = pd.DataFrame(list_pokemon) return df_pokemon def image_formatter (im) : return f'<img src=" {im} ">' def main_pokemon_run () : links_pokemon = get_number_pokemon() df_pokemon = get_all_pokemon(links_pokemon=links_pokemon) df_pokemon.sort_values([ 'id' ],inplace= True ) return df_pokemon, HTML(df_pokemon.iloc[ 0 : 4 ].to_html(formatters={ 'Image' : image_formatter}, escape= False ))

As you can see from the response below, the API call took roughly 9 minutes and 30 seconds at a rate of 1.69 iterations per second. We can improve this drastically by adding multiprocessing.

With Multiprocessing

get_all_pokemon function into a multiprocessing pool function. We use the cpu_count() built in multiprocessing function to define the number of workers needed. Since we we want to get this done as quickly as possible using the full cpu_count - 1 will ensure the program runs the most optimally without taking up all over our CPUs . The max(cpu_count() -1, 1) ensures that this function will never be 0. Next, I use multiprocessing's built in Manager tool to share a global memory of our returned Pokémon. Now with multiprocessing we can separate thefunction into a multiprocessing pool function. We use thebuilt in multiprocessing function to define the number of workers needed. Since we we want to get this done as quickly as possible using the fullwill ensure the program runs the most optimally without taking up all over our. Theensures that this function will never be 0. Next, I use multiprocessing's built intool to share a global memory of our returned Pokémon.

While you can use the returned value of the multiprocess map, using a manager is nice way to use multiprocessing for other more complicated tasks. Check the documentation for the available multiprocessing.

pool.imap function passes an array to iterate over, we need to add our parameters more specifically to our get_pokemon_multiprocess function before we iterate over it. The functools.partial library allows us to do this by creating a partial function to send to the pool map. I use tqdm in this example to see the progress of the pool. Manager types. Another step needed is to create a partial function. As thefunction passes an array to iterate over, we need to add our parameters more specifically to ourfunction before we iterate over it. Thelibrary allows us to do this by creating a partial function to send to the pool map. I use tqdm in this example to see the progress of the pool.

finally block. This is done to validate that even if there is an error, the process workers are closed and terminated. I also show in the code the same function without it. Now that the pool has ran, we need to ensure that it is both closed and joined, you'll see I do this immediately after and in theblock. This is done to validate that even if there is an error, the process workers are closed and terminated.

The join function is necessary to have the processes after pooling being joined together. This also validates that the manager is completely finished binding.

import requests as req import timeit import time import pandas as pd from IPython.display import Image, HTML import random from tqdm import tqdm from ratelimit import limits, sleep_and_retry from multiprocessing import Pool, Manager, cpu_count from functools import partial API_POKEMON = 'https://pokeapi.co/api/v2/pokemon/{pokemon}' # To see how it ran # def infoDebugger(title): # print(title) # print('module name:', __name__) # if hasattr(os, 'getppid'): # print('parent process:', os.getppid()) # print('process id:', os.getpid()) @sleep_and_retry @limits(calls=100, period=60) def call_api (url) : response = req.get(url) if response.status_code == 404 : return 'Not Found' if response.status_code != 200 : raise Exception( 'API response: {}' .format(response.status_code)) return response # https://docs.python.org/2/library/multiprocessing.html def get_number_pokemon () : res = req.get(API_POKEMON.format(pokemon= '' )) number_pokemon = res.json()[ 'count' ] res_url = call_api(API_POKEMON.format(pokemon= '?offset=0&limit={limit}' .format(limit=str(number_pokemon)))) pokemon_links_values = [link[ 'url' ] for link in res_url.json()[ 'results' ]] return pokemon_links_values def get_pokemon_multiprocess (listManager=None, links_pokemon=None, process= 0 ) : # print('Called Pokemon', process) link = links_pokemon[process] info = None resolved = False # print(link) try : while not resolved: res = None tooManyCalls = False try : res = call_api(link) if res == 'Not Found' : resolved = True break except Exception as e: print(e) if e == 'too many calls' : tooManyCalls = True if tooManyCalls: time.sleep( 60 ) elif res.status_code < 300 : pokemon_info = res.json() info = { 'Image' : pokemon_info[ 'sprites' ][ 'front_default' ], 'id' : pokemon_info[ 'id' ], 'name' : pokemon_info[ 'name' ], 'height' : pokemon_info[ 'height' ], 'base_experience' : pokemon_info[ 'base_experience' ], 'weight' : pokemon_info[ 'weight' ], 'species' : pokemon_info[ 'species' ][ 'name' ] } resolved = True elif res.status_code == 429 : print(res.status_code) time.sleep( 60 ) else : print(res.status_code) sleep_val = random.randint( 1 , 10 ) time.sleep(sleep_val) except Exception as e: print(e) finally : if info != None : listManager.append(info) time.sleep( 0.5 ) return def image_formatter (im) : return f'<img src=" {im} ">' def main_pokemon_run_multiprocessing () : ## cannot be 0, so max(NUMBER,1) solves this workers = max(cpu_count() -1 , 1 ) ## create the pool manager = Manager() ## Need a manager to help get the values async, the values will be updated after join listManager = manager.list() pool = Pool(workers) try : links_pokemon = get_number_pokemon() part_get_clean_pokemon = partial(get_pokemon_multiprocess, listManager, links_pokemon) # could do this the below is visualize the rate success /etc # pool.imap(part_get_clean_pokemon, list(range(0, len(links_pokemon)))) # using tqdm to see progress imap works for _ in tqdm(pool.imap(part_get_clean_pokemon, list(range( 0 , len(links_pokemon)))), total=len(links_pokemon)): pass pool.close() pool.join() finally : pool.close() pool.join() pokemonList = list(listManager) df_pokemon = pd.DataFrame(pokemonList) df_pokemon.sort_values([ 'id' ],inplace= True ) return df_pokemon, HTML(df_pokemon.iloc[ 0 : 4 ].to_html(formatters={ 'Image' : image_formatter}, escape= False ))

You'll see in the below snapshot that the pooling worked successfully and improved the runtime and iteration time drastically from 9 minutes to 1 minute and iterations per second to 14.97 iterations per second.

Hope this was helpful. To see the full code and check out my notebook on github

Let me know if you come up with any other useful uses for this and please share any feedback on my sample if you have any.

