Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
bc2d24f
[core] Use graceful shutdown path when actor OUT_OF_SCOPE (`del actor`)
codope Oct 1, 2025
efbe38f
add timeout for graceful actor cleanup with fallback to kill
codope Oct 3, 2025
7b827dc
keep a map of timer by actor, handle actor restart and cancel timer w…
codope Oct 3, 2025
d3e7aca
defer created_actors_ cleanup for grceful shutdown; fix doc lint
codope Oct 3, 2025
fb6b05e
fixed workerID verification and infinite timeout case
codope Oct 3, 2025
35802e1
skip doctest for code snippet
codope Oct 3, 2025
ef7f98e
address test and other minor comments
codope Oct 6, 2025
e454b70
fix test fixture
codope Oct 7, 2025
ef861fc
created_actors_ cleanup eagerly and simplify timer callback
codope Oct 7, 2025
0bfa458
fix new test fix
codope Oct 8, 2025
f56c9d5
Update comment in ray_config_def.h
codope Oct 16, 2025
b320379
Merge remote-tracking branch 'origin/master' into ray-shutdown-del-actor
codope Oct 16, 2025
c33b2f8
Merge branch 'master' into ray-shutdown-del-actor
codope Oct 17, 2025
a45cbaa
Merge remote-tracking branch 'origin/master' into ray-shutdown-del-actor
codope Oct 29, 2025
ac164aa
address timer creation, move seq, erase, shutdownflag
codope Oct 30, 2025
c3d20b4
Merge remote-tracking branch 'origin/master' into ray-shutdown-del-actor
codope Nov 4, 2025
a3c6f3c
resolve conflict
codope Nov 4, 2025
5454dc5
Merge remote-tracking branch 'origin/master' into ray-shutdown-del-actor
codope Nov 7, 2025
da1af4a
clarify comments in actor manager wrt timer
codope Nov 10, 2025
362353a
clarify doc notes and add testable code snippet
codope Nov 10, 2025
62f67e0
Merge remote-tracking branch 'origin/master' into ray-shutdown-del-actor
codope Nov 10, 2025
0803797
use weak_ptr` for gcs actor manager and remove is_shutdown_ flag
codope Nov 10, 2025
3bf9e04
Merge remote-tracking branch 'origin/master' into ray-shutdown-del-actor
codope Nov 12, 2025
27cc79d
revert comments
codope Nov 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 61 additions & 4 deletions doc/source/ray-core/actors/terminating-actors.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ Terminating Actors

Actor processes will be terminated automatically when all copies of the
actor handle have gone out of scope in Python, or if the original creator
process dies.
process dies. When actors terminate gracefully, Ray calls the actor's
``__ray_shutdown__()`` method if defined, allowing for cleanup of resources
(see :ref:`actor-cleanup` for details).

Note that automatic termination of actors is not yet supported in Java or C++.

Expand Down Expand Up @@ -33,9 +35,8 @@ manually destroyed.
actor_handle = Actor.remote()

ray.kill(actor_handle)
# This will not go through the normal Python sys.exit
# teardown logic, so any exit handlers installed in
# the actor using ``atexit`` will not be called.
# Force kill: the actor exits immediately without cleanup.
# This will NOT call __ray_shutdown__() or atexit handlers.


.. tab-item:: Java
Expand Down Expand Up @@ -191,3 +192,59 @@ You could see the actor is dead as a result of the user's `exit_actor()` call:
is_detached: false
placement_group_id: null
repr_name: ''


.. _actor-cleanup:

Actor cleanup with `__ray_shutdown__`
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

When an actor terminates gracefully, Ray calls the ``__ray_shutdown__()`` method
if it exists, allowing cleanup of resources like database connections or file handles.

.. tab-set::

.. tab-item:: Python

.. testcode::

import ray
import tempfile
import os

@ray.remote
class FileProcessorActor:
def __init__(self):
self.temp_file = tempfile.NamedTemporaryFile(delete=False)
self.temp_file.write(b"processing data")
self.temp_file.flush()

def __ray_shutdown__(self):
# Clean up temporary file
if hasattr(self, 'temp_file'):
self.temp_file.close()
os.unlink(self.temp_file.name)

def process(self):
return "done"

actor = FileProcessorActor.remote()
ray.get(actor.process.remote())
del actor # __ray_shutdown__() is called automatically

When ``__ray_shutdown__()`` is called:

- **Automatic termination**: When all actor handles go out of scope (``del actor`` or natural scope exit)
- **Manual graceful termination**: When you call ``actor.__ray_terminate__.remote()``

When ``__ray_shutdown__()`` is **NOT** called:

- **Force kill**: When you use ``ray.kill(actor)`` - the actor is killed immediately without cleanup.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future we should introduce a graceful flag to this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree; will create a followup

- **Unexpected termination**: When the actor process crashes or exits unexpectedly (such as a segfault or being killed by the OOM killer).

**Important notes:**

- ``__ray_shutdown__()`` runs after all actor tasks complete.
- By default, Ray waits 30 seconds for the graceful shutdown procedure (including ``__ray_shutdown__()``) to complete. If the actor doesn't exit within this timeout, it's force killed. Configure this with ``ray.init(_system_config={"actor_graceful_shutdown_timeout_ms": 60000})``.
- Exceptions in ``__ray_shutdown__()`` are caught and logged but don't prevent actor termination.
- ``__ray_shutdown__()`` must be a synchronous method, including for async actors.
174 changes: 174 additions & 0 deletions python/ray/tests/test_actor_failures.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ def ray_init_with_task_retry_delay():
ray.shutdown()


@pytest.fixture
def ray_init_with_actor_graceful_shutdown_timeout():
ray.shutdown()
address = ray.init(_system_config={"actor_graceful_shutdown_timeout_ms": 1000})
yield address
ray.shutdown()


@pytest.fixture
def tempfile_factory() -> Generator[Callable[[], str], None, None]:
"""Yields a factory function to generate tempfiles that will be deleted after the test run."""
Expand Down Expand Up @@ -1383,5 +1391,171 @@ def sleep_forever(self):
wait_for_condition(lambda: not check_file_exists_and_not_empty(shutdown_file))


def test_actor_ray_shutdown_called_on_del(ray_start_regular_shared, tempfile_factory):
"""Test that __ray_shutdown__ is called when actor goes out of scope via del."""
shutdown_file = tempfile_factory()

@ray.remote
class DelTestActor:
def __ray_shutdown__(self):
with open(shutdown_file, "w") as f:
f.write("shutdown_called_on_del")
f.flush()

def ready(self):
return "ready"

actor = DelTestActor.remote()
ray.get(actor.ready.remote())
del actor

wait_for_condition(
lambda: check_file_exists_and_not_empty(shutdown_file), timeout=10
)

with open(shutdown_file, "r") as f:
assert f.read() == "shutdown_called_on_del", (
"Expected __ray_shutdown__ to be called within actor_graceful_shutdown_timeout_ms "
"after actor handle was deleted with del"
)


def test_actor_del_with_atexit(ray_start_regular_shared, tempfile_factory):
"""Test that both __ray_shutdown__ and atexit handlers run on del actor."""
shutdown_file = tempfile_factory()
atexit_file = tempfile_factory()
order_file = tempfile_factory()

@ray.remote
class BothHandlersActor:
def __init__(self):
atexit.register(self.cleanup)

def __ray_shutdown__(self):
with open(shutdown_file, "w") as f:
f.write("ray_shutdown_del")
f.flush()
with open(order_file, "a") as f:
f.write(f"shutdown:{time.time()}\n")
f.flush()

def cleanup(self):
with open(atexit_file, "w") as f:
f.write("atexit_del")
f.flush()

with open(order_file, "a") as f:
f.write(f"atexit:{time.time()}\n")
f.flush()

def ready(self):
return "ready"

actor = BothHandlersActor.remote()
ray.get(actor.ready.remote())
del actor

wait_for_condition(
lambda: check_file_exists_and_not_empty(shutdown_file), timeout=10
)
with open(shutdown_file, "r") as f:
assert (
f.read() == "ray_shutdown_del"
), "Expected __ray_shutdown__ to be called when actor deleted"

wait_for_condition(lambda: check_file_exists_and_not_empty(atexit_file), timeout=10)
with open(atexit_file, "r") as f:
assert f.read() == "atexit_del", "Expected atexit handler to be called"

# Verify execution order: __ray_shutdown__ should run before atexit
wait_for_condition(lambda: check_file_exists_and_not_empty(order_file), timeout=10)
with open(order_file, "r") as f:
order = f.read()
lines = order.strip().split("\n")
assert len(lines) == 2, f"Expected 2 entries, got: {lines}"
assert lines[0].startswith(
"shutdown:"
), f"Expected __ray_shutdown__ first, got order: {lines}"
assert lines[1].startswith(
"atexit:"
), f"Expected atexit second, got order: {lines}"


def test_actor_ray_shutdown_called_on_scope_exit(
ray_start_regular_shared, tempfile_factory
):
"""Test that __ray_shutdown__ is called when actor goes out of scope."""
shutdown_file = tempfile_factory()

@ray.remote
class ScopeTestActor:
def __ray_shutdown__(self):
with open(shutdown_file, "w") as f:
f.write("shutdown_called_on_scope_exit")
f.flush()

def ready(self):
return "ready"

def create_and_use_actor():
actor = ScopeTestActor.remote()
ray.get(actor.ready.remote())
# Actor goes out of scope at end of function

create_and_use_actor()

wait_for_condition(
lambda: check_file_exists_and_not_empty(shutdown_file), timeout=10
)

with open(shutdown_file, "r") as f:
assert f.read() == "shutdown_called_on_scope_exit"


def test_actor_graceful_shutdown_timeout_fallback(
ray_init_with_actor_graceful_shutdown_timeout, tempfile_factory
):
"""Test that actor is force killed if __ray_shutdown__ exceeds timeout."""
shutdown_started_file = tempfile_factory()
shutdown_completed_file = tempfile_factory()

@ray.remote
class HangingShutdownActor:
def __ray_shutdown__(self):
with open(shutdown_started_file, "w") as f:
f.write("shutdown_started")
f.flush()

# Hang indefinitely - simulating buggy cleanup code
time.sleep(5)

# This should never be reached due to force kill fallback
with open(shutdown_completed_file, "w") as f:
f.write("should_not_reach")
f.flush()

def ready(self):
return "ready"

actor = HangingShutdownActor.remote()
ray.get(actor.ready.remote())
del actor

# Verify that shutdown started
wait_for_condition(
lambda: check_file_exists_and_not_empty(shutdown_started_file), timeout=5
)
with open(shutdown_started_file, "r") as f:
assert (
f.read() == "shutdown_started"
), "Expected __ray_shutdown__ to start execution"

# Verify that shutdown did NOT complete (force killed before completion)
assert not check_file_exists_and_not_empty(shutdown_completed_file), (
"Expected actor to be force-killed before __ray_shutdown__ completed, "
"but completion file exists. This means force kill fallback did not work."
)


if __name__ == "__main__":
sys.exit(pytest.main(["-sv", __file__]))
6 changes: 6 additions & 0 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,12 @@ RAY_CONFIG(int64_t, raylet_fetch_timeout_milliseconds, 1000)
/// the worker SIGKILL.
RAY_CONFIG(int64_t, kill_worker_timeout_milliseconds, 5000)

/// Timeout for graceful actor shutdown (e.g. when actor goes out of scope).
/// If an actor does not gracefully shut down within this timeout, it will be force
/// killed. Set to -1 for infinite timeout to prevent the actor from being force killed
/// during graceful shutdown.
RAY_CONFIG(int64_t, actor_graceful_shutdown_timeout_ms, 30000)

/// The duration that we wait after the worker is launched before the
/// starting_worker_timeout_callback() is called.
RAY_CONFIG(int64_t, worker_register_timeout_seconds, 60)
Expand Down
Loading