Skip to content

Commit

Permalink
Merge pull request #1113 from njsmith/async-process-creation
Browse files Browse the repository at this point in the history
Add trio.open_process, and deprecate Process(...)
  • Loading branch information
oremanj authored Jun 21, 2019
2 parents 1316f36 + 45040f9 commit 7eb1aca
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 86 deletions.
23 changes: 11 additions & 12 deletions docs/source/reference-io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -662,12 +662,12 @@ for them to exit. The interface for doing so consists of two layers:
the standard :func:`subprocess.run` with some additional features
and safer defaults.

* :class:`trio.Process` starts a process in the background and optionally
provides Trio streams for interacting with it (sending input,
receiving output and errors). Using it requires a bit more code
than :func:`~trio.run_process`, but exposes additional capabilities:
back-and-forth communication, processing output as soon as it is generated,
and so forth. It is modelled after the standard :class:`subprocess.Popen`.
* `trio.open_process` starts a process in the background and returns a
`Process` object to let you interact with it. Using it requires a
bit more code than `run_process`, but exposes additional
capabilities: back-and-forth communication, processing output as
soon as it is generated, and so forth. It is modelled after the
standard library :class:`subprocess.Popen`.


.. _subprocess-options:
Expand Down Expand Up @@ -713,12 +713,11 @@ course, these defaults can be changed where necessary.
Interacting with a process as it runs
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

If you want more control than :func:`~trio.run_process` affords,
you can spawn a subprocess by creating an instance of
:class:`trio.Process` and then interact with it using its
:attr:`~trio.Process.stdin`,
:attr:`~trio.Process.stdout`, and/or
:attr:`~trio.Process.stderr` streams.
If you want more control than :func:`~trio.run_process` affords, you
can use `trio.open_process` to spawn a subprocess, and then interact
with it using the `Process` interface.

.. autofunction:: trio.open_process

.. autoclass:: trio.Process

Expand Down
3 changes: 3 additions & 0 deletions newsfragments/1109.removal.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
It turns out that it's better to treat subprocess spawning as an async
operation. Therefore, direct construction of `Process` objects has
been deprecated. Use `trio.open_process` instead.
2 changes: 1 addition & 1 deletion trio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@

from ._path import Path

from ._subprocess import Process, run_process
from ._subprocess import Process, open_process, run_process

from ._ssl import SSLStream, SSLListener, NeedHandshakeError

Expand Down
153 changes: 98 additions & 55 deletions trio/_subprocess.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import select
import subprocess
from functools import partial

from ._abc import AsyncResource
from ._highlevel_generic import StapledStream
Expand All @@ -13,61 +14,24 @@


class Process(AsyncResource):
r"""Execute a child program in a new process.
r"""A child process. Like :class:`subprocess.Popen`, but async.
Like :class:`subprocess.Popen`, but async.
Constructing a :class:`Process` immediately spawns the child
process, or throws an :exc:`OSError` if the spawning fails (for
example, if the specified command could not be found).
After construction, you can interact with the child process
by writing data to its :attr:`stdin` stream (a
:class:`~trio.abc.SendStream`), reading data from its :attr:`stdout`
and/or :attr:`stderr` streams (both :class:`~trio.abc.ReceiveStream`\s),
sending it signals using :meth:`terminate`, :meth:`kill`, or
:meth:`send_signal`, and waiting for it to exit using :meth:`wait`.
Each standard stream is only available if it was specified at
:class:`Process` construction time that a pipe should be created
for it. For example, if you constructed with
``stdin=subprocess.PIPE``, you can write to the :attr:`stdin`
stream, else :attr:`stdin` will be ``None``.
:class:`Process` implements :class:`~trio.abc.AsyncResource`,
so you can use it as an async context manager or call its
:meth:`aclose` method directly. "Closing" a :class:`Process`
will close any pipes to the child and wait for it to exit;
if cancelled, the child will be forcibly killed and we will
ensure it has finished exiting before allowing the cancellation
to propagate. It is *strongly recommended* that process lifetime
be scoped using an ``async with`` block wherever possible, to
avoid winding up with processes hanging around longer than you
were planning on.
This class has no public constructor. To create a child process, use
`open_process`::
Args:
command (list or str): The command to run. Typically this is a
sequence of strings such as ``['ls', '-l', 'directory with spaces']``,
where the first element names the executable to invoke and the other
elements specify its arguments. With ``shell=True`` in the
``**options``, or on Windows, ``command`` may alternatively
be a string, which will be parsed following platform-dependent
:ref:`quoting rules <subprocess-quoting>`.
stdin: Specifies what the child process's standard input
stream should connect to: output written by the parent
(``subprocess.PIPE``), nothing (``subprocess.DEVNULL``),
or an open file (pass a file descriptor or something whose
``fileno`` method returns one). If ``stdin`` is unspecified,
the child process will have the same standard input stream
as its parent.
stdout: Like ``stdin``, but for the child process's standard output
stream.
stderr: Like ``stdin``, but for the child process's standard error
stream. An additional value ``subprocess.STDOUT`` is supported,
which causes the child's standard output and standard error
messages to be intermixed on a single standard output stream,
attached to whatever the ``stdout`` option says to attach it to.
**options: Other :ref:`general subprocess options <subprocess-options>`
are also accepted.
process = await trio.open_process(...)
`Process` implements the `~trio.abc.AsyncResource` interface. In order to
make sure your process doesn't end up getting abandoned by mistake or
after an exception, you can use ``async with``::
async with await trio.open_process(...) as process:
...
"Closing" a :class:`Process` will close any pipes to the child and wait
for it to exit; if cancelled, the child will be forcibly killed and we
will ensure it has finished exiting before allowing the cancellation to
propagate.
Attributes:
args (str or list): The ``command`` passed at construction time,
Expand Down Expand Up @@ -103,7 +67,28 @@ class Process(AsyncResource):
# arbitrarily many threads if wait() keeps getting cancelled.
_wait_for_exit_data = None

