Skip to content

Commit

Permalink
Added subprocess support
Browse files Browse the repository at this point in the history
Fixes #9.
  • Loading branch information
agronholm committed Dec 30, 2019
1 parent 26cf592 commit b2e80e7
Show file tree
Hide file tree
Showing 6 changed files with 466 additions and 5 deletions.
80 changes: 79 additions & 1 deletion src/anyio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,17 @@
from contextlib import contextmanager
from importlib import import_module
from pprint import pprint
from subprocess import CompletedProcess, PIPE, DEVNULL, CalledProcessError
from typing import (
overload, TypeVar, Callable, Union, Optional, Awaitable, Coroutine, Any, Dict, List, Type)
overload, TypeVar, Callable, Union, Optional, Awaitable, Coroutine, Any, Dict, List, Type,
Sequence)

import attr
import sniffio

from ._utils import get_bind_address
from .abc.streams import UnreliableReceiveMessageStream, MessageStream
from .abc.subprocesses import AsyncProcess
from .abc.synchronization import Lock, Condition, Event, Semaphore, CapacityLimiter
from .abc.tasks import CancelScope, TaskGroup
from .abc.networking import (
Expand Down Expand Up @@ -266,6 +269,81 @@ def current_default_thread_limiter() -> CapacityLimiter:
return _get_asynclib().current_default_thread_limiter()


#
# Subprocesses
#

async def run_process(command: Union[str, Sequence[str]], *, input: Optional[bytes] = None,
stdout: int = PIPE, stderr: int = PIPE,
check: bool = True) -> CompletedProcess:
"""
Run an external command in a subprocess and wait until it completes.
.. seealso:: :func:`subprocess.run`
:param command: either a string to pass to the shell, or an iterable of strings containing the
executable name or path and its arguments
:param input: bytes passed to the standard input of the subprocess
:param stdout: either :data:`subprocess.PIPE` or :data:`subprocess.DEVNULL`
:param stderr: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL` or
:data:`subprocess.STDOUT`
:param check: if ``True``, raise :exc:`~subprocess.CalledProcessError` if the process
terminates with a return code other than 0
:return: an object representing the completed process
:raises CalledProcessError: if ``check`` is ``True`` and the process exits with a nonzero
return code
"""
async def drain_stream(stream, index):
chunks = [chunk async for chunk in stream]
stream_contents[index] = b''.join(chunks)

process = await open_process(command, stdin=PIPE if input else DEVNULL, stdout=stdout,
stderr=stderr)
stream_contents = [None, None]
try:
async with create_task_group() as tg:
if process.stdout:
await tg.spawn(drain_stream, process.stdout, 0)
if process.stderr:
await tg.spawn(drain_stream, process.stderr, 1)
if process.stdin and input:
await process.stdin.send(input)
await process.stdin.aclose()

await process.wait()
except BaseException:
process.kill()
raise

output, errors = stream_contents
if check and process.returncode != 0:
raise CalledProcessError(typing.cast(int, process.returncode), command, output, errors)

return CompletedProcess(command, typing.cast(int, process.returncode), output, errors)


def open_process(command: Union[str, Sequence[str]], *, stdin: int = PIPE,
stdout: int = PIPE, stderr: int = PIPE) -> Coroutine[Any, Any, AsyncProcess]:
"""
Start an external command in a subprocess.
.. seealso:: :class:`subprocess.Popen`
:param command: either a string to pass to the shell, or an iterable of strings containing the
executable name or path and its arguments
:param stdin: either :data:`subprocess.PIPE` or :data:`subprocess.DEVNULL`
:param stdout: either :data:`subprocess.PIPE` or :data:`subprocess.DEVNULL`
:param stderr: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL` or
:data:`subprocess.STDOUT`
:return: an asynchronous process object
"""
shell = isinstance(command, str)
return _get_asynclib().open_process(command, shell=shell, stdin=stdin, stdout=stdout,
stderr=stderr)


#
# Async file I/O
#
Expand Down
89 changes: 88 additions & 1 deletion src/anyio/_backends/_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
UDPPacket, UDPSocket as AbstractUDPSocket, ConnectedUDPSocket as AbstractConnectedUDPSocket,
TCPListener as AbstractTCPListener, UNIXListener as AbstractUNIXListener
)
from ..abc.streams import ByteStream, MessageStream
from ..abc.streams import ByteStream, MessageStream, ReceiveByteStream, SendByteStream
from ..abc.subprocesses import AsyncProcess as AbstractAsyncProcess
from ..abc.synchronization import (
Lock as AbstractLock, Condition as AbstractCondition, Event as AbstractEvent,
Semaphore as AbstractSemaphore, CapacityLimiter as AbstractCapacityLimiter)
Expand Down Expand Up @@ -460,6 +461,92 @@ def run_async_from_thread(func: Callable[..., Coroutine[Any, Any, T_Retval]], *a
return f.result()


#
# Stream wrappers
#

@attr.s(slots=True, frozen=True, auto_attribs=True)
class StreamReaderWrapper(ReceiveByteStream):
_stream: asyncio.StreamReader

async def receive(self, max_bytes: Optional[int] = None) -> bytes:
return await self._stream.read(max_bytes or 65536)

async def aclose(self) -> None:
self._stream.feed_eof()


@attr.s(slots=True, frozen=True, auto_attribs=True)
class StreamWriterWrapper(SendByteStream):
_stream: asyncio.StreamWriter

async def send(self, item: bytes) -> None:
self._stream.write(item)
await self._stream.drain()

async def aclose(self) -> None:
self._stream.close()


#
# Subprocesses
#

@attr.s(slots=True, auto_attribs=True)
class Process(AbstractAsyncProcess):
_process: asyncio.subprocess.Process
_stdin: Optional[SendByteStream]
_stdout: Optional[ReceiveByteStream]
_stderr: Optional[ReceiveByteStream]

async def wait(self) -> int:
return await self._process.wait()

def terminate(self) -> None:
self._process.terminate()

def kill(self) -> None:
self._process.kill()

def send_signal(self, signal: int) -> None:
self._process.send_signal(signal)

@property
def pid(self) -> int:
return self._process.pid

@property
def returncode(self) -> Optional[int]:
return self._process.returncode

@property
def stdin(self) -> Optional[SendByteStream]:
return self._stdin

@property
def stdout(self) -> Optional[ReceiveByteStream]:
return self._stdout

@property
def stderr(self) -> Optional[ReceiveByteStream]:
return self._stderr


async def open_process(command, *, shell: bool, stdin: int, stdout: int, stderr: int):
check_cancelled()
if shell:
process = await asyncio.create_subprocess_shell(command, stdin=stdin, stdout=stdout,
stderr=stderr)
else:
process = await asyncio.create_subprocess_exec(*command, stdin=stdin, stdout=stdout,
stderr=stderr)

stdin_stream = StreamWriterWrapper(process.stdin) if process.stdin else None
stdout_stream = StreamReaderWrapper(process.stdout) if process.stdout else None
stderr_stream = StreamReaderWrapper(process.stderr) if process.stderr else None
return Process(process, stdin_stream, stdout_stream, stderr_stream)


#
# Async file I/O
#
Expand Down
77 changes: 76 additions & 1 deletion src/anyio/_backends/_curio.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import curio.meta
import curio.socket
import curio.ssl
import curio.subprocess
import curio.traps
from async_generator import async_generator, asynccontextmanager, yield_

Expand All @@ -23,7 +24,8 @@
UDPPacket, UDPSocket as AbstractUDPSocket, ConnectedUDPSocket as AbstractConnectedUDPSocket,
TCPListener as AbstractTCPListener, UNIXListener as AbstractUNIXListener
)
from ..abc.streams import ByteStream, MessageStream
from ..abc.streams import ByteStream, MessageStream, ReceiveByteStream, SendByteStream
from ..abc.subprocesses import AsyncProcess as AbstractAsyncProcess
from ..abc.synchronization import (
Lock as AbstractLock, Condition as AbstractCondition, Event as AbstractEvent,
Semaphore as AbstractSemaphore, CapacityLimiter as AbstractCapacityLimiter)
Expand Down Expand Up @@ -413,6 +415,79 @@ def run_async_from_thread(func: Callable[..., T_Retval], *args) -> T_Retval:
return future.result()


