Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add trio.open_process, and deprecate Process(...) #1113

Merged
merged 3 commits into from
Jun 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 11 additions & 12 deletions docs/source/reference-io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -662,12 +662,12 @@ for them to exit. The interface for doing so consists of two layers:
the standard :func:`subprocess.run` with some additional features
and safer defaults.

* :class:`trio.Process` starts a process in the background and optionally
provides Trio streams for interacting with it (sending input,
receiving output and errors). Using it requires a bit more code
than :func:`~trio.run_process`, but exposes additional capabilities:
back-and-forth communication, processing output as soon as it is generated,
and so forth. It is modelled after the standard :class:`subprocess.Popen`.
* `trio.open_process` starts a process in the background and returns a
`Process` object to let you interact with it. Using it requires a
bit more code than `run_process`, but exposes additional
capabilities: back-and-forth communication, processing output as
soon as it is generated, and so forth. It is modelled after the
standard library :class:`subprocess.Popen`.


.. _subprocess-options:
Expand Down Expand Up @@ -713,12 +713,11 @@ course, these defaults can be changed where necessary.
Interacting with a process as it runs
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

If you want more control than :func:`~trio.run_process` affords,
you can spawn a subprocess by creating an instance of
:class:`trio.Process` and then interact with it using its
:attr:`~trio.Process.stdin`,
:attr:`~trio.Process.stdout`, and/or
:attr:`~trio.Process.stderr` streams.
If you want more control than :func:`~trio.run_process` affords, you
can use `trio.open_process` to spawn a subprocess, and then interact
with it using the `Process` interface.

.. autofunction:: trio.open_process

.. autoclass:: trio.Process

Expand Down
3 changes: 3 additions & 0 deletions newsfragments/1109.removal.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
It turns out that it's better to treat subprocess spawning as an async
operation. Therefore, direct construction of `Process` objects has
been deprecated. Use `trio.open_process` instead.
2 changes: 1 addition & 1 deletion trio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@

from ._path import Path

from ._subprocess import Process, run_process
from ._subprocess import Process, open_process, run_process

from ._ssl import SSLStream, SSLListener, NeedHandshakeError

Expand Down
153 changes: 98 additions & 55 deletions trio/_subprocess.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import select
import subprocess
from functools import partial

from ._abc import AsyncResource
from ._highlevel_generic import StapledStream
Expand All @@ -13,61 +14,24 @@


class Process(AsyncResource):
r"""Execute a child program in a new process.
r"""A child process. Like :class:`subprocess.Popen`, but async.
Like :class:`subprocess.Popen`, but async.
Constructing a :class:`Process` immediately spawns the child
process, or throws an :exc:`OSError` if the spawning fails (for
example, if the specified command could not be found).
After construction, you can interact with the child process
by writing data to its :attr:`stdin` stream (a
:class:`~trio.abc.SendStream`), reading data from its :attr:`stdout`
and/or :attr:`stderr` streams (both :class:`~trio.abc.ReceiveStream`\s),
sending it signals using :meth:`terminate`, :meth:`kill`, or
:meth:`send_signal`, and waiting for it to exit using :meth:`wait`.
Each standard stream is only available if it was specified at
:class:`Process` construction time that a pipe should be created
for it. For example, if you constructed with
``stdin=subprocess.PIPE``, you can write to the :attr:`stdin`
stream, else :attr:`stdin` will be ``None``.
:class:`Process` implements :class:`~trio.abc.AsyncResource`,
so you can use it as an async context manager or call its
:meth:`aclose` method directly. "Closing" a :class:`Process`
will close any pipes to the child and wait for it to exit;
if cancelled, the child will be forcibly killed and we will
ensure it has finished exiting before allowing the cancellation
to propagate. It is *strongly recommended* that process lifetime
be scoped using an ``async with`` block wherever possible, to
avoid winding up with processes hanging around longer than you
were planning on.
This class has no public constructor. To create a child process, use
`open_process`::
Args:
command (list or str): The command to run. Typically this is a
sequence of strings such as ``['ls', '-l', 'directory with spaces']``,
where the first element names the executable to invoke and the other
elements specify its arguments. With ``shell=True`` in the
``**options``, or on Windows, ``command`` may alternatively
be a string, which will be parsed following platform-dependent
:ref:`quoting rules <subprocess-quoting>`.
stdin: Specifies what the child process's standard input
stream should connect to: output written by the parent
(``subprocess.PIPE``), nothing (``subprocess.DEVNULL``),
or an open file (pass a file descriptor or something whose
``fileno`` method returns one). If ``stdin`` is unspecified,
the child process will have the same standard input stream
as its parent.
stdout: Like ``stdin``, but for the child process's standard output
stream.
stderr: Like ``stdin``, but for the child process's standard error
stream. An additional value ``subprocess.STDOUT`` is supported,
which causes the child's standard output and standard error
messages to be intermixed on a single standard output stream,
attached to whatever the ``stdout`` option says to attach it to.
**options: Other :ref:`general subprocess options <subprocess-options>`
are also accepted.
process = await trio.open_process(...)
`Process` implements the `~trio.abc.AsyncResource` interface. In order to
make sure your process doesn't end up getting abandoned by mistake or
after an exception, you can use ``async with``::
async with await trio.open_process(...) as process:
...
"Closing" a :class:`Process` will close any pipes to the child and wait
for it to exit; if cancelled, the child will be forcibly killed and we
will ensure it has finished exiting before allowing the cancellation to
propagate.
Attributes:
args (str or list): The ``command`` passed at construction time,
Expand Down Expand Up @@ -103,7 +67,28 @@ class Process(AsyncResource):
# arbitrarily many threads if wait() keeps getting cancelled.
_wait_for_exit_data = None

