Package aioextensions

High performance functions to work with the async IO.

Release Documentation Downloads Status Coverage License

Rationale

Modern services deal with a bunch of different tasks to perform:

Latency comparison

The important thing to note is that tasks can be categorized in two groups:

CPU bound tasks

Those that happen inside the CPU, with very low latency and exploit the full potential of the hardware in the computer.

Resources of an CPU bound task

Examples of these tasks include:

Task Latency in seconds
CPU computation 0.000000001
Memory access 0.0000001
CPU Processing (1KB) 0.000003
Memory read (1MB) 0.00025

IO bound tasks

Those that happen over a wire that transports data, with very high latencies and do not exploit the full potential of the hardware because the only thing to do is waiting until the data gets to the other end and comes back (round-trip).

Resources of an IO bound task

Examples of these tasks include:

Task Latency in seconds
Disk access 0.00015
HTTP to localhost 0.0005
Disk read (1MB) 0.02
HTTP to internet 0.15

Speed and costs matter

At the end of the day, we want to minimize the amount of cost per user served by the program, server, or service while maximizing the user perception of speed.

In order to achieve this we need a model that allows us to exploit all CPU cores and installed hardware in the machine, while maintaining the ability to query large amounts of high-latency external services over the network: databases, caches, storage, distributed queues, or input from multiple users.

Concurrency model

Python's Async IO has a concurrency model based on an event loop, which is responsible for executing the code, collecting and processing what's needed.

This event-loop executes in the main thread of the Python interpreter and therefore it's limited by the GIL, so it's alone unable to exploit all hardware installed in the host.

However:

  • CPU intensive work can be sent to a pool of processes, far from the event-loop and thus being able to bypass the GIL, exploiting many CPU cores in the machine, and leaving the event-loop schedule and coordinate incoming requests.
  • IO intensive work can be sent to a pool of threads, far from the event-loop and thus being able to wait for high-latency operations without interrupting the event-loop work.

There is an important difference:

  • Work done by a pool of processes is executed in parallel: all CPU cores are being used.
  • Work done by a pool of threads is done concurrently: tasks execution is overlapping, but not necessarily parallel: only 1 task can use the CPU while the remaining ones are waiting the GIL.

Solving CPU bound tasks efficiently

The optimal way to perform CPU bound tasks is to send them to separate processses in order to bypass the GIL.

Usage

>>> from aioextensions import collect, in_process, run
>>> def cpu_bound_task(id: str):
        print(f'doing: {id}')
        # Imagine here something that uses a lot the CPU
        # For example: this complex mathematical operation
        for _ in range(10): 3**20000000
        print(f'returning: {id}')
        return id
>>> async def main():
        results = await collect([
            # in_process sends the task to a pool of processes
            in_process(cpu_bound_task, id)
            # Let's solve 5 of those tasks in parallel!
            for id in range(5)
        ])
        print(f'results: {results}')
>>> run(main())
# I have 4 CPU cores in my machine
doing: 0
doing: 1
doing: 2
doing: 3
returning: 1
doing: 4
returning: 2
returning: 3
returning: 0
returning: 4
results: (0, 1, 2, 3, 4)

As expected, all CPU cores were used and we were hardware-efficient!

Solving IO bound tasks efficiently

The optimal way to perform IO bound tasks is to send them to separate threads. This does not bypass the GIL. However, threads will be in idle state most of the time, waiting high-latency operations to complete.

Usage

>>> from aioextensions import collect, in_thread, run
>>> from time import sleep, time
>>> def io_bound_task(id: str):
        print(f'time: {time()}, doing: {id}')
        # Imagine here something with high latency
        # For example: a call to the database, or this sleep
        sleep(1)
        print(f'time: {time()}, returning: {id}')
        return id
>>> async def main():
        results = await collect([
            # in_thread sends the task to a pool of threads
            in_thread(io_bound_task, id)
            # Let's solve 5 of those tasks concurrently!
            for id in range(5)
        ])
        print(f'time: {time()}, results: {results}')
>>> run(main)
time: 1597623831, doing: 0
time: 1597623831, doing: 1
time: 1597623831, doing: 2
time: 1597623831, doing: 3
time: 1597623831, doing: 4
time: 1597623832, returning: 0
time: 1597623832, returning: 4
time: 1597623832, returning: 3
time: 1597623832, returning: 2
time: 1597623832, returning: 1
time: 1597623832, results: (0, 1, 2, 3, 4)

As expected, all tasks were executed concurrently. This means that instead of waiting five seconds for five tasks (serially) we just waited one second for all of them.

Installing

$ pip install aioextensions

# Optionally if you want uvloop support (not available on Windows)

$ pip install aioextensions[full]

Using

>>> from aioextensions import *  # to import everything

Please read the documentation bellow for more details about every function.

