diff --git a/src/anyio/__init__.py b/src/anyio/__init__.py index e62d559d..97957751 100644 --- a/src/anyio/__init__.py +++ b/src/anyio/__init__.py @@ -6,14 +6,17 @@ from contextlib import contextmanager from importlib import import_module from pprint import pprint +from subprocess import CompletedProcess, PIPE, DEVNULL, CalledProcessError from typing import ( - overload, TypeVar, Callable, Union, Optional, Awaitable, Coroutine, Any, Dict, List, Type) + overload, TypeVar, Callable, Union, Optional, Awaitable, Coroutine, Any, Dict, List, Type, + Sequence) import attr import sniffio from ._utils import get_bind_address from .abc.streams import UnreliableReceiveMessageStream, MessageStream +from .abc.subprocesses import AsyncProcess from .abc.synchronization import Lock, Condition, Event, Semaphore, CapacityLimiter from .abc.tasks import CancelScope, TaskGroup from .abc.networking import ( @@ -266,6 +269,81 @@ def current_default_thread_limiter() -> CapacityLimiter: return _get_asynclib().current_default_thread_limiter() +# +# Subprocesses +# + +async def run_process(command: Union[str, Sequence[str]], *, input: Optional[bytes] = None, + stdout: int = PIPE, stderr: int = PIPE, + check: bool = True) -> CompletedProcess: + """ + Run an external command in a subprocess and wait until it completes. + + .. seealso:: :func:`subprocess.run` + + :param command: either a string to pass to the shell, or an iterable of strings containing the + executable name or path and its arguments + :param input: bytes passed to the standard input of the subprocess + :param stdout: either :data:`subprocess.PIPE` or :data:`subprocess.DEVNULL` + :param stderr: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL` or + :data:`subprocess.STDOUT` + :param check: if ``True``, raise :exc:`~subprocess.CalledProcessError` if the process + terminates with a return code other than 0 + :return: an object representing the completed process + :raises CalledProcessError: if ``check`` is ``True`` and the process exits with a nonzero + return code + + """ + async def drain_stream(stream, index): + chunks = [chunk async for chunk in stream] + stream_contents[index] = b''.join(chunks) + + process = await open_process(command, stdin=PIPE if input else DEVNULL, stdout=stdout, + stderr=stderr) + stream_contents = [None, None] + try: + async with create_task_group() as tg: + if process.stdout: + await tg.spawn(drain_stream, process.stdout, 0) + if process.stderr: + await tg.spawn(drain_stream, process.stderr, 1) + if process.stdin and input: + await process.stdin.send(input) + await process.stdin.aclose() + + await process.wait() + except BaseException: + process.kill() + raise + + output, errors = stream_contents + if check and process.returncode != 0: + raise CalledProcessError(typing.cast(int, process.returncode), command, output, errors) + + return CompletedProcess(command, typing.cast(int, process.returncode), output, errors) + + +def open_process(command: Union[str, Sequence[str]], *, stdin: int = PIPE, + stdout: int = PIPE, stderr: int = PIPE) -> Coroutine[Any, Any, AsyncProcess]: + """ + Start an external command in a subprocess. + + .. seealso:: :class:`subprocess.Popen` + + :param command: either a string to pass to the shell, or an iterable of strings containing the + executable name or path and its arguments + :param stdin: either :data:`subprocess.PIPE` or :data:`subprocess.DEVNULL` + :param stdout: either :data:`subprocess.PIPE` or :data:`subprocess.DEVNULL` + :param stderr: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL` or + :data:`subprocess.STDOUT` + :return: an asynchronous process object + + """ + shell = isinstance(command, str) + return _get_asynclib().open_process(command, shell=shell, stdin=stdin, stdout=stdout, + stderr=stderr) + + # # Async file I/O # diff --git a/src/anyio/_backends/_asyncio.py b/src/anyio/_backends/_asyncio.py index 0dc7fe78..ce41fb8d 100644 --- a/src/anyio/_backends/_asyncio.py +++ b/src/anyio/_backends/_asyncio.py @@ -21,7 +21,8 @@ UDPPacket, UDPSocket as AbstractUDPSocket, ConnectedUDPSocket as AbstractConnectedUDPSocket, TCPListener as AbstractTCPListener, UNIXListener as AbstractUNIXListener ) -from ..abc.streams import ByteStream, MessageStream +from ..abc.streams import ByteStream, MessageStream, ReceiveByteStream, SendByteStream +from ..abc.subprocesses import AsyncProcess as AbstractAsyncProcess from ..abc.synchronization import ( Lock as AbstractLock, Condition as AbstractCondition, Event as AbstractEvent, Semaphore as AbstractSemaphore, CapacityLimiter as AbstractCapacityLimiter) @@ -460,6 +461,92 @@ def run_async_from_thread(func: Callable[..., Coroutine[Any, Any, T_Retval]], *a return f.result() +# +# Stream wrappers +# + +@attr.s(slots=True, frozen=True, auto_attribs=True) +class StreamReaderWrapper(ReceiveByteStream): + _stream: asyncio.StreamReader + + async def receive(self, max_bytes: Optional[int] = None) -> bytes: + return await self._stream.read(max_bytes or 65536) + + async def aclose(self) -> None: + self._stream.feed_eof() + + +@attr.s(slots=True, frozen=True, auto_attribs=True) +class StreamWriterWrapper(SendByteStream): + _stream: asyncio.StreamWriter + + async def send(self, item: bytes) -> None: + self._stream.write(item) + await self._stream.drain() + + async def aclose(self) -> None: + self._stream.close() + + +# +# Subprocesses +# + +@attr.s(slots=True, auto_attribs=True) +class Process(AbstractAsyncProcess): + _process: asyncio.subprocess.Process + _stdin: Optional[SendByteStream] + _stdout: Optional[ReceiveByteStream] + _stderr: Optional[ReceiveByteStream] + + async def wait(self) -> int: + return await self._process.wait() + + def terminate(self) -> None: + self._process.terminate() + + def kill(self) -> None: + self._process.kill() + + def send_signal(self, signal: int) -> None: + self._process.send_signal(signal) + + @property + def pid(self) -> int: + return self._process.pid + + @property + def returncode(self) -> Optional[int]: + return self._process.returncode + + @property + def stdin(self) -> Optional[SendByteStream]: + return self._stdin + + @property + def stdout(self) -> Optional[ReceiveByteStream]: + return self._stdout + + @property + def stderr(self) -> Optional[ReceiveByteStream]: + return self._stderr + + +async def open_process(command, *, shell: bool, stdin: int, stdout: int, stderr: int): + check_cancelled() + if shell: + process = await asyncio.create_subprocess_shell(command, stdin=stdin, stdout=stdout, + stderr=stderr) + else: + process = await asyncio.create_subprocess_exec(*command, stdin=stdin, stdout=stdout, + stderr=stderr) + + stdin_stream = StreamWriterWrapper(process.stdin) if process.stdin else None + stdout_stream = StreamReaderWrapper(process.stdout) if process.stdout else None + stderr_stream = StreamReaderWrapper(process.stderr) if process.stderr else None + return Process(process, stdin_stream, stdout_stream, stderr_stream) + + # # Async file I/O # diff --git a/src/anyio/_backends/_curio.py b/src/anyio/_backends/_curio.py index 8967124f..7c9668b5 100644 --- a/src/anyio/_backends/_curio.py +++ b/src/anyio/_backends/_curio.py @@ -14,6 +14,7 @@ import curio.meta import curio.socket import curio.ssl +import curio.subprocess import curio.traps from async_generator import async_generator, asynccontextmanager, yield_ @@ -23,7 +24,8 @@ UDPPacket, UDPSocket as AbstractUDPSocket, ConnectedUDPSocket as AbstractConnectedUDPSocket, TCPListener as AbstractTCPListener, UNIXListener as AbstractUNIXListener ) -from ..abc.streams import ByteStream, MessageStream +from ..abc.streams import ByteStream, MessageStream, ReceiveByteStream, SendByteStream +from ..abc.subprocesses import AsyncProcess as AbstractAsyncProcess from ..abc.synchronization import ( Lock as AbstractLock, Condition as AbstractCondition, Event as AbstractEvent, Semaphore as AbstractSemaphore, CapacityLimiter as AbstractCapacityLimiter) @@ -413,6 +415,79 @@ def run_async_from_thread(func: Callable[..., T_Retval], *args) -> T_Retval: return future.result() +# +# Stream wrappers +# + +class FileStreamWrapper(ByteStream): + def __init__(self, stream: curio.io.FileStream): + super().__init__() + self._stream = stream + + async def receive(self, max_bytes: Optional[int] = None) -> bytes: + return await self._stream.read(max_bytes or 65536) + + async def send(self, item: bytes) -> None: + await self._stream.write(item) + + async def aclose(self) -> None: + await self._stream.close() + + +# +# Subprocesses +# + +@attr.s(slots=True, auto_attribs=True) +class Process(AbstractAsyncProcess): + _process: curio.subprocess.Popen + _stdin: Optional[SendByteStream] + _stdout: Optional[ReceiveByteStream] + _stderr: Optional[ReceiveByteStream] + + async def wait(self) -> int: + return await self._process.wait() + + def terminate(self) -> None: + self._process.terminate() + + def kill(self) -> None: + self._process.kill() + + def send_signal(self, signal: int) -> None: + self._process.send_signal(signal) + + @property + def pid(self) -> int: + return self._process.pid + + @property + def returncode(self) -> Optional[int]: + return self._process.returncode + + @property + def stdin(self) -> Optional[SendByteStream]: + return self._stdin + + @property + def stdout(self) -> Optional[ReceiveByteStream]: + return self._stdout + + @property + def stderr(self) -> Optional[ReceiveByteStream]: + return self._stderr + + +async def open_process(command, *, shell: bool, stdin: int, stdout: int, stderr: int): + await check_cancelled() + process = curio.subprocess.Popen(command, stdin=stdin, stdout=stdout, stderr=stderr, + shell=shell) + stdin_stream = FileStreamWrapper(process.stdin) if process.stdin else None + stdout_stream = FileStreamWrapper(process.stdout) if process.stdout else None + stderr_stream = FileStreamWrapper(process.stderr) if process.stderr else None + return Process(process, stdin_stream, stdout_stream, stderr_stream) + + # # Async file I/O # diff --git a/src/anyio/_backends/_trio.py b/src/anyio/_backends/_trio.py index 56837fd1..4d7aec1e 100644 --- a/src/anyio/_backends/_trio.py +++ b/src/anyio/_backends/_trio.py @@ -14,11 +14,12 @@ UDPPacket, UDPSocket as AbstractUDPSocket, ConnectedUDPSocket as AbstractConnectedUDPSocket, TCPListener as AbstractTCPListener, UNIXListener as AbstractUNIXListener ) -from ..abc.streams import ByteStream -from ..abc.tasks import CancelScope as AbstractCancelScope, TaskGroup as AbstractTaskGroup +from ..abc.streams import ByteStream, ReceiveByteStream, SendByteStream +from ..abc.subprocesses import AsyncProcess as AbstractAsyncProcess from ..abc.synchronization import ( Lock as AbstractLock, Condition as AbstractCondition, Event as AbstractEvent, Semaphore as AbstractSemaphore, CapacityLimiter as AbstractCapacityLimiter) +from ..abc.tasks import CancelScope as AbstractCancelScope, TaskGroup as AbstractTaskGroup from ..exceptions import ( ExceptionGroup as BaseExceptionGroup, WouldBlock, ClosedResourceError, BusyResourceError) @@ -167,6 +168,85 @@ def wrapper(): run_async_from_thread = trio.from_thread.run +# +# Stream wrappers +# + +@attr.s(slots=True, frozen=True, auto_attribs=True) +class ReceiveStreamWrapper(ReceiveByteStream): + _stream: trio.abc.ReceiveStream + + async def receive(self, max_bytes: Optional[int] = None) -> bytes: + return await self._stream.receive_some(max_bytes) + + async def aclose(self) -> None: + await self._stream.aclose() + + +@attr.s(slots=True, frozen=True, auto_attribs=True) +class SendStreamWrapper(SendByteStream): + _stream: trio.abc.SendStream + + async def send(self, item: bytes) -> None: + await self._stream.send_all(item) + + async def aclose(self) -> None: + await self._stream.aclose() + + +# +# Subprocesses +# + +@attr.s(slots=True, auto_attribs=True) +class Process(AbstractAsyncProcess): + _process: trio.Process + _stdin: Optional[SendByteStream] + _stdout: Optional[ReceiveByteStream] + _stderr: Optional[ReceiveByteStream] + + async def wait(self) -> int: + return await self._process.wait() + + def terminate(self) -> None: + self._process.terminate() + + def kill(self) -> None: + self._process.kill() + + def send_signal(self, signal: int) -> None: + self._process.send_signal(signal) + + @property + def pid(self) -> int: + return self._process.pid + + @property + def returncode(self) -> Optional[int]: + return self._process.returncode + + @property + def stdin(self) -> Optional[SendByteStream]: + return self._stdin + + @property + def stdout(self) -> Optional[ReceiveByteStream]: + return self._stdout + + @property + def stderr(self) -> Optional[ReceiveByteStream]: + return self._stderr + + +async def open_process(command, *, shell: bool, stdin: int, stdout: int, stderr: int): + process = await trio.open_process(command, stdin=stdin, stdout=stdout, stderr=stderr, + shell=shell) + stdin_stream = SendStreamWrapper(process.stdin) if process.stdin else None + stdout_stream = ReceiveStreamWrapper(process.stdout) if process.stdout else None + stderr_stream = ReceiveStreamWrapper(process.stderr) if process.stderr else None + return Process(process, stdin_stream, stdout_stream, stderr_stream) + + # # Async file I/O # diff --git a/src/anyio/abc/subprocesses.py b/src/anyio/abc/subprocesses.py new file mode 100644 index 00000000..11b14280 --- /dev/null +++ b/src/anyio/abc/subprocesses.py @@ -0,0 +1,76 @@ +from abc import ABCMeta, abstractmethod +from typing import Optional + +from anyio.abc.streams import SendByteStream, ReceiveByteStream + + +class AsyncProcess(metaclass=ABCMeta): + """An asynchronous version of :class:`subprocess.Process`.""" + + @abstractmethod + async def wait(self) -> int: + """ + Wait until the process exits. + + :return: the exit code of the process + """ + + @abstractmethod + def terminate(self) -> None: + """ + Terminates the process, gracefully if possible. + + On Windows, this calls ``TerminateProcess()``. + On POSIX systems, this sends ``SIGTERM`` to the process. + + .. seealso:: :meth:`subprocess.Popen.terminate` + """ + + @abstractmethod + def kill(self) -> None: + """ + Kills the process. + + On Windows, this calls ``TerminateProcess()``. + On POSIX systems, this sends ``SIGKILL`` to the process. + + .. seealso:: :meth:`subprocess.Popen.kill` + """ + + @abstractmethod + def send_signal(self, signal: int) -> None: + """ + Send a signal to the subprocess. + + .. seealso:: :meth:`subprocess.Popen.send_signal` + + :param signal: the signal number (e.g. :data:`signal.SIGHUP`) + """ + + @property + @abstractmethod + def pid(self) -> int: + """The process ID of the process.""" + + @property + @abstractmethod + def returncode(self) -> Optional[int]: + """ + The return code of the process. If the process has not yet terminated, this will be + ``None``. + """ + + @property + @abstractmethod + def stdin(self) -> Optional[SendByteStream]: + """The stream for the standard input of the process.""" + + @property + @abstractmethod + def stdout(self) -> Optional[ReceiveByteStream]: + """The stream for the standard output of the process.""" + + @property + @abstractmethod + def stderr(self) -> Optional[ReceiveByteStream]: + """The stream for the standard error output of the process.""" diff --git a/tests/test_subprocesses.py b/tests/test_subprocesses.py new file mode 100644 index 00000000..707a9cef --- /dev/null +++ b/tests/test_subprocesses.py @@ -0,0 +1,65 @@ +import platform +from subprocess import CalledProcessError +from textwrap import dedent + +import pytest + +from anyio import run_process, open_process +from anyio.wrappers import BufferedByteReader + + +@pytest.mark.parametrize('shell, command', [ + pytest.param(True, 'python -c "import sys; print(sys.stdin.read()[::-1])"', id='shell'), + pytest.param(False, ['python', '-c', 'import sys; print(sys.stdin.read()[::-1])'], id='exec') +]) +@pytest.mark.anyio +async def test_run_process(shell, command, anyio_backend): + if anyio_backend == 'curio' and platform.python_implementation() == 'PyPy': + pytest.skip('This test causes Curio to crash PyPy') + + process = await run_process(command, input=b'abc') + assert process.returncode == 0 + assert process.stdout == b'cba\n' + + +@pytest.mark.anyio +async def test_run_process_checked(anyio_backend): + if anyio_backend == 'curio' and platform.python_implementation() == 'PyPy': + pytest.skip('This test causes Curio to crash PyPy') + + with pytest.raises(CalledProcessError) as exc: + await run_process(['python', '-c', + 'import sys; print("stderr-text", file=sys.stderr); ' + 'print("stdout-text"); sys.exit(1)'], check=True) + + assert exc.value.returncode == 1 + assert exc.value.stdout == b'stdout-text\n' + assert exc.value.stderr == b'stderr-text\n' + + +@pytest.mark.anyio +async def test_terminate(tmp_path, anyio_backend): + if anyio_backend == 'curio' and platform.python_implementation() == 'PyPy': + pytest.skip('This test causes Curio to crash PyPy') + + script_path = tmp_path / 'script.py' + script_path.write_text(dedent("""\ + import signal, sys, time + + def terminate(signum, frame): + print('exited with SIGTERM', flush=True) + sys.exit() + + signal.signal(signal.SIGTERM, terminate) + print('ready', flush=True) + time.sleep(5) + """)) + process = await open_process(['python', str(script_path)]) + buffered_stdout = BufferedByteReader(process.stdout) + line = await buffered_stdout.read_until(b'\n', 100) + assert line == b'ready' + + process.terminate() + line = await buffered_stdout.read_until(b'\n', 100) + assert line == b'exited with SIGTERM' + assert await process.wait() == 0