From de321dbdcabec223a4c4368d3f18689f0405b14f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Thu, 30 Jul 2020 19:18:39 +0300 Subject: [PATCH] Added support for subprocesses Fixes #9. --- docs/api.rst | 9 +++ docs/index.rst | 1 + docs/subprocesses.rst | 58 +++++++++++++++++++ docs/versionhistory.rst | 1 + src/anyio/__init__.py | 78 +++++++++++++++++++++++++- src/anyio/_backends/_asyncio.py | 98 +++++++++++++++++++++++++++++++++ src/anyio/_backends/_curio.py | 88 +++++++++++++++++++++++++++++ src/anyio/_backends/_trio.py | 90 ++++++++++++++++++++++++++++++ src/anyio/abc/__init__.py | 5 +- src/anyio/abc/subprocesses.py | 77 ++++++++++++++++++++++++++ tests/test_subprocesses.py | 76 +++++++++++++++++++++++++ 11 files changed, 578 insertions(+), 3 deletions(-) create mode 100644 docs/subprocesses.rst create mode 100644 src/anyio/abc/subprocesses.py create mode 100644 tests/test_subprocesses.py diff --git a/docs/api.rst b/docs/api.rst index 163226f07..1639945cd 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -131,6 +131,15 @@ Sockets and networking .. autoclass:: anyio.abc.sockets.ConnectedUDPSocket :members: +Subprocesses +------------ + +.. autofunction:: anyio.run_process +.. autofunction:: anyio.open_process + +.. autoclass:: anyio.abc.subprocesses.Process + :members: + Synchronization --------------- diff --git a/docs/index.rst b/docs/index.rst index e733a1dc8..66f81aff0 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -16,6 +16,7 @@ The manual streams networking threads + subprocesses fileio signals testing diff --git a/docs/subprocesses.rst b/docs/subprocesses.rst new file mode 100644 index 000000000..024b38783 --- /dev/null +++ b/docs/subprocesses.rst @@ -0,0 +1,58 @@ +Using subprocesses +================== + +AnyIO allows you to run arbitrary executables in subprocesses, either as a one-shot call or by +opening a process handle for you that gives you more control over the subprocess. + +You can either give the command as a string, in which case it is passed to your default shell +(equivalent to ``shell=True`` in :func:`subprocess.run`), or as a sequence of strings +(``shell=False``) in which the executable is the first item in the sequence and the rest are +arguments passed to it. + +.. note:: The subprocess facilities provided by AnyIO do not include a way to execute arbitrary + Python code like the :mod:`multiprocessing` module does, but they can be used as + building blocks for such a feature. + +Running one-shot commands +------------------------- + +To run an external command with one call, use :func:`~anyio.run_process`:: + + from anyio import run_process, run + + + async def main(): + result = await run_process('/usr/bin/ps') + print(result.stdout.decode()) + + run(main) + +The snippet above runs the ``ps`` command within a shell (. To run it directly:: + + from anyio import run_process, run + + + async def main(): + result = await run_process(['/usr/bin/ps']) + print(result.stdout.decode()) + + run(main) + +Working with processes +---------------------- + +When you have more complex requirements for your interaction with subprocesses, you can launch one +with :func:`~anyio.open_process`:: + + from anyio import open_process, run + from anyio.streams.text import TextReceiveStream + + + async def main(): + async with await open_process(['/usr/bin/ps']) as process: + for text in TextReceiveStream(process.stdout): + print(text) + + run(main) + +See the API documentation of :class:`~anyio.abc.subprocesses.Process` for more information. diff --git a/docs/versionhistory.rst b/docs/versionhistory.rst index 10c88e3dc..ada0a55cc 100644 --- a/docs/versionhistory.rst +++ b/docs/versionhistory.rst @@ -49,6 +49,7 @@ This library adheres to `Semantic Versioning 2.0 `_. - Unified checkpoint behavior: a cancellation check now calls ``sleep(0)`` on asyncio and curio, allowing the scheduler to switch to a different task - Added memory object streams +- Added support for subprocesses - Dropped support for Python 3.5 - Bumped minimum versions of trio and curio to v0.16 and v1.2, respectively - Fixed a bug where a task group would abandon its subtasks if its own cancel scope was cancelled diff --git a/src/anyio/__init__.py b/src/anyio/__init__.py index c177ba75a..4a83491bc 100644 --- a/src/anyio/__init__.py +++ b/src/anyio/__init__.py @@ -10,6 +10,7 @@ from ipaddress import ip_address, IPv6Address from pathlib import Path from socket import AddressFamily, SocketKind +from subprocess import PIPE, CompletedProcess, DEVNULL, CalledProcessError from typing import TypeVar, Callable, Union, Optional, Awaitable, Coroutine, Any, Dict, List, Tuple import sniffio @@ -17,7 +18,7 @@ from ._utils import convert_ipv6_sockaddr from .abc import ( Lock, Condition, Event, Semaphore, CapacityLimiter, CancelScope, TaskGroup, IPAddressType, - SocketStream, UDPSocket, ConnectedUDPSocket, IPSockAddrType, Listener, SocketListener, + SocketStream, UDPSocket, ConnectedUDPSocket, IPSockAddrType, Listener, SocketListener, Process, AsyncResource) from .fileio import AsyncFile from .streams.tls import TLSStream @@ -280,6 +281,81 @@ def current_default_worker_thread_limiter() -> CapacityLimiter: return _get_asynclib().current_default_thread_limiter() +# +# Subprocesses +# + +async def run_process(command: Union[str, typing.Sequence[str]], *, input: Optional[bytes] = None, + stdout: int = PIPE, stderr: int = PIPE, + check: bool = True) -> CompletedProcess: + """ + Run an external command in a subprocess and wait until it completes. + + .. seealso:: :func:`subprocess.run` + + :param command: either a string to pass to the shell, or an iterable of strings containing the + executable name or path and its arguments + :param input: bytes passed to the standard input of the subprocess + :param stdout: either :data:`subprocess.PIPE` or :data:`subprocess.DEVNULL` + :param stderr: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL` or + :data:`subprocess.STDOUT` + :param check: if ``True``, raise :exc:`~subprocess.CalledProcessError` if the process + terminates with a return code other than 0 + :return: an object representing the completed process + :raises CalledProcessError: if ``check`` is ``True`` and the process exits with a nonzero + return code + + """ + async def drain_stream(stream, index): + chunks = [chunk async for chunk in stream] + stream_contents[index] = b''.join(chunks) + + async with await open_process(command, stdin=PIPE if input else DEVNULL, stdout=stdout, + stderr=stderr) as process: + stream_contents = [None, None] + try: + async with create_task_group() as tg: + if process.stdout: + await tg.spawn(drain_stream, process.stdout, 0) + if process.stderr: + await tg.spawn(drain_stream, process.stderr, 1) + if process.stdin and input: + await process.stdin.send(input) + await process.stdin.aclose() + + await process.wait() + except BaseException: + process.kill() + raise + + output, errors = stream_contents + if check and process.returncode != 0: + raise CalledProcessError(typing.cast(int, process.returncode), command, output, errors) + + return CompletedProcess(command, typing.cast(int, process.returncode), output, errors) + + +async def open_process(command: Union[str, typing.Sequence[str]], *, stdin: int = PIPE, + stdout: int = PIPE, stderr: int = PIPE) -> Process: + """ + Start an external command in a subprocess. + + .. seealso:: :class:`subprocess.Popen` + + :param command: either a string to pass to the shell, or an iterable of strings containing the + executable name or path and its arguments + :param stdin: either :data:`subprocess.PIPE` or :data:`subprocess.DEVNULL` + :param stdout: either :data:`subprocess.PIPE` or :data:`subprocess.DEVNULL` + :param stderr: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL` or + :data:`subprocess.STDOUT` + :return: an asynchronous process object + + """ + shell = isinstance(command, str) + return await _get_asynclib().open_process(command, shell=shell, stdin=stdin, stdout=stdout, + stderr=stderr) + + # # Async file I/O # diff --git a/src/anyio/_backends/_asyncio.py b/src/anyio/_backends/_asyncio.py index d85e45dbd..77556f51c 100644 --- a/src/anyio/_backends/_asyncio.py +++ b/src/anyio/_backends/_asyncio.py @@ -14,6 +14,8 @@ List, Dict, Sequence, Type, Deque) from weakref import WeakKeyDictionary +from dataclasses import dataclass + from .. import ( abc, claim_worker_thread, _local, T_Retval, TaskInfo, GetAddrInfoReturnType) from ..abc import IPSockAddrType, SockAddrType @@ -526,6 +528,102 @@ def run_async_from_thread(func: Callable[..., Coroutine[Any, Any, T_Retval]], *a return f.result() +# +# Subprocesses +# + +@dataclass +class StreamReaderWrapper(abc.ByteReceiveStream): + _stream: asyncio.StreamReader + + async def receive(self, max_bytes: int = 65536) -> bytes: + data = await self._stream.read(max_bytes) + if data: + return data + else: + raise EndOfStream + + async def aclose(self) -> None: + self._stream.feed_eof() + + +@dataclass +class StreamWriterWrapper(abc.ByteSendStream): + _stream: asyncio.StreamWriter + + async def send(self, item: bytes) -> None: + self._stream.write(item) + await self._stream.drain() + + async def aclose(self) -> None: + self._stream.close() + + +@dataclass +class Process(abc.Process): + _process: asyncio.subprocess.Process + _stdin: Optional[abc.ByteSendStream] + _stdout: Optional[abc.ByteReceiveStream] + _stderr: Optional[abc.ByteReceiveStream] + + async def aclose(self) -> None: + if self._stdin: + await self._stdin.aclose() + if self._stdout: + await self._stdout.aclose() + if self._stderr: + await self._stderr.aclose() + + await self.wait() + + async def wait(self) -> int: + return await self._process.wait() + + def terminate(self) -> None: + self._process.terminate() + + def kill(self) -> None: + self._process.kill() + + def send_signal(self, signal: int) -> None: + self._process.send_signal(signal) + + @property + def pid(self) -> int: + return self._process.pid + + @property + def returncode(self) -> Optional[int]: + return self._process.returncode + + @property + def stdin(self) -> Optional[abc.ByteSendStream]: + return self._stdin + + @property + def stdout(self) -> Optional[abc.ByteReceiveStream]: + return self._stdout + + @property + def stderr(self) -> Optional[abc.ByteReceiveStream]: + return self._stderr + + +async def open_process(command, *, shell: bool, stdin: int, stdout: int, stderr: int): + await check_cancelled() + if shell: + process = await asyncio.create_subprocess_shell(command, stdin=stdin, stdout=stdout, + stderr=stderr) + else: + process = await asyncio.create_subprocess_exec(*command, stdin=stdin, stdout=stdout, + stderr=stderr) + + stdin_stream = StreamWriterWrapper(process.stdin) if process.stdin else None + stdout_stream = StreamReaderWrapper(process.stdout) if process.stdout else None + stderr_stream = StreamReaderWrapper(process.stderr) if process.stderr else None + return Process(process, stdin_stream, stdout_stream, stderr_stream) + + # # Sockets and networking # diff --git a/src/anyio/_backends/_curio.py b/src/anyio/_backends/_curio.py index 9de322810..dfc74f0dc 100644 --- a/src/anyio/_backends/_curio.py +++ b/src/anyio/_backends/_curio.py @@ -17,7 +17,9 @@ import curio.meta import curio.socket import curio.ssl +import curio.subprocess import curio.traps +from dataclasses import dataclass from .. import ( abc, T_Retval, claim_worker_thread, TaskInfo, _local, GetAddrInfoReturnType, IPSockAddrType) @@ -444,6 +446,92 @@ def run_async_from_thread(func: Callable[..., T_Retval], *args) -> T_Retval: return future.result() +# +# Subprocesses +# + +class FileStreamWrapper(abc.ByteStream): + def __init__(self, stream: curio.io.FileStream): + super().__init__() + self._stream = stream + + async def receive(self, max_bytes: Optional[int] = None) -> bytes: + data = await self._stream.read(max_bytes or 65536) + if data: + return data + else: + raise EndOfStream + + async def send(self, item: bytes) -> None: + await self._stream.write(item) + + async def send_eof(self) -> None: + raise NotImplementedError + + async def aclose(self) -> None: + await self._stream.close() + + +@dataclass +class Process(abc.Process): + _process: curio.subprocess.Popen + _stdin: Optional[abc.ByteSendStream] + _stdout: Optional[abc.ByteReceiveStream] + _stderr: Optional[abc.ByteReceiveStream] + + async def aclose(self) -> None: + if self._stdin: + await self._stdin.aclose() + if self._stdout: + await self._stdout.aclose() + if self._stderr: + await self._stderr.aclose() + + await self.wait() + + async def wait(self) -> int: + return await self._process.wait() + + def terminate(self) -> None: + self._process.terminate() + + def kill(self) -> None: + self._process.kill() + + def send_signal(self, signal: int) -> None: + self._process.send_signal(signal) + + @property + def pid(self) -> int: + return self._process.pid + + @property + def returncode(self) -> Optional[int]: + return self._process.returncode + + @property + def stdin(self) -> Optional[abc.ByteSendStream]: + return self._stdin + + @property + def stdout(self) -> Optional[abc.ByteReceiveStream]: + return self._stdout + + @property + def stderr(self) -> Optional[abc.ByteReceiveStream]: + return self._stderr + + +async def open_process(command, *, shell: bool, stdin: int, stdout: int, stderr: int): + await check_cancelled() + process = curio.subprocess.Popen(command, stdin=stdin, stdout=stdout, stderr=stderr, + shell=shell) + stdin_stream = FileStreamWrapper(process.stdin) if process.stdin else None + stdout_stream = FileStreamWrapper(process.stdout) if process.stdout else None + stderr_stream = FileStreamWrapper(process.stderr) if process.stderr else None + return Process(process, stdin_stream, stdout_stream, stderr_stream) + + # # Sockets and networking # diff --git a/src/anyio/_backends/_trio.py b/src/anyio/_backends/_trio.py index 65ee47280..87449d10f 100644 --- a/src/anyio/_backends/_trio.py +++ b/src/anyio/_backends/_trio.py @@ -4,6 +4,7 @@ from typing import Callable, Optional, List, Type, Union, Tuple, TYPE_CHECKING, TypeVar, Generic import trio.from_thread +from dataclasses import dataclass from trio.to_thread import run_sync from .. import abc, claim_worker_thread, T_Retval, TaskInfo @@ -176,6 +177,95 @@ def wrapper(): run_async_from_thread = trio.from_thread.run +# +# Subprocesses +# + +@dataclass +class ReceiveStreamWrapper(abc.ByteReceiveStream): + _stream: trio.abc.ReceiveStream + + async def receive(self, max_bytes: Optional[int] = None) -> bytes: + data = await self._stream.receive_some(max_bytes) + if data: + return data + else: + raise EndOfStream + + async def aclose(self) -> None: + await self._stream.aclose() + + +@dataclass +class SendStreamWrapper(abc.ByteSendStream): + _stream: trio.abc.SendStream + + async def send(self, item: bytes) -> None: + await self._stream.send_all(item) + + async def aclose(self) -> None: + await self._stream.aclose() + + +@dataclass +class Process(abc.Process): + _process: trio.Process + _stdin: Optional[abc.ByteSendStream] + _stdout: Optional[abc.ByteReceiveStream] + _stderr: Optional[abc.ByteReceiveStream] + + async def aclose(self) -> None: + if self._stdin: + await self._stdin.aclose() + if self._stdout: + await self._stdout.aclose() + if self._stderr: + await self._stderr.aclose() + + await self.wait() + + async def wait(self) -> int: + return await self._process.wait() + + def terminate(self) -> None: + self._process.terminate() + + def kill(self) -> None: + self._process.kill() + + def send_signal(self, signal: int) -> None: + self._process.send_signal(signal) + + @property + def pid(self) -> int: + return self._process.pid + + @property + def returncode(self) -> Optional[int]: + return self._process.returncode + + @property + def stdin(self) -> Optional[abc.ByteSendStream]: + return self._stdin + + @property + def stdout(self) -> Optional[abc.ByteReceiveStream]: + return self._stdout + + @property + def stderr(self) -> Optional[abc.ByteReceiveStream]: + return self._stderr + + +async def open_process(command, *, shell: bool, stdin: int, stdout: int, stderr: int): + process = await trio.open_process(command, stdin=stdin, stdout=stdout, stderr=stderr, + shell=shell) + stdin_stream = SendStreamWrapper(process.stdin) if process.stdin else None + stdout_stream = ReceiveStreamWrapper(process.stdout) if process.stdout else None + stderr_stream = ReceiveStreamWrapper(process.stderr) if process.stderr else None + return Process(process, stdin_stream, stdout_stream, stderr_stream) + + # # Sockets and networking # diff --git a/src/anyio/abc/__init__.py b/src/anyio/abc/__init__.py index 1b6d2cfbf..f87568630 100644 --- a/src/anyio/abc/__init__.py +++ b/src/anyio/abc/__init__.py @@ -4,8 +4,8 @@ 'ObjectReceiveStream', 'ObjectSendStream', 'ObjectStream', 'ByteReceiveStream', 'ByteSendStream', 'ByteStream', 'AnyUnreliableByteReceiveStream', 'AnyUnreliableByteSendStream', 'AnyUnreliableByteStream', 'AnyByteReceiveStream', - 'AnyByteSendStream', 'AnyByteStream', 'Listener', 'Event', 'Lock', 'Condition', - 'Semaphore', 'CapacityLimiter', 'CancelScope', 'TaskGroup') + 'AnyByteSendStream', 'AnyByteStream', 'Listener', 'Process', 'Event', 'Lock', + 'Condition', 'Semaphore', 'CapacityLimiter', 'CancelScope', 'TaskGroup') from .sockets import ( IPAddressType, IPSockAddrType, SockAddrType, UDPPacketType, SocketStream, SocketListener, @@ -16,5 +16,6 @@ ObjectReceiveStream, ObjectSendStream, ObjectStream, ByteReceiveStream, ByteSendStream, ByteStream, AnyUnreliableByteReceiveStream, AnyUnreliableByteSendStream, AnyUnreliableByteStream, AnyByteReceiveStream, AnyByteSendStream, AnyByteStream, Listener) +from .subprocesses import Process from .synchronization import Event, Lock, Condition, Semaphore, CapacityLimiter from .tasks import CancelScope, TaskGroup diff --git a/src/anyio/abc/subprocesses.py b/src/anyio/abc/subprocesses.py new file mode 100644 index 000000000..44b81dfec --- /dev/null +++ b/src/anyio/abc/subprocesses.py @@ -0,0 +1,77 @@ +from abc import abstractmethod +from typing import Optional + +from .resource import AsyncResource +from .streams import ByteSendStream, ByteReceiveStream + + +class Process(AsyncResource): + """An asynchronous version of :class:`subprocess.Process`.""" + + @abstractmethod + async def wait(self) -> int: + """ + Wait until the process exits. + + :return: the exit code of the process + """ + + @abstractmethod + def terminate(self) -> None: + """ + Terminates the process, gracefully if possible. + + On Windows, this calls ``TerminateProcess()``. + On POSIX systems, this sends ``SIGTERM`` to the process. + + .. seealso:: :meth:`subprocess.Popen.terminate` + """ + + @abstractmethod + def kill(self) -> None: + """ + Kills the process. + + On Windows, this calls ``TerminateProcess()``. + On POSIX systems, this sends ``SIGKILL`` to the process. + + .. seealso:: :meth:`subprocess.Popen.kill` + """ + + @abstractmethod + def send_signal(self, signal: int) -> None: + """ + Send a signal to the subprocess. + + .. seealso:: :meth:`subprocess.Popen.send_signal` + + :param signal: the signal number (e.g. :data:`signal.SIGHUP`) + """ + + @property + @abstractmethod + def pid(self) -> int: + """The process ID of the process.""" + + @property + @abstractmethod + def returncode(self) -> Optional[int]: + """ + The return code of the process. If the process has not yet terminated, this will be + ``None``. + """ + + @property + @abstractmethod + def stdin(self) -> Optional[ByteSendStream]: + """The stream for the standard input of the process.""" + + @property + @abstractmethod + def stdout(self) -> Optional[ByteReceiveStream]: + """The stream for the standard output of the process.""" + + @property + @abstractmethod + def stderr(self) -> Optional[ByteReceiveStream]: + """The stream for the standard error output of the process.""" diff --git a/tests/test_subprocesses.py b/tests/test_subprocesses.py new file mode 100644 index 000000000..b16b14b9a --- /dev/null +++ b/tests/test_subprocesses.py @@ -0,0 +1,76 @@ +import platform +import sys +from subprocess import CalledProcessError +from textwrap import dedent + +import pytest + +from anyio import run_process, open_process +from anyio.streams.buffered import BufferedByteReceiveStream + +pytestmark = pytest.mark.anyio + + +@pytest.fixture(autouse=True) +def check_compatibility(anyio_backend_name): + if anyio_backend_name == 'curio': + if platform.python_implementation() == 'PyPy': + pytest.skip('Using subprocesses causes Curio to crash PyPy') + elif platform.system() == 'Windows': + pytest.skip('Subprocess support on Curio+Windows is broken') + elif anyio_backend_name == 'asyncio': + if platform.system() == 'Windows' and sys.version_info < (3, 8): + pytest.skip('Python < 3.8 uses SelectorEventLoop by default and it does not support ' + 'subprocesses') + + +@pytest.mark.parametrize('shell, command', [ + pytest.param(True, f'{sys.executable} -c "import sys; print(sys.stdin.read()[::-1])"', + id='shell'), + pytest.param(False, [sys.executable, '-c', 'import sys; print(sys.stdin.read()[::-1])'], + id='exec') +]) +async def test_run_process(shell, command, anyio_backend_name): + if anyio_backend_name == 'curio' and platform.python_implementation() == 'PyPy': + pytest.skip('This test causes Curio to crash PyPy') + + process = await run_process(command, input=b'abc') + assert process.returncode == 0 + assert process.stdout.rstrip() == b'cba' + + +async def test_run_process_checked(): + with pytest.raises(CalledProcessError) as exc: + await run_process([sys.executable, '-c', + 'import sys; print("stderr-text", file=sys.stderr); ' + 'print("stdout-text"); sys.exit(1)'], check=True) + + assert exc.value.returncode == 1 + assert exc.value.stdout.rstrip() == b'stdout-text' + assert exc.value.stderr.rstrip() == b'stderr-text' + + +@pytest.mark.skipif(platform.system() == 'Windows', + reason='process.terminate() kills the process instantly on Windows') +async def test_terminate(tmp_path): + script_path = tmp_path / 'script.py' + script_path.write_text(dedent("""\ + import signal, sys, time + + def terminate(signum, frame): + print('exited with SIGTERM', flush=True) + sys.exit() + + signal.signal(signal.SIGTERM, terminate) + print('ready', flush=True) + time.sleep(5) + """)) + async with await open_process([sys.executable, str(script_path)]) as process: + buffered_stdout = BufferedByteReceiveStream(process.stdout) + line = await buffered_stdout.receive_until(b'\n', 100) + assert line.rstrip() == b'ready' + + process.terminate() + line = await buffered_stdout.receive_until(b'\n', 100) + assert line.rstrip() == b'exited with SIGTERM' + assert await process.wait() == 0