Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bpo-32309: Implement asyncio.ThreadPool #18410

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions Doc/library/asyncio-pools.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
.. currentmodule:: asyncio

.. versionadded:: 3.9

.. _asyncio-pools:

=====
Pools
=====

**Source code:** :source:`Lib/asyncio/pools.py`

-----------------------------------------------

.. note::
This section of the documentation and all of its members have been
added *provisionally* to asyncio's API. For more details, see
:term:`provisional api`.

Asyncio pools are high-level, asynchronous context managers that can be used
to concurrently run blocking functions and methods.

There are many potential use cases, but a particularly useful one is for
combining libraries without asyncio support with existing asyncio programs.
Normally, calling a non-async function within the event loop would will result
in blocking the event loop until the function returns. However, by using a
pool to run the function, it can be executed in a separate worker (such as a
thread or process) without blocking the event loop.

.. class:: ThreadPool(concurrency=None)

An asynchronous thread pool that provides methods to concurrently
run IO-bound functions, without blocking the event loop.

*concurrency* is an optional argument that limits the number of
threads to utilize in the thread pool. With the default value of
``None``, the amount of threads used will scale based on the
number of processors.

.. coroutinemethod:: run(func, /, *args, **kwargs)

Asynchronously run *func* with its arguments and keyword-arguments
within the thread pool, and return a :class:`asyncio.Future` object
that represents the eventual result of its execution. ::

async with asyncio.ThreadPool() as pool:
await pool.run(time.sleep, 1)

Raises a :exc:`RuntimeError` if the thread pool is *not* running.

.. coroutinemethod:: astart()

Start the thread pool and spawn its threads. Note that
this function is called automatically when using ``asyncio.ThreadPool``
as an asynchronous context manager, and does not need to be called
directly.

Raises a :exc:`RuntimeError` if the thread pool is already running or
if it's been closed.

.. coroutinemethod:: aclose()
aeros marked this conversation as resolved.
Show resolved Hide resolved

Close the thread pool. Note that this function is
called automatically when using ``asyncio.ThreadPool`` as an
asynchronous context manager, and does not need to be called directly.

Raises a :exc:`RuntimeError` if the thread pool has already been closed.

Examples
========

Here's an example of concurrently running two IO-bound functions using
:class:`asyncio.ThreadPool`::

import asyncio

def blocking_io():
print("start blocking_io")
with open('/dev/urandom', 'rb') as f:
f.read(100_000)
print("blocking_io complete")

def other_blocking_io():
print("start other_blocking_io")
with open('/dev/zero', 'rb') as f:
f.read(10)
print("other_blocking_io complete")

async def main():
async with asyncio.ThreadPool() as pool:
await asyncio.gather(
pool.run(blocking_io),
pool.run(other_blocking_io))

asyncio.run(main())
3 changes: 3 additions & 0 deletions Doc/library/asyncio.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ asyncio provides a set of **high-level** APIs to:

* :ref:`synchronize <asyncio-sync>` concurrent code;

* concurrently run blocking functions in a :ref:`pool <asyncio-pools>`;

Additionally, there are **low-level** APIs for
*library and framework developers* to:

Expand Down Expand Up @@ -73,6 +75,7 @@ Additionally, there are **low-level** APIs for
asyncio-subprocess.rst
asyncio-queue.rst
asyncio-exceptions.rst
asyncio-pools.rst

.. toctree::
:caption: Low-level APIs
Expand Down
7 changes: 7 additions & 0 deletions Doc/whatsnew/3.9.rst
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,13 @@ details, see the documentation for ``loop.create_datagram_endpoint()``.
(Contributed by Kyle Stanley, Antoine Pitrou, and Yury Selivanov in
:issue:`37228`.)

Added :class:`asyncio.ThreadPool`, an asynchronous context manager for
concurrently running IO-bound functions without blocking the event loop.
It essentially works as a higher-level version of
:meth:`asyncio.loop.run_in_executor` that can take keyword arguments and
be used as a context manager using ``async with``.
(Contributed by Kyle Stanley in :issue:`32309`.)

