While working on a recent project, I realized that heavy processes for python like scrapping could be made easier though python's multiprocessing library. The documentation and community engaging in multiprocessing is fairly sparse, so I wanted to share some of my learnings through an example project of scrapping the PokéAPI. Below I wrote a bit of code that pulls all of the available pokémon while minding the API's 100 calls per 60 second limits. You'll see that the iteration is fairly slow as there are 964 pokémon the API returns.
I simily create three calls
get_number_pokemon
, get_pokemon
, and get_all_pokemon
. The first, get_number_pokemon
, simply returns all the url's from the api for the next process, get_all_pokemon
, to iterate over the urls and pull the information together by using get_pokemon
to pull each URL's response data. import requests as req
import timeit
import time
import pandas as pd
from IPython.display import Image, HTML
import random
from tqdm import tqdm
from ratelimit import limits, sleep_and_retry
## Rate limit to help with overcalling
## pokemon api is 100 calls per 60 seconds max
@sleep_and_retry
@limits(calls=100, period=60)
def call_api(url):
response = req.get(url)
if response.status_code == 404:
return 'Not Found'
if response.status_code != 200:
print('here', status_code, url)
raise Exception('API response: {}'.format(response.status_code))
return response
API_POKEMON = 'https://pokeapi.co/api/v2/pokemon/{pokemon}'
def get_number_pokemon():
res = req.get(API_POKEMON.format(pokemon=''))
number_pokemon = res.json()['count']
res_url = call_api(API_POKEMON.format(pokemon='?offset=0&limit={limit}'.format(limit=str(number_pokemon))))
pokemon_links_values = [link['url'] for link in res_url.json()['results']]
return pokemon_links_values
def get_pokemon(link=''):
info = None
resolved = False
try:
while not resolved:
res = None
tooManyCalls = False
try:
res = call_api(link)
if res == 'Not Found':
resolved = True
break
except Exception as e:
print(e)
if e == 'too many calls':
tooManyCalls =True
if tooManyCalls:
time.sleep(60)
elif res.status_code < 300:
pokemon_info = res.json()
info = {
'Image' : pokemon_info['sprites']['front_default'],
'id' : pokemon_info['id'],
'name' : pokemon_info['name'],
'height' : pokemon_info['height'],
'base_experience' : pokemon_info['base_experience'],
'weight' : pokemon_info['weight'],
'species' : pokemon_info['species']['name']
}
resolved = True
elif res.status_code == 429:
time.sleep(60)
else:
sleep_val = random.randint(1,10)
time.sleep(sleep_val)
except Exception as e:
print(e)
return info
finally:
return info
def get_all_pokemon(links_pokemon=None):
list_pokemon = []
for link in tqdm(links_pokemon):
pokemon = get_pokemon(link)
if pokemon != None:
list_pokemon.append(pokemon)
time.sleep(0.3)
pd.set_option('display.max_colwidth', None)
df_pokemon = pd.DataFrame(list_pokemon)
return df_pokemon
def image_formatter(im):
return f'<img src="{im}">'
def main_pokemon_run():
links_pokemon = get_number_pokemon()
df_pokemon = get_all_pokemon(links_pokemon=links_pokemon)
df_pokemon.sort_values(['id'],inplace=True)
return df_pokemon, HTML(df_pokemon.iloc[0:4].to_html(formatters={'Image': image_formatter}, escape=False))
As you can see from the response below, the API call took roughly 9 minutes and 30 seconds at a rate of 1.69 iterations per second. We can improve this drastically by adding multiprocessing.
Now with multiprocessing we can separate the
get_all_pokemon
function into a multiprocessing pool function. We use the cpu_count()
built in multiprocessing function to define the number of workers needed. Since we we want to get this done as quickly as possible using the full cpu_count - 1
will ensure the program runs the most optimally without taking up all over our CPUs
. The max(cpu_count() -1, 1)
ensures that this function will never be 0. Next, I use multiprocessing's built in Manager
tool to share a global memory of our returned Pokémon. While you can use the returned value of the multiprocess map, using a manager is nice way to use multiprocessing for other more complicated tasks. Check the documentation for the available multiprocessing.
Manager types. Another step needed is to create a partial function. As the
pool.imap
function passes an array to iterate over, we need to add our parameters more specifically to our get_pokemon_multiprocess
function before we iterate over it. The functools.partial
library allows us to do this by creating a partial function to send to the pool map. I use tqdm in this example to see the progress of the pool. I also show in the code the same function without it. Now that the pool has ran, we need to ensure that it is both closed and joined, you'll see I do this immediately after and in the
finally
block. This is done to validate that even if there is an error, the process workers are closed and terminated. The join function is necessary to have the processes after pooling being joined together. This also validates that the manager is completely finished binding.
import requests as req
import timeit
import time
import pandas as pd
from IPython.display import Image, HTML
import random
from tqdm import tqdm
from ratelimit import limits, sleep_and_retry
from multiprocessing import Pool, Manager, cpu_count
from functools import partial
API_POKEMON = 'https://pokeapi.co/api/v2/pokemon/{pokemon}'
# To see how it ran
# def infoDebugger(title):
# print(title)
# print('module name:', __name__)
# if hasattr(os, 'getppid'):
# print('parent process:', os.getppid())
# print('process id:', os.getpid())
@sleep_and_retry
@limits(calls=100, period=60)
def call_api(url):
response = req.get(url)
if response.status_code == 404:
return 'Not Found'
if response.status_code != 200:
raise Exception('API response: {}'.format(response.status_code))
return response
# https://docs.python.org/2/library/multiprocessing.html
def get_number_pokemon():
res = req.get(API_POKEMON.format(pokemon=''))
number_pokemon = res.json()['count']
res_url = call_api(API_POKEMON.format(pokemon='?offset=0&limit={limit}'.format(limit=str(number_pokemon))))
pokemon_links_values = [link['url'] for link in res_url.json()['results']]
return pokemon_links_values
def get_pokemon_multiprocess(listManager=None, links_pokemon=None, process=0):
# print('Called Pokemon', process)
link = links_pokemon[process]
info = None
resolved = False
# print(link)
try:
while not resolved:
res = None
tooManyCalls = False
try:
res = call_api(link)
if res == 'Not Found':
resolved = True
break
except Exception as e:
print(e)
if e == 'too many calls':
tooManyCalls =True
if tooManyCalls:
time.sleep(60)
elif res.status_code < 300:
pokemon_info = res.json()
info = {
'Image' : pokemon_info['sprites']['front_default'],
'id' : pokemon_info['id'],
'name' : pokemon_info['name'],
'height' : pokemon_info['height'],
'base_experience' : pokemon_info['base_experience'],
'weight' : pokemon_info['weight'],
'species' : pokemon_info['species']['name']
}
resolved = True
elif res.status_code == 429:
print(res.status_code)
time.sleep(60)
else:
print(res.status_code)
sleep_val = random.randint(1,10)
time.sleep(sleep_val)
except Exception as e:
print(e)
finally:
if info != None:
listManager.append(info)
time.sleep(0.5)
return
def image_formatter(im):
return f'<img src="{im}">'
def main_pokemon_run_multiprocessing():
## cannot be 0, so max(NUMBER,1) solves this
workers = max(cpu_count()-1,1)
## create the pool
manager = Manager()
## Need a manager to help get the values async, the values will be updated after join
listManager = manager.list()
pool = Pool(workers)
try:
links_pokemon = get_number_pokemon()
part_get_clean_pokemon = partial(get_pokemon_multiprocess, listManager, links_pokemon)
# could do this the below is visualize the rate success /etc
# pool.imap(part_get_clean_pokemon, list(range(0, len(links_pokemon))))
# using tqdm to see progress imap works
for _ in tqdm(pool.imap(part_get_clean_pokemon, list(range(0, len(links_pokemon)))), total=len(links_pokemon)):
pass
pool.close()
pool.join()
finally:
pool.close()
pool.join()
pokemonList = list(listManager)
df_pokemon = pd.DataFrame(pokemonList)
df_pokemon.sort_values(['id'],inplace=True)
return df_pokemon, HTML(df_pokemon.iloc[0:4].to_html(formatters={'Image': image_formatter}, escape=False))
You'll see in the below snapshot that the pooling worked successfully and improved the runtime and iteration time drastically from 9 minutes to 1 minute and iterations per second to 14.97 iterations per second.
Hope this was helpful. To see the full code and check out my notebook on github.
Let me know if you come up with any other useful uses for this and please share any feedback on my sample if you have any.
Greyson Nevins-Archer