Expand source code Browse git
"""High performance functions to work with the async IO.

[![Release](
https://img.shields.io/pypi/v/aioextensions?color=success&label=Release&style=flat-square)](
https://pypi.org/project/aioextensions)
[![Documentation](
https://img.shields.io/badge/Documentation-click_here!-success?style=flat-square)](
https://fluidattacks.github.io/aioextensions/)
[![Downloads](
https://img.shields.io/pypi/dm/aioextensions?label=Downloads&style=flat-square)](
https://pypi.org/project/aioextensions)
[![Status](
https://img.shields.io/pypi/status/aioextensions?label=Status&style=flat-square)](
https://pypi.org/project/aioextensions)
[![Coverage](
https://img.shields.io/badge/Coverage-100%25-success?style=flat-square)](
https://fluidattacks.github.io/aioextensions/)
[![License](
https://img.shields.io/pypi/l/aioextensions?color=success&label=License&style=flat-square)](
https://github.com/fluidattacks/aioextensions/blob/latest/LICENSE.md)

# Rationale

Modern services deal with a bunch of different tasks to perform:

![Latency comparison](
https://raw.githubusercontent.com/fluidattacks/aioextensions/latest/docs/static/latency.png)

The important thing to note is that tasks can be categorized in two groups:

## CPU bound tasks

Those that happen inside the CPU, with very low latency and exploit the full
potential of the hardware in the computer.

![Resources of an CPU bound task](
https://raw.githubusercontent.com/fluidattacks/aioextensions/latest/docs/static/resources_cpu_task.png)

Examples of these tasks include:

| Task                 | Latency in seconds |
|----------------------|-------------------:|
| CPU computation      |        0.000000001 |
| Memory access        |          0.0000001 |
| CPU Processing (1KB) |           0.000003 |
| Memory read (1MB)    |            0.00025 |

## IO bound tasks

Those that happen over a wire that transports data, with very high latencies
and do not exploit the full potential of the hardware because the only thing to
do is waiting until the data gets to the other end and comes back (round-trip).

![Resources of an IO bound task](
https://raw.githubusercontent.com/fluidattacks/aioextensions/latest/docs/static/resources_io_task.png)

Examples of these tasks include:

| Task                 | Latency in seconds |
|----------------------|-------------------:|
| Disk access          |            0.00015 |
| HTTP to localhost    |             0.0005 |
| Disk read (1MB)      |               0.02 |
| HTTP to internet     |               0.15 |

# Speed and costs matter

At the end of the day, we want to minimize the amount of cost per user served
by the program, server, or service while maximizing the user perception of
speed.

In order to achieve this we need a model that allows us to exploit all CPU
cores and installed hardware in the machine, while maintaining the ability to
query large amounts of high-latency external services over the network:
databases, caches, storage, distributed queues, or input from multiple users.

# Concurrency model

Python's Async IO has a concurrency model based on an **event loop**, which
is responsible for executing the code, collecting and processing what's needed.

This event-loop executes in the main thread of the Python interpreter and
therefore it's limited by the [GIL](https://realpython.com/python-gil), so it's
alone unable to exploit all hardware installed in the host.

However:

- CPU intensive work can be sent to a pool of processes, far from the
    event-loop and thus being able to bypass the
    [GIL](https://realpython.com/python-gil), exploiting many CPU cores in
    the machine, and leaving the event-loop schedule and coordinate incoming
    requests.
- IO intensive work can be sent to a pool of threads, far from the event-loop
    and thus being able to wait for high-latency operations without
    interrupting the event-loop work.

There is an important difference:

-  Work done by a pool of processes is executed in parallel: all CPU cores are
    being used.
-  Work done by a pool of threads is done concurrently: tasks execution is
    overlapping, but not necessarily parallel: only 1 task can use the CPU
    while the remaining ones are waiting the
    [GIL](https://realpython.com/python-gil).

## Solving CPU bound tasks efficiently

The optimal way to perform CPU bound tasks is to send them to separate
processses in order to bypass the [GIL](https://realpython.com/python-gil).

Usage:

    >>> from aioextensions import collect, in_process, run

    >>> def cpu_bound_task(id: str):
            print(f'doing: {id}')
            # Imagine here something that uses a lot the CPU
            # For example: this complex mathematical operation
            for _ in range(10): 3**20000000
            print(f'returning: {id}')
            return id

    >>> async def main():
            results = await collect([
                # in_process sends the task to a pool of processes
                in_process(cpu_bound_task, id)
                # Let's solve 5 of those tasks in parallel!
                for id in range(5)
            ])
            print(f'results: {results}')

    >>> run(main())
    # I have 4 CPU cores in my machine
    doing: 0
    doing: 1
    doing: 2
    doing: 3
    returning: 1
    doing: 4
    returning: 2
    returning: 3
    returning: 0
    returning: 4
    results: (0, 1, 2, 3, 4)

As expected, all CPU cores were used and we were hardware-efficient!

## Solving IO bound tasks efficiently

The optimal way to perform IO bound tasks is to send them to separate
threads. This does not bypass the [GIL](https://realpython.com/python-gil).
However, threads will be in idle state most of the time, waiting high-latency
operations to complete.

Usage:

    >>> from aioextensions import collect, in_thread, run
    >>> from time import sleep, time

    >>> def io_bound_task(id: str):
            print(f'time: {time()}, doing: {id}')
            # Imagine here something with high latency
            # For example: a call to the database, or this sleep
            sleep(1)
            print(f'time: {time()}, returning: {id}')
            return id

    >>> async def main():
            results = await collect([
                # in_thread sends the task to a pool of threads
                in_thread(io_bound_task, id)
                # Let's solve 5 of those tasks concurrently!
                for id in range(5)
            ])
            print(f'time: {time()}, results: {results}')

    >>> run(main)
    time: 1597623831, doing: 0
    time: 1597623831, doing: 1
    time: 1597623831, doing: 2
    time: 1597623831, doing: 3
    time: 1597623831, doing: 4
    time: 1597623832, returning: 0
    time: 1597623832, returning: 4
    time: 1597623832, returning: 3
    time: 1597623832, returning: 2
    time: 1597623832, returning: 1
    time: 1597623832, results: (0, 1, 2, 3, 4)

As expected, all tasks were executed concurrently. This means that instead of
waiting five seconds for five tasks (serially) we just waited one second for
all of them.

# Installing

    $ pip install aioextensions

    # Optionally if you want uvloop support (not available on Windows)

    $ pip install aioextensions[full]

# Using

    >>> from aioextensions import *  # to import everything

Please read the documentation bellow for more details about every function.
"""

# Standard library
import asyncio
from collections import (
    deque,
)
from concurrent.futures import (
    Executor,
    ProcessPoolExecutor,
    ThreadPoolExecutor,
)
from contextlib import (
    asynccontextmanager,
    suppress,
)
from functools import (
    partial,
    wraps,
)
from itertools import (
    tee,
)
from os import (
    cpu_count,
)
from typing import (
    Any,
    AsyncGenerator,
    AsyncIterator,
    Awaitable,
    Callable,
    cast,
    Deque,
    Dict,
    Generator,
    Iterable,
    Optional,
    Tuple,
    Type,
    TypeVar,
    Union,
)

# Third party libraries
try:
    # Attempt to install uvloop (optional dependency)
    import uvloop

    UVLOOP = uvloop
except ImportError:
    UVLOOP = None

# Constants
F = TypeVar("F", bound=Callable[..., Any])  # pylint: disable=invalid-name
S = TypeVar("S")  # pylint: disable=invalid-name
T = TypeVar("T")  # pylint: disable=invalid-name
Y = TypeVar("Y")  # pylint: disable=invalid-name

# Linters
# pylint: disable=unsubscriptable-object


def run(coroutine: Awaitable[T], *, debug: bool = False) -> T:
    """Execute an asynchronous function synchronously and return its result.

    Usage:

        >>> async def do(a, b=0):
                await something
                return a + b

        >>> run(do(1, b=2))

        >>> 3

    This function acts as a drop-in replacement of asyncio.run and
    installs `uvloop` (the fastest event-loop implementation out there) if
    available.

    .. tip::
        Use this as the entrypoint for your program.
    """
    if UVLOOP is not None:
        UVLOOP.install()

    return asyncio.run(coroutine, debug=debug)


async def in_thread(
    function: Callable[..., T],
    *args: Any,
    **kwargs: Any,
) -> T:
    """Execute `function(*args, **kwargs)` in the configured thread pool.

    This is the most performant wrapper for IO bound and high-latency tasks.

    Every task will be assigned at most one thread, if there are more tasks
    than threads in the pool the excess will be executed in FIFO order.

    Spawning a million IO bound tasks with this function has a very small
    memory footprint.

    .. warning::
        Executing CPU intensive work here is a bad idea because of the
        limitations that the [GIL](https://realpython.com/python-gil) imposes.

        See `in_process` for a CPU performant alternative.
    """
    _ensure_thread_pool_is_initialized()

    return await asyncio.get_running_loop().run_in_executor(
        THREAD_POOL.pool,
        partial(function, *args, **kwargs),
    )


async def in_process(
    function: Callable[..., T],
    *args: Any,
    **kwargs: Any,
) -> T:
    """Execute `function(*args, **kwargs)` in the configured process pool.

    This is the most performant wrapper for CPU bound and low-latency tasks.

    Tasks executed in a process pool bypass the
    [GIL](https://realpython.com/python-gil) and can consume all CPU cores
    available in the host if needed.

    Every task will be assigned at most one process, if there are more tasks
    than processes in the pool the excess will be executed in FIFO order.

    .. warning::
        Executing IO intensive work here is possible, but spawning a process
        has some overhead that can be avoided using threads at no performance
        expense.

        See `in_thread` for an IO performant alternative.
    """
    _ensure_process_pool_is_initialized()

    return await asyncio.get_running_loop().run_in_executor(
        PROCESS_POOL.pool,
        partial(function, *args, **kwargs),
    )


