Skip to content

Commit

Permalink
Merge pull request #1496 from njsmith/no-process-public-constructor
Browse files Browse the repository at this point in the history
Remove support for calling trio.Process() directly
  • Loading branch information
oremanj authored May 6, 2020
2 parents be13872 + 2265dde commit 0ca6b1d
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 99 deletions.
3 changes: 3 additions & 0 deletions newsfragments/1109.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
It turns out that creating a subprocess can block the parent process
for a surprisingly long time. So `trio.open_process` now uses a worker
thread to avoid blocking the event loop.
3 changes: 3 additions & 0 deletions newsfragments/1109.removal.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
If you want to create a `trio.Process` object, you now have to call
`trio.open_process`; calling ``trio.Process()`` directly was
deprecated in v0.12.0 and has now been removed.
166 changes: 75 additions & 91 deletions trio/_subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import subprocess
import sys
from typing import Optional
from functools import partial

from ._abc import AsyncResource, SendStream, ReceiveStream
from ._highlevel_generic import StapledStream
Expand All @@ -10,6 +11,7 @@
wait_child_exiting, create_pipe_to_child_stdin,
create_pipe_from_child_output
)
from ._util import NoPublicConstructor
import trio

# Linux-specific, but has complex lifetime management stuff so we hard-code it
Expand Down Expand Up @@ -46,7 +48,7 @@ def pidfd_open(fd, flags):
can_try_pidfd_open = False


class Process(AsyncResource):
class Process(AsyncResource, metaclass=NoPublicConstructor):
r"""A child process. Like :class:`subprocess.Popen`, but async.
This class has no public constructor. To create a child process, use
Expand Down Expand Up @@ -100,91 +102,18 @@ class Process(AsyncResource):
# arbitrarily many threads if wait() keeps getting cancelled.
_wait_for_exit_data = None

# After the deprecation period:
# - delete __init__ and _create
# - add metaclass=NoPublicConstructor
# - rename _init to __init__
# - move most of the code into open_process()
# - put the subprocess.Popen(...) call into a thread
def __init__(self, *args, **kwargs):
trio._deprecate.warn_deprecated(
"directly constructing Process objects",
"0.12.0",
issue=1109,
instead="trio.open_process"
)
self._init(*args, **kwargs)

@classmethod
def _create(cls, *args, **kwargs):
self = cls.__new__(cls)
self._init(*args, **kwargs)
return self

def _init(
self, command, *, stdin=None, stdout=None, stderr=None, **options
):
for key in (
'universal_newlines', 'text', 'encoding', 'errors', 'bufsize'
):
if options.get(key):
raise TypeError(
"trio.Process only supports communicating over "
"unbuffered byte streams; the '{}' option is not supported"
.format(key)
)

self.stdin = None # type: Optional[SendStream]
self.stdout = None # type: Optional[ReceiveStream]
self.stderr = None # type: Optional[ReceiveStream]
self.stdio = None # type: Optional[StapledStream]
def __init__(self, popen, stdin, stdout, stderr):
self._proc = popen
self.stdin = stdin # type: Optional[SendStream]
self.stdout = stdout # type: Optional[ReceiveStream]
self.stderr = stderr # type: Optional[ReceiveStream]

if os.name == "posix":
if isinstance(command, str) and not options.get("shell"):
raise TypeError(
"command must be a sequence (not a string) if shell=False "
"on UNIX systems"
)
if not isinstance(command, str) and options.get("shell"):
raise TypeError(
"command must be a string (not a sequence) if shell=True "
"on UNIX systems"
)
self.stdio = None # type: Optional[StapledStream]
if self.stdin is not None and self.stdout is not None:
self.stdio = StapledStream(self.stdin, self.stdout)

self._wait_lock = Lock()

if stdin == subprocess.PIPE:
self.stdin, stdin = create_pipe_to_child_stdin()
if stdout == subprocess.PIPE:
self.stdout, stdout = create_pipe_from_child_output()
if stderr == subprocess.STDOUT:
# If we created a pipe for stdout, pass the same pipe for
# stderr. If stdout was some non-pipe thing (DEVNULL or a
# given FD), pass the same thing. If stdout was passed as
# None, keep stderr as STDOUT to allow subprocess to dup
# our stdout. Regardless of which of these is applicable,
# don't create a new Trio stream for stderr -- if stdout
# is piped, stderr will be intermixed on the stdout stream.
if stdout is not None:
stderr = stdout
elif stderr == subprocess.PIPE:
self.stderr, stderr = create_pipe_from_child_output()

try:
self._proc = subprocess.Popen(
command, stdin=stdin, stdout=stdout, stderr=stderr, **options
)
finally:
# Close the parent's handle for each child side of a pipe;
# we want the child to have the only copy, so that when
# it exits we can read EOF on our side.
if self.stdin is not None:
os.close(stdin)
if self.stdout is not None:
os.close(stdout)
if self.stderr is not None:
os.close(stderr)

self._pidfd = None
if can_try_pidfd_open:
try:
Expand All @@ -200,9 +129,6 @@ def _init(
# make sure it'll get closed.
self._pidfd = open(fd)

if self.stdin is not None and self.stdout is not None:
self.stdio = StapledStream(self.stdin, self.stdout)

self.args = self._proc.args
self.pid = self._proc.pid

Expand Down Expand Up @@ -378,12 +304,70 @@ async def open_process(
specified command could not be found.
"""
# XX FIXME: move the process creation into a thread as soon as we're done
# deprecating Process(...)
await trio.lowlevel.checkpoint()
return Process._create(
command, stdin=stdin, stdout=stdout, stderr=stderr, **options
)
for key in ('universal_newlines', 'text', 'encoding', 'errors', 'bufsize'):
if options.get(key):
raise TypeError(
"trio.Process only supports communicating over "
"unbuffered byte streams; the '{}' option is not supported"
.format(key)
)