def __init__(
# After the deprecation period:
# - delete __init__ and _create
# - add metaclass=NoPublicConstructor
# - rename _init to __init__
# - move most of the code into open_process()
# - put the subprocess.Popen(...) call into a thread
def __init__(self, *args, **kwargs):
trio._deprecate.warn_deprecated(
"directly constructing Process objects",
"0.12.0",
issue=1109,
instead="trio.open_process"
)
self._init(*args, **kwargs)

@classmethod
def _create(cls, *args, **kwargs):
self = cls.__new__(cls)
self._init(*args, **kwargs)
return self

def _init(
self, command, *, stdin=None, stdout=None, stderr=None, **options
):
for key in (
Expand Down Expand Up @@ -280,6 +265,64 @@ def kill(self):
self._proc.kill()


async def open_process(
command, *, stdin=None, stdout=None, stderr=None, **options
) -> Process:
r"""Execute a child program in a new process.
After construction, you can interact with the child process by writing
data to its `~Process.stdin` stream (a `~trio.abc.SendStream`), reading
data from its `~Process.stdout` and/or `~Process.stderr` streams (both
`~trio.abc.ReceiveStream`\s), sending it signals using
`~Process.terminate`, `~Process.kill`, or `~Process.send_signal`, and
waiting for it to exit using `~Process.wait`. See `Process` for details.
Each standard stream is only available if you specify that a pipe should
be created for it. For example, if you pass ``stdin=subprocess.PIPE``, you
can write to the `~Process.stdin` stream, else `~Process.stdin` will be
``None``.
Args:
command (list or str): The command to run. Typically this is a
sequence of strings such as ``['ls', '-l', 'directory with spaces']``,
where the first element names the executable to invoke and the other
elements specify its arguments. With ``shell=True`` in the
``**options``, or on Windows, ``command`` may alternatively
be a string, which will be parsed following platform-dependent
:ref:`quoting rules <subprocess-quoting>`.
stdin: Specifies what the child process's standard input
stream should connect to: output written by the parent
(``subprocess.PIPE``), nothing (``subprocess.DEVNULL``),
or an open file (pass a file descriptor or something whose
``fileno`` method returns one). If ``stdin`` is unspecified,
the child process will have the same standard input stream
as its parent.
stdout: Like ``stdin``, but for the child process's standard output
stream.
stderr: Like ``stdin``, but for the child process's standard error
stream. An additional value ``subprocess.STDOUT`` is supported,
which causes the child's standard output and standard error
messages to be intermixed on a single standard output stream,
attached to whatever the ``stdout`` option says to attach it to.
**options: Other :ref:`general subprocess options <subprocess-options>`
are also accepted.
Returns:
A new `Process` object.
Raises:
OSError: if the process spawning fails, for example because the
specified command could not be found.
"""
# XX FIXME: move the process creation into a thread as soon as we're done
# deprecating Process(...)
await trio.hazmat.checkpoint()
return Process._create(
command, stdin=stdin, stdout=stdout, stderr=stderr, **options
)


async def run_process(
command,
*,
Expand Down Expand Up @@ -424,7 +467,7 @@ async def run_process(
stdout_chunks = []
stderr_chunks = []

async with Process(command, **options) as proc:
async with await open_process(command, **options) as proc:

async def feed_input():
async with proc.stdin:
Expand Down
44 changes: 26 additions & 18 deletions trio/tests/test_subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from .. import (
_core, move_on_after, fail_after, sleep, sleep_forever, Process,
run_process
open_process, run_process, TrioDeprecationWarning
)
from .._core.tests.tutil import slow
from ..testing import wait_all_tasks_blocked
Expand Down Expand Up @@ -41,24 +41,33 @@ def got_signal(proc, sig):

async def test_basic():
repr_template = "<trio.Process {!r}: {{}}>".format(EXIT_TRUE)
async with Process(EXIT_TRUE) as proc:
async with await open_process(EXIT_TRUE) as proc:
assert isinstance(proc, Process)
assert proc.returncode is None
assert repr(proc) == repr_template.format(
"running with PID {}".format(proc.pid)
)
assert proc.returncode == 0
assert repr(proc) == repr_template.format("exited with status 0")

async with Process(EXIT_FALSE) as proc:
async with await open_process(EXIT_FALSE) as proc:
pass
assert proc.returncode == 1
assert repr(proc) == "<trio.Process {!r}: {}>".format(
EXIT_FALSE, "exited with status 1"
)


# Delete this test when we remove direct Process construction
async def test_deprecated_Process_init():
with pytest.warns(TrioDeprecationWarning):
async with Process(EXIT_TRUE) as proc:
assert isinstance(proc, Process)
assert proc.returncode == 0


async def test_multi_wait():
async with Process(SLEEP(10)) as proc:
async with await open_process(SLEEP(10)) as proc:
# Check that wait (including multi-wait) tolerates being cancelled
async with _core.open_nursery() as nursery:
nursery.start_soon(proc.wait)
Expand All @@ -77,11 +86,10 @@ async def test_multi_wait():


async def test_kill_when_context_cancelled():
with move_on_after(0) as scope:
async with Process(SLEEP(10)) as proc:
with move_on_after(100) as scope:
async with await open_process(SLEEP(10)) as proc:
assert proc.poll() is None
# Process context entry is synchronous, so this is the
# only checkpoint:
scope.cancel()
await sleep_forever()
assert scope.cancelled_caught
assert got_signal(proc, SIGKILL)
Expand All @@ -98,7 +106,7 @@ async def test_kill_when_context_cancelled():


async def test_pipes():
async with Process(
async with await open_process(
COPY_STDIN_TO_STDOUT_AND_BACKWARD_TO_STDERR,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
Expand Down Expand Up @@ -142,7 +150,7 @@ async def test_interactive():
# out: EOF
# err: EOF

async with Process(
async with await open_process(
python(
"idx = 0\n"
"while True:\n"
Expand Down Expand Up @@ -267,7 +275,7 @@ async def test_run_with_broken_pipe():


async def test_stderr_stdout():
async with Process(
async with await open_process(
COPY_STDIN_TO_STDOUT_AND_BACKWARD_TO_STDERR,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
Expand Down Expand Up @@ -300,7 +308,7 @@ async def test_stderr_stdout():

# this one hits the branch where stderr=STDOUT but stdout
# is not redirected
async with Process(
async with await open_process(
CAT, stdin=subprocess.PIPE, stderr=subprocess.STDOUT
) as proc:
assert proc.stdout is None
Expand All @@ -312,7 +320,7 @@ async def test_stderr_stdout():
try:
r, w = os.pipe()

async with Process(
async with await open_process(
COPY_STDIN_TO_STDOUT_AND_BACKWARD_TO_STDERR,
stdin=subprocess.PIPE,
stdout=w,
Expand All @@ -333,21 +341,21 @@ async def test_stderr_stdout():

async def test_errors():
with pytest.raises(TypeError) as excinfo:
Process(["ls"], encoding="utf-8")
await open_process(["ls"], encoding="utf-8")
assert "unbuffered byte streams" in str(excinfo.value)
assert "the 'encoding' option is not supported" in str(excinfo.value)

if posix:
with pytest.raises(TypeError) as excinfo:
Process(["ls"], shell=True)
await open_process(["ls"], shell=True)
with pytest.raises(TypeError) as excinfo:
Process("ls", shell=False)
await open_process("ls", shell=False)


async def test_signals():
async def test_one_signal(send_it, signum):
with move_on_after(1.0) as scope:
async with Process(SLEEP(3600)) as proc:
async with await open_process(SLEEP(3600)) as proc:
send_it(proc)
assert not scope.cancelled_caught
if posix:
Expand All @@ -368,7 +376,7 @@ async def test_wait_reapable_fails():
# With SIGCHLD disabled, the wait() syscall will wait for the
# process to exit but then fail with ECHILD. Make sure we
# support this case as the stdlib subprocess module does.
async with Process(SLEEP(3600)) as proc:
async with await open_process(SLEEP(3600)) as proc:
async with _core.open_nursery() as nursery:
nursery.start_soon(proc.wait)
await wait_all_tasks_blocked()
Expand Down