#
# Stream wrappers
#

class FileStreamWrapper(ByteStream):
def __init__(self, stream: curio.io.FileStream):
super().__init__()
self._stream = stream

async def receive(self, max_bytes: Optional[int] = None) -> bytes:
return await self._stream.read(max_bytes or 65536)

async def send(self, item: bytes) -> None:
await self._stream.write(item)

async def aclose(self) -> None:
await self._stream.close()


#
# Subprocesses
#

@attr.s(slots=True, auto_attribs=True)
class Process(AbstractAsyncProcess):
_process: curio.subprocess.Popen
_stdin: Optional[SendByteStream]
_stdout: Optional[ReceiveByteStream]
_stderr: Optional[ReceiveByteStream]

async def wait(self) -> int:
return await self._process.wait()

def terminate(self) -> None:
self._process.terminate()

def kill(self) -> None:
self._process.kill()

def send_signal(self, signal: int) -> None:
self._process.send_signal(signal)

@property
def pid(self) -> int:
return self._process.pid

@property
def returncode(self) -> Optional[int]:
return self._process.returncode

@property
def stdin(self) -> Optional[SendByteStream]:
return self._stdin

@property
def stdout(self) -> Optional[ReceiveByteStream]:
return self._stdout

@property
def stderr(self) -> Optional[ReceiveByteStream]:
return self._stderr


async def open_process(command, *, shell: bool, stdin: int, stdout: int, stderr: int):
await check_cancelled()
process = curio.subprocess.Popen(command, stdin=stdin, stdout=stdout, stderr=stderr,
shell=shell)
stdin_stream = FileStreamWrapper(process.stdin) if process.stdin else None
stdout_stream = FileStreamWrapper(process.stdout) if process.stdout else None
stderr_stream = FileStreamWrapper(process.stderr) if process.stderr else None
return Process(process, stdin_stream, stdout_stream, stderr_stream)


#
# Async file I/O
#
Expand Down
Loading

0 comments on commit b2e80e7

Please sign in to comment.