This article covers how to use system processes both with and without thread pools as Python Concurrency Methods.
In my previous article, I gave an overview of thread pools in Python. This is a good technique for many problems, but has limitations imposed by Python’s Global Interpreter Lock among other things. This article will bypass the limits of Python’s Global Interpreter Lock.
Our task is the same as in the previous article: determine if a given IP address supports telnet or ssh. For these techniques we’ll reuse the code for check_a_port as-is. I’ve reproduced it below:
from typing import Tuple, List, Generator import socket PortStatus = Tuple[str, int, bool] # Our result type def check_a_port(ip_or_name: str, port: int) -> PortStatus try: sock = socket.create_connection((ip_or_name, port), timeout=10) sock.close() return ip_or_name, port, True except (TimeoutError, OSError): return ip_or_name, port, False
Follow Your Processes
Python provides a surprisingly easy to use built-in package called multiprocessing. The multiprocessing package provides a couple of Python classes which I will use in this article. The first is the Process
class, which instantiates a system (UNIX, Linux, Windows, etc) process running your Python code. Processes do not share address space with each other, so you can’t just wait for things to complete like you can with threads. Fortunately, the multiprocessing package provides the Queue
class to deal with this quite easily.
Process Logic – Queue it up!
The general approach that I’m going to follow for this is to create a work queue of IP:port pairs to examine, and a results queue for the return results from check_a_port()
. The basic logic is pretty simple:
- Write all the IP:port pairs to the work
Queue
- Each worker
Process
does the following steps in a loop:- Read the work
Queue
, - Call
check_a_port()
on the IP: port pair from the workQueue
- Write the return result to the result
Queue
- Read the work
- The original program thread reads the result
Queue
,yield
ing each result to its caller as it occurs.
The code for this is a bit longer than the thread case, but not bad at all. Python’s multiprocessing library is taking care of a huge number of details behind the scenes to make this much easier than in many languages.
Let’s start out with the process_worker – defined as a function.
from multiprocessing import Process, Queue def proc_worker(work: Queue, result: Queue) -> None: while True: request = work.get(block=True) if request is None: return result.put(self.check_a_port(*request))
This is pretty simple, we read things from the work
queue, if they’re None
, we return, and if they’re not, we give them to check_a_port
, and put the return on the result
queue. Now all we have to do is run those in a lot of processes, and give them work to do in their work queue and yield
up the results to our caller. Next step: run them in processes – perhaps 32 of them. Here’s what that code looks like:
def check_ports(ip_ports: List[Tuple[str, int]]) -> Generator[PortStatus] work = Queue() result = Queue() workers = [Process(target=proc_worker, args=(work, result)) for _ in range(32)] for item in ip_ports: work.put(item) remaining = len(ip_ports) while remaining: yield result.get() remaining -= 1 for process in workers: process.terminate() process.join()
This turned out to be pretty simple, but still it deserves a little explanation of what’s going on and why it’s there.
- The list comprehension creates a list of 32 worker processes, with each running proc_worker, and each is passed the work queue and the result queue.
- The
for item in ip_ports
loop puts the work from ip_ports into the work queue. You can write any pickleable Python object onto a multiprocessing Queue. This is pretty cool! - The
while remaining
loop reads the results and yields them to our caller as they come in. - Lastly we kill each of our infinitely looping worker processes and wait for them to terminate.
Needless to say, Python and Linux are doing a good bit of work under the covers to make this happen so transparently and with so little effort on my part.
A Potential Flaw
Although it’s pretty simple it has a potential flaw for large input sets. If the worker processes write so many results to the result queue that it fills up before all the work can be put in the work queue – it can deadlock trying. Not a happy result. To fix that one might invent a function like this:
def writer(ip_ports: List[Tuple[str, int]], work: Queue):
for item in ip_ports:
work.put(item)
Now you can call our writer function like this:
queuer = Process(writer, args=(ip_ports, work))
And of course, don’t forget to add a queuer.join()
call after the get/yield
loop is complete. This function doesn’t infinitely loop, so it doesn’t need to be explicitly terminated.
Disadvantages of Processes
Although doing your concurrency with processes easily gets past Python’s GIL, it does have some disadvantages:
- Linux processes are slightly more expensive than threads, and Python multiprocess processes are more expensive than a simple Linux process fork, or a Python thread.
- The total number of processes on a UNIX system is limited to 64K.
- Copying the work and results to queues is more expensive than passing object references to threads. If they’re small, it’s probably not important, but if they’re massive, this is likely important.
Threads and Pools and Processes, Oh My!
Bringing thread pools to the process table gets us one step closer to low overhead, high scalability concurrency in Python. The general architecture is going to look a bit like the Process model above, and a bit like the thread pool model from the previous article. Architecturally, each process needs a thread worker which reads requests from the work queue, and then gives the work to the workers in the thread pool.
Because we didn’t change the check_a_port
code from the original model, we’ll also need someone to harvest the results from the completed thread tasks, and post the results on the result queue. Let’s call the first one thread_worker()
, and the second one result_poster().
def thread_worker(work: Queue, result: Queue): thread_pool = ThreadPoolExecutor(max_threads) task_list: Set[Future] = set() threading.Thread(target=result_poster, task_list, result).start() while True: request = self.work_queue.get(block=True) task = self.thread_pool.submit(check_a_port, *request) self.task_list.add(task) def result_poster(task_list, result): while True: if self.task_list: task = as_completed(task_list).__next__() result.put(task.result()) task_list.remove(task) else: time.sleep(.25)
Now that we’ve seen the code, let’s see if it makes any sense. Looking at thread_worker
, it goes through these steps
- Create a thread pool
- Create a set of tasks to represent the work given to the thread pool
- Start our
result_poster()
function in its own thread. - Loop forever doing the following steps:
- pull work from the work queue
- submit the work to our thread port (check_a_port)
- Add the task to our list of active tasks
result_poster()
loops forever through the following steps to fulfill its role in this architecture:
- If there’s anything in the task list:
- Pull a completed task from the task list
- Get the result of the task completion and puts it on the result queue
- Removes the task from the task list
- Otherwise, delay for a bit
Some Things Hardly Change At All!
That leaves only the new implementation of check_ports() – which is almost identical to the previous one for doing Process
-only concurrency. The only difference is in that the Process
() objects use thread_worker
as their target instead of proc_worker
. So, for thread pools in processes, it looks like this instead:
def check_ports(ip_ports: List[Tuple[str, int]]) -> Generator[PortStatus]
work = Queue()
result = Queue()
workers = [Process(target=thread_worker, args=(work, result))
for _ in range(32)]
for item in ip_ports:
work.put(item)
remaining = len(ip_ports)
while remaining:
yield result.get()
remaining -= 1
for process in workers:
process.terminate()
process.join()
Of course, it behaves exactly as described for the previous version of check_ports()
. This has the advantage that you’re don’t have to create a huge number of processes to get past Python’s GIL. In fact, you often only need one worker process for each core in your target system. In most cases, you can get the rest of your concurrency using threads. So, this turns out to be a pretty good approach to have in your bag of tricks.
Barriers are Good Things!
There is another possibility for unfairness which might occur in this arrangement of thread pools in processes and affect the average level of concurrency and final running time. It is possible for some of our processes to get going before others are ready to go. If that happens, they could suck up a disproportionate number of work items to put in their thread queues. If the thread pools are large, and the number of items isn’t so large, one process could grab all the work. This would underload our other processes. There is a little-known synchronization tool for this which solves this perfectly – a barrier.
The idea of a barrier is like a starting line for a race. After everyone arrives at the starting line, the official shoots off his starting pistol, and everyone takes off. Barriers work the same way. You create a barrier with the number of participants in the barrier using something like barrier = Barrier(n)
, and once everyone is ready, they declare they’ve reached the barrier by issuing a barrier.wait()
. They are suspended awaiting the other participants. Once all the participants have issued a barrier.wait()
, they are all released to run at once. This can be used to add a bit more evenness or fairness to the process.
Disadvantages of Processes and Thread Pools
You knew I was going to to write this section about the limitations of thread pools in processes. Since I don’t want to disappoint – here it are the main disadvantage to this approach:
- Threads are still relatively expensive, and as you go up in the number of threads, you eventually begin to spend a lot of time locking on the GIL. You can mitigate this to a degree by increasing the number of processes beyond the one-per-core rule of thumb.
Looking Forward to the Final Article in the Series
The final article will explain how to do concurrency in Python with no threads at all for network-bound work like this. This will significantly lower the overhead for network-bound tasks.
Can you explain the significance of the * (star) in the following code example you gave?
while True:
request = self.work_queue.get(block=True)
task = self.thread_pool.submit(check_a_port, *request)
self.task_list.add(task)
I can comprehend the explanation and what’s basically happening here, but I don’t understand the *requests, is this a python3 thing?
Is the *(star) necessary?
The
check_a_port()
function takes two parameters. Request contains those two parameters in a single tuple, in the right order. Placing the*
before request, expands it to the same number of parameters as the number of elements in request (2). If that weren’t there, then it would complain thatcheck_a_port()
needed two parameters, but only one had been given. Alternatively, I could have writtenself.thread_pool.submit(check_a_port, request[0], request[1])
which is equivalent. It’s not a Python3 thing. Sorry for the slow response :-(.