Skip to content

Commit

Permalink
Fixed run_process() and open_process().__aexit__ leaking an orpha…
Browse files Browse the repository at this point in the history
…n process when cancelled (#672)
  • Loading branch information
gschaffner authored Jan 25, 2024
1 parent 3f14df8 commit 1e60219
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 34 deletions.
12 changes: 12 additions & 0 deletions docs/versionhistory.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,18 @@ This library adheres to `Semantic Versioning 2.0 <http://semver.org/>`_.
- Fixed passing ``total_tokens`` to ``anyio.CapacityLimiter()`` as a keyword argument
not working on the ``trio`` backend
(`#515 <https://github.com/agronholm/anyio/issues/515>`_)
- Fixed ``Process.aclose()`` not performing the minimum level of necessary cleanup when
cancelled. Previously:

- Cancellation of ``Process.aclose()`` could leak an orphan process
- Cancellation of ``run_process()`` could very briefly leak an orphan process.
- Cancellation of ``Process.aclose()`` or ``run_process()`` on Trio could leave
standard streams unclosed

(PR by Ganden Schaffner)
- Fixed ``Process.stdin.aclose()``, ``Process.stdout.aclose()``, and
``Process.stderr.aclose()`` not including a checkpoint on asyncio (PR by Ganden
Schaffner)

**4.2.0**

Expand Down
26 changes: 18 additions & 8 deletions src/anyio/_backends/_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -918,6 +918,7 @@ async def receive(self, max_bytes: int = 65536) -> bytes:

async def aclose(self) -> None:
self._stream.feed_eof()
await AsyncIOBackend.checkpoint()


@dataclass(eq=False)
Expand All @@ -930,6 +931,7 @@ async def send(self, item: bytes) -> None:

async def aclose(self) -> None:
self._stream.close()
await AsyncIOBackend.checkpoint()


@dataclass(eq=False)
Expand All @@ -940,14 +942,22 @@ class Process(abc.Process):
_stderr: StreamReaderWrapper | None

async def aclose(self) -> None:
if self._stdin:
await self._stdin.aclose()
if self._stdout:
await self._stdout.aclose()
if self._stderr:
await self._stderr.aclose()

await self.wait()
with CancelScope(shield=True):
if self._stdin:
await self._stdin.aclose()
if self._stdout:
await self._stdout.aclose()
if self._stderr:
await self._stderr.aclose()

try:
await self.wait()
except BaseException:
self.kill()
with CancelScope(shield=True):
await self.wait()

raise

async def wait(self) -> int:
return await self._process.wait()
Expand Down
23 changes: 15 additions & 8 deletions src/anyio/_backends/_trio.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,14 +283,21 @@ class Process(abc.Process):
_stderr: abc.ByteReceiveStream | None

async def aclose(self) -> None:
if self._stdin:
await self._stdin.aclose()
if self._stdout:
await self._stdout.aclose()
if self._stderr:
await self._stderr.aclose()

await self.wait()
with CancelScope(shield=True):
if self._stdin:
await self._stdin.aclose()
if self._stdout:
await self._stdout.aclose()
if self._stderr:
await self._stderr.aclose()

try:
await self.wait()
except BaseException:
self.kill()
with CancelScope(shield=True):
await self.wait()
raise

async def wait(self) -> int:
return await self._process.wait()
Expand Down
26 changes: 12 additions & 14 deletions src/anyio/_core/_subprocesses.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,20 +65,18 @@ async def drain_stream(stream: AsyncIterable[bytes], index: int) -> None:
start_new_session=start_new_session,
) as process:
stream_contents: list[bytes | None] = [None, None]
try:
async with create_task_group() as tg:
if process.stdout:
tg.start_soon(drain_stream, process.stdout, 0)
if process.stderr:
tg.start_soon(drain_stream, process.stderr, 1)
if process.stdin and input:
await process.stdin.send(input)
await process.stdin.aclose()

await process.wait()
except BaseException:
process.kill()
raise
async with create_task_group() as tg:
if process.stdout:
tg.start_soon(drain_stream, process.stdout, 0)

if process.stderr:
tg.start_soon(drain_stream, process.stderr, 1)

if process.stdin and input:
await process.stdin.send(input)
await process.stdin.aclose()

await process.wait()

output, errors = stream_contents
if check and process.returncode != 0:
Expand Down
61 changes: 60 additions & 1 deletion tests/test_subprocesses.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
from textwrap import dedent

import pytest
from _pytest.fixtures import FixtureRequest

from anyio import open_process, run_process
from anyio import CancelScope, ClosedResourceError, open_process, run_process
from anyio.streams.buffered import BufferedByteReceiveStream

pytestmark = pytest.mark.anyio
Expand Down Expand Up @@ -176,3 +177,61 @@ async def test_run_process_inherit_stdout(capfd: pytest.CaptureFixture[str]) ->
out, err = capfd.readouterr()
assert out == "stdout-text" + os.linesep
assert err == "stderr-text" + os.linesep


async def test_process_aexit_cancellation_doesnt_orphan_process() -> None:
"""
Regression test for #669.
Ensures that open_process.__aexit__() doesn't leave behind an orphan process when
cancelled.
"""
with CancelScope() as scope:
async with await open_process(
[sys.executable, "-c", "import time; time.sleep(1)"]
) as process:
scope.cancel()

assert process.returncode is not None
assert process.returncode != 0


async def test_process_aexit_cancellation_closes_standard_streams(
request: FixtureRequest,
anyio_backend_name: str,
) -> None:
"""
Regression test for #669.
Ensures that open_process.__aexit__() closes standard streams when cancelled. Also
ensures that process.std{in.send,{out,err}.receive}() raise ClosedResourceError on a
closed stream.
"""
if anyio_backend_name == "asyncio":
# Avoid pytest.xfail here due to https://github.com/pytest-dev/pytest/issues/9027
request.node.add_marker(
pytest.mark.xfail(reason="#671 needs to be resolved first")
)

with CancelScope() as scope:
async with await open_process(
[sys.executable, "-c", "import time; time.sleep(1)"]
) as process:
scope.cancel()

assert process.stdin is not None

with pytest.raises(ClosedResourceError):
await process.stdin.send(b"foo")

assert process.stdout is not None

with pytest.raises(ClosedResourceError):
await process.stdout.receive(1)

assert process.stderr is not None

with pytest.raises(ClosedResourceError):
await process.stderr.receive(1)
6 changes: 3 additions & 3 deletions tests/test_taskgroups.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,9 @@ async def taskfunc(*, task_status: TaskStatus) -> None:
assert not finished


@pytest.mark.xfail(
sys.version_info < (3, 9), reason="Requires a way to detect cancellation source"
)
@pytest.mark.parametrize("anyio_backend", ["asyncio"])
async def test_start_native_host_cancelled() -> None:
started = finished = False
Expand All @@ -199,9 +202,6 @@ async def start_another() -> None:
async with create_task_group() as tg:
await tg.start(taskfunc)

if sys.version_info < (3, 9):
pytest.xfail("Requires a way to detect cancellation source")

task = asyncio.get_running_loop().create_task(start_another())
await wait_all_tasks_blocked()
task.cancel()
Expand Down

0 comments on commit 1e60219

Please sign in to comment.