def rate_limited(
    *,
    max_calls: int,
    max_calls_period: Union[float, int],
    min_seconds_between_calls: Union[float, int] = 0,
) -> Callable[[F], F]:
    """Decorator to turn an asynchronous function into a rate limited one.

    The decorated function won't be able to execute more than `max_calls` times
    over a period of `max_calls_period` seconds. The excess will be queued in
    FIFO mode.

    Aditionally, it's guaranteed that no successive calls can be performed
    faster than `min_seconds_between_calls` seconds.

    Usage:

        If you want to perform at most 2 calls to a database per second:

        >>> @rate_limited(
                max_calls=2,
                max_calls_period=1,
                min_seconds_between_calls=0.2,
            )
            async def query(n):
                await something
                print(f'time: {time()}, doing: {n}')

        >>> await collect(map(query, range(10)))

    Output:

        ```
        time: 1597706698.0, doing: 0
        time: 1597706698.2, doing: 1
        time: 1597706699.0, doing: 2
        time: 1597706699.2, doing: 3
        time: 1597706700.0, doing: 4
        time: 1597706700.2, doing: 5
        time: 1597706701.0, doing: 6
        time: 1597706701.2, doing: 7
        time: 1597706702.0, doing: 8
        time: 1597706702.2, doing: 9
        ```

    .. tip::
        Use `min_seconds_between_calls` as an anti-burst system. This can, for
        instance, lower your bill in DynamoDB or prevent a cooldown period
        (also know as ban) by a firewall.

    This decorator creates a `max_calls` sized data structure.
    """
    if max_calls < 1:
        raise ValueError("max_calls must be >= 1")
    if max_calls_period <= 0:
        raise ValueError("max_calls_period must be > 0")
    if min_seconds_between_calls < 0:
        raise ValueError("min_seconds_between_calls must be >= 0")

    def decorator(function: F) -> F:
        lock = None
        waits: Deque[float] = deque()

        @wraps(function)
        async def wrapper(*args: Any, **kwargs: Any) -> Any:
            nonlocal lock

            lock = lock or asyncio.Lock()
            loop = asyncio.get_event_loop()

            async with lock:
                if waits:
                    # Anti burst control system:
                    #   wait until the difference between the most recent call
                    #   and the current call is >= min_seconds_between_calls
                    await asyncio.sleep(
                        waits[-1] + min_seconds_between_calls - loop.time()
                    )

                while len(waits) >= max_calls:
                    # Rate limit control system:
                    #   wait until the least recent call and the current call
                    #   is >= max_calls_period
                    await asyncio.sleep(
                        waits.popleft() + max_calls_period - loop.time()
                    )

                waits.append(loop.time())

            return await function(*args, **kwargs)

        return cast(F, wrapper)

    return decorator


async def collect(
    awaitables: Iterable[Awaitable[T]],
    *,
    workers: int = 1024,
) -> Tuple[T, ...]:
    """Resolve concurrently the input stream and return back in the same order.

    At any point in time there will be at most _number of `workers`_
    tasks being resolved concurrently.

    Aditionally, the algorithm makes sure that at any point in time every
    worker is busy.

    Args:
        awaitables: An iterable (generator, list, tuple, set, etc) of
            awaitables (coroutine, asyncio.Task, or asyncio.Future).
        workers: The number of independent workers that will be processing
            the input stream.

    Returns:
        A tuple with the results of executing each awaitable in the event loop.
        Results are returned in the same order of the input stream.

    Usage:
        >>> async def do(n):
                print(f'running: {n}')
                await sleep(1)
                print(f'returning: {n}')
                return n

        >>> iterable = map(do, range(5))

        >>> results = await collect(iterable, workers=2)

        >>> print(f'results: {results}')

    Output:
        ```
        running: 0
        running: 1
        returning: 0
        returning: 1
        running: 2
        running: 3
        returning: 2
        returning: 3
        running: 4
        returning: 4
        results: (0, 1, 2, 3, 4)
        ```

    .. tip::
        This is similar to asyncio.as_completed. However results are returned
        in order and allows you to control how much resources are consumed
        throughout the execution, for instance:

        - How many open files will be opened at the same time
        - How many HTTP requests will be performed to a service (rate limit)
        - How many sockets will be opened concurrently
        - Etc

        This is useful for finite resources, for instance: the number
        of sockets provided by the operative system is limited; going beyond it
        would make the kernel to kill the program abruptly.

    If awaitables is an instance of Sized (has `__len__` prototype).
    This function will launch at most `len(awaitables)` workers.
    """
    return tuple(
        [
            await elem
            for elem in resolve(
                awaitables,
                workers=workers,
                worker_greediness=0,
            )
        ]
    )


def resolve(  # noqa: mccabe
    awaitables: Iterable[Awaitable[T]],
    *,
    workers: int = 1024,
    worker_greediness: int = 0,
) -> Iterable[Awaitable[T]]:
    """Resolve concurrently the input stream and yield back in the same order.

    At any point in time there will be at most _number of `workers`_
    tasks being resolved concurrently.

    Aditionally, the algorithm makes sure that at any point in time every
    worker is busy (if greediness allow them).

    Args:
        awaitables: An iterable (generator, list, tuple, set, etc) of
            awaitables (coroutine, asyncio.Task, or asyncio.Future).
        workers: The number of independent workers that will be processing
            the input stream.
        worker_greediness: How much tasks can a worker process before waiting
            for you to retrieve its results. 0 means unlimited. Set to non-zero
            in order to upper-bound memory usage throughout the execution.

    Yields:
        A future with the result of the next ready task. Futures are yielded in
        the same order of the input stream.

    Usage:
        >>> async def do(n):
                print(f'running: {n}')
                await asyncio.sleep(1)
                print(f'returning: {n}')
                return n

        >>> iterable = map(do, range(5))

        >>> for next in resolve(iterable, workers=2):
                try:
                    print(f'got resolved result: {await next}')
                except:
                    pass  # Handle possible exceptions

    Output:
        ```
        running: 0
        running: 1
        returning: 0
        returning: 1
        got resolved result: 0
        got resolved result: 1
        running: 2
        running: 3
        returning: 2
        returning: 3
        got resolved result: 2
        got resolved result: 3
        running: 4
        returning: 4
        got resolved result: 4
        ```

    .. tip::
        This is similar to asyncio.as_completed. However results are returned
        in order and allows you to control how much resources are consumed
        throughout the execution, for instance:

        - How many open files will be opened at the same time
        - How many HTTP requests will be performed to a service (rate limit)
        - How many sockets will be opened concurrently
        - Etc

        This is useful for finite resources, for instance: the number
        of sockets provided by the operative system is limited; going beyond it
        would make the kernel to kill the program abruptly.

    If awaitables is an instance of Sized (has `__len__` prototype).
    This function will launch at most `len(awaitables)` workers.
    """
    if workers < 1:
        raise ValueError("workers must be >= 1")
    if worker_greediness < 0:
        raise ValueError("worker_greediness must be >= 0")

    if hasattr(awaitables, "__len__"):
        workers = min(workers, len(awaitables))  # type: ignore

    loop = asyncio.get_event_loop()
    store: Dict[int, asyncio.Queue] = {}
    stream, stream_copy = tee(enumerate(awaitables))
    stream_finished = asyncio.Event()
    workers_up = asyncio.Event()
    workers_tasks: Dict[int, asyncio.Task] = {}

    async def worker() -> None:
        done: asyncio.Queue = asyncio.Queue(worker_greediness)
        for index, awaitable in stream:
            store[index] = done
            future = loop.create_future()
            future.set_result(await schedule(awaitable, loop=loop))
            await done.put(future)
            workers_up.set()
        workers_up.set()
        stream_finished.set()

    async def start_workers() -> None:
        for index in range(workers):
            if stream_finished.is_set():
                break
            workers_tasks[index] = asyncio.create_task(worker())
            await force_loop_cycle()
        await workers_up.wait()

    async def get_one(index: int) -> Awaitable[T]:
        if not workers_tasks:
            await start_workers()

        awaitable = await store.pop(index).get()
        result: Awaitable[T] = (await awaitable).result()
        return result

    for index, _ in stream_copy:
        yield cast(Awaitable[T], get_one(index))


