Skip to content

Commit f39c50e

Browse files
committed
Added support for subprocesses
Fixes #9.
1 parent 1c3dc5c commit f39c50e

11 files changed

+566
-3
lines changed

docs/api.rst

+9
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,15 @@ Sockets and networking
130130
.. autoclass:: anyio.abc.sockets.ConnectedUDPSocket
131131
:members:
132132

133+
Subprocesses
134+
------------
135+
136+
.. autofunction:: anyio.run_process
137+
.. autofunction:: anyio.open_process
138+
139+
.. autoclass:: anyio.abc.subprocesses.Process
140+
:members:
141+
133142
Synchronization
134143
---------------
135144

docs/index.rst

+1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ The manual
1616
streams
1717
networking
1818
threads
19+
subprocesses
1920
fileio
2021
signals
2122
testing

docs/subprocesses.rst

+58
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
Using subprocesses
2+
==================
3+
4+
AnyIO allows you to run arbitrary executables in subprocesses, either as a one-shot call or by
5+
opening a process handle for you that gives you more control over the subprocess.
6+
7+
You can either give the command as a string, in which case it is passed to your default shell
8+
(equivalent to ``shell=True`` in :func:`subprocess.run`), or as a sequence of strings
9+
(``shell=False``) in which the executable is the first item in the sequence and the rest are
10+
arguments passed to it.
11+
12+
.. note:: The subprocess facilities provided by AnyIO do not include a way to execute arbitrary
13+
Python code like the :mod:`multiprocessing` module does, but they can be used as
14+
building blocks for such a feature.
15+
16+
Running one-shot commands
17+
-------------------------
18+
19+
To run an external command with one call, use :func:`~anyio.run_process`::
20+
21+
from anyio import run_process, run
22+
23+
24+
async def main():
25+
result = await run_process('/usr/bin/ps')
26+
print(result.stdout.decode())
27+
28+
run(main)
29+
30+
The snippet above runs the ``ps`` command within a shell (. To run it directly::
31+
32+
from anyio import run_process, run
33+
34+
35+
async def main():
36+
result = await run_process(['/usr/bin/ps'])
37+
print(result.stdout.decode())
38+
39+
run(main)
40+
41+
Working with processes
42+
----------------------
43+
44+
When you have more complex requirements for your interaction with subprocesses, you can launch one
45+
with :func:`~anyio.open_process`::
46+
47+
from anyio import open_process, run
48+
from anyio.streams.text import TextReceiveStream
49+
50+
51+
async def main():
52+
async with await open_process(['/usr/bin/ps']) as process:
53+
for text in TextReceiveStream(process.stdout):
54+
print(text)
55+
56+
run(main)
57+
58+
See the API documentation of :class:`~anyio.abc.subprocesses.Process` for more information.

docs/versionhistory.rst

+1
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ This library adheres to `Semantic Versioning 2.0 <http://semver.org/>`_.
4949
- Unified checkpoint behavior: a cancellation check now calls ``sleep(0)`` on asyncio and curio,
5050
allowing the scheduler to switch to a different task
5151
- Added memory object streams
52+
- Added support for subprocesses
5253
- Dropped support for Python 3.5
5354
- Bumped minimum versions of trio and curio to v0.16 and v1.2, respectively
5455
- Fixed a bug where a task group would abandon its subtasks if its own cancel scope was cancelled

src/anyio/__init__.py

+77-1
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,14 @@
1010
from ipaddress import ip_address, IPv6Address
1111
from pathlib import Path
1212
from socket import AddressFamily, SocketKind
13+
from subprocess import PIPE, CompletedProcess, DEVNULL, CalledProcessError
1314
from typing import TypeVar, Callable, Union, Optional, Awaitable, Coroutine, Any, Dict, List, Tuple
1415

1516
import sniffio
1617

1718
from .abc import (
1819
Lock, Condition, Event, Semaphore, CapacityLimiter, CancelScope, TaskGroup, IPAddressType,
19-
SocketStream, UDPSocket, ConnectedUDPSocket, IPSockAddrType, Listener, SocketListener)
20+
SocketStream, UDPSocket, ConnectedUDPSocket, IPSockAddrType, Listener, SocketListener, Process)
2021
from .fileio import AsyncFile
2122
from .streams.tls import TLSStream
2223
from .streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
@@ -265,6 +266,81 @@ def current_default_worker_thread_limiter() -> CapacityLimiter:
265266
return _get_asynclib().current_default_thread_limiter()
266267

267268

