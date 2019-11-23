Use Hacker Noon's RSS Feed
"""
Implementation 1: Infinite Loop in Consumers
"""
import queue
import threading
orders = queue.Queue()
def serving_line_or_consumer():
while True:
new_order = orders.get()
# prepare meals from `new_order`, assuming GIL is released while preparing meals
orders.task_done() # Invoke this to indicate the "order" in the Queue is processed
def order_line_or_producer():
# Each staff in the serving line produces 200 orders
for _ in range(200):
orders.put("Order")
# Let's put 4 staff into the order line
order_line = [threading.Thread(target=order_line_or_producer) for _ in range(4)]
# Let's assign 6 staff into the serving line
serving_line = [threading.Thread(target=serving_line_or_consumer) for _ in range(6)]
# Put all staff to work
[t.start() for t in order_line]
[t.start() for t in serving_line]
# "join" the order, block until all orders are cleared
orders.join()
# "join" the threads, ending all threads
[t.join() for t in order_line]
[t.join() for t in serving_line]
serving_line_or_consumer()
def serving_line_or_consumer():
while True: # PROBLEM: Wait for orders
new_order = orders.get()
# prepare meals from `new_order`, assuming GIL is released while preparing meals
orders.task_done() # Invoke this to indicate the "order" in the Queue is processed
"""
Implementation 2: Use a Sentinel Value to Stop Busy Waiting
"""
import queue
import threading
orders = queue.Queue()
def serving_line_or_consumer():
has_order = True
while has_order:
new_order = orders.get()
if new_order is None: # Check for Sentinel Value
has_order = False
# prepare meals from `new_order`, assuming GIL is released while preparing meals
orders.task_done() # Invoke this to indicate the "order" in the Queue is processed
def order_line_or_producer():
# Each staff in the serving line produces 200 orders
for _ in range(200):
orders.put("Order")
# Let's put 4 staff into the order line
order_line = [threading.Thread(target=order_line_or_producer) for _ in range(4)]
# Let's assign 6 staff into the serving line
serving_line = [threading.Thread(target=serving_line_or_consumer) for _ in range(6)]
# Put all staff to work
[t.start() for t in order_line]
[t.start() for t in serving_line]
# ADDED THIS: Inform serving line no more orders
[orders.put(None) for _ in range(len(serving_line))]
# "join" the order, block until all orders are cleared
orders.join()
# "join" the threads, ending all threads
[t.join() for t in order_line]
[t.join() for t in serving_line]
to inform the consumers when there aren't any new orders. We added a line to put
None
into
None
orders
[orders.put(None) for _ in range(len(serving_line))]
def serving_line_or_consumer():
has_order = True
while has_order:
new_order = orders.get()
if new_order is None: # Check for Sentinel Value
has_order = False
# prepare meals from `new_order`, assuming GIL is released while preparing meals
orders.task_done() # Invoke this to indicate the "order" in the Queue is processed
, it will break from the infinite loop. However, the caveat is, we have to explicitly recreate a consumer whenever the system gets new orders after consumers are terminated. This isn't really what desired in our
None
, and
acquire()
. I will explain what
release()
and
acquire
perform in the latter part.
release
"""
Implementation 3: Use Semaphores
"""
import queue
import threading
orders = queue.Queue()
has_order = threading.Semaphore(value=0) # ADDED THIS
def serving_line_or_consumer():
while has_order.acquire(): # ADDED THIS: Acquire a Semaphore, or sleep until the counter of semaphore is larger than zero
new_order = orders.get()
# prepare meals from `new_order`, assuming GIL is released while preparing meals
orders.task_done()
def order_line_or_producer():
# Each staff in the serving line produces 200 orders
for _ in range(200):
orders.put("Order")
has_order.release() # ADDED THIS: Release the Semaphore, increment the internal counter by 1
# Let's put 4 staff into the order line
order_line = [threading.Thread(target=order_line_or_producer) for _ in range(4)]
# Let's assign 6 staff into the serving line
serving_line = [threading.Thread(target=serving_line_or_consumer) for _ in range(6)]
# Put all staff to work
[t.start() for t in order_line]
[t.start() for t in serving_line]
# "join" the order, block until all orders are cleared
orders.join()
# "join" the threads, ending all threads
[t.join() for t in order_line]
[t.join() for t in serving_line]
would be:
acquire()
If the internal counter is zero:
allow the thread to sleep while waiting for it to be larger than zero
once the internal counter is more than zero, decrement it by 1
return True
if the internal counter is larger than zero:
decrement the internal counter by 1
return True
, our consumers will be idle when there isn't any order the internal counter of the semaphore is zero).
acquire()
to make use of semaphore as well. When a producer receives an order, it releases the semaphore. In short,
order_line_or_producer()
doesn't do anything but increment the internal counter of the semaphore.
release()
will then act accordingly. This is a much better solution among all. With semaphore, consumers will be put to idle/sleep while they wait for new orders, and producers inform consumers by incrementing the semaphore.
acquire()
Anecdote: In Private File Saver, I implemented a desktop client to sync local files to AWS S3 bucket in Python. The performance was satisfactory until I had to sync more than 12,000 files. It warranted a need to rewrite the logic to be more efficient. One of the obvious improvements would be implementing producer/consumer solution. (The details of this problem can be found here).