Concurrency in Python: Asyncio

In the third article in this series, we cover solving our recurring problem with Asyncio by itself, and reserve Asyncio and Processes for a future fourth article. Asyncio is by far the lowest-overhead form of concurrency available in Python, but it has limitations on the kinds of problems it’s helpful with.

Python Asyncio and Event Loops

Asynchronous I/O (or asyncio) in Python is a method of performing non-blocking functions with an event loop. When an event would require the program to wait, control is implicitly transferred to the event loop, which then finds other work to perform. There are a variety of asyncio constructs which may cause control to be transferred to the event loop. These include waiting for an asyncio.Queue() message, waiting for another asyncio task to finish, and waiting for an network message to arrive, or be sent – when you use the right asyncio network API.

However, at this point in time, there is no asyncio-compatible file I/O – which can be problematic. So, if your program is largely dependent on file I/O, or calls to other blocking functions, then asyncio may not be for you. Once you enter the asyncio world, you have to push blocking non-asyncio calls into threads. If that’s where you’re spending most of your time, then asyncio probably becomes why bother? But if that’s not the case this example may prove helpful.

Unfortunately, the asyncio APIs, are a bit different from the corresponding non-asyncio APIs. So, for this one, we need to rewrite our check_a_port() function.

Implementing check_a_port with asyncio

import asyncio
from typing import Tuple, Optional
async def async_check_a_port(ip: str, service: str, timeout: int = 5
                             ) -> Tuple[str, str, Optional[str]]:
    """
    Check an IP:port to see if it is open/available
    :param ip:str: IP address (or DNS name)
    :param service:str: service name
    :param timeout:int: how long to wait for before giving up.
    :return:Tuple[str, str, Optional[str]: ip, port, failure reason or None
    """
    port = socket.getservbyname(service, "tcp")
    loop = asyncio.get_event_loop()
    conn = loop.create_connection(asyncio.Protocol, host=ip, port=port)
    try:
        connection = await asyncio.wait_for(conn, timeout=timeout)
        connection[0].close()
        return ip, service, None
    except asyncio.TimeoutError as oops:
        # NOTE: asyncio.TimeoutError is not TimeoutError (!)
        return ip, service, type(oops).__name__
    except socket.error as oops:
        return ip, service, f"{oops}: errno={oops.errno}"

Although this code is similar to the code of check_a_port() from the earlier articles, let’s walk through it and see how it works.

The first thing to note is that this process has a funny new keyword at the front – async. This means that you can’t just call it, you have to await its completion with the await keyword – which you’ll see in a bit.

As before, we compute the port from /etc/services. using getservbyname(). But since this is all based on event loops, the connection is created in a pretty different way. The event loop object has a member function called create_connection – which takes a new type of parameter – asyncio.Protocol – a class implementing a conventional connection-oriented stream protocol (e.g., TCP/IP). This creates a description of what work should be done once someone decides to await it.

Next the code waits for the connection to be established. What happens when you do this is that it effectively puts the work of creating the socket on the back burner until someone awaits the result of the connection. This allows the event loop (loop) to continue doing other things. Once the connection is established, async_check_a_port resumes executing. Meanwhile, the event loop runs whatever else it has to do while waiting for the connection to complete or fail. If it fails, it either raises an asyncio.TimeoutError or a socket.error – depending on why it failed. It is confusing and very much worth noting that the Python built-in TimeoutError is a different class than asyncio.TimeoutError.

Going back to the success case, the code closes the socket and returns the success indication. This logic is essentially identical to the original check_a_port code – but under asyncio, the runtime behavior is quite different.

In our previous example, we used the built-in Python thread pool code to do threading. Unfortunately, there is no corresponding asyncio task pool, so the obvious thing to do is to create an analogous class.

Emulating Thread Pools for asyncio tasks

from typing import Dict, List, Set, Union, Optional, Awaitable, Generator
from concurrent.futures import Future
class AsyncPoolExecutor:
    """
    Class to emulate ThreadPoolExecutor for asyncio
    We ensure that no more than 'max_executions' tasks are run at once.
    """
    def __init__(self, max_executions: int):
        """
        Initialize with the requested number of work items...
        :param max_executions: int: maximum number of simultaneous tasks
        """
        self.max_executions = max_executions
        self.work: Set[Awaitable] = set() # Work in progress
        self.unscheduled: List[Union[Awaitable, Future]] = [] # not running

    def add(self, task: Awaitable):
        """
        Add an Awaitable to our list of work to be done
        :param task:Awaitable: Task to accomplish.
        :return: None
        """
        self.unscheduled.append(task)

    async def as_completed(self) -> Generator[Future]:
        """
        Async Generator returning each task as it completes.
        Similar to futures.as_completed()
        :return: Yield tasks as they complete
        """
        while True:
            while self.unscheduled and len(self.work) < self.max_executions:
                self.work.add(self.unscheduled.pop(0))
            if not self.work:
                return
            done, pending = await asyncio.wait(self.work,
                                       return_when=asyncio.FIRST_COMPLETED)
            self.work = pending  # pending is a set...
            for item in done:
                yield item