async def force_loop_cycle() -> None:
    """Force the event loop to perform one cycle.

    This can be used to suspend the execution of the current coroutine and
    yield control back to the event-loop until the next cycle.

    Can be seen as a forceful switch of control between threads.
    Useful for cooperative initialization.

    Usage:

        >>> await forceforce_loop_cycle()

    """
    await asyncio.sleep(0)


async def generate_in_thread(
    generator_func: Callable[..., Generator[Y, S, None]],
    *args: Any,
    **kwargs: Any,
) -> AsyncGenerator[Y, S]:
    """Mimic `generator_func(*args, **kwargs)` in the configured thread pool.

    Note that `generator_func(*args, **kwargs)` may return a generator or an
    interator and both cases are handled rightfully.

    Usage:

        >>> from os import scandir

        >>> async for entry in generate_in_thread(scandir, '.'):
                print(entry.name)

    Output:
        ```
        .gitignore
        LICENSE.md
        README.md
        ...
        ```

    Calls to the generator are done serially and not concurrently.

    The benefit of wrapping a generator with this function is that the
    event-loop is free to schedule and wait another tasks in the mean time.
    For instance, in a web server.
    """
    gen: Generator[Y, S, None] = generator_func(*args, **kwargs)
    gen_sent: Any = None

    def gen_next(val: S) -> Y:
        with suppress(StopIteration):
            return gen.send(val) if hasattr(gen, "send") else next(gen)
        raise StopAsyncIteration()

    while True:
        try:
            gen_sent = yield await in_thread(gen_next, gen_sent)
        except StopAsyncIteration:
            return


def schedule(
    awaitable: Awaitable[T],
    *,
    loop: Optional[asyncio.AbstractEventLoop] = None,
) -> "Awaitable[asyncio.Future[T]]":
    """Schedule an awaitable in the event loop and return a wrapper for it.

    Usage:

        >>> async def do(n):
                print(f'running: {n}')
                await sleep(1)
                print(f'returning: {n}')

        >>> task = schedule(do(3))  # Task is executing in the background now

        >>> print('other work is being done here')

        >>> task_result = await task  # Wait until the task is ready

        >>> print(f'result: {task_result.result()}')  # may rise if do() raised

    Output:

        ```
        other work is being done here
        doing: 3
        returning: 3
        3
        ```

    This works very similar to asyncio.create_task. The main difference is that
    the result (or exception) can be accessed via exception() or result()
    methods.

    If an exception was raised by the awaitable, it will be propagated only at
    the moment result() is called and never otherwise.
    """
    wrapper = (loop or asyncio.get_event_loop()).create_future()

    def _done_callback(future: asyncio.Future) -> None:
        if not wrapper.done():  # pragma: no cover
            wrapper.set_result(future)

    asyncio.create_task(awaitable).add_done_callback(_done_callback)

    return wrapper


class Semaphore(asyncio.Semaphore):
    """Same as `asyncio.Semaphore` plus some useful methods."""

    @asynccontextmanager
    async def acquire_many(self, times: int) -> AsyncIterator[None]:
        """Acquire a semaphore many times, and release on exit.

        Usage:

            >>> async with semaphore.acquire_many(5):
                    # Work with shared resource
                    ...

        """
        if times <= 0:
            raise ValueError("times must be >= 1")

        try:
            await collect([self.acquire() for _ in range(times)])
            yield
        finally:
            for _ in range(times):
                self.release()


class BoundedSemaphore(Semaphore, asyncio.BoundedSemaphore):
    """Same as `asyncio.BoundedSemaphore` plus some useful methods."""


def _ensure_process_pool_is_initialized() -> None:
    if not PROCESS_POOL.initialized:
        PROCESS_POOL.initialize(max_workers=CPU_CORES)


def _ensure_thread_pool_is_initialized() -> None:
    if not THREAD_POOL.initialized:
        THREAD_POOL.initialize(max_workers=10 * CPU_CORES)


class ExecutorPool:
    """Object representing a pool of Processes or Threads.

    The actual pool is created at `initialization` time
    and it is empty until that.
    """

    def __init__(
        self,
        cls: Union[
            Type[ProcessPoolExecutor],
            Type[ThreadPoolExecutor],
        ],
    ) -> None:
        self._cls = cls
        self._pool: Optional[Executor] = None

    def initialize(self, *, max_workers: Optional[int] = None) -> None:
        """Initialize the executor with a cap of at most `max_workers`.

        Workers are created on-demand as needed or never created at all
        if never needed.
        """
        if self._pool is not None:
            self._pool.shutdown(wait=False)

        self._pool = self._cls(max_workers=max_workers)

    def shutdown(self, *, wait: bool) -> None:
        """Shut down the executor and (optionally) waits for workers to finish."""
        if self._pool is not None:
            self._pool.shutdown(wait=wait)
            self._pool = None

    @property
    def pool(self) -> Executor:
        """Low level pool of workers held by the executor, may be None."""
        if self._pool is None:
            raise RuntimeError("Must call initialize first")

        return self._pool

    @property
    def initialized(self) -> bool:
        """Return true if the executor is initialized and ready to process."""
        return self._pool is not None


def run_decorator(function: F) -> F:
    """Decorator to turn an asynchronous function into a synchronous one.

    Usage:
        >>> @run_decorator
            async def do(a, b=0):
                return a + b

        >>> do(1, b=2)

    Output:

        ```
        3
        ```
    This can be used as a bridge between synchronous and asynchronous code.
    We use it mostly in tests for its convenience over pytest-asyncio plugin.
    """

    @wraps(function)
    def wrapper(*args: Any, **kwargs: Any) -> Any:
        return run(function(*args, **kwargs))

    return cast(F, wrapper)


# Constants
CPU_CORES: int = cpu_count() or 1
"""Number of CPU cores in the host system."""