if os.name == "posix":
if isinstance(command, str) and not options.get("shell"):
raise TypeError(
"command must be a sequence (not a string) if shell=False "
"on UNIX systems"
)
if not isinstance(command, str) and options.get("shell"):
raise TypeError(
"command must be a string (not a sequence) if shell=True "
"on UNIX systems"
)

trio_stdin = None # type: Optional[SendStream]
trio_stdout = None # type: Optional[ReceiveStream]
trio_stderr = None # type: Optional[ReceiveStream]

if stdin == subprocess.PIPE:
trio_stdin, stdin = create_pipe_to_child_stdin()
if stdout == subprocess.PIPE:
trio_stdout, stdout = create_pipe_from_child_output()
if stderr == subprocess.STDOUT:
# If we created a pipe for stdout, pass the same pipe for
# stderr. If stdout was some non-pipe thing (DEVNULL or a
# given FD), pass the same thing. If stdout was passed as
# None, keep stderr as STDOUT to allow subprocess to dup
# our stdout. Regardless of which of these is applicable,
# don't create a new Trio stream for stderr -- if stdout
# is piped, stderr will be intermixed on the stdout stream.
if stdout is not None:
stderr = stdout
elif stderr == subprocess.PIPE:
trio_stderr, stderr = create_pipe_from_child_output()

try:
popen = await trio.to_thread.run_sync(
partial(
subprocess.Popen,
command,
stdin=stdin,
stdout=stdout,
stderr=stderr,
**options
)
)
finally:
# Close the parent's handle for each child side of a pipe;
# we want the child to have the only copy, so that when
# it exits we can read EOF on our side.
if trio_stdin is not None:
os.close(stdin)
if trio_stdout is not None:
os.close(stdout)
if trio_stderr is not None:
os.close(stderr)

return Process._create(popen, trio_stdin, trio_stdout, trio_stderr)


async def run_process(
Expand Down
8 changes: 0 additions & 8 deletions trio/tests/test_subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,6 @@ async def test_basic():
)


# Delete this test when we remove direct Process construction
async def test_deprecated_Process_init():
with pytest.warns(TrioDeprecationWarning):
async with Process(EXIT_TRUE) as proc:
assert isinstance(proc, Process)
assert proc.returncode == 0


async def test_multi_wait():
async with await open_process(SLEEP(10)) as proc:
# Check that wait (including multi-wait) tolerates being cancelled
Expand Down

0 comments on commit 0ca6b1d

Please sign in to comment.