Skip to content

Commit

Permalink
Merge pull request #3081 from jakkdl/break_the_lot
Browse files Browse the repository at this point in the history
Add ability to break parking lots, stop locks from stalling
  • Loading branch information
Zac-HD authored Oct 9, 2024
2 parents d0158fa + 92f9799 commit 2a66a0d
Show file tree
Hide file tree
Showing 10 changed files with 357 additions and 8 deletions.
4 changes: 4 additions & 0 deletions docs/source/reference-lowlevel.rst
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,10 @@ Wait queue abstraction
.. autoclass:: ParkingLotStatistics
:members:

.. autofunction:: add_parking_lot_breaker

.. autofunction:: remove_parking_lot_breaker

Low-level checkpoint functions
------------------------------

Expand Down
1 change: 1 addition & 0 deletions newsfragments/3035.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
:class:`trio.Lock` and :class:`trio.StrictFIFOLock` will now raise :exc:`trio.BrokenResourceError` when :meth:`trio.Lock.acquire` would previously stall due to the owner of the lock exiting without releasing the lock.
1 change: 1 addition & 0 deletions newsfragments/3081.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added :func:`trio.lowlevel.add_parking_lot_breaker` and :func:`trio.lowlevel.remove_parking_lot_breaker` to allow creating custom lock/semaphore implementations that will break their underlying parking lot if a task exits unexpectedly. :meth:`trio.lowlevel.ParkingLot.break_lot` is also added, to allow breaking a parking lot intentionally.
7 changes: 6 additions & 1 deletion src/trio/_core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@
from ._ki import currently_ki_protected, disable_ki_protection, enable_ki_protection
from ._local import RunVar, RunVarToken
from ._mock_clock import MockClock
from ._parking_lot import ParkingLot, ParkingLotStatistics
from ._parking_lot import (
ParkingLot,
ParkingLotStatistics,
add_parking_lot_breaker,
remove_parking_lot_breaker,
)