PROCESS_POOL: ExecutorPool = ExecutorPool(ProcessPoolExecutor)
"""Process pool used by `in_process` function to execute work.

Preconfigured to launch at most `CPU_CORES` processes (if needed).

Proceses are created on the first `in_process` call, one by one as needed
or never launched otherwise.
"""

THREAD_POOL: ExecutorPool = ExecutorPool(ThreadPoolExecutor)
"""Thread pool used by `in_thread` function to execute work.

Preconfigured to launch at most 10 * `CPU_CORES` threads (if needed).

Threads are created on the first `in_thread` call, one by one as needed,
or never launched otherwise.
"""

Global variables

var CPU_CORES : int

Number of CPU cores in the host system.

var PROCESS_POOLExecutorPool

Process pool used by in_process() function to execute work.

Preconfigured to launch at most CPU_CORES processes (if needed).

Proceses are created on the first in_process() call, one by one as needed or never launched otherwise.

var THREAD_POOLExecutorPool

Thread pool used by in_thread() function to execute work.

Preconfigured to launch at most 10 * CPU_CORES threads (if needed).

Threads are created on the first in_thread() call, one by one as needed, or never launched otherwise.

Functions

def run(coroutine: Awaitable[~T], *, debug: bool = False) ‑> ~T

Execute an asynchronous function synchronously and return its result.

Usage

>>> async def do(a, b=0):
        await something
        return a + b
>>> run(do(1, b=2))
>>> 3

This function acts as a drop-in replacement of asyncio.run and installs uvloop (the fastest event-loop implementation out there) if available.

Tip

Use this as the entrypoint for your program.

Expand source code Browse git
def run(coroutine: Awaitable[T], *, debug: bool = False) -> T:
    """Execute an asynchronous function synchronously and return its result.

    Usage:

        >>> async def do(a, b=0):
                await something
                return a + b

        >>> run(do(1, b=2))

        >>> 3

    This function acts as a drop-in replacement of asyncio.run and
    installs `uvloop` (the fastest event-loop implementation out there) if
    available.

    .. tip::
        Use this as the entrypoint for your program.
    """
    if UVLOOP is not None:
        UVLOOP.install()

    return asyncio.run(coroutine, debug=debug)
async def in_thread(function: Callable[..., ~T], *args: Any, **kwargs: Any) ‑> ~T

Execute function(*args, **kwargs) in the configured thread pool.

This is the most performant wrapper for IO bound and high-latency tasks.

Every task will be assigned at most one thread, if there are more tasks than threads in the pool the excess will be executed in FIFO order.

Spawning a million IO bound tasks with this function has a very small memory footprint.

Warning

Executing CPU intensive work here is a bad idea because of the limitations that the GIL imposes.

See in_process() for a CPU performant alternative.

Expand source code Browse git
async def in_thread(
    function: Callable[..., T],
    *args: Any,
    **kwargs: Any,
) -> T:
    """Execute `function(*args, **kwargs)` in the configured thread pool.

    This is the most performant wrapper for IO bound and high-latency tasks.

    Every task will be assigned at most one thread, if there are more tasks
    than threads in the pool the excess will be executed in FIFO order.

    Spawning a million IO bound tasks with this function has a very small
    memory footprint.

    .. warning::
        Executing CPU intensive work here is a bad idea because of the
        limitations that the [GIL](https://realpython.com/python-gil) imposes.

        See `in_process` for a CPU performant alternative.
    """
    _ensure_thread_pool_is_initialized()

    return await asyncio.get_running_loop().run_in_executor(
        THREAD_POOL.pool,
        partial(function, *args, **kwargs),
    )
async def in_process(function: Callable[..., ~T], *args: Any, **kwargs: Any) ‑> ~T

Execute function(*args, **kwargs) in the configured process pool.

This is the most performant wrapper for CPU bound and low-latency tasks.

Tasks executed in a process pool bypass the GIL and can consume all CPU cores available in the host if needed.

Every task will be assigned at most one process, if there are more tasks than processes in the pool the excess will be executed in FIFO order.

Warning

Executing IO intensive work here is possible, but spawning a process has some overhead that can be avoided using threads at no performance expense.

See in_thread() for an IO performant alternative.

Expand source code Browse git
async def in_process(
    function: Callable[..., T],
    *args: Any,
    **kwargs: Any,
) -> T:
    """Execute `function(*args, **kwargs)` in the configured process pool.

    This is the most performant wrapper for CPU bound and low-latency tasks.

    Tasks executed in a process pool bypass the
    [GIL](https://realpython.com/python-gil) and can consume all CPU cores
    available in the host if needed.

    Every task will be assigned at most one process, if there are more tasks
    than processes in the pool the excess will be executed in FIFO order.

    .. warning::
        Executing IO intensive work here is possible, but spawning a process
        has some overhead that can be avoided using threads at no performance
        expense.

        See `in_thread` for an IO performant alternative.
    """
    _ensure_process_pool_is_initialized()

    return await asyncio.get_running_loop().run_in_executor(
        PROCESS_POOL.pool,
        partial(function, *args, **kwargs),
    )
def rate_limited(*, max_calls: int, max_calls_period: Union[float, int], min_seconds_between_calls: Union[float, int] = 0) ‑> Callable[[~F], ~F]

Decorator to turn an asynchronous function into a rate limited one.

The decorated function won't be able to execute more than max_calls times over a period of max_calls_period seconds. The excess will be queued in FIFO mode.

Aditionally, it's guaranteed that no successive calls can be performed faster than min_seconds_between_calls seconds.

Usage

If you want to perform at most 2 calls to a database per second:

>>> @rate_limited(
        max_calls=2,
        max_calls_period=1,
        min_seconds_between_calls=0.2,
    )
    async def query(n):
        await something
        print(f'time: {time()}, doing: {n}')
>>> await collect(map(query, range(10)))

Output

time: 1597706698.0, doing: 0
time: 1597706698.2, doing: 1
time: 1597706699.0, doing: 2
time: 1597706699.2, doing: 3
time: 1597706700.0, doing: 4
time: 1597706700.2, doing: 5
time: 1597706701.0, doing: 6
time: 1597706701.2, doing: 7
time: 1597706702.0, doing: 8
time: 1597706702.2, doing: 9

Tip

Use min_seconds_between_calls as an anti-burst system. This can, for instance, lower your bill in DynamoDB or prevent a cooldown period (also know as ban) by a firewall.

This decorator creates a max_calls sized data structure.