269+
#
270+
# Subprocesses
271+
#
272+
273+
async def run_process(command: Union[str, typing.Sequence[str]], *, input: Optional[bytes] = None,
274+
stdout: int = PIPE, stderr: int = PIPE,
275+
check: bool = True) -> CompletedProcess:
276+
"""
277+
Run an external command in a subprocess and wait until it completes.
278+
279+
.. seealso:: :func:`subprocess.run`
280+
281+
:param command: either a string to pass to the shell, or an iterable of strings containing the
282+
executable name or path and its arguments
283+
:param input: bytes passed to the standard input of the subprocess
284+
:param stdout: either :data:`subprocess.PIPE` or :data:`subprocess.DEVNULL`
285+
:param stderr: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL` or
286+
:data:`subprocess.STDOUT`
287+
:param check: if ``True``, raise :exc:`~subprocess.CalledProcessError` if the process
288+
terminates with a return code other than 0
289+
:return: an object representing the completed process
290+
:raises CalledProcessError: if ``check`` is ``True`` and the process exits with a nonzero
291+
return code
292+
293+
"""
294+
async def drain_stream(stream, index):
295+
chunks = [chunk async for chunk in stream]
296+
stream_contents[index] = b''.join(chunks)
297+
298+
async with await open_process(command, stdin=PIPE if input else DEVNULL, stdout=stdout,
299+
stderr=stderr) as process:
300+
stream_contents = [None, None]
301+
try:
302+
async with create_task_group() as tg:
303+
if process.stdout:
304+
await tg.spawn(drain_stream, process.stdout, 0)
305+
if process.stderr:
306+
await tg.spawn(drain_stream, process.stderr, 1)
307+
if process.stdin and input:
308+
await process.stdin.send(input)
309+
await process.stdin.aclose()
310+
311+
await process.wait()
312+
except BaseException:
313+
process.kill()
314+
raise
315+
316+
output, errors = stream_contents
317+
if check and process.returncode != 0:
318+
raise CalledProcessError(typing.cast(int, process.returncode), command, output, errors)
319+
320+
return CompletedProcess(command, typing.cast(int, process.returncode), output, errors)
321+
322+
323+
async def open_process(command: Union[str, typing.Sequence[str]], *, stdin: int = PIPE,
324+
stdout: int = PIPE, stderr: int = PIPE) -> Process:
325+
"""
326+
Start an external command in a subprocess.
327+
328+
.. seealso:: :class:`subprocess.Popen`
329+
330+
:param command: either a string to pass to the shell, or an iterable of strings containing the
331+
executable name or path and its arguments
332+
:param stdin: either :data:`subprocess.PIPE` or :data:`subprocess.DEVNULL`
333+
:param stdout: either :data:`subprocess.PIPE` or :data:`subprocess.DEVNULL`
334+
:param stderr: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL` or
335+
:data:`subprocess.STDOUT`
336+
:return: an asynchronous process object
337+
338+
"""
339+
shell = isinstance(command, str)
340+
return await _get_asynclib().open_process(command, shell=shell, stdin=stdin, stdout=stdout,
341+
stderr=stderr)
342+
343+
268344
#
269345
# Async file I/O
270346
#

src/anyio/_backends/_asyncio.py

+94
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
List, Dict, Sequence, Type, Deque)
1515
from weakref import WeakKeyDictionary
1616

