Skip to content

Commit ad8f302

Browse files
codopeisrabbani
andauthored
[core] Use graceful shutdown path when actor OUT_OF_SCOPE (del actor) (#57090)
When actors terminate gracefully, Ray calls the actor's `__ray_shutdown__()` method if defined, allowing for cleanup of resources. But, this is not invoked in case actor goes out of scope due to `del actor`. ### Why `del actor` doesn't invoke `__ray_shutdown__` Traced through the entire code path, and here's what happens: Flow when `del actor` is called: 1. **Python side**: `ActorHandle.__del__()` -> `worker.core_worker.remove_actor_handle_reference(actor_id)` https://github.com/ray-project/ray/blob/3b1de771d5bb0e5289c4f13e9819bc3e8a0ad99e/python/ray/actor.py#L2040 2. **C++ ref counting**: `CoreWorker::RemoveActorHandleReference()` -> `reference_counter_->RemoveLocalReference()` - When ref count reaches 0, triggers `OnObjectOutOfScopeOrFreed` callback https://github.com/ray-project/ray/blob/3b1de771d5bb0e5289c4f13e9819bc3e8a0ad99e/src/ray/core_worker/core_worker.cc#L2503-L2506 3. **Actor manager callback**: `MarkActorKilledOrOutOfScope()` -> `AsyncReportActorOutOfScope()` to GCS https://github.com/ray-project/ray/blob/3b1de771d5bb0e5289c4f13e9819bc3e8a0ad99e/src/ray/core_worker/actor_manager.cc#L180-L183 https://github.com/ray-project/ray/blob/3b1de771d5bb0e5289c4f13e9819bc3e8a0ad99e/src/ray/core_worker/task_submission/actor_task_submitter.cc#L44-L51 4. **GCS receives notification**: `HandleReportActorOutOfScope()` - **THE PROBLEM IS HERE** ([line 279 in `src/ray/gcs/gcs_actor_manager.cc`](https://github.com/ray-project/ray/blob/3b1de771d5bb0e5289c4f13e9819bc3e8a0ad99e/src/ray/gcs/gcs_actor_manager.cc#L279)): ```cpp DestroyActor(actor_id, GenActorOutOfScopeCause(actor), /*force_kill=*/true, // <-- HARDCODED TO TRUE! [reply, send_reply_callback]() { ``` 5. **Actor worker receives kill signal**: `HandleKillActor()` in [`src/ray/core_worker/core_worker.cc`](https://github.com/ray-project/ray/blob/3b1de771d5bb0e5289c4f13e9819bc3e8a0ad99e/src/ray/core_worker/core_worker.cc#L3970) ```cpp if (request.force_kill()) { // This is TRUE for OUT_OF_SCOPE ForceExit(...) // Skips __ray_shutdown__ } else { Exit(...) // Would call __ray_shutdown__ } ``` 6. **ForceExit path**: Bypasses graceful shutdown -> No `__ray_shutdown__` callback invoked. This PR simply changes the GCS to use graceful shutdown for OUT_OF_SCOPE actors. Also, updated the docs. --------- Signed-off-by: Sagar Sumit <[email protected]> Co-authored-by: Ibrahim Rabbani <[email protected]>
1 parent 15393ed commit ad8f302

File tree

7 files changed

+333
-19
lines changed

7 files changed

+333
-19
lines changed

doc/source/ray-core/actors/terminating-actors.rst

Lines changed: 61 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@ Terminating Actors
33

44
Actor processes will be terminated automatically when all copies of the
55
actor handle have gone out of scope in Python, or if the original creator
6-
process dies.
6+
process dies. When actors terminate gracefully, Ray calls the actor's
7+
``__ray_shutdown__()`` method if defined, allowing for cleanup of resources
8+
(see :ref:`actor-cleanup` for details).
79

810
Note that automatic termination of actors is not yet supported in Java or C++.
911

@@ -33,9 +35,8 @@ manually destroyed.
3335
actor_handle = Actor.remote()
3436

3537
ray.kill(actor_handle)
36-
# This will not go through the normal Python sys.exit
37-
# teardown logic, so any exit handlers installed in
38-
# the actor using ``atexit`` will not be called.
38+
# Force kill: the actor exits immediately without cleanup.
39+
# This will NOT call __ray_shutdown__() or atexit handlers.
3940

4041

4142
.. tab-item:: Java
@@ -191,3 +192,59 @@ You could see the actor is dead as a result of the user's `exit_actor()` call:
191192
is_detached: false
192193
placement_group_id: null
193194
repr_name: ''
195+
196+
197+
.. _actor-cleanup:
198+
199+
Actor cleanup with `__ray_shutdown__`
200+
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
201+
202+
When an actor terminates gracefully, Ray calls the ``__ray_shutdown__()`` method
203+
if it exists, allowing cleanup of resources like database connections or file handles.
204+
205+
.. tab-set::
206+
207+
.. tab-item:: Python
208+
209+
.. testcode::
210+
211+
import ray
212+
import tempfile
213+
import os
214+
215+
@ray.remote
216+
class FileProcessorActor:
217+
def __init__(self):
218+
self.temp_file = tempfile.NamedTemporaryFile(delete=False)
219+
self.temp_file.write(b"processing data")
220+
self.temp_file.flush()
221+
222+
def __ray_shutdown__(self):
223+
# Clean up temporary file
224+
if hasattr(self, 'temp_file'):
225+
self.temp_file.close()
226+
os.unlink(self.temp_file.name)
227+
228+
def process(self):
229+
return "done"
230+
231+
actor = FileProcessorActor.remote()
232+
ray.get(actor.process.remote())
233+
del actor # __ray_shutdown__() is called automatically
234+
235+
When ``__ray_shutdown__()`` is called:
236+
237+
- **Automatic termination**: When all actor handles go out of scope (``del actor`` or natural scope exit)
238+
- **Manual graceful termination**: When you call ``actor.__ray_terminate__.remote()``
239+
240+
When ``__ray_shutdown__()`` is **NOT** called:
241+
242+
- **Force kill**: When you use ``ray.kill(actor)`` - the actor is killed immediately without cleanup.
243+
- **Unexpected termination**: When the actor process crashes or exits unexpectedly (such as a segfault or being killed by the OOM killer).
244+
245+
**Important notes:**
246+
247+
- ``__ray_shutdown__()`` runs after all actor tasks complete.
248+
- By default, Ray waits 30 seconds for the graceful shutdown procedure (including ``__ray_shutdown__()``) to complete. If the actor doesn't exit within this timeout, it's force killed. Configure this with ``ray.init(_system_config={"actor_graceful_shutdown_timeout_ms": 60000})``.
249+
- Exceptions in ``__ray_shutdown__()`` are caught and logged but don't prevent actor termination.
250+
- ``__ray_shutdown__()`` must be a synchronous method, including for async actors.

python/ray/tests/test_actor_failures.py

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,14 @@ def ray_init_with_task_retry_delay():
3131
ray.shutdown()
3232

3333

34+
@pytest.fixture
35+
def ray_init_with_actor_graceful_shutdown_timeout():
36+
ray.shutdown()
37+
address = ray.init(_system_config={"actor_graceful_shutdown_timeout_ms": 1000})
38+
yield address
39+
ray.shutdown()
40+
41+
3442
@pytest.fixture
3543
def tempfile_factory() -> Generator[Callable[[], str], None, None]:
3644
"""Yields a factory function to generate tempfiles that will be deleted after the test run."""
@@ -1383,5 +1391,171 @@ def sleep_forever(self):
13831391
wait_for_condition(lambda: not check_file_exists_and_not_empty(shutdown_file))
13841392

13851393

1394+
def test_actor_ray_shutdown_called_on_del(ray_start_regular_shared, tempfile_factory):
1395+
"""Test that __ray_shutdown__ is called when actor goes out of scope via del."""
1396+
shutdown_file = tempfile_factory()
1397+
1398+
@ray.remote
1399+
class DelTestActor:
1400+
def __ray_shutdown__(self):
1401+
with open(shutdown_file, "w") as f:
1402+
f.write("shutdown_called_on_del")
1403+
f.flush()
1404+
1405+
def ready(self):
1406+
return "ready"
1407+
1408+
actor = DelTestActor.remote()
1409+
ray.get(actor.ready.remote())
1410+
del actor
1411+
1412+
wait_for_condition(
1413+
lambda: check_file_exists_and_not_empty(shutdown_file), timeout=10
1414+
)
1415+
1416+
with open(shutdown_file, "r") as f:
1417+
assert f.read() == "shutdown_called_on_del", (
1418+
"Expected __ray_shutdown__ to be called within actor_graceful_shutdown_timeout_ms "
1419+
"after actor handle was deleted with del"
1420+
)
1421+
1422+
1423+
def test_actor_del_with_atexit(ray_start_regular_shared, tempfile_factory):
1424+
"""Test that both __ray_shutdown__ and atexit handlers run on del actor."""
1425+
shutdown_file = tempfile_factory()
1426+
atexit_file = tempfile_factory()
1427+
order_file = tempfile_factory()
1428+
1429+
@ray.remote
1430+
class BothHandlersActor:
1431+
def __init__(self):
1432+
atexit.register(self.cleanup)
1433+
1434+
def __ray_shutdown__(self):
1435+
with open(shutdown_file, "w") as f:
1436+
f.write("ray_shutdown_del")
1437+
f.flush()
1438+
with open(order_file, "a") as f:
1439+
f.write(f"shutdown:{time.time()}\n")
1440+
f.flush()
1441+
1442+
def cleanup(self):
1443+
with open(atexit_file, "w") as f:
1444+
f.write("atexit_del")
1445+
f.flush()
1446+
1447+
with open(order_file, "a") as f:
1448+
f.write(f"atexit:{time.time()}\n")
1449+
f.flush()
1450+
1451+
def ready(self):
1452+
return "ready"
1453+
1454+
actor = BothHandlersActor.remote()
1455+
ray.get(actor.ready.remote())
1456+
del actor
1457+
1458+
wait_for_condition(
1459+
lambda: check_file_exists_and_not_empty(shutdown_file), timeout=10
1460+
)
1461+
with open(shutdown_file, "r") as f:
1462+
assert (
1463+
f.read() == "ray_shutdown_del"
1464+
), "Expected __ray_shutdown__ to be called when actor deleted"
1465+
1466+
wait_for_condition(lambda: check_file_exists_and_not_empty(atexit_file), timeout=10)
1467+
with open(atexit_file, "r") as f:
1468+
assert f.read() == "atexit_del", "Expected atexit handler to be called"
1469+
1470+
# Verify execution order: __ray_shutdown__ should run before atexit
1471+
wait_for_condition(lambda: check_file_exists_and_not_empty(order_file), timeout=10)
1472+
with open(order_file, "r") as f:
1473+
order = f.read()
1474+
lines = order.strip().split("\n")
1475+
assert len(lines) == 2, f"Expected 2 entries, got: {lines}"
1476+
assert lines[0].startswith(
1477+
"shutdown:"
1478+
), f"Expected __ray_shutdown__ first, got order: {lines}"
1479+
assert lines[1].startswith(
1480+
"atexit:"
1481+
), f"Expected atexit second, got order: {lines}"
1482+
1483+
1484+
def test_actor_ray_shutdown_called_on_scope_exit(
1485+
ray_start_regular_shared, tempfile_factory
1486+
):
1487+
"""Test that __ray_shutdown__ is called when actor goes out of scope."""
1488+
shutdown_file = tempfile_factory()
1489+
1490+
@ray.remote
1491+
class ScopeTestActor:
1492+
def __ray_shutdown__(self):
1493+
with open(shutdown_file, "w") as f:
1494+
f.write("shutdown_called_on_scope_exit")
1495+
f.flush()
1496+
1497+
def ready(self):
1498+
return "ready"
1499+
1500+
def create_and_use_actor():
1501+
actor = ScopeTestActor.remote()
1502+
ray.get(actor.ready.remote())
1503+
# Actor goes out of scope at end of function
1504+
1505+
create_and_use_actor()
1506+
1507+
wait_for_condition(
1508+
lambda: check_file_exists_and_not_empty(shutdown_file), timeout=10
1509+
)
1510+
1511+
with open(shutdown_file, "r") as f:
1512+
assert f.read() == "shutdown_called_on_scope_exit"
1513+
1514+
1515+
def test_actor_graceful_shutdown_timeout_fallback(
1516+
ray_init_with_actor_graceful_shutdown_timeout, tempfile_factory
1517+
):
1518+
"""Test that actor is force killed if __ray_shutdown__ exceeds timeout."""
1519+
shutdown_started_file = tempfile_factory()
1520+
shutdown_completed_file = tempfile_factory()
1521+
1522+
@ray.remote
1523+
class HangingShutdownActor:
1524+
def __ray_shutdown__(self):
1525+
with open(shutdown_started_file, "w") as f:
1526+
f.write("shutdown_started")
1527+
f.flush()
1528+
1529+
# Hang indefinitely - simulating buggy cleanup code
1530+
time.sleep(5)
1531+
1532+
# This should never be reached due to force kill fallback
1533+
with open(shutdown_completed_file, "w") as f:
1534+
f.write("should_not_reach")
1535+
f.flush()
1536+
1537+
def ready(self):
1538+
return "ready"
1539+
1540+
actor = HangingShutdownActor.remote()
1541+
ray.get(actor.ready.remote())
1542+
del actor
1543+
1544+
# Verify that shutdown started
1545+
wait_for_condition(
1546+
lambda: check_file_exists_and_not_empty(shutdown_started_file), timeout=5
1547+
)
1548+
with open(shutdown_started_file, "r") as f:
1549+
assert (
1550+
f.read() == "shutdown_started"
1551+
), "Expected __ray_shutdown__ to start execution"
1552+
1553+
# Verify that shutdown did NOT complete (force killed before completion)
1554+
assert not check_file_exists_and_not_empty(shutdown_completed_file), (
1555+
"Expected actor to be force-killed before __ray_shutdown__ completed, "
1556+
"but completion file exists. This means force kill fallback did not work."
1557+
)
1558+
1559+
13861560
if __name__ == "__main__":
13871561
sys.exit(pytest.main(["-sv", __file__]))

src/ray/common/ray_config_def.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,12 @@ RAY_CONFIG(int64_t, raylet_fetch_timeout_milliseconds, 1000)
271271
/// the worker SIGKILL.
272272
RAY_CONFIG(int64_t, kill_worker_timeout_milliseconds, 5000)
273273

274+
/// Timeout for graceful actor shutdown (e.g. when actor goes out of scope).
275+
/// If an actor does not gracefully shut down within this timeout, it will be force
276+
/// killed. Set to -1 for infinite timeout to prevent the actor from being force killed
277+
/// during graceful shutdown.
278+
RAY_CONFIG(int64_t, actor_graceful_shutdown_timeout_ms, 30000)
279+
274280
/// The duration that we wait after the worker is launched before the
275281
/// starting_worker_timeout_callback() is called.
276282
RAY_CONFIG(int64_t, worker_register_timeout_seconds, 60)

0 commit comments

Comments
 (0)