Expand source code Browse git
def rate_limited(
    *,
    max_calls: int,
    max_calls_period: Union[float, int],
    min_seconds_between_calls: Union[float, int] = 0,
) -> Callable[[F], F]:
    """Decorator to turn an asynchronous function into a rate limited one.

    The decorated function won't be able to execute more than `max_calls` times
    over a period of `max_calls_period` seconds. The excess will be queued in
    FIFO mode.

    Aditionally, it's guaranteed that no successive calls can be performed
    faster than `min_seconds_between_calls` seconds.

    Usage:

        If you want to perform at most 2 calls to a database per second:

        >>> @rate_limited(
                max_calls=2,
                max_calls_period=1,
                min_seconds_between_calls=0.2,
            )
            async def query(n):
                await something
                print(f'time: {time()}, doing: {n}')

        >>> await collect(map(query, range(10)))

    Output:

        ```
        time: 1597706698.0, doing: 0
        time: 1597706698.2, doing: 1
        time: 1597706699.0, doing: 2
        time: 1597706699.2, doing: 3
        time: 1597706700.0, doing: 4
        time: 1597706700.2, doing: 5
        time: 1597706701.0, doing: 6
        time: 1597706701.2, doing: 7
        time: 1597706702.0, doing: 8
        time: 1597706702.2, doing: 9
        ```

    .. tip::
        Use `min_seconds_between_calls` as an anti-burst system. This can, for
        instance, lower your bill in DynamoDB or prevent a cooldown period
        (also know as ban) by a firewall.

    This decorator creates a `max_calls` sized data structure.
    """
    if max_calls < 1:
        raise ValueError("max_calls must be >= 1")
    if max_calls_period <= 0:
        raise ValueError("max_calls_period must be > 0")
    if min_seconds_between_calls < 0:
        raise ValueError("min_seconds_between_calls must be >= 0")

    def decorator(function: F) -> F:
        lock = None
        waits: Deque[float] = deque()

        @wraps(function)
        async def wrapper(*args: Any, **kwargs: Any) -> Any:
            nonlocal lock

            lock = lock or asyncio.Lock()
            loop = asyncio.get_event_loop()

            async with lock:
                if waits:
                    # Anti burst control system:
                    #   wait until the difference between the most recent call
                    #   and the current call is >= min_seconds_between_calls
                    await asyncio.sleep(
                        waits[-1] + min_seconds_between_calls - loop.time()
                    )

                while len(waits) >= max_calls:
                    # Rate limit control system:
                    #   wait until the least recent call and the current call
                    #   is >= max_calls_period
                    await asyncio.sleep(
                        waits.popleft() + max_calls_period - loop.time()
                    )

                waits.append(loop.time())

            return await function(*args, **kwargs)

        return cast(F, wrapper)

    return decorator
async def collect(awaitables: Iterable[Awaitable[~T]], *, workers: int = 1024) ‑> Tuple[~T, ...]

Resolve concurrently the input stream and return back in the same order.

At any point in time there will be at most number of workers tasks being resolved concurrently.

Aditionally, the algorithm makes sure that at any point in time every worker is busy.

Args

awaitables
An iterable (generator, list, tuple, set, etc) of awaitables (coroutine, asyncio.Task, or asyncio.Future).
workers
The number of independent workers that will be processing the input stream.

Returns

A tuple with the results of executing each awaitable in the event loop. Results are returned in the same order of the input stream.

Usage

>>> async def do(n):
        print(f'running: {n}')
        await sleep(1)
        print(f'returning: {n}')
        return n
>>> iterable = map(do, range(5))
>>> results = await collect(iterable, workers=2)
>>> print(f'results: {results}')

Output

running: 0
running: 1
returning: 0
returning: 1
running: 2
running: 3
returning: 2
returning: 3
running: 4
returning: 4
results: (0, 1, 2, 3, 4)

Tip

This is similar to asyncio.as_completed. However results are returned in order and allows you to control how much resources are consumed throughout the execution, for instance:

  • How many open files will be opened at the same time
  • How many HTTP requests will be performed to a service (rate limit)
  • How many sockets will be opened concurrently
  • Etc

This is useful for finite resources, for instance: the number of sockets provided by the operative system is limited; going beyond it would make the kernel to kill the program abruptly.

If awaitables is an instance of Sized (has __len__ prototype). This function will launch at most len(awaitables) workers.

Expand source code Browse git
async def collect(
    awaitables: Iterable[Awaitable[T]],
    *,
    workers: int = 1024,
) -> Tuple[T, ...]:
    """Resolve concurrently the input stream and return back in the same order.

    At any point in time there will be at most _number of `workers`_
    tasks being resolved concurrently.

    Aditionally, the algorithm makes sure that at any point in time every
    worker is busy.

    Args:
        awaitables: An iterable (generator, list, tuple, set, etc) of
            awaitables (coroutine, asyncio.Task, or asyncio.Future).
        workers: The number of independent workers that will be processing
            the input stream.

    Returns:
        A tuple with the results of executing each awaitable in the event loop.
        Results are returned in the same order of the input stream.

    Usage:
        >>> async def do(n):
                print(f'running: {n}')
                await sleep(1)
                print(f'returning: {n}')
                return n

        >>> iterable = map(do, range(5))

        >>> results = await collect(iterable, workers=2)

        >>> print(f'results: {results}')

    Output:
        ```
        running: 0
        running: 1
        returning: 0
        returning: 1
        running: 2
        running: 3
        returning: 2
        returning: 3
        running: 4
        returning: 4
        results: (0, 1, 2, 3, 4)
        ```

    .. tip::
        This is similar to asyncio.as_completed. However results are returned
        in order and allows you to control how much resources are consumed
        throughout the execution, for instance:

        - How many open files will be opened at the same time
        - How many HTTP requests will be performed to a service (rate limit)
        - How many sockets will be opened concurrently
        - Etc

        This is useful for finite resources, for instance: the number
        of sockets provided by the operative system is limited; going beyond it
        would make the kernel to kill the program abruptly.

    If awaitables is an instance of Sized (has `__len__` prototype).
    This function will launch at most `len(awaitables)` workers.
    """
    return tuple(
        [
            await elem
            for elem in resolve(
                awaitables,
                workers=workers,
                worker_greediness=0,
            )
        ]
    )
def resolve(awaitables: Iterable[Awaitable[~T]], *, workers: int = 1024, worker_greediness: int = 0) ‑> Iterable[Awaitable[~T]]

Resolve concurrently the input stream and yield back in the same order.

At any point in time there will be at most number of workers tasks being resolved concurrently.

Aditionally, the algorithm makes sure that at any point in time every worker is busy (if greediness allow them).

Args

awaitables
An iterable (generator, list, tuple, set, etc) of awaitables (coroutine, asyncio.Task, or asyncio.Future).
workers
The number of independent workers that will be processing the input stream.
worker_greediness
How much tasks can a worker process before waiting for you to retrieve its results. 0 means unlimited. Set to non-zero in order to upper-bound memory usage throughout the execution.

Yields

A future with the result of the next ready task. Futures are yielded in the same order of the input stream.

Usage

>>> async def do(n):
        print(f'running: {n}')
        await asyncio.sleep(1)
        print(f'returning: {n}')
        return n
>>> iterable = map(do, range(5))
>>> for next in resolve(iterable, workers=2):
        try:
            print(f'got resolved result: {await next}')
        except:
            pass  # Handle possible exceptions

Output

running: 0
running: 1
returning: 0
returning: 1
got resolved result: 0
got resolved result: 1
running: 2
running: 3
returning: 2
returning: 3
got resolved result: 2
got resolved result: 3
running: 4
returning: 4
got resolved result: 4

Tip

This is similar to asyncio.as_completed. However results are returned in order and allows you to control how much resources are consumed throughout the execution, for instance:

  • How many open files will be opened at the same time
  • How many HTTP requests will be performed to a service (rate limit)
  • How many sockets will be opened concurrently
  • Etc

This is useful for finite resources, for instance: the number of sockets provided by the operative system is limited; going beyond it would make the kernel to kill the program abruptly.