# Imports that always exist
from ._run import (
Expand Down
71 changes: 71 additions & 0 deletions src/trio/_core/_parking_lot.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,13 @@
# See: https://github.com/python-trio/trio/issues/53
from __future__ import annotations

import inspect
import math
from collections import OrderedDict
from typing import TYPE_CHECKING

import attrs
import outcome

from .. import _core
from .._util import final
Expand All @@ -86,6 +88,37 @@
from ._run import Task


GLOBAL_PARKING_LOT_BREAKER: dict[Task, list[ParkingLot]] = {}


def add_parking_lot_breaker(task: Task, lot: ParkingLot) -> None:
"""Register a task as a breaker for a lot. See :meth:`ParkingLot.break_lot`.
raises:
trio.BrokenResourceError: if the task has already exited.
"""
if inspect.getcoroutinestate(task.coro) == inspect.CORO_CLOSED:
raise _core._exceptions.BrokenResourceError(
"Attempted to add already exited task as lot breaker.",
)
if task not in GLOBAL_PARKING_LOT_BREAKER:
GLOBAL_PARKING_LOT_BREAKER[task] = [lot]
else:
GLOBAL_PARKING_LOT_BREAKER[task].append(lot)


def remove_parking_lot_breaker(task: Task, lot: ParkingLot) -> None:
"""Deregister a task as a breaker for a lot. See :meth:`ParkingLot.break_lot`"""
try:
GLOBAL_PARKING_LOT_BREAKER[task].remove(lot)
except (KeyError, ValueError):
raise RuntimeError(
"Attempted to remove task as breaker for a lot it is not registered for",
) from None
if not GLOBAL_PARKING_LOT_BREAKER[task]:
del GLOBAL_PARKING_LOT_BREAKER[task]


@attrs.frozen
class ParkingLotStatistics:
"""An object containing debugging information for a ParkingLot.
Expand Down Expand Up @@ -118,6 +151,7 @@ class ParkingLot:
# {task: None}, we just want a deque where we can quickly delete random
# items
_parked: OrderedDict[Task, None] = attrs.field(factory=OrderedDict, init=False)
broken_by: list[Task] = attrs.field(factory=list, init=False)

def __len__(self) -> int:
"""Returns the number of parked tasks."""
Expand All @@ -136,7 +170,15 @@ async def park(self) -> None:
"""Park the current task until woken by a call to :meth:`unpark` or
:meth:`unpark_all`.
Raises:
BrokenResourceError: if attempting to park in a broken lot, or the lot
breaks before we get to unpark.
"""
if self.broken_by:
raise _core.BrokenResourceError(
f"Attempted to park in parking lot broken by {self.broken_by}",
)
task = _core.current_task()
self._parked[task] = None
task.custom_sleep_data = self
Expand Down Expand Up @@ -234,6 +276,35 @@ def repark_all(self, new_lot: ParkingLot) -> None:
"""
return self.repark(new_lot, count=len(self))

def break_lot(self, task: Task | None = None) -> None:
"""Break this lot, with ``task`` noted as the task that broke it.
This causes all parked tasks to raise an error, and any
future tasks attempting to park to error. Unpark & repark become no-ops as the
parking lot is empty.
The error raised contains a reference to the task sent as a parameter. The task
is also saved in the parking lot in the ``broken_by`` attribute.
"""
if task is None:
task = _core.current_task()

# if lot is already broken, just mark this as another breaker and return
if self.broken_by:
self.broken_by.append(task)
return

self.broken_by.append(task)

for parked_task in self._parked:
_core.reschedule(
parked_task,
outcome.Error(
_core.BrokenResourceError(f"Parking lot broken by {task}"),
),
)
self._parked.clear()

def statistics(self) -> ParkingLotStatistics:
"""Return an object containing debugging information.
Expand Down
7 changes: 7 additions & 0 deletions src/trio/_core/_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from ._exceptions import Cancelled, RunFinishedError, TrioInternalError
from ._instrumentation import Instruments
from ._ki import LOCALS_KEY_KI_PROTECTION_ENABLED, KIManager, enable_ki_protection
from ._parking_lot import GLOBAL_PARKING_LOT_BREAKER
from ._thread_cache import start_thread_soon
from ._traps import (
Abort,
Expand Down Expand Up @@ -1896,6 +1897,12 @@ async def python_wrapper(orig_coro: Awaitable[RetT]) -> RetT:
return task

def task_exited(self, task: Task, outcome: Outcome[Any]) -> None:
# break parking lots associated with the exiting task
if task in GLOBAL_PARKING_LOT_BREAKER:
for lot in GLOBAL_PARKING_LOT_BREAKER[task]:
lot.break_lot(task)
del GLOBAL_PARKING_LOT_BREAKER[task]

if (
task._cancel_status is not None
and task._cancel_status.abandoned_by_misnesting
Expand Down
167 changes: 167 additions & 0 deletions src/trio/_core/_tests/test_parking_lot.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
from __future__ import annotations

import re
from typing import TypeVar

import pytest

import trio
from trio.lowlevel import (
add_parking_lot_breaker,
current_task,
remove_parking_lot_breaker,
)
from trio.testing import Matcher, RaisesGroup

from ... import _core
from ...testing import wait_all_tasks_blocked
from .._parking_lot import ParkingLot
Expand Down Expand Up @@ -215,3 +224,161 @@ async def test_parking_lot_repark_with_count() -> None:
"wake 2",
]
lot1.unpark_all()


async def dummy_task(
task_status: _core.TaskStatus[_core.Task] = trio.TASK_STATUS_IGNORED,
) -> None:
task_status.started(_core.current_task())
await trio.sleep_forever()


async def test_parking_lot_breaker_basic() -> None:
"""Test basic functionality for breaking lots."""
lot = ParkingLot()
task = current_task()

# defaults to current task
lot.break_lot()
assert lot.broken_by == [task]

# breaking the lot again with the same task appends another copy in `broken_by`
lot.break_lot()
assert lot.broken_by == [task, task]

# trying to park in broken lot errors
broken_by_str = re.escape(str([task, task]))
with pytest.raises(
_core.BrokenResourceError,
match=f"^Attempted to park in parking lot broken by {broken_by_str}$",
):
await lot.park()


async def test_parking_lot_break_parking_tasks() -> None:
"""Checks that tasks currently waiting to park raise an error when the breaker exits."""

async def bad_parker(lot: ParkingLot, scope: _core.CancelScope) -> None:
add_parking_lot_breaker(current_task(), lot)
with scope:
await trio.sleep_forever()

lot = ParkingLot()
cs = _core.CancelScope()

# check that parked task errors
with RaisesGroup(
Matcher(_core.BrokenResourceError, match="^Parking lot broken by"),
):
async with _core.open_nursery() as nursery:
nursery.start_soon(bad_parker, lot, cs)
await wait_all_tasks_blocked()

nursery.start_soon(lot.park)
await wait_all_tasks_blocked()

cs.cancel()


async def test_parking_lot_breaker_registration() -> None:
lot = ParkingLot()
task = current_task()

with pytest.raises(
RuntimeError,
match="Attempted to remove task as breaker for a lot it is not registered for",
):
remove_parking_lot_breaker(task, lot)

# check that a task can be registered as breaker for the same lot multiple times
add_parking_lot_breaker(task, lot)
add_parking_lot_breaker(task, lot)
remove_parking_lot_breaker(task, lot)
remove_parking_lot_breaker(task, lot)

with pytest.raises(
RuntimeError,
match="Attempted to remove task as breaker for a lot it is not registered for",
):
remove_parking_lot_breaker(task, lot)

# registering a task as breaker on an already broken lot is fine
lot.break_lot()
child_task = None
async with trio.open_nursery() as nursery:
child_task = await nursery.start(dummy_task)
add_parking_lot_breaker(child_task, lot)
nursery.cancel_scope.cancel()
assert lot.broken_by == [task, child_task]

# manually breaking a lot with an already exited task is fine
lot = ParkingLot()
lot.break_lot(child_task)
assert lot.broken_by == [child_task]


async def test_parking_lot_breaker_rebreak() -> None:
lot = ParkingLot()
task = current_task()
lot.break_lot()

# breaking an already broken lot with a different task is allowed
# The nursery is only to create a task we can pass to lot.break_lot
async with trio.open_nursery() as nursery:
child_task = await nursery.start(dummy_task)
lot.break_lot(child_task)
nursery.cancel_scope.cancel()

assert lot.broken_by == [task, child_task]


async def test_parking_lot_multiple_breakers_exit() -> None:
# register multiple tasks as lot breakers, then have them all exit
lot = ParkingLot()
async with trio.open_nursery() as nursery:
child_task1 = await nursery.start(dummy_task)
child_task2 = await nursery.start(dummy_task)
child_task3 = await nursery.start(dummy_task)
add_parking_lot_breaker(child_task1, lot)
add_parking_lot_breaker(child_task2, lot)
add_parking_lot_breaker(child_task3, lot)
nursery.cancel_scope.cancel()

# I think the order is guaranteed currently, but doesn't hurt to be safe.
assert set(lot.broken_by) == {child_task1, child_task2, child_task3}


async def test_parking_lot_breaker_register_exited_task() -> None:
lot = ParkingLot()
child_task = None
async with trio.open_nursery() as nursery:
child_task = await nursery.start(dummy_task)
nursery.cancel_scope.cancel()
# trying to register an exited task as lot breaker errors
with pytest.raises(
trio.BrokenResourceError,
match="^Attempted to add already exited task as lot breaker.$",
):
add_parking_lot_breaker(child_task, lot)


async def test_parking_lot_break_itself() -> None:
"""Break a parking lot, where the breakee is parked.
Doing this is weird, but should probably be supported.
"""

async def return_me_and_park(
lot: ParkingLot,
*,
task_status: _core.TaskStatus[_core.Task] = trio.TASK_STATUS_IGNORED,
) -> None:
task_status.started(_core.current_task())
await lot.park()

lot = ParkingLot()
with RaisesGroup(
Matcher(_core.BrokenResourceError, match="^Parking lot broken by"),
):
async with _core.open_nursery() as nursery:
child_task = await nursery.start(return_me_and_park, lot)
lot.break_lot(child_task)
Loading

0 comments on commit 2a66a0d

Please sign in to comment.