diff --git a/newsfragments/1109.feature.rst b/newsfragments/1109.feature.rst new file mode 100644 index 0000000000..cbbef01ba3 --- /dev/null +++ b/newsfragments/1109.feature.rst @@ -0,0 +1,3 @@ +It turns out that creating a subprocess can block the parent process +for a surprisingly long time. So `trio.open_process` now uses a worker +thread to avoid blocking the event loop. diff --git a/newsfragments/1109.removal.rst b/newsfragments/1109.removal.rst new file mode 100644 index 0000000000..65344f64b8 --- /dev/null +++ b/newsfragments/1109.removal.rst @@ -0,0 +1,3 @@ +If you want to create a `trio.Process` object, you now have to call +`trio.open_process`; calling ``trio.Process()`` directly was +deprecated in v0.12.0 and has now been removed. diff --git a/trio/_subprocess.py b/trio/_subprocess.py index 265d4f2ef9..6983c839c7 100644 --- a/trio/_subprocess.py +++ b/trio/_subprocess.py @@ -2,6 +2,7 @@ import subprocess import sys from typing import Optional +from functools import partial from ._abc import AsyncResource, SendStream, ReceiveStream from ._highlevel_generic import StapledStream @@ -10,6 +11,7 @@ wait_child_exiting, create_pipe_to_child_stdin, create_pipe_from_child_output ) +from ._util import NoPublicConstructor import trio # Linux-specific, but has complex lifetime management stuff so we hard-code it @@ -46,7 +48,7 @@ def pidfd_open(fd, flags): can_try_pidfd_open = False -class Process(AsyncResource): +class Process(AsyncResource, metaclass=NoPublicConstructor): r"""A child process. Like :class:`subprocess.Popen`, but async. This class has no public constructor. To create a child process, use @@ -100,91 +102,18 @@ class Process(AsyncResource): # arbitrarily many threads if wait() keeps getting cancelled. _wait_for_exit_data = None - # After the deprecation period: - # - delete __init__ and _create - # - add metaclass=NoPublicConstructor - # - rename _init to __init__ - # - move most of the code into open_process() - # - put the subprocess.Popen(...) call into a thread - def __init__(self, *args, **kwargs): - trio._deprecate.warn_deprecated( - "directly constructing Process objects", - "0.12.0", - issue=1109, - instead="trio.open_process" - ) - self._init(*args, **kwargs) - - @classmethod - def _create(cls, *args, **kwargs): - self = cls.__new__(cls) - self._init(*args, **kwargs) - return self - - def _init( - self, command, *, stdin=None, stdout=None, stderr=None, **options - ): - for key in ( - 'universal_newlines', 'text', 'encoding', 'errors', 'bufsize' - ): - if options.get(key): - raise TypeError( - "trio.Process only supports communicating over " - "unbuffered byte streams; the '{}' option is not supported" - .format(key) - ) - - self.stdin = None # type: Optional[SendStream] - self.stdout = None # type: Optional[ReceiveStream] - self.stderr = None # type: Optional[ReceiveStream] - self.stdio = None # type: Optional[StapledStream] + def __init__(self, popen, stdin, stdout, stderr): + self._proc = popen + self.stdin = stdin # type: Optional[SendStream] + self.stdout = stdout # type: Optional[ReceiveStream] + self.stderr = stderr # type: Optional[ReceiveStream] - if os.name == "posix": - if isinstance(command, str) and not options.get("shell"): - raise TypeError( - "command must be a sequence (not a string) if shell=False " - "on UNIX systems" - ) - if not isinstance(command, str) and options.get("shell"): - raise TypeError( - "command must be a string (not a sequence) if shell=True " - "on UNIX systems" - ) + self.stdio = None # type: Optional[StapledStream] + if self.stdin is not None and self.stdout is not None: + self.stdio = StapledStream(self.stdin, self.stdout) self._wait_lock = Lock() - if stdin == subprocess.PIPE: - self.stdin, stdin = create_pipe_to_child_stdin() - if stdout == subprocess.PIPE: - self.stdout, stdout = create_pipe_from_child_output() - if stderr == subprocess.STDOUT: - # If we created a pipe for stdout, pass the same pipe for - # stderr. If stdout was some non-pipe thing (DEVNULL or a - # given FD), pass the same thing. If stdout was passed as - # None, keep stderr as STDOUT to allow subprocess to dup - # our stdout. Regardless of which of these is applicable, - # don't create a new Trio stream for stderr -- if stdout - # is piped, stderr will be intermixed on the stdout stream. - if stdout is not None: - stderr = stdout - elif stderr == subprocess.PIPE: - self.stderr, stderr = create_pipe_from_child_output() - - try: - self._proc = subprocess.Popen( - command, stdin=stdin, stdout=stdout, stderr=stderr, **options - ) - finally: - # Close the parent's handle for each child side of a pipe; - # we want the child to have the only copy, so that when - # it exits we can read EOF on our side. - if self.stdin is not None: - os.close(stdin) - if self.stdout is not None: - os.close(stdout) - if self.stderr is not None: - os.close(stderr) - self._pidfd = None if can_try_pidfd_open: try: @@ -200,9 +129,6 @@ def _init( # make sure it'll get closed. self._pidfd = open(fd) - if self.stdin is not None and self.stdout is not None: - self.stdio = StapledStream(self.stdin, self.stdout) - self.args = self._proc.args self.pid = self._proc.pid @@ -378,12 +304,70 @@ async def open_process( specified command could not be found. """ - # XX FIXME: move the process creation into a thread as soon as we're done - # deprecating Process(...) - await trio.lowlevel.checkpoint() - return Process._create( - command, stdin=stdin, stdout=stdout, stderr=stderr, **options - ) + for key in ('universal_newlines', 'text', 'encoding', 'errors', 'bufsize'): + if options.get(key): + raise TypeError( + "trio.Process only supports communicating over " + "unbuffered byte streams; the '{}' option is not supported" + .format(key) + ) + + if os.name == "posix": + if isinstance(command, str) and not options.get("shell"): + raise TypeError( + "command must be a sequence (not a string) if shell=False " + "on UNIX systems" + ) + if not isinstance(command, str) and options.get("shell"): + raise TypeError( + "command must be a string (not a sequence) if shell=True " + "on UNIX systems" + ) + + trio_stdin = None # type: Optional[SendStream] + trio_stdout = None # type: Optional[ReceiveStream] + trio_stderr = None # type: Optional[ReceiveStream] + + if stdin == subprocess.PIPE: + trio_stdin, stdin = create_pipe_to_child_stdin() + if stdout == subprocess.PIPE: + trio_stdout, stdout = create_pipe_from_child_output() + if stderr == subprocess.STDOUT: + # If we created a pipe for stdout, pass the same pipe for + # stderr. If stdout was some non-pipe thing (DEVNULL or a + # given FD), pass the same thing. If stdout was passed as + # None, keep stderr as STDOUT to allow subprocess to dup + # our stdout. Regardless of which of these is applicable, + # don't create a new Trio stream for stderr -- if stdout + # is piped, stderr will be intermixed on the stdout stream. + if stdout is not None: + stderr = stdout + elif stderr == subprocess.PIPE: + trio_stderr, stderr = create_pipe_from_child_output() + + try: + popen = await trio.to_thread.run_sync( + partial( + subprocess.Popen, + command, + stdin=stdin, + stdout=stdout, + stderr=stderr, + **options + ) + ) + finally: + # Close the parent's handle for each child side of a pipe; + # we want the child to have the only copy, so that when + # it exits we can read EOF on our side. + if trio_stdin is not None: + os.close(stdin) + if trio_stdout is not None: + os.close(stdout) + if trio_stderr is not None: + os.close(stderr) + + return Process._create(popen, trio_stdin, trio_stdout, trio_stderr) async def run_process( diff --git a/trio/tests/test_subprocess.py b/trio/tests/test_subprocess.py index 7fe64564c7..c522847c7d 100644 --- a/trio/tests/test_subprocess.py +++ b/trio/tests/test_subprocess.py @@ -58,14 +58,6 @@ async def test_basic(): ) -# Delete this test when we remove direct Process construction -async def test_deprecated_Process_init(): - with pytest.warns(TrioDeprecationWarning): - async with Process(EXIT_TRUE) as proc: - assert isinstance(proc, Process) - assert proc.returncode == 0 - - async def test_multi_wait(): async with await open_process(SLEEP(10)) as proc: # Check that wait (including multi-wait) tolerates being cancelled