If awaitables is an instance of Sized (has __len__ prototype). This function will launch at most len(awaitables) workers.

Expand source code Browse git
def resolve(  # noqa: mccabe
    awaitables: Iterable[Awaitable[T]],
    *,
    workers: int = 1024,
    worker_greediness: int = 0,
) -> Iterable[Awaitable[T]]:
    """Resolve concurrently the input stream and yield back in the same order.

    At any point in time there will be at most _number of `workers`_
    tasks being resolved concurrently.

    Aditionally, the algorithm makes sure that at any point in time every
    worker is busy (if greediness allow them).

    Args:
        awaitables: An iterable (generator, list, tuple, set, etc) of
            awaitables (coroutine, asyncio.Task, or asyncio.Future).
        workers: The number of independent workers that will be processing
            the input stream.
        worker_greediness: How much tasks can a worker process before waiting
            for you to retrieve its results. 0 means unlimited. Set to non-zero
            in order to upper-bound memory usage throughout the execution.

    Yields:
        A future with the result of the next ready task. Futures are yielded in
        the same order of the input stream.

    Usage:
        >>> async def do(n):
                print(f'running: {n}')
                await asyncio.sleep(1)
                print(f'returning: {n}')
                return n

        >>> iterable = map(do, range(5))

        >>> for next in resolve(iterable, workers=2):
                try:
                    print(f'got resolved result: {await next}')
                except:
                    pass  # Handle possible exceptions

    Output:
        ```
        running: 0
        running: 1
        returning: 0
        returning: 1
        got resolved result: 0
        got resolved result: 1
        running: 2
        running: 3
        returning: 2
        returning: 3
        got resolved result: 2
        got resolved result: 3
        running: 4
        returning: 4
        got resolved result: 4
        ```

    .. tip::
        This is similar to asyncio.as_completed. However results are returned
        in order and allows you to control how much resources are consumed
        throughout the execution, for instance:

        - How many open files will be opened at the same time
        - How many HTTP requests will be performed to a service (rate limit)
        - How many sockets will be opened concurrently
        - Etc

        This is useful for finite resources, for instance: the number
        of sockets provided by the operative system is limited; going beyond it
        would make the kernel to kill the program abruptly.

    If awaitables is an instance of Sized (has `__len__` prototype).
    This function will launch at most `len(awaitables)` workers.
    """
    if workers < 1:
        raise ValueError("workers must be >= 1")
    if worker_greediness < 0:
        raise ValueError("worker_greediness must be >= 0")

    if hasattr(awaitables, "__len__"):
        workers = min(workers, len(awaitables))  # type: ignore

    loop = asyncio.get_event_loop()
    store: Dict[int, asyncio.Queue] = {}
    stream, stream_copy = tee(enumerate(awaitables))
    stream_finished = asyncio.Event()
    workers_up = asyncio.Event()
    workers_tasks: Dict[int, asyncio.Task] = {}

    async def worker() -> None:
        done: asyncio.Queue = asyncio.Queue(worker_greediness)
        for index, awaitable in stream:
            store[index] = done
            future = loop.create_future()
            future.set_result(await schedule(awaitable, loop=loop))
            await done.put(future)
            workers_up.set()
        workers_up.set()
        stream_finished.set()

    async def start_workers() -> None:
        for index in range(workers):
            if stream_finished.is_set():
                break
            workers_tasks[index] = asyncio.create_task(worker())
            await force_loop_cycle()
        await workers_up.wait()

    async def get_one(index: int) -> Awaitable[T]:
        if not workers_tasks:
            await start_workers()

        awaitable = await store.pop(index).get()
        result: Awaitable[T] = (await awaitable).result()
        return result

    for index, _ in stream_copy:
        yield cast(Awaitable[T], get_one(index))
async def force_loop_cycle() ‑> NoneType

Force the event loop to perform one cycle.

This can be used to suspend the execution of the current coroutine and yield control back to the event-loop until the next cycle.

Can be seen as a forceful switch of control between threads. Useful for cooperative initialization.

Usage

>>> await forceforce_loop_cycle()
Expand source code Browse git
async def force_loop_cycle() -> None:
    """Force the event loop to perform one cycle.

    This can be used to suspend the execution of the current coroutine and
    yield control back to the event-loop until the next cycle.

    Can be seen as a forceful switch of control between threads.
    Useful for cooperative initialization.

    Usage:

        >>> await forceforce_loop_cycle()

    """
    await asyncio.sleep(0)
async def generate_in_thread(generator_func: Callable[..., Generator[~Y, ~S, NoneType]], *args: Any, **kwargs: Any) ‑> AsyncGenerator[~Y, ~S]

Mimic generator_func(*args, **kwargs) in the configured thread pool.

Note that generator_func(*args, **kwargs) may return a generator or an interator and both cases are handled rightfully.

Usage

>>> from os import scandir
>>> async for entry in generate_in_thread(scandir, '.'):
        print(entry.name)

Output

.gitignore
LICENSE.md
README.md
...

Calls to the generator are done serially and not concurrently.

The benefit of wrapping a generator with this function is that the event-loop is free to schedule and wait another tasks in the mean time. For instance, in a web server.

Expand source code Browse git
async def generate_in_thread(
    generator_func: Callable[..., Generator[Y, S, None]],
    *args: Any,
    **kwargs: Any,
) -> AsyncGenerator[Y, S]:
    """Mimic `generator_func(*args, **kwargs)` in the configured thread pool.

    Note that `generator_func(*args, **kwargs)` may return a generator or an
    interator and both cases are handled rightfully.

    Usage:

        >>> from os import scandir

        >>> async for entry in generate_in_thread(scandir, '.'):
                print(entry.name)

    Output:
        ```
        .gitignore
        LICENSE.md
        README.md
        ...
        ```

    Calls to the generator are done serially and not concurrently.

    The benefit of wrapping a generator with this function is that the
    event-loop is free to schedule and wait another tasks in the mean time.
    For instance, in a web server.
    """
    gen: Generator[Y, S, None] = generator_func(*args, **kwargs)
    gen_sent: Any = None

    def gen_next(val: S) -> Y:
        with suppress(StopIteration):
            return gen.send(val) if hasattr(gen, "send") else next(gen)
        raise StopAsyncIteration()

    while True:
        try:
            gen_sent = yield await in_thread(gen_next, gen_sent)
        except StopAsyncIteration:
            return
def schedule(awaitable: Awaitable[~T], *, loop: Union[asyncio.events.AbstractEventLoop, NoneType] = None) ‑> Awaitable[asyncio.Future[T]]

Schedule an awaitable in the event loop and return a wrapper for it.

Usage

>>> async def do(n):
        print(f'running: {n}')
        await sleep(1)
        print(f'returning: {n}')
>>> task = schedule(do(3))  # Task is executing in the background now
>>> print('other work is being done here')
>>> task_result = await task  # Wait until the task is ready
>>> print(f'result: {task_result.result()}')  # may rise if do() raised

Output

other work is being done here
doing: 3
returning: 3
3

This works very similar to asyncio.create_task. The main difference is that the result (or exception) can be accessed via exception() or result() methods.

If an exception was raised by the awaitable, it will be propagated only at the moment result() is called and never otherwise.