17+
from dataclasses import dataclass
18+
1719
from .. import (
1820
abc, claim_worker_thread, _local, T_Retval, TaskInfo, GetAddrInfoReturnType)
1921
from ..abc import IPSockAddrType, SockAddrType
@@ -526,6 +528,98 @@ def run_async_from_thread(func: Callable[..., Coroutine[Any, Any, T_Retval]], *a
526528
return f.result()
527529

528530

531+
#
532+
# Subprocesses
533+
#
534+
535+
@dataclass
536+
class StreamReaderWrapper(abc.ByteReceiveStream):
537+
_stream: asyncio.StreamReader
538+
539+
async def receive(self, max_bytes: int = 65536) -> bytes:
540+
return await self._stream.read(max_bytes)
541+
542+
async def aclose(self) -> None:
543+
self._stream.feed_eof()
544+
545+
546+
@dataclass
547+
class StreamWriterWrapper(abc.ByteSendStream):
548+
_stream: asyncio.StreamWriter
549+
550+
async def send(self, item: bytes) -> None:
551+
self._stream.write(item)
552+
await self._stream.drain()
553+
554+
async def aclose(self) -> None:
555+
self._stream.close()
556+
557+
558+
@dataclass
559+
class Process(abc.Process):
560+
_process: asyncio.subprocess.Process
561+
_stdin: Optional[abc.ByteSendStream]
562+
_stdout: Optional[abc.ByteReceiveStream]
563+
_stderr: Optional[abc.ByteReceiveStream]
564+
565+
async def aclose(self) -> None:
566+
if self._stdin:
567+
await self._stdin.aclose()
568+
if self._stdout:
569+
await self._stdout.aclose()
570+
if self._stderr:
571+
await self._stderr.aclose()
572+
573+
await self.wait()
574+
575+
async def wait(self) -> int:
576+
return await self._process.wait()
577+
578+
def terminate(self) -> None:
579+
self._process.terminate()
580+
581+
def kill(self) -> None:
582+
self._process.kill()
583+
584+
def send_signal(self, signal: int) -> None:
585+
self._process.send_signal(signal)
586+
587+
@property
588+
def pid(self) -> int:
589+
return self._process.pid
590+
591+
@property
592+
def returncode(self) -> Optional[int]:
593+
return self._process.returncode
594+
595+
@property
596+
def stdin(self) -> Optional[abc.ByteSendStream]:
597+
return self._stdin
598+
599+
@property
600+
def stdout(self) -> Optional[abc.ByteReceiveStream]:
601+
return self._stdout
602+
603+
@property
604+
def stderr(self) -> Optional[abc.ByteReceiveStream]:
605+
return self._stderr
606+
607+
608+
async def open_process(command, *, shell: bool, stdin: int, stdout: int, stderr: int):
609+
await check_cancelled()
610+
if shell:
611+
process = await asyncio.create_subprocess_shell(command, stdin=stdin, stdout=stdout,
612+
stderr=stderr)
613+
else:
614+
process = await asyncio.create_subprocess_exec(*command, stdin=stdin, stdout=stdout,
615+
stderr=stderr)
616+
617+
stdin_stream = StreamWriterWrapper(process.stdin) if process.stdin else None
618+
stdout_stream = StreamReaderWrapper(process.stdout) if process.stdout else None
619+
stderr_stream = StreamReaderWrapper(process.stderr) if process.stderr else None
620+
return Process(process, stdin_stream, stdout_stream, stderr_stream)
621+
622+
529623
#
530624
# Sockets and networking
531625
#

src/anyio/_backends/_curio.py

+84
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717
import curio.meta
1818
import curio.socket
1919
import curio.ssl
20+
import curio.subprocess
2021
import curio.traps
22+
from dataclasses import dataclass
2123

2224
from .. import (
2325
abc, T_Retval, claim_worker_thread, TaskInfo, _local, GetAddrInfoReturnType, IPSockAddrType)
@@ -444,6 +446,88 @@ def run_async_from_thread(func: Callable[..., T_Retval], *args) -> T_Retval:
444446
return future.result()
445447

446448

449+
#
450+
# Subprocesses
451+
#
452+
453+
class FileStreamWrapper(abc.ByteStream):
454+
def __init__(self, stream: curio.io.FileStream):
455+
super().__init__()
456+
self._stream = stream
457+
458+
async def receive(self, max_bytes: Optional[int] = None) -> bytes:
459+
return await self._stream.read(max_bytes or 65536)
460+
461+
async def send(self, item: bytes) -> None:
462+
await self._stream.write(item)
463+
464+
async def send_eof(self) -> None:
465+
raise NotImplementedError
466+
467+
async def aclose(self) -> None:
468+
await self._stream.close()
469+
470+
471+
@dataclass
472+
class Process(abc.Process):
473+
_process: curio.subprocess.Popen
474+
_stdin: Optional[abc.ByteSendStream]
475+
_stdout: Optional[abc.ByteReceiveStream]
476+
_stderr: Optional[abc.ByteReceiveStream]
477+
478+
async def aclose(self) -> None:
479+
if self._stdin:
480+
await self._stdin.aclose()
481+
if self._stdout:
482+
await self._stdout.aclose()
483+
if self._stderr:
484+
await self._stderr.aclose()
485+
486+
await self.wait()
487+
488+
async def wait(self) -> int:
489+
return await self._process.wait()
490+
491+
def terminate(self) -> None:
492+
self._process.terminate()
493+
494+
def kill(self) -> None:
495+
self._process.kill()
496+
497+
def send_signal(self, signal: int) -> None:
498+
self._process.send_signal(signal)
499+
500+
@property
501+
def pid(self) -> int:
502+
return self._process.pid
503+
504+
@property
505+
def returncode(self) -> Optional[int]:
506+
return self._process.returncode
507+
508+
@property
509+
def stdin(self) -> Optional[abc.ByteSendStream]:
510+
return self._stdin
511+
512+
@property
513+
def stdout(self) -> Optional[abc.ByteReceiveStream]:
514+
return self._stdout
515+
516+
@property
517+
def stderr(self) -> Optional[abc.ByteReceiveStream]:
518+
return self._stderr
519+
520+
521+
async def open_process(command, *, shell: bool, stdin: int, stdout: int, stderr: int):
522+
await check_cancelled()
523+
process = curio.subprocess.Popen(command, stdin=stdin, stdout=stdout, stderr=stderr,
524+
shell=shell)
525+
stdin_stream = FileStreamWrapper(process.stdin) if process.stdin else None
526+
stdout_stream = FileStreamWrapper(process.stdout) if process.stdout else None
527+
stderr_stream = FileStreamWrapper(process.stderr) if process.stderr else None
528+
return Process(process, stdin_stream, stdout_stream, stderr_stream)
529+
530+
447531
#
448532
# Sockets and networking
449533
#

0 commit comments

Comments
 (0)