Close modal

Blog Post

Python concurrency I - Queues

Development
Tue 05 March 2019
0 Comments


Concurrency can be difficult to understand, there are a couple of basic concepts:

  • Process based concurrency - Such as spawning multiple processes
  • Threading based concurrency - Such as spawning multiple threads within the single process.

Let's examine threading based concurrency for simplicity, as process based just has a few more limitations and considerations.

If you are not familiar, threads are:

  • Live within a host process executing instructions, and stop if the host process stops.
  • Cannot be stopped, they must willingly stop their work and be joined (awaited)
  • Must execute care sharing objects (use atomic objects for mutation), as threads can overlap each other in reading/writing (called a "race condition").

Python provides a nice wrapper for thread operations in the threading module (threading.Thread), starting a new thread is as simple as :

from threading import Thread
Thread().start()

However there are some problems:

  • It does nothing useful
  • Will not stop unless we terminate the host process.

To achieve anything, you'll need to subclass threading, and perform your work inside the run(self) method of the class, as follows:

class WorkerThread(Thread):
def run(self):
    print("Hello world (from thread)")
WorkerThread().start()

Running this, we see it just keeps on printing out until we terminate the script (process), additionally you'll see there's no input or output from the thread. Let's add a concept to help us gracefully stop worked threads as needed, known as a semaphore, or event in the python library.

There are several operations that can be performed on it:

  • isSet() - Return the internal flag, without waiting (polls)
  • set() - Set internal flag to true (triggers anyone waiting)
  • clear() - Set internal flag to false
  • wait([timeout]) - Waits for the flag to be true, until optional timeout (then false)

Now we have a way to inform one or more threads that a state has changed, which in our case we can use a stop signal for worker threads. While events support being waited upon, we actually want to perform other things in the meantime, and as such we will poll the event everytime we perform our run loop, and if the event's flag is not set, we will carry on as usual; it would look like this:

def run(self):
    while not self.stoprequest.isSet():
        pass  # Perform the actual work

We need another structure, to safely pass either work for consumption in or results out for publishing. Enter the queue, it has some similar functions to the event, but is actually a list (you can use SimpleQueue for unlimited entries, or a regular Queue with a fixed limit and more advanced features)

If you're using a queue to accept incoming work you can actually wait upon the work, however I would suggest using a timeout so that the thread can gracefully finish execution when requested, a small timeout will avoid threashing the thread by polling constantly as well.

Here's how to poll a queue without blocking, as you can see an exception can be raised if it is empty, and mush be caught as below.

try:
    item = queue.get_nowait()
except e as queue.Empty:
    pass

If using regular Queue with a fixed limit, be aware that an exception can be raised if the queue is full (SimpleQueue instances do not have this problem).

try:
    queue.put("Hello World")
except e as queue.Full:
    pass

Finally, let's combine two well behaved worker thread that observes stop requests, and publishes output to a shared queue, since the queue will be FIFO (first in, first out) the consumer of it will get the items in the order they were published - and this is safe due to the design of queue (specifically this is what it was made for).

#!# -*- coding: utf-8 -*-
from collections import namedtuple
from queue import SimpleQueue
from threading import Thread, Event
from time import sleep
from random import randint


class WorkerThread(Thread):
    def __init__(self, *, result_queue, identifier):
        super(WorkerThread, self).__init__()
        self.result_queue = result_queue
        self.identifier = identifier
        self.stoprequest = Event()
        self.counter = 0

    def run(self):
        while not self.stoprequest.isSet():
            self.service_runloop()

    def service_runloop(self):
        sleep(randint(1, 5))
        self.result_queue.put(f'[{self.identifier}: Item {self.counter}')
        self.counter = self.counter + 1

    def join(self, timeout=None):
        self.stoprequest.set()
        super(WorkerThread, self).join(timeout)


if __name__ == '__main__':
    q = SimpleQueue()
    worker1 = WorkerThread(result_queue=q, identifier='Mickie')
    worker2 = WorkerThread(result_queue=q, identifier='Minnie')
    #test = SimpleCollector(result_queue=q)
    worker1.start()
    worker2.start()
    while True:
        item = q.get()
        print(item)

As you can see, it's just the thread identifier being published after a random delay, after a short interval you should see some duplicates from one thread while another is sleeping. Whilst trivial in nature, this type of pattern could perform any number of takss... even accepting work from the queue to process and either discard or publish the computed result to another queue. The only limit to what you can do is how you design your code to take advantage of parallel execution.