Expand source code Browse git
def schedule(
    awaitable: Awaitable[T],
    *,
    loop: Optional[asyncio.AbstractEventLoop] = None,
) -> "Awaitable[asyncio.Future[T]]":
    """Schedule an awaitable in the event loop and return a wrapper for it.

    Usage:

        >>> async def do(n):
                print(f'running: {n}')
                await sleep(1)
                print(f'returning: {n}')

        >>> task = schedule(do(3))  # Task is executing in the background now

        >>> print('other work is being done here')

        >>> task_result = await task  # Wait until the task is ready

        >>> print(f'result: {task_result.result()}')  # may rise if do() raised

    Output:

        ```
        other work is being done here
        doing: 3
        returning: 3
        3
        ```

    This works very similar to asyncio.create_task. The main difference is that
    the result (or exception) can be accessed via exception() or result()
    methods.

    If an exception was raised by the awaitable, it will be propagated only at
    the moment result() is called and never otherwise.
    """
    wrapper = (loop or asyncio.get_event_loop()).create_future()

    def _done_callback(future: asyncio.Future) -> None:
        if not wrapper.done():  # pragma: no cover
            wrapper.set_result(future)

    asyncio.create_task(awaitable).add_done_callback(_done_callback)

    return wrapper
def run_decorator(function: ~F) ‑> ~F

Decorator to turn an asynchronous function into a synchronous one.

Usage

>>> @run_decorator
    async def do(a, b=0):
        return a + b
>>> do(1, b=2)

Output

3

This can be used as a bridge between synchronous and asynchronous code. We use it mostly in tests for its convenience over pytest-asyncio plugin.

Expand source code Browse git
def run_decorator(function: F) -> F:
    """Decorator to turn an asynchronous function into a synchronous one.

    Usage:
        >>> @run_decorator
            async def do(a, b=0):
                return a + b

        >>> do(1, b=2)

    Output:

        ```
        3
        ```
    This can be used as a bridge between synchronous and asynchronous code.
    We use it mostly in tests for its convenience over pytest-asyncio plugin.
    """

    @wraps(function)
    def wrapper(*args: Any, **kwargs: Any) -> Any:
        return run(function(*args, **kwargs))

    return cast(F, wrapper)

Classes

class Semaphore (value=1, *, loop=None)

Same as asyncio.Semaphore plus some useful methods.

Expand source code Browse git
class Semaphore(asyncio.Semaphore):
    """Same as `asyncio.Semaphore` plus some useful methods."""

    @asynccontextmanager
    async def acquire_many(self, times: int) -> AsyncIterator[None]:
        """Acquire a semaphore many times, and release on exit.

        Usage:

            >>> async with semaphore.acquire_many(5):
                    # Work with shared resource
                    ...

        """
        if times <= 0:
            raise ValueError("times must be >= 1")

        try:
            await collect([self.acquire() for _ in range(times)])
            yield
        finally:
            for _ in range(times):
                self.release()

Ancestors

  • asyncio.locks.Semaphore
  • asyncio.locks._ContextManagerMixin

Subclasses

Methods

async def acquire_many(self, times: int) ‑> AsyncIterator[NoneType]

Acquire a semaphore many times, and release on exit.

Usage

>>> async with semaphore.acquire_many(5):
        # Work with shared resource
        ...
Expand source code Browse git
@asynccontextmanager
async def acquire_many(self, times: int) -> AsyncIterator[None]:
    """Acquire a semaphore many times, and release on exit.

    Usage:

        >>> async with semaphore.acquire_many(5):
                # Work with shared resource
                ...

    """
    if times <= 0:
        raise ValueError("times must be >= 1")

    try:
        await collect([self.acquire() for _ in range(times)])
        yield
    finally:
        for _ in range(times):
            self.release()
class BoundedSemaphore (value=1, *, loop=None)

Same as asyncio.BoundedSemaphore plus some useful methods.

Expand source code Browse git
class BoundedSemaphore(Semaphore, asyncio.BoundedSemaphore):
    """Same as `asyncio.BoundedSemaphore` plus some useful methods."""

Ancestors

  • Semaphore
  • asyncio.locks.BoundedSemaphore
  • asyncio.locks.Semaphore
  • asyncio.locks._ContextManagerMixin

Inherited members

class ExecutorPool (cls: Union[Type[concurrent.futures.process.ProcessPoolExecutor], Type[concurrent.futures.thread.ThreadPoolExecutor]])

Object representing a pool of Processes or Threads.

The actual pool is created at initialization time and it is empty until that.

Expand source code Browse git
class ExecutorPool:
    """Object representing a pool of Processes or Threads.

    The actual pool is created at `initialization` time
    and it is empty until that.
    """

    def __init__(
        self,
        cls: Union[
            Type[ProcessPoolExecutor],
            Type[ThreadPoolExecutor],
        ],
    ) -> None:
        self._cls = cls
        self._pool: Optional[Executor] = None

    def initialize(self, *, max_workers: Optional[int] = None) -> None:
        """Initialize the executor with a cap of at most `max_workers`.

        Workers are created on-demand as needed or never created at all
        if never needed.
        """
        if self._pool is not None:
            self._pool.shutdown(wait=False)

        self._pool = self._cls(max_workers=max_workers)

    def shutdown(self, *, wait: bool) -> None:
        """Shut down the executor and (optionally) waits for workers to finish."""
        if self._pool is not None:
            self._pool.shutdown(wait=wait)
            self._pool = None

    @property
    def pool(self) -> Executor:
        """Low level pool of workers held by the executor, may be None."""
        if self._pool is None:
            raise RuntimeError("Must call initialize first")

        return self._pool

    @property
    def initialized(self) -> bool:
        """Return true if the executor is initialized and ready to process."""
        return self._pool is not None

Instance variables

var pool : concurrent.futures._base.Executor

Low level pool of workers held by the executor, may be None.

Expand source code Browse git
@property
def pool(self) -> Executor:
    """Low level pool of workers held by the executor, may be None."""
    if self._pool is None:
        raise RuntimeError("Must call initialize first")

    return self._pool
var initialized : bool

Return true if the executor is initialized and ready to process.

Expand source code Browse git
@property
def initialized(self) -> bool:
    """Return true if the executor is initialized and ready to process."""
    return self._pool is not None

Methods

def initialize(self, *, max_workers: Union[int, NoneType] = None) ‑> NoneType

Initialize the executor with a cap of at most max_workers.

Workers are created on-demand as needed or never created at all if never needed.

Expand source code Browse git
def initialize(self, *, max_workers: Optional[int] = None) -> None:
    """Initialize the executor with a cap of at most `max_workers`.

    Workers are created on-demand as needed or never created at all
    if never needed.
    """
    if self._pool is not None:
        self._pool.shutdown(wait=False)

    self._pool = self._cls(max_workers=max_workers)
def shutdown(self, *, wait: bool) ‑> NoneType

Shut down the executor and (optionally) waits for workers to finish.

Expand source code Browse git
def shutdown(self, *, wait: bool) -> None:
    """Shut down the executor and (optionally) waits for workers to finish."""
    if self._pool is not None:
        self._pool.shutdown(wait=wait)
        self._pool = None