Added a new :term:`coroutine` :meth:`~asyncio.loop.shutdown_default_executor`
that schedules a shutdown for the default executor that waits on the
:class:`~concurrent.futures.ThreadPoolExecutor` to finish closing. Also,
Expand Down
2 changes: 2 additions & 0 deletions Lib/asyncio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from .exceptions import *
from .futures import *
from .locks import *
from .pools import *
from .protocols import *
from .runners import *
from .queues import *
Expand All @@ -29,6 +30,7 @@
exceptions.__all__ +
futures.__all__ +
locks.__all__ +
pools.__all__ +
protocols.__all__ +
runners.__all__ +
queues.__all__ +
Expand Down
152 changes: 152 additions & 0 deletions Lib/asyncio/pools.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
"""Support for high-level asynchronous pools in asyncio."""

__all__ = 'ThreadPool',


import concurrent.futures
import functools
import threading
import os

from abc import ABC, abstractmethod

from . import events
from . import exceptions
from . import futures


class AbstractPool(ABC):
"""Abstract base class for asynchronous pools."""

@abstractmethod
async def astart(self):
raise NotImplementedError
aeros marked this conversation as resolved.
Show resolved Hide resolved

@abstractmethod
async def __aenter__(self):
await self.astart()
return self

@abstractmethod
async def aclose(self):
raise NotImplementedError

@abstractmethod
async def __aexit__(self, exc_type, exc_value, exc_traceback):
await self.aclose()

@abstractmethod
async def run(self, func, /, *args, **kwargs):
"""Asynchronously run function *func* using the pool.

Return a future, representing the eventual result of *func*.
"""
raise NotImplementedError


class ThreadPool(AbstractPool):
"""Asynchronous thread pool for running IO-bound functions.

Directly calling an IO-bound function within the main thread will block
other operations from occurring until it is completed. By using a
thread pool, several IO-bound functions can be ran concurrently within
their own threads, without blocking other operations.

The optional argument *concurrency* sets the number of threads within the
thread pool. If *concurrency* is `None`, the maximum number of threads will
be used; based on the number of CPU cores.

This thread pool is intended to be used as an asynchronous context manager,
using the `async with` syntax, which provides automatic initialization and
finalization of resources. For example:

import asyncio

def blocking_io():
print("start blocking_io")
with open('/dev/urandom', 'rb') as f:
f.read(100_000)
print("blocking_io complete")

def other_blocking_io():
print("start other_blocking_io")
with open('/dev/zero', 'rb') as f:
f.read(10)
print("other_blocking_io complete")

async def main():
async with asyncio.ThreadPool() as pool:
await asyncio.gather(
pool.run(blocking_io),
pool.run(other_blocking_io))

asyncio.run(main())
"""

def __init__(self, concurrency=None):
if concurrency is None:
concurrency = min(32, (os.cpu_count() or 1) + 4)
aeros marked this conversation as resolved.
Show resolved Hide resolved

self._concurrency = concurrency
self._running = False
self._closed = False
self._loop = None
self._pool = None

async def astart(self):
self._loop = events.get_running_loop()
await self._spawn_threadpool()

async def __aenter__(self):
await self.astart()
return self

async def aclose(self):
await self._shutdown_threadpool()

async def __aexit__(self, exc_type, exc_value, exc_traceback):
await self.aclose()

async def run(self, func, /, *args, **kwargs):
if not self._running:
raise RuntimeError(f"unable to run {func!r}, "
"thread pool is not running")

func_call = functools.partial(func, *args, **kwargs)
executor = self._pool
return await futures.wrap_future(
executor.submit(func_call), loop=self._loop)

async def _spawn_threadpool(self):
"""Spawn the thread pool.

Asynchronously spawns a thread pool with *concurrency* threads.
"""
if self._running:
raise RuntimeError("thread pool is already running")

if self._closed:
raise RuntimeError("thread pool is closed")

await self._loop.run_in_executor(None, self._do_spawn)

def _do_spawn(self):
self._pool = concurrent.futures.ThreadPoolExecutor(
max_workers=self._concurrency)
self._running = True

async def _shutdown_threadpool(self):
"""Shutdown the thread pool.

Asynchronously joins all of the threads in the thread pool.
"""
if self._closed:
raise RuntimeError("thread pool is already closed")

# Set _running to False as early as possible
self._running = False
await self._loop.run_in_executor(None, self._do_shutdown)

def _do_shutdown(self):
self._pool.shutdown()
self._closed = True
Loading