Although this is an interesting class, I’m going to only give a brief overview of it, since it emulates a thread pool for asyncio tasks, and its use will be illustrated in code following. To construct an AsyncioPoolExecutor object, you need pass the number of simultaneous connections. For our example, you would want to keep that below the maximum number of open file descriptors available, to keep it from running out of open file descriptors.

add() and as_completed() behave like the corresponding thread pool operations. The add member function simply adds work to the list of tasks not yet running. as_completed() is an asynchronous generator, and is more interesting and worth a bit more explanation.

As_completed() loops indefinitely until all the work is done. Each time through its while True loop, it performs these tasks:

  1. Add unscheduled work to the work queue (up to our limit)
  2. Return if there is no work to do
  3. Yield the results from each completed task back to whoever is waiting on the results.

The code to use it is similar to the previous synchronous examples, but is a bit different due to the differences imposed by asyncio. Below is the new version of scan_em_all – using the AsyncPoolExecutor class, and the async_check_a_port()function.

Scan ’em All with asyncio

async def scan_em_all(services: List[str], devices: List[str]):
    """
    Check every combination of services and devices to see if they're available
    :param services: List[str]: List of services
    :param devices: List[str]: List of device IPs or DNS names
    :return: Dict[str, List[str]]: Dictionary of all IPs with lists of all available services
    """
    runner = AsyncPoolExecutor(250)
    answer: Dict[str, List[str]] = {}
    for ip in devices:
        for service in services:
            runner.add(async_check_a_port(ip, service))
    async for task in runner.as_completed():
        ip, service, err = task.result()
        if err is None:
            if ip not in answer:
                answer[ip]: List[str] = []
            answer[ip].append(service)
    return {key: sorted(answer[key]) for key in sorted(answer.keys())}

The first thing the function does is create an AsyncPoolExecutor, and an empty answer dictionary. The next lines simply add the work for doing an async_check_a_port() call for each combination of IP address and port given as arguments. This is very similar to what we did before.

Next we wait for the tasks to complete – almost exactly as before – but with one significant difference. Now, we have to precede the for task in runner.as_completed() call with the async keyword. This is necessary because the as_completed member function is an asyncio generator which has to be awaited instead of merely being called. The structure of this code is nearly identical to the structure of the synchronous code, but it is enough different that it has to be a new implementation.

Why Bother with asyncio?

Having done this, the question comes up “Why bother?”. The most obvious reason is that there are no threads created at all. This is much less overhead, and from the Python perspective, this makes this code much faster and consume a lot fewer system resources. This is not only because of the lack of creating and managing threads, but Python’s GIL is constantly having to check the lock to see if it can proceed. In this case, it is always able to proceed.

In the threading case, as more and more threads get created, the percentage of time it is able to get the lock goes down and down – which makes the overhead go up and up. That overhead simply never occurs here. Although Python may check the lock, it never has to wait on it. As in the threaded case, you are still limited to the work done by a single core, but now that core is doing more work for you because of the lower overhead.

Why not use asyncio for everything?

Although you can gratuitously throw in asyncio into nearly every program, which would no doubt make you the way coolioest programmer ever, there are reasons why not to use asyncio. Here are a few:

  • If every important operation involves (synchronous) disk I/O (particularly reading from disk)
  • If every important operation involves a blocking I/O operation through a library you have no control over – for example, synchronous APIs through wrappers you don’t want to write, or things like Netmiko.
  • If there is nothing blocking in your program (like a pure computation), then asyncio can’t make it faster. In the end, without multiprocessing Processes, you’re stuck with the single-core performance limitation.
  • Performance really isn’t an issue. Asyncio adds complexity. Complexity decreases reliability. Make sure the performance advantage is worth the increased complexity.
  • You need to run on a version of Python before 3.5. Asyncio changed a lot in 3.5.

Although I originally planned for this series to be three articles, this one is long enough to be worth its own article. I’ll reserve the fourth (and hopefully last) article for Asyncio and Processes together.

Please note: I reserve the right to delete comments that are offensive or off-topic.

Leave a Reply

You have to agree to the comment policy.

This site uses Akismet to reduce spam. Learn how your comment data is processed.