def __init__(
# After the deprecation period:
# - delete __init__ and _create
# - add metaclass=NoPublicConstructor
# - rename _init to __init__
# - move most of the code into open_process()
# - put the subprocess.Popen(...) call into a thread
def __init__(self, *args, **kwargs):
trio._deprecate.warn_deprecated(
"directly constructing Process objects",
"0.12.0",
issue=1109,
instead="trio.open_process"
)
self._init(*args, **kwargs)

@classmethod
def _create(cls, *args, **kwargs):
self = cls.__new__(cls)
self._init(*args, **kwargs)
return self

def _init(
self, command, *, stdin=None, stdout=None, stderr=None, **options
):
for key in (
Expand Down Expand Up @@ -280,6 +265,64 @@ def kill(self):
self._proc.kill()


async def open_process(
command, *, stdin=None, stdout=None, stderr=None, **options
) -> Process:
r"""Execute a child program in a new process.
After construction, you can interact with the child process by writing
data to its `~Process.stdin` stream (a `~trio.abc.SendStream`), reading
data from its `~Process.stdout` and/or `~Process.stderr` streams (both
`~trio.abc.ReceiveStream`\s), sending it signals using
`~Process.terminate`, `~Process.kill`, or `~Process.send_signal`, and
waiting for it to exit using `~Process.wait`. See `Process` for details.
Each standard stream is only available if you specify that a pipe should
be created for it. For example, if you pass ``stdin=subprocess.PIPE``, you
can write to the `~Process.stdin` stream, else `~Process.stdin` will be
``None``.
Args:
command (list or str): The command to run. Typically this is a
sequence of strings such as ``['ls', '-l', 'directory with spaces']``,
where the first element names the executable to invoke and the other
elements specify its arguments. With ``shell=True`` in the
``**options``, or on Windows, ``command`` may alternatively
be a string, which will be parsed following platform-dependent
:ref:`quoting rules <subprocess-quoting>`.
stdin: Specifies what the child process's standard input
stream should connect to: output written by the parent
(``subprocess.PIPE``), nothing (``subprocess.DEVNULL``),
or an open file (pass a file descriptor or something whose
``fileno`` method returns one). If ``stdin`` is unspecified,
the child process will have the same standard input stream
as its parent.
stdout: Like ``stdin``, but for the child process's standard output
stream.
stderr: Like ``stdin``, but for the child process's standard error
stream. An additional value ``subprocess.STDOUT`` is supported,
which causes the child's standard output and standard error
messages to be intermixed on a single standard output stream,
attached to whatever the ``stdout`` option says to attach it to.
**options: Other :ref:`general subprocess options <subprocess-options>`
are also accepted.
Returns:
A new `Process` object.
Raises:
OSError: if the process spawning fails, for example because the
specified command could not be found.
"""
# XX FIXME: move the process creation into a thread as soon as we're done
# deprecating Process(...)
await trio.hazmat.checkpoint()
return Process._create(
command, stdin=stdin, stdout=stdout, stderr=stderr, **options
)


async def run_process(
command,
*,
Expand Down Expand Up @@ -424,7 +467,7 @@ async def run_process(
stdout_chunks = []
stderr_chunks = []

async with Process(command, **options) as proc:
async with await open_process(command, **options) as proc:

async def feed_input():
async with proc.stdin:
Expand Down
44 changes: 26 additions & 18 deletions trio/tests/test_subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from .. import (
_core, move_on_after, fail_after, sleep, sleep_forever, Process,
run_process
open_process, run_process, TrioDeprecationWarning
)
from .._core.tests.tutil import slow
from ..testing import wait_all_tasks_blocked
Expand Down Expand Up @@ -41,24 +41,33 @@ def got_signal(proc, sig):

async def test_basic():
repr_template = "<trio.Process {!r}: {{}}>".format(EXIT_TRUE)
async with Process(EXIT_TRUE) as proc:
async with await open_process(EXIT_TRUE) as proc:
assert isinstance(proc, Process)
assert proc.returncode is None
assert repr(proc) == repr_template.format(
"running with PID {}".format(proc.pid)
)
assert proc.returncode == 0
assert repr(proc) == repr_template.format("exited with status 0")

async with Process(EXIT_FALSE) as proc:
async with await open_process(EXIT_FALSE) as proc:
pass
assert proc.returncode == 1
assert repr(proc) == "<trio.Process {!r}: {}>".format(
EXIT_FALSE, "exited with status 1"
)


# Delete this test when we remove direct Process construction
async def test_deprecated_Process_init():
with pytest.warns(TrioDeprecationWarning):
async with Process(EXIT_TRUE) as proc:
assert isinstance(proc, Process)
assert proc.returncode == 0


async def test_multi_wait():
async with Process(SLEEP(10)) as proc:
async with await open_process(SLEEP(10)) as proc:
# Check that wait (including multi-wait) tolerates being cancelled
async with _core.open_nursery() as nursery:
nursery.start_soon(proc.wait)
Expand All @@ -77,11 +86,10 @@ async def test_multi_wait():


async def test_kill_when_context_cancelled():
with move_on_after(0) as scope:
async with Process(SLEEP(10)) as proc:
with move_on_after(100) as scope:
async with await open_process(SLEEP(10)) as proc:
assert proc.poll() is None
# Process context entry is synchronous, so this is the
# only checkpoint:
scope.cancel()
await sleep_forever()
assert scope.cancelled_caught
assert got_signal(proc, SIGKILL)
Expand All @@ -98,7 +106,7 @@ async def test_kill_when_context_cancelled():


async def test_pipes():
async with Process(
async with await open_process(
COPY_STDIN_TO_STDOUT_AND_BACKWARD_TO_STDERR,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
Expand Down Expand Up @@ -142,7 +150,7 @@ async def test_interactive():
# out: EOF
# err: EOF

async with Process(
async with await open_process(
python(
"idx = 0\n"
"while True:\n"
Expand Down Expand Up @@ -267,7 +275,7 @@ async def test_run_with_broken_pipe():


async def test_stderr_stdout():
async with Process(
async with await open_process(
COPY_STDIN_TO_STDOUT_AND_BACKWARD_TO_STDERR,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
Expand Down Expand Up @@ -300,7 +308,7 @@ async def test_stderr_stdout():

# this one hits the branch where stderr=STDOUT but stdout
# is not redirected
async with Process(
async with await open_process(
CAT, stdin=subprocess.PIPE, stderr=subprocess.STDOUT
) as proc:
assert proc.stdout is None
Expand All @@ -312,7 +320,7 @@ async def test_stderr_stdout():
try:
r, w = os.pipe()

async with Process(
async with await open_process(
COPY_STDIN_TO_STDOUT_AND_BACKWARD_TO_STDERR,
stdin=subprocess.PIPE,
stdout=w,
Expand All @@ -333,21 +341,21 @@ async def test_stderr_stdout():

async def test_errors():
with pytest.raises(TypeError) as excinfo:
Process(["ls"], encoding="utf-8")
await open_process(["ls"], encoding="utf-8")
assert "unbuffered byte streams" in str(excinfo.value)
assert "the 'encoding' option is not supported" in str(excinfo.value)

if posix:
with pytest.raises(TypeError) as excinfo:
Process(["ls"], shell=True)
await open_process(["ls"], shell=True)
with pytest.raises(TypeError) as excinfo:
Process("ls", shell=False)
await open_process("ls", shell=False)


async def test_signals():
async def test_one_signal(send_it, signum):
with move_on_after(1.0) as scope:
async with Process(SLEEP(3600)) as proc:
async with await open_process(SLEEP(3600)) as proc:
send_it(proc)
assert not scope.cancelled_caught
if posix:
Expand All @@ -368,7 +376,7 @@ async def test_wait_reapable_fails():
# With SIGCHLD disabled, the wait() syscall will wait for the
# process to exit but then fail with ECHILD. Make sure we
# support this case as the stdlib subprocess module does.
async with Process(SLEEP(3600)) as proc:
async with await open_process(SLEEP(3600)) as proc:
async with _core.open_nursery() as nursery:
nursery.start_soon(proc.wait)
await wait_all_tasks_blocked()
Expand Down

0 comments on commit 7eb1aca

Please sign in to comment.