From 45ff2340ead6c1d46c8179f32e642d336a6e98c7 Mon Sep 17 00:00:00 2001 From: dayshah Date: Mon, 15 Sep 2025 03:05:32 -0700 Subject: [PATCH 01/27] error handling with actor deaths Signed-off-by: dayshah --- python/ray/actor.py | 1 + .../ray/experimental/collective/collective.py | 9 +- .../collective/collective_tensor_transport.py | 2 - .../collective/tensor_transport_manager.py | 64 --------- .../gpu_object_manager/gpu_object_manager.py | 121 +++++++++++++++++- .../gpu_objects/test_gpu_objects_gloo.py | 69 ++++++++++ .../collective_group/nccl_collective_group.py | 2 +- 7 files changed, 195 insertions(+), 73 deletions(-) diff --git a/python/ray/actor.py b/python/ray/actor.py index 07f808c90352..f79fa6e40649 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -1173,6 +1173,7 @@ def _process_option_dict(actor_options, has_tensor_transport_methods): if _filled_options.get("concurrency_groups", None) is None: _filled_options["concurrency_groups"] = {} _filled_options["concurrency_groups"]["_ray_system"] = 1 + _filled_options["concurrency_groups"]["_ray_system_error"] = 1 return _filled_options diff --git a/python/ray/experimental/collective/collective.py b/python/ray/experimental/collective/collective.py index db60fd500dab..da198dbde7a0 100644 --- a/python/ray/experimental/collective/collective.py +++ b/python/ray/experimental/collective/collective.py @@ -205,10 +205,15 @@ def destroy_collective_group(group_or_name: Union[CommunicatorHandle, str]): group = manager.remove_remote_communicator(name) if group is not None: destroy_tasks = [ - actor.__ray_call__.remote(_do_destroy_collective_group, name) + actor.__ray_call__.options(concurrency_group="_ray_system_error").remote( + _do_destroy_collective_group, name + ) for actor in group.actors ] - ray.get(destroy_tasks) + try: + ray.get(destroy_tasks) + except ray.exceptions.ActorDiedError: + pass else: raise ValueError(f"No group with name {name} found.") diff --git a/python/ray/experimental/collective/collective_tensor_transport.py b/python/ray/experimental/collective/collective_tensor_transport.py index 61996d608d68..499cf55cb1d1 100644 --- a/python/ray/experimental/collective/collective_tensor_transport.py +++ b/python/ray/experimental/collective/collective_tensor_transport.py @@ -168,8 +168,6 @@ def send_multiple_tensors( for tensor in tensors: if tensor.device.type != device.type: - # TODO(swang): Right now there is no way to catch this error - # and the receiving Ray task will hang. raise ValueError( f"tensor device {tensor.device} does not match device {device}" ) diff --git a/python/ray/experimental/collective/tensor_transport_manager.py b/python/ray/experimental/collective/tensor_transport_manager.py index c86dc4554f17..47b1e805161e 100644 --- a/python/ray/experimental/collective/tensor_transport_manager.py +++ b/python/ray/experimental/collective/tensor_transport_manager.py @@ -96,70 +96,6 @@ def get_communicator_metadata( CommunicatorMetadata: The communicator metadata. """ - @staticmethod - def send_object( - src_actor: "ray.actor.ActorHandle", - obj_id: str, - tensor_transport_meta: TensorTransportMetadata, - communicator_metadata_ref: CommunicatorMetadata, - ): - """ - Send the GPU object to the destination actor. - - Args: - src_actor: The actor that runs this function. - obj_id: The ID of the GPU object to send. - tensor_transport_meta: The tensor transport metadata for the GPU object. - communicator_metadata_ref: The ObjectRef of communicator metadata for the send/recv operation. - """ - from ray.experimental.gpu_object_manager.gpu_object_store import __ray_send__ - - # Send tensors stored in the `src_actor`'s GPU object store to the - # destination rank `dst_rank`. - # NOTE(swang): We put this task on the background thread to avoid tasks - # executing on the main thread blocking the data transfer. - src_actor.__ray_call__.options(concurrency_group="_ray_system").remote( - __ray_send__, - obj_id, - tensor_transport_meta, - communicator_metadata_ref, - ) - - @staticmethod - def recv_object( - dst_actor: "ray.actor.ActorHandle", - obj_id: str, - tensor_transport_metadata_ref: TensorTransportMetadata, - communicator_metadata_ref: CommunicatorMetadata, - ): - """ - Receive the GPU object from the source actor. - This function receives tensors from the source rank and stores them in the - `dst_actor`'s GPU object store. - - Args: - dst_actor: The actor that runs this function. - obj_id: The ID of the GPU object to receive. - tensor_transport_metadata_ref: The ObjectRef of tensor transport metadata for the GPU object. - communicator_metadata_ref: The ObjectRef of communicator metadata for the send/recv operation. - """ - from ray.experimental.gpu_object_manager.gpu_object_store import __ray_recv__ - - # Receive tensors from the source rank and store them in the - # `dst_actor`'s GPU object store. - # - # NOTE(swang): We put this task on the background thread to avoid tasks - # executing on the main thread blocking the data transfer. Technically, - # this is only needed for the sender task, but we put the receiver task - # on the same background thread to ensure that all communication - # operations are executed in a global order. - dst_actor.__ray_call__.options(concurrency_group="_ray_system").remote( - __ray_recv__, - obj_id, - tensor_transport_metadata_ref, - communicator_metadata_ref, - ) - @staticmethod @abstractmethod def recv_multiple_tensors( diff --git a/python/ray/experimental/gpu_object_manager/gpu_object_manager.py b/python/ray/experimental/gpu_object_manager/gpu_object_manager.py index 73ac927e355e..078771b482a7 100644 --- a/python/ray/experimental/gpu_object_manager/gpu_object_manager.py +++ b/python/ray/experimental/gpu_object_manager/gpu_object_manager.py @@ -1,4 +1,5 @@ import threading +import time import warnings from typing import TYPE_CHECKING, Any, Dict, List, NamedTuple, Optional, Set, Tuple @@ -33,6 +34,16 @@ class GPUObjectMeta(NamedTuple): sent_to_src_actor_and_others_warned: bool +# This is used to periodically check in on the transfer and abort operations in case of failures / timeouts. +class TransportRefsInfo(NamedTuple): + src_actor: "ray.actor.ActorHandle" + dst_actor: "ray.actor.ActorHandle" + send_ref: ObjectRef + recv_ref: ObjectRef + collective_group_name: Optional[str] + timeout: float + + # TODO(swang): Uncomment and add an API docs page and example usage. # @PublicAPI(stability="alpha") def wait_tensor_freed(tensor: "torch.Tensor", timeout: Optional[float] = None): @@ -73,6 +84,12 @@ def __init__(self): # Lock to ensure we only create the GPU object store once. self.gpu_object_store_lock = threading.Lock() + # List of transport refs that the monitor thread needs to start monitoring + self._unmonitored_transport_refs_info: List[TransportRefsInfo] = [] + self._unmonitored_transport_refs_info_lock = threading.Lock() + # Background thread to poll on the transfer operation. + self._monitor_failures_thread = None + @property def gpu_object_store(self) -> "ray.experimental.GPUObjectStore": with self.gpu_object_store_lock: @@ -84,6 +101,57 @@ def gpu_object_store(self) -> "ray.experimental.GPUObjectStore": self._gpu_object_store = GPUObjectStore() return self._gpu_object_store + def _monitor_failures(self): + not_done = [] + done = [] + ref_info_map = {} + i = 0 + while True: + print(f"running monitor iter {i}") + i += 1 + with self._unmonitored_transport_refs_info_lock: + for ref_info in self._unmonitored_transport_refs_info: + not_done.append(ref_info.send_ref) + not_done.append(ref_info.recv_ref) + ref_info_map[ref_info.send_ref.hex()] = ref_info + ref_info_map[ref_info.recv_ref.hex()] = ref_info + self._unmonitored_transport_refs_info = [] + + if len(not_done) > 0: + done, not_done = ray.wait(not_done, num_returns=1, timeout=1) + if len(done) > 0: + try: + ray.get(done[0]) + except Exception: + self._abort_transport(done[0], ref_info_map) + + # wait returns lists in the same order they were passed in, + # so can just check the timeout of the first + while ( + len(not_done) > 0 + and ref_info_map[not_done[0].hex()].timeout < time.time() + ): + self._abort_transport(not_done[0], ref_info_map) + + time.sleep(1) + + def _abort_transport( + self, failed_ref: ObjectRef, ref_info_map: Dict[str, TransportRefsInfo] + ): + from ray.experimental.collective import destroy_collective_group + + if failed_ref.hex() not in ref_info_map: + return + ref_info = ref_info_map.pop(failed_ref.hex()) + ray.kill(ref_info.src_actor) + ray.kill(ref_info.dst_actor) + if ref_info.collective_group_name: + try: + destroy_collective_group(ref_info.collective_group_name) + except ValueError: + # Already destroyed collective group + pass + def is_managed_object(self, obj_id: str) -> bool: """ Check if the GPU object is managed by this process. @@ -244,6 +312,10 @@ def trigger_out_of_band_tensor_transfer( gpu_object_refs.add(arg) if gpu_object_refs: from ray.experimental.collective import get_tensor_transport_manager + from ray.experimental.gpu_object_manager.gpu_object_store import ( + __ray_recv__, + __ray_send__, + ) # Count the number of readers for each GPU object. for obj_ref in gpu_object_refs: @@ -294,20 +366,61 @@ def trigger_out_of_band_tensor_transfer( dst_actor, gpu_object_meta.tensor_transport_backend, ) + + send_ref = None if not tensor_transport_manager.is_one_sided(): - tensor_transport_manager.send_object( - src_actor, + # Send tensors stored in the `src_actor`'s GPU object store to the + # destination rank `dst_rank`. + # NOTE: We put this task on the background thread to avoid tasks + # executing on the main thread blocking the data transfer. + send_ref = src_actor.__ray_call__.options( + concurrency_group="_ray_system" + ).remote( + __ray_send__, obj_id, tensor_transport_meta, communicator_meta, ) - tensor_transport_manager.recv_object( - dst_actor, + + # Receive tensors from the source rank and store them in the + # `dst_actor`'s GPU object store. + # NOTE: Putting this task on the background thread is technically only + # needed for the sender task, but we put the receiver task on the same + # background thread to ensure that all communication operations are + # executed in a global order. + recv_ref = dst_actor.__ray_call__.options( + concurrency_group="_ray_system" + ).remote( + __ray_recv__, obj_id, tensor_transport_meta, communicator_meta, ) + from ray.util.collective.types import CollectiveCommunicatorMetadata + + collective_group_name = ( + communicator_meta.communicator_name + if isinstance(communicator_meta, CollectiveCommunicatorMetadata) + else None + ) + with self._unmonitored_transport_refs_info_lock: + self._unmonitored_transport_refs_info.append( + TransportRefsInfo( + src_actor=src_actor, + dst_actor=dst_actor, + send_ref=send_ref, + recv_ref=recv_ref, + collective_group_name=collective_group_name, + timeout=time.time() + ray_constants.FETCH_FAIL_TIMEOUT_SECONDS, + ) + ) + if self._monitor_failures_thread is None: + self._monitor_failures_thread = threading.Thread( + target=self._monitor_failures, daemon=True + ) + self._monitor_failures_thread.start() + def get_gpu_object( self, object_id: str, diff --git a/python/ray/tests/gpu_objects/test_gpu_objects_gloo.py b/python/ray/tests/gpu_objects/test_gpu_objects_gloo.py index ce723104736a..2c9df6c64484 100644 --- a/python/ray/tests/gpu_objects/test_gpu_objects_gloo.py +++ b/python/ray/tests/gpu_objects/test_gpu_objects_gloo.py @@ -885,5 +885,74 @@ def do_transfer(self, a1, a2): ) +@ray.remote +class ErrorActor: + def clear_gpu_object_store(self): + gpu_object_store = ( + ray._private.worker.global_worker.gpu_object_manager.gpu_object_store + ) + + with gpu_object_store._lock: + assert len(gpu_object_store._gpu_object_store) > 0 + gpu_object_store._gpu_object_store.clear() + + @ray.method(tensor_transport="gloo") + def send(self, tensor): + return tensor + + def recv(self, tensor): + return tensor + + @ray.method(concurrency_group="_ray_system") + def block_background_thread(self): + time.sleep(100) + + +def test_send_fails(ray_start_regular): + actors = [ErrorActor.remote() for _ in range(2)] + create_collective_group(actors, backend="torch_gloo") + + # The gpu object will be gone when we trigger the transfer + # so the send will error out + gpu_obj_ref = actors[0].send.remote(torch.randn((100, 100))) + ray.get(actors[0].clear_gpu_object_store.remote()) + result_ref = actors[1].recv.remote(gpu_obj_ref) + + with pytest.raises(ray.exceptions.ActorDiedError): + ray.get(result_ref) + + +def test_send_actor_dies(ray_start_regular): + actors = [ErrorActor.remote() for _ in range(2)] + create_collective_group(actors, backend="torch_gloo") + + # Do a transfer with the sender's background thread blocked, + # so the send doesn't happen before the actor is killed + gpu_obj_ref = actors[0].send.remote(torch.randn((100, 100))) + actors[0].block_background_thread.remote() + result_ref = actors[1].recv.remote(gpu_obj_ref) + ray.kill(actors[0]) + + with pytest.raises(ray.exceptions.ActorDiedError): + ray.get(result_ref) + + +def test_recv_actor_dies(ray_start_regular): + actors = [ErrorActor.remote() for _ in range(2)] + create_collective_group(actors, backend="torch_gloo") + + # Do a transfer with the receiver's background thread blocked, + # so the recv doesn't happen before the actor is killed + gpu_obj_ref = actors[0].send.remote(torch.randn((100, 100))) + actors[1].block_background_thread.remote() + result_ref = actors[1].recv.remote(gpu_obj_ref) + ray.kill(actors[1]) + + with pytest.raises(ray.exceptions.ActorDiedError): + ray.get(result_ref) + with pytest.raises(ray.exceptions.ActorDiedError): + ray.get(actors[0].send.remote(torch.randn((100, 100)))) + + if __name__ == "__main__": sys.exit(pytest.main(["-sv", __file__])) diff --git a/python/ray/util/collective/collective_group/nccl_collective_group.py b/python/ray/util/collective/collective_group/nccl_collective_group.py index f866b70a9c1e..f6851236a7dd 100644 --- a/python/ray/util/collective/collective_group/nccl_collective_group.py +++ b/python/ray/util/collective/collective_group/nccl_collective_group.py @@ -656,7 +656,7 @@ def _point2point(self, tensors, p2p_fn, peer_rank: int, peer_gpu_idx: int): # We have made sure that self.rank != peer_rank during API check. peer_p2p_rank = 0 if self.rank > peer_rank else 1 for i, tensor in enumerate(tensors): - p2p_fn(tensors[i], comms[i], streams[i], peer_p2p_rank) + p2p_fn(tensor, comms[i], streams[i], peer_p2p_rank) def _flatten_for_scatter_gather(tensor_list, copy=False): From a4aaf51a72cea11646802dfa1c5aec0cd7a303da Mon Sep 17 00:00:00 2001 From: dayshah Date: Mon, 15 Sep 2025 03:53:51 -0700 Subject: [PATCH 02/27] up Signed-off-by: dayshah --- .../gpu_object_manager/gpu_object_manager.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/python/ray/experimental/gpu_object_manager/gpu_object_manager.py b/python/ray/experimental/gpu_object_manager/gpu_object_manager.py index 078771b482a7..3d34086a0095 100644 --- a/python/ray/experimental/gpu_object_manager/gpu_object_manager.py +++ b/python/ray/experimental/gpu_object_manager/gpu_object_manager.py @@ -41,6 +41,7 @@ class TransportRefsInfo(NamedTuple): send_ref: ObjectRef recv_ref: ObjectRef collective_group_name: Optional[str] + backend: str timeout: float @@ -105,10 +106,7 @@ def _monitor_failures(self): not_done = [] done = [] ref_info_map = {} - i = 0 while True: - print(f"running monitor iter {i}") - i += 1 with self._unmonitored_transport_refs_info_lock: for ref_info in self._unmonitored_transport_refs_info: not_done.append(ref_info.send_ref) @@ -139,12 +137,21 @@ def _abort_transport( self, failed_ref: ObjectRef, ref_info_map: Dict[str, TransportRefsInfo] ): from ray.experimental.collective import destroy_collective_group + from ray.util.collective.types import Backend if failed_ref.hex() not in ref_info_map: return ref_info = ref_info_map.pop(failed_ref.hex()) - ray.kill(ref_info.src_actor) - ray.kill(ref_info.dst_actor) + if ref_info.backend == Backend.TORCH_GLOO: + ray.kill(ref_info.src_actor) + ray.kill(ref_info.dst_actor) + elif ref_info.backend == Backend.NCCL: + # TODO(dayshah) + pass + elif ref_info.backend == Backend.NIXL: + # TODO(dayshah) + pass + if ref_info.collective_group_name: try: destroy_collective_group(ref_info.collective_group_name) @@ -412,6 +419,7 @@ def trigger_out_of_band_tensor_transfer( send_ref=send_ref, recv_ref=recv_ref, collective_group_name=collective_group_name, + backend=gpu_object_meta.tensor_transport_backend, timeout=time.time() + ray_constants.FETCH_FAIL_TIMEOUT_SECONDS, ) ) From 3be637be3b7f13d44d3abe3f45fe2fed528781af Mon Sep 17 00:00:00 2001 From: dayshah Date: Mon, 15 Sep 2025 21:11:04 -0700 Subject: [PATCH 03/27] Fixes + nccl+nixl abort Signed-off-by: dayshah --- .../ray/experimental/collective/collective.py | 2 +- .../collective/collective_tensor_transport.py | 6 ++ .../collective/nixl_tensor_transport.py | 5 + .../collective/tensor_transport_manager.py | 10 ++ .../gpu_object_manager/gpu_object_manager.py | 95 ++++++++++++++----- .../gpu_object_manager/gpu_object_store.py | 7 ++ .../gpu_objects/test_gpu_objects_gloo.py | 2 +- python/ray/util/collective/__init__.py | 2 + python/ray/util/collective/collective.py | 13 +++ .../collective_group/nccl_collective_group.py | 5 + .../collective_group/nixl_backend.py | 8 ++ .../torch_gloo_collective_group.py | 3 + 12 files changed, 130 insertions(+), 28 deletions(-) diff --git a/python/ray/experimental/collective/collective.py b/python/ray/experimental/collective/collective.py index da198dbde7a0..c0d0196570bc 100644 --- a/python/ray/experimental/collective/collective.py +++ b/python/ray/experimental/collective/collective.py @@ -205,7 +205,7 @@ def destroy_collective_group(group_or_name: Union[CommunicatorHandle, str]): group = manager.remove_remote_communicator(name) if group is not None: destroy_tasks = [ - actor.__ray_call__.options(concurrency_group="_ray_system_error").remote( + actor.__ray_call__.options(concurrency_group="_ray_system").remote( _do_destroy_collective_group, name ) for actor in group.actors diff --git a/python/ray/experimental/collective/collective_tensor_transport.py b/python/ray/experimental/collective/collective_tensor_transport.py index 499cf55cb1d1..5acc1cd6e6dc 100644 --- a/python/ray/experimental/collective/collective_tensor_transport.py +++ b/python/ray/experimental/collective/collective_tensor_transport.py @@ -176,3 +176,9 @@ def send_multiple_tensors( communicator_metadata.dst_rank, communicator_metadata.communicator_name, ) + + @staticmethod + def abort_transport(communicator_metadata: CollectiveCommunicatorMetadata): + import ray.util.collective as collective + + collective.abort(communicator_metadata.communicator_name) diff --git a/python/ray/experimental/collective/nixl_tensor_transport.py b/python/ray/experimental/collective/nixl_tensor_transport.py index 40701590e9d0..ed7f57711743 100644 --- a/python/ray/experimental/collective/nixl_tensor_transport.py +++ b/python/ray/experimental/collective/nixl_tensor_transport.py @@ -148,3 +148,8 @@ def send_multiple_tensors( raise NotImplementedError( "NIXL transport does not support send_multiple_tensors, since it is a one-sided transport." ) + + @staticmethod + def abort_transport(communicator_metadata: NixlCommunicatorMetadata): + g = get_group_handle(communicator_metadata.communicator_name) + g.abort() diff --git a/python/ray/experimental/collective/tensor_transport_manager.py b/python/ray/experimental/collective/tensor_transport_manager.py index 47b1e805161e..520c5a6a95c2 100644 --- a/python/ray/experimental/collective/tensor_transport_manager.py +++ b/python/ray/experimental/collective/tensor_transport_manager.py @@ -126,3 +126,13 @@ def send_multiple_tensors( tensors: The tensors to send. communicator_metadata: The communicator metadata for the send/recv operation. """ + + @staticmethod + @abstractmethod + def abort_transport(communicator_metadata: CommunicatorMetadata): + """ + Try to abort an ongoing transport. + + Args: + communicator_metadata: The communicator metadata for the send/recv operation. + """ diff --git a/python/ray/experimental/gpu_object_manager/gpu_object_manager.py b/python/ray/experimental/gpu_object_manager/gpu_object_manager.py index 3d34086a0095..981f82a28542 100644 --- a/python/ray/experimental/gpu_object_manager/gpu_object_manager.py +++ b/python/ray/experimental/gpu_object_manager/gpu_object_manager.py @@ -1,3 +1,4 @@ +import logging import threading import time import warnings @@ -14,7 +15,9 @@ from ray.experimental.gpu_object_manager.gpu_object_store import ( GPUObjectStore, ) - from ray.util.collective.types import TensorTransportMetadata + from ray.util.collective.types import CommunicatorMetadata, TensorTransportMetadata + +logger = logging.getLogger(__name__) # GPUObjectMeta is a named tuple containing the source actor, tensor transport # backend, tensor metadata, and other information that needs to be recorded. @@ -38,9 +41,9 @@ class GPUObjectMeta(NamedTuple): class TransportRefsInfo(NamedTuple): src_actor: "ray.actor.ActorHandle" dst_actor: "ray.actor.ActorHandle" - send_ref: ObjectRef + send_ref: Optional[ObjectRef] recv_ref: ObjectRef - collective_group_name: Optional[str] + communicator_meta: "CommunicatorMetadata" backend: str timeout: float @@ -103,15 +106,20 @@ def gpu_object_store(self) -> "ray.experimental.GPUObjectStore": return self._gpu_object_store def _monitor_failures(self): + """ + Monitor the refs from send and recv tasks and abort the transfers + if they error out or timeout to prevent hanging. + """ not_done = [] done = [] ref_info_map = {} while True: with self._unmonitored_transport_refs_info_lock: for ref_info in self._unmonitored_transport_refs_info: - not_done.append(ref_info.send_ref) + if ref_info.send_ref: + not_done.append(ref_info.send_ref) + ref_info_map[ref_info.send_ref.hex()] = ref_info not_done.append(ref_info.recv_ref) - ref_info_map[ref_info.send_ref.hex()] = ref_info ref_info_map[ref_info.recv_ref.hex()] = ref_info self._unmonitored_transport_refs_info = [] @@ -136,27 +144,69 @@ def _monitor_failures(self): def _abort_transport( self, failed_ref: ObjectRef, ref_info_map: Dict[str, TransportRefsInfo] ): + """ + Clean up the ref_info_map, abort the transport based on the backend, + and destroy any associated collective group. + + NOTE: Kills the associated actors to abort torch_gloo transfers. + """ from ray.experimental.collective import destroy_collective_group - from ray.util.collective.types import Backend + from ray.experimental.gpu_object_manager.gpu_object_store import ( + __ray_abort_transport__, + ) + from ray.util.collective.types import Backend, CollectiveCommunicatorMetadata - if failed_ref.hex() not in ref_info_map: + ref_info = ref_info_map.pop(failed_ref.hex(), None) + if ref_info is None: return - ref_info = ref_info_map.pop(failed_ref.hex()) + + if ref_info.send_ref: + ref_info_map.pop(ref_info.send_ref.hex(), None) + ref_info_map.pop(ref_info.recv_ref.hex(), None) + if ref_info.backend == Backend.TORCH_GLOO: + # Don't have a way to abort a torch_gloo transfer right now + logger.error( + "Killing actors due to a hanging/failed torch_gloo transfer. " + "Src actor: %s, Dst actor: %s", + ref_info.src_actor, + ref_info.dst_actor, + ) ray.kill(ref_info.src_actor) ray.kill(ref_info.dst_actor) - elif ref_info.backend == Backend.NCCL: - # TODO(dayshah) - pass - elif ref_info.backend == Backend.NIXL: - # TODO(dayshah) - pass - - if ref_info.collective_group_name: + elif ref_info.backend == Backend.NCCL or ref_info.backend == Backend.NIXL: + logger.error( + "Aborting tensor transfers on actors due to a hanging/failed transfer. " + "Src actor: %s, Dst actor: %s", + ref_info.src_actor, + ref_info.dst_actor, + ) + if ref_info.send_ref: + # Only need to abort recv side if communication is one-sided + ref_info.src_actor.__ray_call__.options( + concurrency_group="_ray_system_error" + ).remote( + __ray_abort_transport__, + ref_info.communicator_meta, + ) + ref_info.src_actor.__ray_call__.options( + concurrency_group="_ray_system_error" + ).remote( + __ray_abort_transport__, + ref_info.communicator_meta, + ) + + if isinstance(ref_info.communicator_meta, CollectiveCommunicatorMetadata): try: - destroy_collective_group(ref_info.collective_group_name) + destroy_collective_group( + ref_info.communicator_meta.collective_group_name + ) + logger.error( + "Destroyed collective group %s due to a hanging/failed transfer", + ref_info.communicator_meta.collective_group_name, + ) except ValueError: - # Already destroyed collective group + # Collective group was already destroyed pass def is_managed_object(self, obj_id: str) -> bool: @@ -404,13 +454,6 @@ def trigger_out_of_band_tensor_transfer( communicator_meta, ) - from ray.util.collective.types import CollectiveCommunicatorMetadata - - collective_group_name = ( - communicator_meta.communicator_name - if isinstance(communicator_meta, CollectiveCommunicatorMetadata) - else None - ) with self._unmonitored_transport_refs_info_lock: self._unmonitored_transport_refs_info.append( TransportRefsInfo( @@ -418,7 +461,7 @@ def trigger_out_of_band_tensor_transfer( dst_actor=dst_actor, send_ref=send_ref, recv_ref=recv_ref, - collective_group_name=collective_group_name, + communicator_meta=communicator_meta, backend=gpu_object_meta.tensor_transport_backend, timeout=time.time() + ray_constants.FETCH_FAIL_TIMEOUT_SECONDS, ) diff --git a/python/ray/experimental/gpu_object_manager/gpu_object_store.py b/python/ray/experimental/gpu_object_manager/gpu_object_store.py index 3bd9f532ad1d..c9a8f86f0cb1 100644 --- a/python/ray/experimental/gpu_object_manager/gpu_object_store.py +++ b/python/ray/experimental/gpu_object_manager/gpu_object_store.py @@ -116,6 +116,13 @@ def __ray_fetch_gpu_object__(self, obj_id: str): return gpu_object +def __ray_abort_transport__(self, communicator_meta: CommunicatorMetadata): + """Helper function that can run on an actor doing a send or recv to abort the transport.""" + backend = collective.get_group_handle(communicator_meta.communicator_name).backend() + tensor_transport_manager = get_tensor_transport_manager(backend) + tensor_transport_manager.abort_transport(communicator_meta) + + @dataclass class _GPUObject: # A list of tensors representing the GPU object. diff --git a/python/ray/tests/gpu_objects/test_gpu_objects_gloo.py b/python/ray/tests/gpu_objects/test_gpu_objects_gloo.py index 2c9df6c64484..031f0a423204 100644 --- a/python/ray/tests/gpu_objects/test_gpu_objects_gloo.py +++ b/python/ray/tests/gpu_objects/test_gpu_objects_gloo.py @@ -926,7 +926,7 @@ def test_send_actor_dies(ray_start_regular): actors = [ErrorActor.remote() for _ in range(2)] create_collective_group(actors, backend="torch_gloo") - # Do a transfer with the sender's background thread blocked, + # Try a transfer with the sender's background thread blocked, # so the send doesn't happen before the actor is killed gpu_obj_ref = actors[0].send.remote(torch.randn((100, 100))) actors[0].block_background_thread.remote() diff --git a/python/ray/util/collective/__init__.py b/python/ray/util/collective/__init__.py index 09423ad37c11..289ff3560fd7 100644 --- a/python/ray/util/collective/__init__.py +++ b/python/ray/util/collective/__init__.py @@ -1,4 +1,5 @@ from ray.util.collective.collective import ( + abort, allgather, allgather_multigpu, allreduce, @@ -50,4 +51,5 @@ "recv", "recv_multigpu", "get_group_handle", + "abort", ] diff --git a/python/ray/util/collective/collective.py b/python/ray/util/collective/collective.py index 1d92b838d7f6..f0e7b078ee6c 100644 --- a/python/ray/util/collective/collective.py +++ b/python/ray/util/collective/collective.py @@ -602,6 +602,19 @@ def send(tensor, dst_rank: int, group_name: str = "default"): g.send([tensor], opts) +def abort(group_name: str = "default"): + """Abort the transport. + + Args: + group_name: the name of the collective group. + """ + try: + g = get_group_handle(group_name) + g.abort() + except ValueError: + pass + + def send_multigpu( tensor, dst_rank: int, diff --git a/python/ray/util/collective/collective_group/nccl_collective_group.py b/python/ray/util/collective/collective_group/nccl_collective_group.py index f6851236a7dd..47521e009230 100644 --- a/python/ray/util/collective/collective_group/nccl_collective_group.py +++ b/python/ray/util/collective/collective_group/nccl_collective_group.py @@ -391,6 +391,11 @@ def p2p_fn(tensor, comm, stream, peer): tensors, p2p_fn, recv_options.src_rank, recv_options.src_gpu_index ) + def abort(self): + """Abort all p2p transports on this process.""" + for comm in self._dev_comm_map.values(): + comm.abort() + def _get_nccl_collective_communicator(self, comm_key, device_list): """Create or retrieve an NCCL communicator from cache. diff --git a/python/ray/util/collective/collective_group/nixl_backend.py b/python/ray/util/collective/collective_group/nixl_backend.py index 1950f952d4ef..4bb3e5183b70 100644 --- a/python/ray/util/collective/collective_group/nixl_backend.py +++ b/python/ray/util/collective/collective_group/nixl_backend.py @@ -1,3 +1,4 @@ +import threading import time from typing import TYPE_CHECKING, List, Tuple @@ -26,6 +27,7 @@ def __init__(self): ctx = ray.get_runtime_context() actor_id = ctx.get_actor_id() self._nixl_agent = nixl_agent(actor_id, agent_config) + self._stop_event = threading.Event() @classmethod def backend(cls): @@ -73,6 +75,9 @@ def recv( # Since current nixl does not provide a better way, we need to check the state of # the transfer continuously. while True: + if self._stop_event.is_set(): + self._stop_event.clear() + break state = nixl_agent.check_xfer_state(xfer_handle) if state == "ERR": raise RuntimeError("NIXL transfer got to Error state.") @@ -102,3 +107,6 @@ def get_nixl_metadata(self, tensors: List["torch.Tensor"]) -> Tuple[bytes, bytes nixl_agent.get_serialized_descs(xfer_descs), nixl_agent.get_agent_metadata(), ) + + def abort(self): + self._stop_event.set() diff --git a/python/ray/util/collective/collective_group/torch_gloo_collective_group.py b/python/ray/util/collective/collective_group/torch_gloo_collective_group.py index 51e7f6482b6f..400851f7aa7a 100644 --- a/python/ray/util/collective/collective_group/torch_gloo_collective_group.py +++ b/python/ray/util/collective/collective_group/torch_gloo_collective_group.py @@ -173,3 +173,6 @@ def send(self, tensor: List["torch.Tensor"], send_options: SendOptions) -> None: def recv(self, tensor: List["torch.Tensor"], recv_options: RecvOptions) -> None: tensor = self._check_tensor_input(tensor) dist.recv(tensor, src=recv_options.src_rank) + + def abort(self): + raise NotImplementedError("TorchGLOO does not support abort.") From d587a12ebf5e2387dcde58c7675c0f1a2cd5edbb Mon Sep 17 00:00:00 2001 From: dayshah Date: Tue, 16 Sep 2025 06:17:20 +0000 Subject: [PATCH 04/27] up Signed-off-by: dayshah --- .../ray/experimental/gpu_object_manager/gpu_object_manager.py | 2 +- .../util/collective/collective_group/base_collective_group.py | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/python/ray/experimental/gpu_object_manager/gpu_object_manager.py b/python/ray/experimental/gpu_object_manager/gpu_object_manager.py index 981f82a28542..ee7a5e3ca4fb 100644 --- a/python/ray/experimental/gpu_object_manager/gpu_object_manager.py +++ b/python/ray/experimental/gpu_object_manager/gpu_object_manager.py @@ -189,7 +189,7 @@ def _abort_transport( __ray_abort_transport__, ref_info.communicator_meta, ) - ref_info.src_actor.__ray_call__.options( + ref_info.dst_actor.__ray_call__.options( concurrency_group="_ray_system_error" ).remote( __ray_abort_transport__, diff --git a/python/ray/util/collective/collective_group/base_collective_group.py b/python/ray/util/collective/collective_group/base_collective_group.py index eff07fb16c67..03b356725de0 100644 --- a/python/ray/util/collective/collective_group/base_collective_group.py +++ b/python/ray/util/collective/collective_group/base_collective_group.py @@ -83,3 +83,7 @@ def send(self, tensor, send_options: SendOptions): @abstractmethod def recv(self, tensor, recv_options: RecvOptions): raise NotImplementedError() + + @abstractmethod + def abort(self): + raise NotImplementedError() From 3aaf712da1b5161d2e7e4c4615c0b38632fec978 Mon Sep 17 00:00:00 2001 From: dayshah Date: Tue, 16 Sep 2025 16:32:07 +0000 Subject: [PATCH 05/27] fix communicator name Signed-off-by: dayshah --- .../gpu_object_manager/gpu_object_manager.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/python/ray/experimental/gpu_object_manager/gpu_object_manager.py b/python/ray/experimental/gpu_object_manager/gpu_object_manager.py index ee7a5e3ca4fb..80625ccbe2dd 100644 --- a/python/ray/experimental/gpu_object_manager/gpu_object_manager.py +++ b/python/ray/experimental/gpu_object_manager/gpu_object_manager.py @@ -196,14 +196,15 @@ def _abort_transport( ref_info.communicator_meta, ) + # Have to get this name before communicator_meta turns into CollectiveCommunicatorMetadata + # without communicator_name + collective_group_name = ref_info.communicator_meta.communicator_name if isinstance(ref_info.communicator_meta, CollectiveCommunicatorMetadata): try: - destroy_collective_group( - ref_info.communicator_meta.collective_group_name - ) + destroy_collective_group(collective_group_name) logger.error( "Destroyed collective group %s due to a hanging/failed transfer", - ref_info.communicator_meta.collective_group_name, + collective_group_name, ) except ValueError: # Collective group was already destroyed From 82650fdde6503a70962d31c83139df2e9525baf1 Mon Sep 17 00:00:00 2001 From: dayshah Date: Wed, 17 Sep 2025 00:01:27 +0000 Subject: [PATCH 06/27] kill nccl nixl abort + address comments Signed-off-by: dayshah --- .../collective/collective_tensor_transport.py | 6 - .../collective/nixl_tensor_transport.py | 5 - .../collective/tensor_transport_manager.py | 10 -- .../gpu_object_manager/gpu_object_manager.py | 123 ++++++++---------- .../gpu_object_manager/gpu_object_store.py | 7 - .../gpu_objects/test_gpu_objects_gloo.py | 46 +++---- python/ray/util/collective/__init__.py | 2 - python/ray/util/collective/collective.py | 13 -- .../collective_group/base_collective_group.py | 4 - .../collective_group/nccl_collective_group.py | 5 - .../collective_group/nixl_backend.py | 8 -- .../torch_gloo_collective_group.py | 3 - 12 files changed, 76 insertions(+), 156 deletions(-) diff --git a/python/ray/experimental/collective/collective_tensor_transport.py b/python/ray/experimental/collective/collective_tensor_transport.py index 5acc1cd6e6dc..499cf55cb1d1 100644 --- a/python/ray/experimental/collective/collective_tensor_transport.py +++ b/python/ray/experimental/collective/collective_tensor_transport.py @@ -176,9 +176,3 @@ def send_multiple_tensors( communicator_metadata.dst_rank, communicator_metadata.communicator_name, ) - - @staticmethod - def abort_transport(communicator_metadata: CollectiveCommunicatorMetadata): - import ray.util.collective as collective - - collective.abort(communicator_metadata.communicator_name) diff --git a/python/ray/experimental/collective/nixl_tensor_transport.py b/python/ray/experimental/collective/nixl_tensor_transport.py index ed7f57711743..40701590e9d0 100644 --- a/python/ray/experimental/collective/nixl_tensor_transport.py +++ b/python/ray/experimental/collective/nixl_tensor_transport.py @@ -148,8 +148,3 @@ def send_multiple_tensors( raise NotImplementedError( "NIXL transport does not support send_multiple_tensors, since it is a one-sided transport." ) - - @staticmethod - def abort_transport(communicator_metadata: NixlCommunicatorMetadata): - g = get_group_handle(communicator_metadata.communicator_name) - g.abort() diff --git a/python/ray/experimental/collective/tensor_transport_manager.py b/python/ray/experimental/collective/tensor_transport_manager.py index 520c5a6a95c2..47b1e805161e 100644 --- a/python/ray/experimental/collective/tensor_transport_manager.py +++ b/python/ray/experimental/collective/tensor_transport_manager.py @@ -126,13 +126,3 @@ def send_multiple_tensors( tensors: The tensors to send. communicator_metadata: The communicator metadata for the send/recv operation. """ - - @staticmethod - @abstractmethod - def abort_transport(communicator_metadata: CommunicatorMetadata): - """ - Try to abort an ongoing transport. - - Args: - communicator_metadata: The communicator metadata for the send/recv operation. - """ diff --git a/python/ray/experimental/gpu_object_manager/gpu_object_manager.py b/python/ray/experimental/gpu_object_manager/gpu_object_manager.py index 80625ccbe2dd..dc1fe116b290 100644 --- a/python/ray/experimental/gpu_object_manager/gpu_object_manager.py +++ b/python/ray/experimental/gpu_object_manager/gpu_object_manager.py @@ -2,6 +2,7 @@ import threading import time import warnings +from queue import Queue from typing import TYPE_CHECKING, Any, Dict, List, NamedTuple, Optional, Set, Tuple import ray @@ -37,8 +38,9 @@ class GPUObjectMeta(NamedTuple): sent_to_src_actor_and_others_warned: bool -# This is used to periodically check in on the transfer and abort operations in case of failures / timeouts. -class TransportRefsInfo(NamedTuple): +# This is used to periodically check in on the RDT transfer through the refs from +# __ray_send__ and __ray_recv__ and abort operations in case of failures / timeouts. +class TransferMetadata(NamedTuple): src_actor: "ray.actor.ActorHandle" dst_actor: "ray.actor.ActorHandle" send_ref: Optional[ObjectRef] @@ -88,9 +90,8 @@ def __init__(self): # Lock to ensure we only create the GPU object store once. self.gpu_object_store_lock = threading.Lock() - # List of transport refs that the monitor thread needs to start monitoring - self._unmonitored_transport_refs_info: List[TransportRefsInfo] = [] - self._unmonitored_transport_refs_info_lock = threading.Lock() + # Thread safe queue of transport refs that the monitor thread needs to start monitoring + self._unmonitored_transfers: Queue[TransferMetadata] = Queue() # Background thread to poll on the transfer operation. self._monitor_failures_thread = None @@ -114,22 +115,22 @@ def _monitor_failures(self): done = [] ref_info_map = {} while True: - with self._unmonitored_transport_refs_info_lock: - for ref_info in self._unmonitored_transport_refs_info: - if ref_info.send_ref: - not_done.append(ref_info.send_ref) - ref_info_map[ref_info.send_ref.hex()] = ref_info - not_done.append(ref_info.recv_ref) - ref_info_map[ref_info.recv_ref.hex()] = ref_info - self._unmonitored_transport_refs_info = [] + while not self._unmonitored_transfers.empty(): + ref_info = self._unmonitored_transfers.get() + if ref_info.send_ref: + not_done.append(ref_info.send_ref) + ref_info_map[ref_info.send_ref.hex()] = ref_info + not_done.append(ref_info.recv_ref) + ref_info_map[ref_info.recv_ref.hex()] = ref_info if len(not_done) > 0: done, not_done = ray.wait(not_done, num_returns=1, timeout=1) if len(done) > 0: try: ray.get(done[0]) - except Exception: - self._abort_transport(done[0], ref_info_map) + ref_info_map.pop(done[0].hex(), None) + except Exception as e: + self._abort_transport(done[0], ref_info_map, e) # wait returns lists in the same order they were passed in, # so can just check the timeout of the first @@ -137,73 +138,56 @@ def _monitor_failures(self): len(not_done) > 0 and ref_info_map[not_done[0].hex()].timeout < time.time() ): - self._abort_transport(not_done[0], ref_info_map) + self._abort_transport( + not_done[0], + ref_info_map, + TimeoutError( + f"RDT transfer failed after {ray.constants.FETCH_FAIL_TIMEOUT_SECONDS}s." + ), + ) time.sleep(1) def _abort_transport( - self, failed_ref: ObjectRef, ref_info_map: Dict[str, TransportRefsInfo] + self, + failed_ref: ObjectRef, + ref_info_map: Dict[str, TransferMetadata], + exception: Exception, ): """ - Clean up the ref_info_map, abort the transport based on the backend, - and destroy any associated collective group. - - NOTE: Kills the associated actors to abort torch_gloo transfers. + Cleans up the ref_info_map, kill the src and dst actors, and destroy the + collective group if necessary. """ from ray.experimental.collective import destroy_collective_group - from ray.experimental.gpu_object_manager.gpu_object_store import ( - __ray_abort_transport__, - ) - from ray.util.collective.types import Backend, CollectiveCommunicatorMetadata + from ray.util.collective.types import CollectiveCommunicatorMetadata ref_info = ref_info_map.pop(failed_ref.hex(), None) if ref_info is None: return + logger.error( + "RDT transfer with src actor %s and dst actor %s failed. Killing the actors. " + "Transfer failed with exception: %s", + ref_info.src_actor, + ref_info.dst_actor, + exception, + ) + if ref_info.send_ref: ref_info_map.pop(ref_info.send_ref.hex(), None) ref_info_map.pop(ref_info.recv_ref.hex(), None) - if ref_info.backend == Backend.TORCH_GLOO: - # Don't have a way to abort a torch_gloo transfer right now - logger.error( - "Killing actors due to a hanging/failed torch_gloo transfer. " - "Src actor: %s, Dst actor: %s", - ref_info.src_actor, - ref_info.dst_actor, - ) - ray.kill(ref_info.src_actor) - ray.kill(ref_info.dst_actor) - elif ref_info.backend == Backend.NCCL or ref_info.backend == Backend.NIXL: - logger.error( - "Aborting tensor transfers on actors due to a hanging/failed transfer. " - "Src actor: %s, Dst actor: %s", - ref_info.src_actor, - ref_info.dst_actor, - ) - if ref_info.send_ref: - # Only need to abort recv side if communication is one-sided - ref_info.src_actor.__ray_call__.options( - concurrency_group="_ray_system_error" - ).remote( - __ray_abort_transport__, - ref_info.communicator_meta, - ) - ref_info.dst_actor.__ray_call__.options( - concurrency_group="_ray_system_error" - ).remote( - __ray_abort_transport__, - ref_info.communicator_meta, - ) + ray.kill(ref_info.src_actor) + ray.kill(ref_info.dst_actor) - # Have to get this name before communicator_meta turns into CollectiveCommunicatorMetadata - # without communicator_name + # isinstance does an implicit cast and makes communicator_name inaccessible + # so we have to get communicator_name before the cast. collective_group_name = ref_info.communicator_meta.communicator_name if isinstance(ref_info.communicator_meta, CollectiveCommunicatorMetadata): try: destroy_collective_group(collective_group_name) logger.error( - "Destroyed collective group %s due to a hanging/failed transfer", + "Destroyed collective group %s due to a hanging/failed RDT transfer", collective_group_name, ) except ValueError: @@ -455,18 +439,17 @@ def trigger_out_of_band_tensor_transfer( communicator_meta, ) - with self._unmonitored_transport_refs_info_lock: - self._unmonitored_transport_refs_info.append( - TransportRefsInfo( - src_actor=src_actor, - dst_actor=dst_actor, - send_ref=send_ref, - recv_ref=recv_ref, - communicator_meta=communicator_meta, - backend=gpu_object_meta.tensor_transport_backend, - timeout=time.time() + ray_constants.FETCH_FAIL_TIMEOUT_SECONDS, - ) + self._unmonitored_transfers.put( + TransferMetadata( + src_actor=src_actor, + dst_actor=dst_actor, + send_ref=send_ref, + recv_ref=recv_ref, + communicator_meta=communicator_meta, + backend=gpu_object_meta.tensor_transport_backend, + timeout=time.time() + ray_constants.FETCH_FAIL_TIMEOUT_SECONDS, ) + ) if self._monitor_failures_thread is None: self._monitor_failures_thread = threading.Thread( target=self._monitor_failures, daemon=True diff --git a/python/ray/experimental/gpu_object_manager/gpu_object_store.py b/python/ray/experimental/gpu_object_manager/gpu_object_store.py index c9a8f86f0cb1..3bd9f532ad1d 100644 --- a/python/ray/experimental/gpu_object_manager/gpu_object_store.py +++ b/python/ray/experimental/gpu_object_manager/gpu_object_store.py @@ -116,13 +116,6 @@ def __ray_fetch_gpu_object__(self, obj_id: str): return gpu_object -def __ray_abort_transport__(self, communicator_meta: CommunicatorMetadata): - """Helper function that can run on an actor doing a send or recv to abort the transport.""" - backend = collective.get_group_handle(communicator_meta.communicator_name).backend() - tensor_transport_manager = get_tensor_transport_manager(backend) - tensor_transport_manager.abort_transport(communicator_meta) - - @dataclass class _GPUObject: # A list of tensors representing the GPU object. diff --git a/python/ray/tests/gpu_objects/test_gpu_objects_gloo.py b/python/ray/tests/gpu_objects/test_gpu_objects_gloo.py index 031f0a423204..059b056d2e29 100644 --- a/python/ray/tests/gpu_objects/test_gpu_objects_gloo.py +++ b/python/ray/tests/gpu_objects/test_gpu_objects_gloo.py @@ -58,6 +58,29 @@ def fail(self, error_message): raise Exception(error_message) +@ray.remote +class ErrorActor: + @ray.method(tensor_transport="gloo") + def send(self, tensor): + return tensor + + def recv(self, tensor): + return tensor + + def clear_gpu_object_store(self): + gpu_object_store = ( + ray._private.worker.global_worker.gpu_object_manager.gpu_object_store + ) + + with gpu_object_store._lock: + assert len(gpu_object_store._gpu_object_store) > 0 + gpu_object_store._gpu_object_store.clear() + + @ray.method(concurrency_group="_ray_system") + def block_background_thread(self): + time.sleep(100) + + @pytest.mark.parametrize("data_size_bytes", [100]) def test_gc_gpu_object(ray_start_regular, data_size_bytes): """ @@ -885,29 +908,6 @@ def do_transfer(self, a1, a2): ) -@ray.remote -class ErrorActor: - def clear_gpu_object_store(self): - gpu_object_store = ( - ray._private.worker.global_worker.gpu_object_manager.gpu_object_store - ) - - with gpu_object_store._lock: - assert len(gpu_object_store._gpu_object_store) > 0 - gpu_object_store._gpu_object_store.clear() - - @ray.method(tensor_transport="gloo") - def send(self, tensor): - return tensor - - def recv(self, tensor): - return tensor - - @ray.method(concurrency_group="_ray_system") - def block_background_thread(self): - time.sleep(100) - - def test_send_fails(ray_start_regular): actors = [ErrorActor.remote() for _ in range(2)] create_collective_group(actors, backend="torch_gloo") diff --git a/python/ray/util/collective/__init__.py b/python/ray/util/collective/__init__.py index 289ff3560fd7..09423ad37c11 100644 --- a/python/ray/util/collective/__init__.py +++ b/python/ray/util/collective/__init__.py @@ -1,5 +1,4 @@ from ray.util.collective.collective import ( - abort, allgather, allgather_multigpu, allreduce, @@ -51,5 +50,4 @@ "recv", "recv_multigpu", "get_group_handle", - "abort", ] diff --git a/python/ray/util/collective/collective.py b/python/ray/util/collective/collective.py index f0e7b078ee6c..1d92b838d7f6 100644 --- a/python/ray/util/collective/collective.py +++ b/python/ray/util/collective/collective.py @@ -602,19 +602,6 @@ def send(tensor, dst_rank: int, group_name: str = "default"): g.send([tensor], opts) -def abort(group_name: str = "default"): - """Abort the transport. - - Args: - group_name: the name of the collective group. - """ - try: - g = get_group_handle(group_name) - g.abort() - except ValueError: - pass - - def send_multigpu( tensor, dst_rank: int, diff --git a/python/ray/util/collective/collective_group/base_collective_group.py b/python/ray/util/collective/collective_group/base_collective_group.py index 03b356725de0..eff07fb16c67 100644 --- a/python/ray/util/collective/collective_group/base_collective_group.py +++ b/python/ray/util/collective/collective_group/base_collective_group.py @@ -83,7 +83,3 @@ def send(self, tensor, send_options: SendOptions): @abstractmethod def recv(self, tensor, recv_options: RecvOptions): raise NotImplementedError() - - @abstractmethod - def abort(self): - raise NotImplementedError() diff --git a/python/ray/util/collective/collective_group/nccl_collective_group.py b/python/ray/util/collective/collective_group/nccl_collective_group.py index 47521e009230..f6851236a7dd 100644 --- a/python/ray/util/collective/collective_group/nccl_collective_group.py +++ b/python/ray/util/collective/collective_group/nccl_collective_group.py @@ -391,11 +391,6 @@ def p2p_fn(tensor, comm, stream, peer): tensors, p2p_fn, recv_options.src_rank, recv_options.src_gpu_index ) - def abort(self): - """Abort all p2p transports on this process.""" - for comm in self._dev_comm_map.values(): - comm.abort() - def _get_nccl_collective_communicator(self, comm_key, device_list): """Create or retrieve an NCCL communicator from cache. diff --git a/python/ray/util/collective/collective_group/nixl_backend.py b/python/ray/util/collective/collective_group/nixl_backend.py index 4bb3e5183b70..1950f952d4ef 100644 --- a/python/ray/util/collective/collective_group/nixl_backend.py +++ b/python/ray/util/collective/collective_group/nixl_backend.py @@ -1,4 +1,3 @@ -import threading import time from typing import TYPE_CHECKING, List, Tuple @@ -27,7 +26,6 @@ def __init__(self): ctx = ray.get_runtime_context() actor_id = ctx.get_actor_id() self._nixl_agent = nixl_agent(actor_id, agent_config) - self._stop_event = threading.Event() @classmethod def backend(cls): @@ -75,9 +73,6 @@ def recv( # Since current nixl does not provide a better way, we need to check the state of # the transfer continuously. while True: - if self._stop_event.is_set(): - self._stop_event.clear() - break state = nixl_agent.check_xfer_state(xfer_handle) if state == "ERR": raise RuntimeError("NIXL transfer got to Error state.") @@ -107,6 +102,3 @@ def get_nixl_metadata(self, tensors: List["torch.Tensor"]) -> Tuple[bytes, bytes nixl_agent.get_serialized_descs(xfer_descs), nixl_agent.get_agent_metadata(), ) - - def abort(self): - self._stop_event.set() diff --git a/python/ray/util/collective/collective_group/torch_gloo_collective_group.py b/python/ray/util/collective/collective_group/torch_gloo_collective_group.py index 400851f7aa7a..51e7f6482b6f 100644 --- a/python/ray/util/collective/collective_group/torch_gloo_collective_group.py +++ b/python/ray/util/collective/collective_group/torch_gloo_collective_group.py @@ -173,6 +173,3 @@ def send(self, tensor: List["torch.Tensor"], send_options: SendOptions) -> None: def recv(self, tensor: List["torch.Tensor"], recv_options: RecvOptions) -> None: tensor = self._check_tensor_input(tensor) dist.recv(tensor, src=recv_options.src_rank) - - def abort(self): - raise NotImplementedError("TorchGLOO does not support abort.") From 95b9ff0eb0b284cb433f51a19cb172fadc7086bc Mon Sep 17 00:00:00 2001 From: dayshah Date: Wed, 17 Sep 2025 22:54:37 -0700 Subject: [PATCH 07/27] no background concurrency group Signed-off-by: dayshah --- python/ray/actor.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/ray/actor.py b/python/ray/actor.py index f79fa6e40649..07f808c90352 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -1173,7 +1173,6 @@ def _process_option_dict(actor_options, has_tensor_transport_methods): if _filled_options.get("concurrency_groups", None) is None: _filled_options["concurrency_groups"] = {} _filled_options["concurrency_groups"]["_ray_system"] = 1 - _filled_options["concurrency_groups"]["_ray_system_error"] = 1 return _filled_options From b31e1ed405927696fff4d151fef257e3c422c68c Mon Sep 17 00:00:00 2001 From: dayshah Date: Sun, 21 Sep 2025 02:30:33 +0000 Subject: [PATCH 08/27] join monitor thread and check for log Signed-off-by: dayshah --- python/ray/_private/worker.py | 5 +++++ .../gpu_object_manager/gpu_object_manager.py | 19 ++++++++++++++---- .../gpu_objects/test_gpu_objects_gloo.py | 20 +++++++++++++++++-- 3 files changed, 38 insertions(+), 6 deletions(-) diff --git a/python/ray/_private/worker.py b/python/ray/_private/worker.py index f40a8c4f0464..87a8c014abcc 100644 --- a/python/ray/_private/worker.py +++ b/python/ray/_private/worker.py @@ -1142,6 +1142,10 @@ def get_accelerator_ids_for_accelerator_resource( assigned_ids = original_ids[:max_accelerators] return list(assigned_ids) + def shutdown_gpu_object_manager(self): + if self._gpu_object_manager: + self._gpu_object_manager.shutdown() + _connect_or_shutdown_lock = threading.RLock() @@ -2086,6 +2090,7 @@ def shutdown(_exiting_interpreter: bool = False): from ray.dag.compiled_dag_node import _shutdown_all_compiled_dags _shutdown_all_compiled_dags() + global_worker.shutdown_gpu_object_manager() if _exiting_interpreter and global_worker.mode == SCRIPT_MODE: # This is a duration to sleep before shutting down everything in order diff --git a/python/ray/experimental/gpu_object_manager/gpu_object_manager.py b/python/ray/experimental/gpu_object_manager/gpu_object_manager.py index dc1fe116b290..6d51206bed3c 100644 --- a/python/ray/experimental/gpu_object_manager/gpu_object_manager.py +++ b/python/ray/experimental/gpu_object_manager/gpu_object_manager.py @@ -94,6 +94,8 @@ def __init__(self): self._unmonitored_transfers: Queue[TransferMetadata] = Queue() # Background thread to poll on the transfer operation. self._monitor_failures_thread = None + # Event to signal the monitor_failures thread to shutdown + self._monitor_failures_shutdown_event = threading.Event() @property def gpu_object_store(self) -> "ray.experimental.GPUObjectStore": @@ -106,6 +108,16 @@ def gpu_object_store(self) -> "ray.experimental.GPUObjectStore": self._gpu_object_store = GPUObjectStore() return self._gpu_object_store + def shutdown(self): + """ + Interrupt and join the monitor_failures thread. + """ + if self._monitor_failures_thread: + self._monitor_failures_shutdown_event.set() + self._monitor_failures_thread.join() + self._monitor_failures_shutdown_event.clear() + self._monitor_failures_thread = None + def _monitor_failures(self): """ Monitor the refs from send and recv tasks and abort the transfers @@ -114,7 +126,7 @@ def _monitor_failures(self): not_done = [] done = [] ref_info_map = {} - while True: + while not self._monitor_failures_shutdown_event.is_set(): while not self._unmonitored_transfers.empty(): ref_info = self._unmonitored_transfers.get() if ref_info.send_ref: @@ -122,7 +134,6 @@ def _monitor_failures(self): ref_info_map[ref_info.send_ref.hex()] = ref_info not_done.append(ref_info.recv_ref) ref_info_map[ref_info.recv_ref.hex()] = ref_info - if len(not_done) > 0: done, not_done = ray.wait(not_done, num_returns=1, timeout=1) if len(done) > 0: @@ -136,6 +147,7 @@ def _monitor_failures(self): # so can just check the timeout of the first while ( len(not_done) > 0 + and not_done[0].hex() in ref_info_map and ref_info_map[not_done[0].hex()].timeout < time.time() ): self._abort_transport( @@ -145,8 +157,7 @@ def _monitor_failures(self): f"RDT transfer failed after {ray.constants.FETCH_FAIL_TIMEOUT_SECONDS}s." ), ) - - time.sleep(1) + self._monitor_failures_shutdown_event.wait(1) def _abort_transport( self, diff --git a/python/ray/tests/gpu_objects/test_gpu_objects_gloo.py b/python/ray/tests/gpu_objects/test_gpu_objects_gloo.py index 059b056d2e29..16da3191afa6 100644 --- a/python/ray/tests/gpu_objects/test_gpu_objects_gloo.py +++ b/python/ray/tests/gpu_objects/test_gpu_objects_gloo.py @@ -1,4 +1,6 @@ +import logging import random +import re import sys import threading import time @@ -937,7 +939,7 @@ def test_send_actor_dies(ray_start_regular): ray.get(result_ref) -def test_recv_actor_dies(ray_start_regular): +def test_recv_actor_dies(ray_start_regular, caplog, propagate_logs): actors = [ErrorActor.remote() for _ in range(2)] create_collective_group(actors, backend="torch_gloo") @@ -951,7 +953,21 @@ def test_recv_actor_dies(ray_start_regular): with pytest.raises(ray.exceptions.ActorDiedError): ray.get(result_ref) with pytest.raises(ray.exceptions.ActorDiedError): - ray.get(actors[0].send.remote(torch.randn((100, 100)))) + ray.get(actors[0].recv.remote(1)) + + def check_logs(): + records = caplog.records + return any( + record.levelno == logging.ERROR + and re.search(r"RDT transfer with.*failed", record.message) + for record in records + ) and any( + record.levelno == logging.ERROR + and "Destroyed collective group" in record.message + for record in records + ) + + wait_for_condition(check_logs) if __name__ == "__main__": From b794b0a22379cd5d1c8dac43bd81cdbaa30dca16 Mon Sep 17 00:00:00 2001 From: dayshah Date: Mon, 22 Sep 2025 07:52:59 +0000 Subject: [PATCH 09/27] update loop to only wait if there's nothing imminent left Signed-off-by: dayshah --- .../gpu_object_manager/gpu_object_manager.py | 34 +++++++++++-------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/python/ray/experimental/gpu_object_manager/gpu_object_manager.py b/python/ray/experimental/gpu_object_manager/gpu_object_manager.py index 6d51206bed3c..dd274383efff 100644 --- a/python/ray/experimental/gpu_object_manager/gpu_object_manager.py +++ b/python/ray/experimental/gpu_object_manager/gpu_object_manager.py @@ -143,21 +143,25 @@ def _monitor_failures(self): except Exception as e: self._abort_transport(done[0], ref_info_map, e) - # wait returns lists in the same order they were passed in, - # so can just check the timeout of the first - while ( - len(not_done) > 0 - and not_done[0].hex() in ref_info_map - and ref_info_map[not_done[0].hex()].timeout < time.time() - ): - self._abort_transport( - not_done[0], - ref_info_map, - TimeoutError( - f"RDT transfer failed after {ray.constants.FETCH_FAIL_TIMEOUT_SECONDS}s." - ), - ) - self._monitor_failures_shutdown_event.wait(1) + while len(not_done) > 0: + if not_done[0].hex() not in ref_info_map: + # The associated transfer was already aborted. + not_done.pop(0) + elif ref_info_map[not_done[0].hex()].timeout < time.time(): + self._abort_transport( + not_done[0], + ref_info_map, + TimeoutError( + f"RDT transfer failed after {ray.constants.FETCH_FAIL_TIMEOUT_SECONDS}s." + ), + ) + else: + # wait returns lists in the same order they were passed in, so if + # the timeout of first hasn't been reached, neither have the others. + break + if len(not_done) == 0: + # If we emptied out _unmonitored_transfers on this iteration, wait for a bit. + self._monitor_failures_shutdown_event.wait(1) def _abort_transport( self, From e7bdf48bd0f418c6f59a51e1a1527cb51ea29e72 Mon Sep 17 00:00:00 2001 From: dayshah Date: Tue, 23 Sep 2025 18:14:43 +0000 Subject: [PATCH 10/27] add todo for bigger collective groups Signed-off-by: dayshah --- python/ray/experimental/gpu_object_manager/gpu_object_manager.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/ray/experimental/gpu_object_manager/gpu_object_manager.py b/python/ray/experimental/gpu_object_manager/gpu_object_manager.py index 4d78370378d9..6fdd22e745e3 100644 --- a/python/ray/experimental/gpu_object_manager/gpu_object_manager.py +++ b/python/ray/experimental/gpu_object_manager/gpu_object_manager.py @@ -192,6 +192,7 @@ def _abort_transport( ref_info_map.pop(ref_info.send_ref.hex(), None) ref_info_map.pop(ref_info.recv_ref.hex(), None) + # TODO(#51276): Kill all actors in the collective group when we support more collective operations ray.kill(ref_info.src_actor) ray.kill(ref_info.dst_actor) From 0c58b7a89397e9c1313b43f6e38b058cb0e8d178 Mon Sep 17 00:00:00 2001 From: dayshah Date: Mon, 22 Sep 2025 07:41:39 +0000 Subject: [PATCH 11/27] [core][rdt] Abort NIXL and allow actor reuse on failed transfers Signed-off-by: dayshah --- python/ray/actor.py | 1 + .../collective/collective_tensor_transport.py | 10 ++ .../collective/nixl_tensor_transport.py | 11 ++ .../collective/tensor_transport_manager.py | 16 ++ .../gpu_object_manager/gpu_object_manager.py | 45 ++++-- .../gpu_object_manager/gpu_object_store.py | 94 ++++++++---- .../gpu_objects/test_gpu_objects_nixl.py | 27 ++++ python/ray/util/collective/collective.py | 142 ++++++++++-------- .../collective_group/nixl_backend.py | 88 +++++++---- 9 files changed, 297 insertions(+), 137 deletions(-) diff --git a/python/ray/actor.py b/python/ray/actor.py index 07f808c90352..f79fa6e40649 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -1173,6 +1173,7 @@ def _process_option_dict(actor_options, has_tensor_transport_methods): if _filled_options.get("concurrency_groups", None) is None: _filled_options["concurrency_groups"] = {} _filled_options["concurrency_groups"]["_ray_system"] = 1 + _filled_options["concurrency_groups"]["_ray_system_error"] = 1 return _filled_options diff --git a/python/ray/experimental/collective/collective_tensor_transport.py b/python/ray/experimental/collective/collective_tensor_transport.py index 499cf55cb1d1..18cc55f30253 100644 --- a/python/ray/experimental/collective/collective_tensor_transport.py +++ b/python/ray/experimental/collective/collective_tensor_transport.py @@ -136,6 +136,7 @@ def get_communicator_metadata( @staticmethod def recv_multiple_tensors( tensors, + obj_id: str, tensor_transport_metadata: CollectiveTransportMetadata, communicator_metadata: CollectiveCommunicatorMetadata, ): @@ -176,3 +177,12 @@ def send_multiple_tensors( communicator_metadata.dst_rank, communicator_metadata.communicator_name, ) + + @staticmethod + def abort_transport( + obj_id: str, + communicator_metadata: CollectiveCommunicatorMetadata, + ): + raise NotImplementedError( + "Collective transport does not support abort_transport for now." + ) diff --git a/python/ray/experimental/collective/nixl_tensor_transport.py b/python/ray/experimental/collective/nixl_tensor_transport.py index 28a6f2519df4..5e734ae31c74 100644 --- a/python/ray/experimental/collective/nixl_tensor_transport.py +++ b/python/ray/experimental/collective/nixl_tensor_transport.py @@ -119,6 +119,7 @@ def get_communicator_metadata( @staticmethod def recv_multiple_tensors( tensors, + obj_id: str, tensor_transport_metadata: NixlTransportMetadata, communicator_metadata: NixlCommunicatorMetadata, ): @@ -137,6 +138,7 @@ def recv_multiple_tensors( g.recv( tensors, + obj_id, tensor_transport_metadata.nixl_serialized_descs, tensor_transport_metadata.nixl_agent_meta, ) @@ -150,3 +152,12 @@ def send_multiple_tensors( raise NotImplementedError( "NIXL transport does not support send_multiple_tensors, since it is a one-sided transport." ) + + @staticmethod + def abort_transport( + obj_id: str, + communicator_metadata: NixlCommunicatorMetadata, + ): + g = get_group_handle(communicator_metadata.communicator_name) + if g: + g.abort(obj_id) diff --git a/python/ray/experimental/collective/tensor_transport_manager.py b/python/ray/experimental/collective/tensor_transport_manager.py index 47b1e805161e..76396bc9cc90 100644 --- a/python/ray/experimental/collective/tensor_transport_manager.py +++ b/python/ray/experimental/collective/tensor_transport_manager.py @@ -100,6 +100,7 @@ def get_communicator_metadata( @abstractmethod def recv_multiple_tensors( tensors: List["torch.Tensor"], + obj_id: str, tensor_transport_metadata: TensorTransportMetadata, communicator_metadata: CommunicatorMetadata, ): @@ -108,6 +109,7 @@ def recv_multiple_tensors( Args: tensors: The pre-allocated tensor space to receive the tensors. + obj_id: The object ID for related GPU object. tensor_transport_metadata: The tensor transport metadata for the GPU object. communicator_metadata: The communicator metadata for the send/recv operation. @@ -126,3 +128,17 @@ def send_multiple_tensors( tensors: The tensors to send. communicator_metadata: The communicator metadata for the send/recv operation. """ + + @staticmethod + @abstractmethod + def abort_transport( + obj_id: str, + communicator_metadata: CommunicatorMetadata, + ): + """ + Abort the transport. + + Args: + obj_id: The object ID for related GPU object. + communicator_metadata: The communicator metadata for the send/recv operation. + """ diff --git a/python/ray/experimental/gpu_object_manager/gpu_object_manager.py b/python/ray/experimental/gpu_object_manager/gpu_object_manager.py index 6fdd22e745e3..e6415e743547 100644 --- a/python/ray/experimental/gpu_object_manager/gpu_object_manager.py +++ b/python/ray/experimental/gpu_object_manager/gpu_object_manager.py @@ -47,6 +47,7 @@ class TransferMetadata(NamedTuple): recv_ref: ObjectRef communicator_meta: "CommunicatorMetadata" backend: str + obj_id: str timeout: float @@ -174,28 +175,45 @@ def _abort_transport( collective group if necessary. """ from ray.experimental.collective import destroy_collective_group - from ray.util.collective.types import CollectiveCommunicatorMetadata + from ray.experimental.gpu_object_manager.gpu_object_store import ( + __ray_abort_transport__, + ) + from ray.util.collective.types import Backend, CollectiveCommunicatorMetadata ref_info = ref_info_map.pop(failed_ref.hex(), None) if ref_info is None: return + if ref_info.send_ref: + ref_info_map.pop(ref_info.send_ref.hex(), None) + ref_info_map.pop(ref_info.recv_ref.hex(), None) + + error_string = "" + if ref_info.backend == Backend.NIXL: + # Only need to abort on receiver side for NIXL since it's one-sided. + ref_info.dst_actor.__ray_call__.options( + concurrency_group="_ray_system_error" + ).remote( + __ray_abort_transport__, + ref_info.obj_id, + ref_info.communicator_meta, + ) + error_string = "Aborting the transfer." + else: + # TODO(#51276): Kill all actors in the collective group when we support more collective operations + ray.kill(ref_info.src_actor) + ray.kill(ref_info.dst_actor) + error_string = "Killing the actors." + logger.error( - "RDT transfer with src actor %s and dst actor %s failed. Killing the actors. " + "RDT transfer with src actor %s and dst actor %s failed. %s " "Transfer failed with exception: %s", ref_info.src_actor, ref_info.dst_actor, + error_string, exception, ) - if ref_info.send_ref: - ref_info_map.pop(ref_info.send_ref.hex(), None) - ref_info_map.pop(ref_info.recv_ref.hex(), None) - - # TODO(#51276): Kill all actors in the collective group when we support more collective operations - ray.kill(ref_info.src_actor) - ray.kill(ref_info.dst_actor) - # isinstance does an implicit cast and makes communicator_name inaccessible # so we have to get communicator_name before the cast. collective_group_name = ref_info.communicator_meta.communicator_name @@ -330,7 +348,7 @@ def fetch_object( __ray_fetch_gpu_object__, obj_id ) ) - self.gpu_object_store.add_object(obj_id, tensors) + self.gpu_object_store.add_object(obj_id, tensors, is_primary=False) else: if isinstance(gpu_object_meta.tensor_transport_meta, ObjectRef): # If the tensor transport meta is an ObjectRef, gpu object manager @@ -453,7 +471,7 @@ def trigger_out_of_band_tensor_transfer( ).remote( __ray_send__, obj_id, - tensor_transport_meta, + [tensor_transport_meta], communicator_meta, ) @@ -468,7 +486,7 @@ def trigger_out_of_band_tensor_transfer( ).remote( __ray_recv__, obj_id, - tensor_transport_meta, + [tensor_transport_meta], communicator_meta, ) @@ -480,6 +498,7 @@ def trigger_out_of_band_tensor_transfer( recv_ref=recv_ref, communicator_meta=communicator_meta, backend=gpu_object_meta.tensor_transport_backend, + obj_id=obj_id, timeout=time.time() + ray_constants.FETCH_FAIL_TIMEOUT_SECONDS, ) ) diff --git a/python/ray/experimental/gpu_object_manager/gpu_object_store.py b/python/ray/experimental/gpu_object_manager/gpu_object_store.py index 3bd9f532ad1d..3cf467fd9569 100644 --- a/python/ray/experimental/gpu_object_manager/gpu_object_store.py +++ b/python/ray/experimental/gpu_object_manager/gpu_object_store.py @@ -1,10 +1,12 @@ import threading from collections import defaultdict, deque from dataclasses import dataclass -from typing import Dict, List, Optional, Set +from typing import Dict, List, Optional, Set, Union +import ray import ray.util.collective as collective from ray._private.custom_types import TensorTransportEnum +from ray._raylet import ObjectRef from ray.experimental.collective import get_tensor_transport_manager from ray.experimental.collective.util import device_match_transport from ray.util.collective.types import ( @@ -42,12 +44,14 @@ def _tensor_transport_to_collective_backend( def __ray_send__( self, obj_id: str, - tensor_transport_meta: TensorTransportMetadata, + tensor_transport_meta: List[ObjectRef], communicator_meta: CommunicatorMetadata, ): """Helper function that runs on the src actor to send tensors to the dst actor.""" from ray._private.worker import global_worker + tensor_transport_meta: TensorTransportMetadata = ray.get(tensor_transport_meta[0]) + gpu_object_store = global_worker.gpu_object_manager._gpu_object_store assert gpu_object_store.has_object( obj_id @@ -72,36 +76,54 @@ def __ray_send__( def __ray_recv__( self, obj_id: str, - tensor_transport_meta: TensorTransportMetadata, + tensor_transport_meta: List[ObjectRef], communicator_meta: CommunicatorMetadata, ): """Helper function that runs on the dst actor to receive tensors from the src actor.""" from ray._private.worker import global_worker - backend = collective.get_group_handle(communicator_meta.communicator_name).backend() + gpu_object_store = global_worker.gpu_object_manager.gpu_object_store + try: + tensor_transport_meta: TensorTransportMetadata = ray.get( + tensor_transport_meta[0] + ) + device = tensor_transport_meta.tensor_device + tensor_meta = tensor_transport_meta.tensor_meta - device = tensor_transport_meta.tensor_device - tensor_meta = tensor_transport_meta.tensor_meta + backend = collective.get_group_handle( + communicator_meta.communicator_name + ).backend() - gpu_object_store = global_worker.gpu_object_manager.gpu_object_store - if tensor_meta and not device_match_transport(device, backend): - raise ValueError( - f"Tensor transport backend {backend} does not support tensor transfer on device {device}." + if tensor_meta and not device_match_transport(device, backend): + raise ValueError( + f"Tensor transport backend {backend} does not support tensor transfer on device {device}." + ) + + tensors = [] + for meta in tensor_meta: + shape, dtype = meta + tensor = torch.zeros(shape, dtype=dtype, device=device) + tensors.append(tensor) + + tensor_transport_manager = get_tensor_transport_manager(backend) + tensor_transport_manager.recv_multiple_tensors( + tensors, + obj_id, + tensor_transport_meta, + communicator_meta, ) - tensors = [] - for meta in tensor_meta: - shape, dtype = meta - tensor = torch.zeros(shape, dtype=dtype, device=device) - tensors.append(tensor) + gpu_object_store.add_object(obj_id, tensors, is_primary=False) + except Exception as e: + # Store the error as a gpu object if the recv fails, + # so waiters will raise the error. + gpu_object_store.add_object(obj_id, e, is_primary=False) - tensor_transport_manager = get_tensor_transport_manager(backend) - tensor_transport_manager.recv_multiple_tensors( - tensors, - tensor_transport_meta, - communicator_meta, - ) - gpu_object_store.add_object(obj_id, tensors) +def __ray_abort_transport__(self, obj_id: str, communicator_meta: CommunicatorMetadata): + """Helper function that can run on an actor doing a send or recv to abort the transport.""" + backend = collective.get_group_handle(communicator_meta.communicator_name).backend() + tensor_transport_manager = get_tensor_transport_manager(backend) + tensor_transport_manager.abort_transport(obj_id, communicator_meta) def __ray_fetch_gpu_object__(self, obj_id: str): @@ -122,6 +144,8 @@ class _GPUObject: data: List["torch.Tensor"] # Whether the GPU object is the primary copy. is_primary: bool + # If a recv failed, we store the error here. + error: Optional[Exception] = None class GPUObjectStore: @@ -163,13 +187,15 @@ def has_tensor(self, tensor: "torch.Tensor") -> bool: def get_object(self, obj_id: str) -> Optional[List["torch.Tensor"]]: with self._lock: + if self._gpu_object_store[obj_id][0].error: + raise self._gpu_object_store[obj_id][0].error return self._gpu_object_store[obj_id][0].data def add_object( self, obj_id: str, - gpu_object: List["torch.Tensor"], - is_primary: bool = False, + gpu_object: Union[List["torch.Tensor"], Exception], + is_primary: bool, ): """ Add a GPU object to the GPU object store. @@ -180,15 +206,17 @@ def add_object( is_primary: Whether the GPU object is the primary copy. """ with self._object_present_cv: - for tensor in gpu_object: - self._tensor_to_object_ids[tensor].add(obj_id) - # Append to the queue instead of overwriting - self._gpu_object_store[obj_id].append( - _GPUObject( - gpu_object, - is_primary, + if isinstance(gpu_object, Exception): + self._gpu_object_store[obj_id].append( + _GPUObject([], is_primary, error=gpu_object) + ) + else: + for tensor in gpu_object: + self._tensor_to_object_ids[tensor].add(obj_id) + # Append to the queue instead of overwriting + self._gpu_object_store[obj_id].append( + _GPUObject(gpu_object, is_primary) ) - ) self._object_present_cv.notify_all() def is_primary_copy(self, obj_id: str) -> bool: @@ -265,6 +293,8 @@ def pop_object(self, obj_id: str) -> List["torch.Tensor"]: gpu_object = queue.popleft() if len(queue) == 0: del self._gpu_object_store[obj_id] + if gpu_object.error: + raise gpu_object.error for tensor in gpu_object.data: self._tensor_to_object_ids[tensor].remove(obj_id) if len(self._tensor_to_object_ids[tensor]) == 0: diff --git a/python/ray/tests/gpu_objects/test_gpu_objects_nixl.py b/python/ray/tests/gpu_objects/test_gpu_objects_nixl.py index efee07f4c373..14b971b055a1 100644 --- a/python/ray/tests/gpu_objects/test_gpu_objects_nixl.py +++ b/python/ray/tests/gpu_objects/test_gpu_objects_nixl.py @@ -4,6 +4,7 @@ import torch import ray +from ray._common.test_utils import SignalActor @ray.remote(num_gpus=1, num_cpus=0, enable_tensor_transport=True) @@ -48,6 +49,10 @@ def gc(self): assert not gpu_manager.gpu_object_store.has_tensor(tensor) return "Success" + @ray.method(concurrency_group="_ray_system") + def block_background_thread(self, signal_actor): + ray.get(signal_actor.wait.remote()) + @pytest.mark.parametrize("ray_start_regular", [{"num_gpus": 1}], indirect=True) def test_ray_get_gpu_ref_created_by_actor_task(ray_start_regular): @@ -140,5 +145,27 @@ def test_put_gc(ray_start_regular): assert ray.get(ref) == "Success" +@pytest.mark.parametrize("ray_start_regular", [{"num_gpus": 2}], indirect=True) +def test_nixl_abort(ray_start_regular): + actors = [GPUTestActor.remote() for _ in range(2)] + + # Trigger transfer and kill sender before the receiver starts receiving + signal_actor = SignalActor.remote() + actors[1].block_background_thread.remote(signal_actor) + ref = actors[0].echo.remote(torch.randn((100, 100)), "cuda") + result = actors[1].sum.remote(ref, "cuda") + ray.kill(actors[0]) + signal_actor.send.remote() + + with pytest.raises(ray.exceptions.RayTaskError): + ray.get(result) + + # Try a transfer with actor[1] receiving again + new_actor = GPUTestActor.remote() + ref = new_actor.echo.remote(torch.tensor([4, 5, 6]), "cuda") + result = actors[1].sum.remote(ref, "cuda") + assert ray.get(result) == 15 + + if __name__ == "__main__": sys.exit(pytest.main(["-sv", __file__])) diff --git a/python/ray/util/collective/collective.py b/python/ray/util/collective/collective.py index e06f2bb57d2d..5e1681015c25 100644 --- a/python/ray/util/collective/collective.py +++ b/python/ray/util/collective/collective.py @@ -2,6 +2,7 @@ import logging import os +import threading import time from typing import List @@ -82,7 +83,6 @@ class GroupManager(object): def __init__(self): self._name_group_map = {} - self._group_name_map = {} def create_collective_group( self, backend, world_size, rank, group_name, gloo_timeout @@ -132,8 +132,6 @@ def create_collective_group( raise RuntimeError(f"Unexpected backend: {backend}") self._name_group_map[group_name] = g - self._group_name_map[g] = group_name - return self._name_group_map[group_name] def is_group_exist(self, group_name): @@ -155,7 +153,6 @@ def destroy_collective_group(self, group_name): # release the collective group resource g = self._name_group_map[group_name] # clean up the dicts - del self._group_name_map[g] del self._name_group_map[group_name] # Release the communicator resources g.destroy_group() @@ -170,11 +167,16 @@ def destroy_collective_group(self, group_name): _group_mgr = GroupManager() +# This lock is used to make external calls to the _group_mgr thread-safe. +_group_mgr_lock = threading.Lock() def is_group_initialized(group_name): """Check if the group is initialized in this process by the group name.""" - return _group_mgr.is_group_exist(group_name) + global _group_mgr + global _group_mgr_lock + with _group_mgr_lock: + return _group_mgr.is_group_exist(group_name) def init_collective_group( @@ -199,19 +201,22 @@ def init_collective_group( backend = types.Backend(backend) _check_backend_availability(backend) global _group_mgr + global _group_mgr_lock + # TODO(Hao): implement a group auto-counter. if not group_name: raise ValueError("group_name '{}' needs to be a string.".format(group_name)) - if _group_mgr.is_group_exist(group_name): - raise RuntimeError("Trying to initialize a group twice.") + with _group_mgr_lock: + if _group_mgr.is_group_exist(group_name): + raise RuntimeError("Trying to initialize a group twice.") - assert world_size > 0 - assert rank >= 0 - assert rank < world_size - _group_mgr.create_collective_group( - backend, world_size, rank, group_name, gloo_timeout - ) + assert world_size > 0 + assert rank >= 0 + assert rank < world_size + _group_mgr.create_collective_group( + backend, world_size, rank, group_name, gloo_timeout + ) def create_collective_group( @@ -284,7 +289,9 @@ def destroy_collective_group(group_name: str = "default") -> None: """Destroy a collective group given its group name.""" _check_inside_actor() global _group_mgr - _group_mgr.destroy_collective_group(group_name) + global _group_mgr_lock + with _group_mgr_lock: + _group_mgr.destroy_collective_group(group_name) def get_rank(group_name: str = "default") -> int: @@ -299,10 +306,14 @@ def get_rank(group_name: str = "default") -> int: not belong to the group. """ _check_inside_actor() - if not is_group_initialized(group_name): - return -1 - g = _group_mgr.get_group_by_name(group_name) - return g.rank + + global _group_mgr + global _group_mgr_lock + with _group_mgr_lock: + if not _group_mgr.is_group_exist(group_name): + return -1 + g = _group_mgr.get_group_by_name(group_name) + return g.rank def get_collective_group_size(group_name: str = "default") -> int: @@ -316,10 +327,13 @@ def get_collective_group_size(group_name: str = "default") -> int: not exist or the process does not belong to the group. """ _check_inside_actor() - if not is_group_initialized(group_name): - return -1 - g = _group_mgr.get_group_by_name(group_name) - return g.world_size + global _group_mgr + global _group_mgr_lock + with _group_mgr_lock: + if not _group_mgr.is_group_exist(group_name): + return -1 + g = _group_mgr.get_group_by_name(group_name) + return g.world_size def allreduce(tensor, group_name: str = "default", op=types.ReduceOp.SUM): @@ -747,47 +761,49 @@ def get_group_handle(group_name: str = "default"): if group_name != types.NIXL_GROUP_NAME: _check_inside_actor() global _group_mgr - if not is_group_initialized(group_name): - # try loading from remote info store - try: - if group_name == types.NIXL_GROUP_NAME: - _group_mgr.create_collective_group( - types.Backend.NIXL, None, None, group_name, None - ) - else: - # if the information is stored in an Info object, - # get and create the group. - name = "info_" + group_name - mgr = ray.get_actor(name=name) - ids, world_size, rank, backend, gloo_timeout = ray.get( - mgr.get_info.remote() - ) - worker = ray._private.worker.global_worker - id_ = worker.core_worker.get_actor_id() - r = rank[ids.index(id_)] - _group_mgr.create_collective_group( - backend, world_size, r, group_name, gloo_timeout - ) - except ValueError as exc: - # check if this group is initialized using options() - if ( - "collective_group_name" in os.environ - and os.environ["collective_group_name"] == group_name - ): - rank = int(os.environ["collective_rank"]) - world_size = int(os.environ["collective_world_size"]) - backend = os.environ["collective_backend"] - gloo_timeout = os.getenv("collective_gloo_timeout", 30000) - _group_mgr.create_collective_group( - backend, world_size, rank, group_name, gloo_timeout - ) - else: - raise RuntimeError( - "The collective group '{}' is not " - "initialized in the process.".format(group_name) - ) from exc - g = _group_mgr.get_group_by_name(group_name) - return g + global _group_mgr_lock + with _group_mgr_lock: + if not _group_mgr.is_group_exist(group_name): + # try loading from remote info store + try: + if group_name == types.NIXL_GROUP_NAME: + _group_mgr.create_collective_group( + types.Backend.NIXL, None, None, group_name, None + ) + else: + # if the information is stored in an Info object, + # get and create the group. + name = "info_" + group_name + mgr = ray.get_actor(name=name) + ids, world_size, rank, backend, gloo_timeout = ray.get( + mgr.get_info.remote() + ) + worker = ray._private.worker.global_worker + id_ = worker.core_worker.get_actor_id() + r = rank[ids.index(id_)] + _group_mgr.create_collective_group( + backend, world_size, r, group_name, gloo_timeout + ) + except ValueError as exc: + # check if this group is initialized using options() + if ( + "collective_group_name" in os.environ + and os.environ["collective_group_name"] == group_name + ): + rank = int(os.environ["collective_rank"]) + world_size = int(os.environ["collective_world_size"]) + backend = os.environ["collective_backend"] + gloo_timeout = os.getenv("collective_gloo_timeout", 30000) + _group_mgr.create_collective_group( + backend, world_size, rank, group_name, gloo_timeout + ) + else: + raise RuntimeError( + "The collective group '{}' is not " + "initialized in the process.".format(group_name) + ) from exc + g = _group_mgr.get_group_by_name(group_name) + return g def _check_single_tensor_input(tensor): diff --git a/python/ray/util/collective/collective_group/nixl_backend.py b/python/ray/util/collective/collective_group/nixl_backend.py index 1f8fa858d47a..70b8a63e506d 100644 --- a/python/ray/util/collective/collective_group/nixl_backend.py +++ b/python/ray/util/collective/collective_group/nixl_backend.py @@ -1,3 +1,4 @@ +import threading import time from typing import TYPE_CHECKING, List, Tuple @@ -31,6 +32,8 @@ def __init__(self): actor_id = f"RAY-DRIVER-{uuid.uuid4()}" self._nixl_agent = nixl_agent(actor_id, agent_config) + self._aborted_transfer_obj_ids = set() + self._aborted_transfer_obj_ids_lock = threading.Lock() @classmethod def backend(cls): @@ -44,6 +47,7 @@ def backend(cls): def recv( self, tensors: List["torch.Tensor"], + obj_id: str, nixl_serialized_descs: bytes, remote_nixl_agent_meta: bytes, ): @@ -51,43 +55,65 @@ def recv( Args: tensors: List of tensors to receive into. + obj_id: The object ID for related GPU object. nixl_serialized_descs: Serialized NIXL descriptors for the remote tensors. remote_nixl_agent_meta: Metadata about the remote NIXL agent. Raises: RuntimeError: If the NIXL transfer enters an error state. """ - nixl_agent = self._nixl_agent - remote_descs = nixl_agent.deserialize_descs(nixl_serialized_descs) - local_descs = nixl_agent.register_memory(tensors) - remote_name = nixl_agent.add_remote_agent(remote_nixl_agent_meta) - - xfer_handle = nixl_agent.initialize_xfer( - # "UUID" here is just a placeholder, can be any bytes, but without it, - # nixl will fail to transfer multiple times. - "READ", - local_descs.trim(), - remote_descs, - remote_name, - "UUID", - ) - - state = nixl_agent.transfer(xfer_handle) - if state == "ERR": - raise RuntimeError("NIXL transfer got to Error state.") - # Since current nixl does not provide a better way, we need to check the state of - # the transfer continuously. - while True: - state = nixl_agent.check_xfer_state(xfer_handle) + with self._aborted_transfer_obj_ids_lock: + if obj_id in self._aborted_transfer_obj_ids: + self._aborted_transfer_obj_ids.remove(obj_id) + raise RuntimeError(f"NIXL transfer aborted for object id: {obj_id}") + + remote_name = None + local_descs = None + xfer_handle = None + try: + nixl_agent = self._nixl_agent + remote_descs = nixl_agent.deserialize_descs(nixl_serialized_descs) + local_descs = nixl_agent.register_memory(tensors) + remote_name = nixl_agent.add_remote_agent(remote_nixl_agent_meta) + + xfer_handle = nixl_agent.initialize_xfer( + # "UUID" here is just a placeholder, can be any bytes, but without it, + # nixl will fail to transfer multiple times. + "READ", + local_descs.trim(), + remote_descs, + remote_name, + "UUID", + ) + + state = nixl_agent.transfer(xfer_handle) if state == "ERR": raise RuntimeError("NIXL transfer got to Error state.") - if state == "PROC": - time.sleep(0.001) # Avoid busy waiting - elif state == "DONE": - break - - nixl_agent.release_xfer_handle(xfer_handle) - nixl_agent.deregister_memory(local_descs) + # Since current nixl does not provide a better way, we need to check the state of + # the transfer continuously. + while True: + state = nixl_agent.check_xfer_state(xfer_handle) + if state == "ERR": + raise RuntimeError("NIXL transfer got to Error state.") + if state == "PROC": + with self._aborted_transfer_obj_ids_lock: + if obj_id in self._aborted_transfer_obj_ids: + self._aborted_transfer_obj_ids.remove(obj_id) + raise RuntimeError( + f"NIXL transfer aborted for object id: {obj_id}" + ) + time.sleep(0.001) # Avoid busy waiting + elif state == "DONE": + break + finally: + # We could raise errors or NIXL could raise errors like NIXL_ERR_REMOTE_DISCONNECT, + # so doing best effort cleanup. + if xfer_handle: + nixl_agent.release_xfer_handle(xfer_handle) + if local_descs: + nixl_agent.deregister_memory(local_descs) + if remote_name: + nixl_agent.remove_remote_agent(remote_name) def get_nixl_metadata(self, tensors: List["torch.Tensor"]) -> Tuple[bytes, bytes]: """Get NIXL metadata for a set of tensors. @@ -107,3 +133,7 @@ def get_nixl_metadata(self, tensors: List["torch.Tensor"]) -> Tuple[bytes, bytes nixl_agent.get_serialized_descs(xfer_descs), nixl_agent.get_agent_metadata(), ) + + def abort(self, obj_id: str): + with self._aborted_transfer_obj_ids_lock: + self._aborted_transfer_obj_ids.add(obj_id) From 0232a4f6e6bc03ae03dd297da4fa7047aab9a3ea Mon Sep 17 00:00:00 2001 From: dayshah Date: Mon, 22 Sep 2025 08:07:47 +0000 Subject: [PATCH 12/27] non-obj ref tensor transport Signed-off-by: dayshah --- .../experimental/gpu_object_manager/gpu_object_manager.py | 2 +- .../experimental/gpu_object_manager/gpu_object_store.py | 8 +++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/python/ray/experimental/gpu_object_manager/gpu_object_manager.py b/python/ray/experimental/gpu_object_manager/gpu_object_manager.py index e6415e743547..e1eedc880554 100644 --- a/python/ray/experimental/gpu_object_manager/gpu_object_manager.py +++ b/python/ray/experimental/gpu_object_manager/gpu_object_manager.py @@ -370,7 +370,7 @@ def fetch_object( None, None, tensor_transport_backend ) __ray_recv__( - None, obj_id, gpu_object_meta.tensor_transport_meta, communicator_meta + None, obj_id, [gpu_object_meta.tensor_transport_meta], communicator_meta ) def trigger_out_of_band_tensor_transfer( diff --git a/python/ray/experimental/gpu_object_manager/gpu_object_store.py b/python/ray/experimental/gpu_object_manager/gpu_object_store.py index 3cf467fd9569..58c0dc867646 100644 --- a/python/ray/experimental/gpu_object_manager/gpu_object_store.py +++ b/python/ray/experimental/gpu_object_manager/gpu_object_store.py @@ -76,7 +76,7 @@ def __ray_send__( def __ray_recv__( self, obj_id: str, - tensor_transport_meta: List[ObjectRef], + tensor_transport_meta: List[Union[ObjectRef, TensorTransportMetadata]], communicator_meta: CommunicatorMetadata, ): """Helper function that runs on the dst actor to receive tensors from the src actor.""" @@ -84,8 +84,10 @@ def __ray_recv__( gpu_object_store = global_worker.gpu_object_manager.gpu_object_store try: - tensor_transport_meta: TensorTransportMetadata = ray.get( - tensor_transport_meta[0] + tensor_transport_meta: TensorTransportMetadata = ( + ray.get(tensor_transport_meta[0]) + if isinstance(tensor_transport_meta[0], ObjectRef) + else tensor_transport_meta[0] ) device = tensor_transport_meta.tensor_device tensor_meta = tensor_transport_meta.tensor_meta From 629272ece7a69a94830bbd9c58a74a76bf1d3a39 Mon Sep 17 00:00:00 2001 From: dayshah Date: Mon, 22 Sep 2025 22:24:33 +0000 Subject: [PATCH 13/27] always try to discard from aborted set Signed-off-by: dayshah --- python/ray/util/collective/collective_group/nixl_backend.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/ray/util/collective/collective_group/nixl_backend.py b/python/ray/util/collective/collective_group/nixl_backend.py index 70b8a63e506d..4051cbdd5d55 100644 --- a/python/ray/util/collective/collective_group/nixl_backend.py +++ b/python/ray/util/collective/collective_group/nixl_backend.py @@ -114,6 +114,8 @@ def recv( nixl_agent.deregister_memory(local_descs) if remote_name: nixl_agent.remove_remote_agent(remote_name) + with self._aborted_transfer_obj_ids_lock: + self._aborted_transfer_obj_ids.discard(obj_id) def get_nixl_metadata(self, tensors: List["torch.Tensor"]) -> Tuple[bytes, bytes]: """Get NIXL metadata for a set of tensors. From 34b571b028224ca8e5851cfd51c900e78105a59d Mon Sep 17 00:00:00 2001 From: dayshah Date: Mon, 22 Sep 2025 23:05:56 +0000 Subject: [PATCH 14/27] up Signed-off-by: dayshah --- python/ray/experimental/collective/nixl_tensor_transport.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/ray/experimental/collective/nixl_tensor_transport.py b/python/ray/experimental/collective/nixl_tensor_transport.py index 5e734ae31c74..eaf05a9d2b60 100644 --- a/python/ray/experimental/collective/nixl_tensor_transport.py +++ b/python/ray/experimental/collective/nixl_tensor_transport.py @@ -158,6 +158,8 @@ def abort_transport( obj_id: str, communicator_metadata: NixlCommunicatorMetadata, ): + from ray.util.collective.collective import get_group_handle + g = get_group_handle(communicator_metadata.communicator_name) if g: g.abort(obj_id) From 87f16adb3d1dba4b2a70c8e2dcaf4ed787d722d3 Mon Sep 17 00:00:00 2001 From: dayshah Date: Tue, 23 Sep 2025 22:15:05 +0000 Subject: [PATCH 15/27] up Signed-off-by: dayshah --- .../experimental/gpu_object_manager/gpu_object_store.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/python/ray/experimental/gpu_object_manager/gpu_object_store.py b/python/ray/experimental/gpu_object_manager/gpu_object_store.py index 58c0dc867646..3ab34de98f24 100644 --- a/python/ray/experimental/gpu_object_manager/gpu_object_store.py +++ b/python/ray/experimental/gpu_object_manager/gpu_object_store.py @@ -44,13 +44,17 @@ def _tensor_transport_to_collective_backend( def __ray_send__( self, obj_id: str, - tensor_transport_meta: List[ObjectRef], + tensor_transport_meta: List[Union[ObjectRef, TensorTransportMetadata]], communicator_meta: CommunicatorMetadata, ): """Helper function that runs on the src actor to send tensors to the dst actor.""" from ray._private.worker import global_worker - tensor_transport_meta: TensorTransportMetadata = ray.get(tensor_transport_meta[0]) + tensor_transport_meta: TensorTransportMetadata = ( + ray.get(tensor_transport_meta[0]) + if isinstance(tensor_transport_meta[0], ObjectRef) + else tensor_transport_meta[0] + ) gpu_object_store = global_worker.gpu_object_manager._gpu_object_store assert gpu_object_store.has_object( From 7b197547a448c8feb0a4e7e97f706ac016dc0651 Mon Sep 17 00:00:00 2001 From: dayshah Date: Thu, 25 Sep 2025 07:04:53 +0000 Subject: [PATCH 16/27] address comments Signed-off-by: dayshah --- .../collective/collective_tensor_transport.py | 4 ++ .../collective/nixl_tensor_transport.py | 4 ++ .../collective/tensor_transport_manager.py | 9 ++++ .../gpu_object_manager/gpu_object_manager.py | 48 ++++++++++++------- .../gpu_object_manager/gpu_object_store.py | 8 +--- 5 files changed, 49 insertions(+), 24 deletions(-) diff --git a/python/ray/experimental/collective/collective_tensor_transport.py b/python/ray/experimental/collective/collective_tensor_transport.py index 18cc55f30253..3c9f5de3f60f 100644 --- a/python/ray/experimental/collective/collective_tensor_transport.py +++ b/python/ray/experimental/collective/collective_tensor_transport.py @@ -26,6 +26,10 @@ def tensor_transport_backend(self) -> Backend: def is_one_sided() -> bool: return False + @staticmethod + def can_abort_transport() -> bool: + return False + def actor_has_tensor_transport(self, actor: "ray.actor.ActorHandle") -> bool: from ray.experimental.collective import get_collective_groups diff --git a/python/ray/experimental/collective/nixl_tensor_transport.py b/python/ray/experimental/collective/nixl_tensor_transport.py index eaf05a9d2b60..56c90199655c 100644 --- a/python/ray/experimental/collective/nixl_tensor_transport.py +++ b/python/ray/experimental/collective/nixl_tensor_transport.py @@ -24,6 +24,10 @@ def tensor_transport_backend(self) -> Backend: def is_one_sided() -> bool: return True + @staticmethod + def can_abort_transport() -> bool: + return True + def actor_has_tensor_transport(self, actor: "ray.actor.ActorHandle") -> bool: def __ray_actor_has_tensor_transport__( self: "ray.actor.ActorHandle", diff --git a/python/ray/experimental/collective/tensor_transport_manager.py b/python/ray/experimental/collective/tensor_transport_manager.py index 76396bc9cc90..bb854b5baa4c 100644 --- a/python/ray/experimental/collective/tensor_transport_manager.py +++ b/python/ray/experimental/collective/tensor_transport_manager.py @@ -31,6 +31,15 @@ def is_one_sided() -> bool: bool: True if the backend is one-sided, False otherwise. """ + @abstractmethod + @abstractmethod + def can_abort_transport(self) -> bool: + """Whether the backend can abort the transport. + + Returns: + bool: True if the backend can abort the transport. + """ + @abstractmethod def actor_has_tensor_transport(self, actor: "ray.actor.ActorHandle") -> bool: """Whether the actor has the tensor transport available. diff --git a/python/ray/experimental/gpu_object_manager/gpu_object_manager.py b/python/ray/experimental/gpu_object_manager/gpu_object_manager.py index 46206d9091d9..ea0dd1d4fe44 100644 --- a/python/ray/experimental/gpu_object_manager/gpu_object_manager.py +++ b/python/ray/experimental/gpu_object_manager/gpu_object_manager.py @@ -174,11 +174,14 @@ def _abort_transport( Cleans up the ref_info_map, kill the src and dst actors, and destroy the collective group if necessary. """ - from ray.experimental.collective import destroy_collective_group + from ray.experimental.collective import ( + destroy_collective_group, + get_tensor_transport_manager, + ) from ray.experimental.gpu_object_manager.gpu_object_store import ( __ray_abort_transport__, ) - from ray.util.collective.types import Backend, CollectiveCommunicatorMetadata + from ray.util.collective.types import CollectiveCommunicatorMetadata ref_info = ref_info_map.pop(failed_ref.hex(), None) if ref_info is None: @@ -188,9 +191,18 @@ def _abort_transport( ref_info_map.pop(ref_info.send_ref.hex(), None) ref_info_map.pop(ref_info.recv_ref.hex(), None) - error_string = "" - if ref_info.backend == Backend.NIXL: - # Only need to abort on receiver side for NIXL since it's one-sided. + tensor_transport_manager = get_tensor_transport_manager(ref_info.backend) + if tensor_transport_manager.can_abort_transport(): + if not tensor_transport_manager.is_one_sided(): + # This is dead code until we implement a NCCL abort since NIXL + # is the only abortable transport for now and is one-sided. + ref_info.src_actor.__ray_call__.options( + concurrency_group="_ray_system_error" + ).remote( + __ray_abort_transport__, + ref_info.obj_id, + ref_info.communicator_meta, + ) ref_info.dst_actor.__ray_call__.options( concurrency_group="_ray_system_error" ).remote( @@ -198,21 +210,23 @@ def _abort_transport( ref_info.obj_id, ref_info.communicator_meta, ) - error_string = "Aborting the transfer." + logger.info( + "RDT transfer with src actor %s and dst actor %s failed due to %s.", + ref_info.src_actor, + ref_info.dst_actor, + exception, + ) else: # TODO(#51276): Kill all actors in the collective group when we support more collective operations ray.kill(ref_info.src_actor) ray.kill(ref_info.dst_actor) - error_string = "Killing the actors." - - logger.error( - "RDT transfer with src actor %s and dst actor %s failed. %s " - "Transfer failed with exception: %s", - ref_info.src_actor, - ref_info.dst_actor, - error_string, - exception, - ) + logger.error( + "RDT transfer with src actor %s and dst actor %s failed. Killing the actors." + "Transfer failed with exception: %s", + ref_info.src_actor, + ref_info.dst_actor, + exception, + ) # isinstance does an implicit cast and makes communicator_name inaccessible # so we have to get communicator_name before the cast. @@ -471,7 +485,7 @@ def trigger_out_of_band_tensor_transfer( ).remote( __ray_send__, obj_id, - [tensor_transport_meta], + tensor_transport_meta, communicator_meta, ) diff --git a/python/ray/experimental/gpu_object_manager/gpu_object_store.py b/python/ray/experimental/gpu_object_manager/gpu_object_store.py index 0bec339e961a..485bf8c59ef5 100644 --- a/python/ray/experimental/gpu_object_manager/gpu_object_store.py +++ b/python/ray/experimental/gpu_object_manager/gpu_object_store.py @@ -44,18 +44,12 @@ def _tensor_transport_to_collective_backend( def __ray_send__( self, obj_id: str, - tensor_transport_meta: List[Union[ObjectRef, TensorTransportMetadata]], + tensor_transport_meta: TensorTransportMetadata, communicator_meta: CommunicatorMetadata, ): """Helper function that runs on the src actor to send tensors to the dst actor.""" from ray._private.worker import global_worker - tensor_transport_meta: TensorTransportMetadata = ( - ray.get(tensor_transport_meta[0]) - if isinstance(tensor_transport_meta[0], ObjectRef) - else tensor_transport_meta[0] - ) - gpu_object_store = global_worker.gpu_object_manager._gpu_object_store assert gpu_object_store.has_object( obj_id From 5e4aff8578560445d5475d339f9d2677c718229e Mon Sep 17 00:00:00 2001 From: dayshah Date: Thu, 25 Sep 2025 07:17:51 +0000 Subject: [PATCH 17/27] update test Signed-off-by: dayshah --- python/ray/tests/gpu_objects/test_gpu_objects_nixl.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python/ray/tests/gpu_objects/test_gpu_objects_nixl.py b/python/ray/tests/gpu_objects/test_gpu_objects_nixl.py index 14b971b055a1..c569b58f621b 100644 --- a/python/ray/tests/gpu_objects/test_gpu_objects_nixl.py +++ b/python/ray/tests/gpu_objects/test_gpu_objects_nixl.py @@ -157,9 +157,11 @@ def test_nixl_abort(ray_start_regular): ray.kill(actors[0]) signal_actor.send.remote() - with pytest.raises(ray.exceptions.RayTaskError): + with pytest.raises(ray.exceptions.RayTaskError) as excinfo: ray.get(result) + assert "ActorDiedError" in str(excinfo.value) + # Try a transfer with actor[1] receiving again new_actor = GPUTestActor.remote() ref = new_actor.echo.remote(torch.tensor([4, 5, 6]), "cuda") From dfa020a4d403a6a1b184d8988c28d44c0285204c Mon Sep 17 00:00:00 2001 From: dayshah Date: Thu, 25 Sep 2025 07:30:10 +0000 Subject: [PATCH 18/27] update docs Signed-off-by: dayshah --- doc/source/ray-core/direct-transport.rst | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/doc/source/ray-core/direct-transport.rst b/doc/source/ray-core/direct-transport.rst index f6d4cd70b642..75a0b489e2b1 100644 --- a/doc/source/ray-core/direct-transport.rst +++ b/doc/source/ray-core/direct-transport.rst @@ -249,7 +249,10 @@ For collective-based tensor transports (Gloo and NCCL): * Similarly, the process that created the collective group cannot serialize and pass RDT :class:`ray.ObjectRefs ` to other Ray tasks or actors. Instead, the :class:`ray.ObjectRef`\s can only be passed as direct arguments to other actor tasks, and those actors must be in the same collective group. * Each actor can only be in one collective group per tensor transport at a time. * No support for :func:`ray.put `. -* If a system-level error occurs during a collective operation, the collective group will be destroyed and the actors will no longer be able to communicate via the collective group. Note that application-level errors, i.e. exceptions raised by user code, will not destroy the collective group and will instead be propagated to any dependent task(s), as for non-RDT Ray objects. System-level errors include: +* If a system-level error occurs during a GLOO or NCCL collective operation, the collective group will be destroyed and the actors will be killed. +If the error occurs during a NIXL transfer, Ray will attempt to abort the transfer and raise an exception inside the task using the object ref. +Note that it's more likely NIXL will abort the transport before Ray does, but Ray will still propagate the NIXL error in the task using the object ref. +Note that application-level errors, i.e. exceptions raised by user code, will not destroy the collective group and will instead be propagated to any dependent task(s), as for non-RDT Ray objects. System-level errors include: * Errors internal to the third-party transport, e.g., NCCL network errors * Actor and node failure From d569b41869c0b71e048ecb794ebc1bb970466698 Mon Sep 17 00:00:00 2001 From: dayshah Date: Thu, 25 Sep 2025 07:47:39 +0000 Subject: [PATCH 19/27] add space in log Signed-off-by: dayshah --- .../ray/experimental/gpu_object_manager/gpu_object_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/experimental/gpu_object_manager/gpu_object_manager.py b/python/ray/experimental/gpu_object_manager/gpu_object_manager.py index ea0dd1d4fe44..9de8ed5f233a 100644 --- a/python/ray/experimental/gpu_object_manager/gpu_object_manager.py +++ b/python/ray/experimental/gpu_object_manager/gpu_object_manager.py @@ -221,7 +221,7 @@ def _abort_transport( ray.kill(ref_info.src_actor) ray.kill(ref_info.dst_actor) logger.error( - "RDT transfer with src actor %s and dst actor %s failed. Killing the actors." + "RDT transfer with src actor %s and dst actor %s failed. Killing the actors. " "Transfer failed with exception: %s", ref_info.src_actor, ref_info.dst_actor, From ad6a442cc5032e33d9d1aa47c37ba1d1fa28d7cc Mon Sep 17 00:00:00 2001 From: dayshah Date: Fri, 26 Sep 2025 21:15:39 -0700 Subject: [PATCH 20/27] try doc fix Signed-off-by: dayshah --- doc/source/ray-core/direct-transport.rst | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/doc/source/ray-core/direct-transport.rst b/doc/source/ray-core/direct-transport.rst index 75a0b489e2b1..1ec4f8a8e33d 100644 --- a/doc/source/ray-core/direct-transport.rst +++ b/doc/source/ray-core/direct-transport.rst @@ -14,12 +14,12 @@ For example, passing a CUDA ``torch.Tensor`` from one Ray task to another would *Ray Direct Transport (RDT)* is a new feature that allows Ray to store and pass objects directly between Ray actors. This feature augments the familiar Ray :class:`ObjectRef ` API by: -- Keeping GPU data in GPU memory until a transfer is needed +- Keeping GPU data in GPU memory until a transfer is necessary - Avoiding expensive serialization and copies to and from the Ray object store - Using efficient data transports like collective communication libraries (`Gloo `__ or `NCCL `__) or point-to-point RDMA (via `NVIDIA's NIXL `__) to transfer data directly between devices, including both CPU and GPUs .. note:: - RDT is currently in **alpha**. Not all Ray Core APIs are supported yet. Future releases may introduce breaking API changes. See the :ref:`limitations ` section for more details. + RDT is currently in **alpha**. All Ray Core APIs aren't supported yet. Future releases may introduce breaking API changes. See the :ref:`limitations ` section for more details. Getting started =============== @@ -250,9 +250,9 @@ For collective-based tensor transports (Gloo and NCCL): * Each actor can only be in one collective group per tensor transport at a time. * No support for :func:`ray.put `. * If a system-level error occurs during a GLOO or NCCL collective operation, the collective group will be destroyed and the actors will be killed. -If the error occurs during a NIXL transfer, Ray will attempt to abort the transfer and raise an exception inside the task using the object ref. -Note that it's more likely NIXL will abort the transport before Ray does, but Ray will still propagate the NIXL error in the task using the object ref. -Note that application-level errors, i.e. exceptions raised by user code, will not destroy the collective group and will instead be propagated to any dependent task(s), as for non-RDT Ray objects. System-level errors include: + If a system-level error occurs during a NIXL transfer, Ray or NIXL will abort the transfer with an exception and Ray will raise the exception in the task using the object ref. + Note that application-level errors, i.e. exceptions raised by user code, will not destroy the collective group and will instead be propagated to any dependent task(s), as for non-RDT Ray objects. + System-level errors include: * Errors internal to the third-party transport, e.g., NCCL network errors * Actor and node failure From 74dffd6938d540776715b85bd3c120164b65e731 Mon Sep 17 00:00:00 2001 From: dayshah Date: Wed, 22 Oct 2025 00:49:44 -0700 Subject: [PATCH 21/27] comments Signed-off-by: dayshah --- doc/source/ray-core/direct-transport.rst | 18 +++++++++++++----- .../collective/tensor_transport_manager.py | 4 +++- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/doc/source/ray-core/direct-transport.rst b/doc/source/ray-core/direct-transport.rst index 6bf166a3297f..3c74e76160c9 100644 --- a/doc/source/ray-core/direct-transport.rst +++ b/doc/source/ray-core/direct-transport.rst @@ -17,7 +17,7 @@ This feature augments the familiar Ray :class:`ObjectRef ` API by - Using efficient data transports like collective communication libraries (`Gloo `__ or `NCCL `__) or point-to-point RDMA (via `NVIDIA's NIXL `__) to transfer data directly between devices, including both CPU and GPUs .. note:: - RDT is currently in **alpha**. All Ray Core APIs aren't supported yet. Future releases may introduce breaking API changes. See the :ref:`limitations ` section for more details. + RDT is currently in **alpha** and doesn't support all Ray Core APIs yet. Future releases may introduce breaking API changes. See the :ref:`limitations ` section for more details. Getting started =============== @@ -290,14 +290,22 @@ For collective-based tensor transports (Gloo and NCCL): * Similarly, the process that created the collective group cannot serialize and pass RDT :class:`ray.ObjectRefs ` to other Ray tasks or actors. Instead, the :class:`ray.ObjectRef`\s can only be passed as direct arguments to other actor tasks, and those actors must be in the same collective group. * Each actor can only be in one collective group per tensor transport at a time. * No support for :func:`ray.put `. -* If a system-level error occurs during a GLOO or NCCL collective operation, the collective group will be destroyed and the actors will be killed. - If a system-level error occurs during a NIXL transfer, Ray or NIXL will abort the transfer with an exception and Ray will raise the exception in the task using the object ref. - Note that application-level errors, i.e. exceptions raised by user code, will not destroy the collective group and will instead be propagated to any dependent task(s), as for non-RDT Ray objects. - System-level errors include: + +System-level Error-Handling +=========================== + +* If a system-level error occurs during a GLOO or NCCL collective operation, the collective group will be destroyed and the actors will be killed to prevent any hanging. + +* If a system-level error occurs during a NIXL transfer, Ray or NIXL will abort the transfer with an exception and Ray will raise the exception in the dependent task or on the ray.get on the NIXL ref. + +* Note that application-level errors, i.e. exceptions raised by user code, will not destroy the collective group and will instead be propagated to any dependent task(s), as for non-RDT Ray objects. + +* System-level errors include: * Errors internal to the third-party transport, e.g., NCCL network errors * Actor and node failure * Tensors returned by the user that are located on an unsupported device, e.g., a CPU tensor when using NCCL + * Ray object fetch timeouts (can be overridden by setting the ``RAY_fetch_fail_timeout_milliseconds`` environment variable) * Any unexpected system bugs diff --git a/python/ray/experimental/collective/tensor_transport_manager.py b/python/ray/experimental/collective/tensor_transport_manager.py index 1b4242d402cd..98653f926fe8 100644 --- a/python/ray/experimental/collective/tensor_transport_manager.py +++ b/python/ray/experimental/collective/tensor_transport_manager.py @@ -34,7 +34,9 @@ def is_one_sided() -> bool: @abstractmethod @abstractmethod def can_abort_transport(self) -> bool: - """Whether the backend can abort the transport. + """ + Whether the backend can abort the transport. + If this returns False, then Ray will kill involved actors upon system errors to avoid hanging. Returns: bool: True if the backend can abort the transport. From 7f9f1e698e3d685b7db5e4a6d144d63f79fe8ed7 Mon Sep 17 00:00:00 2001 From: dayshah Date: Wed, 22 Oct 2025 14:24:14 -0700 Subject: [PATCH 22/27] polish doc Signed-off-by: dayshah --- doc/source/ray-core/direct-transport.rst | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/doc/source/ray-core/direct-transport.rst b/doc/source/ray-core/direct-transport.rst index 3c74e76160c9..75c2dbbbe15d 100644 --- a/doc/source/ray-core/direct-transport.rst +++ b/doc/source/ray-core/direct-transport.rst @@ -291,20 +291,19 @@ For collective-based tensor transports (Gloo and NCCL): * Each actor can only be in one collective group per tensor transport at a time. * No support for :func:`ray.put `. -System-level Error-Handling +Error handling =========================== +* Application-level errors, i.e. exceptions raised by user code, will not destroy the collective group and will instead be propagated to any dependent task(s), as for non-RDT Ray objects. + * If a system-level error occurs during a GLOO or NCCL collective operation, the collective group will be destroyed and the actors will be killed to prevent any hanging. * If a system-level error occurs during a NIXL transfer, Ray or NIXL will abort the transfer with an exception and Ray will raise the exception in the dependent task or on the ray.get on the NIXL ref. - -* Note that application-level errors, i.e. exceptions raised by user code, will not destroy the collective group and will instead be propagated to any dependent task(s), as for non-RDT Ray objects. * System-level errors include: - * Errors internal to the third-party transport, e.g., NCCL network errors - * Actor and node failure - * Tensors returned by the user that are located on an unsupported device, e.g., a CPU tensor when using NCCL + * Actor or node failures + * Transport errors due to tensor device / transport mismatches, e.g., a CPU tensor when using NCCL * Ray object fetch timeouts (can be overridden by setting the ``RAY_fetch_fail_timeout_milliseconds`` environment variable) * Any unexpected system bugs From 6c689c73160555aeb136c918ed27797f12f73548 Mon Sep 17 00:00:00 2001 From: dayshah Date: Sat, 25 Oct 2025 15:36:10 -0700 Subject: [PATCH 23/27] up Signed-off-by: dayshah --- doc/source/ray-core/direct-transport.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/source/ray-core/direct-transport.rst b/doc/source/ray-core/direct-transport.rst index 742f99246506..e871e8475b73 100644 --- a/doc/source/ray-core/direct-transport.rst +++ b/doc/source/ray-core/direct-transport.rst @@ -299,7 +299,7 @@ Due to a known issue, we currently do not support repeated transfers of tensors :end-before: __nixl_limitations_end__ Error handling -=========================== +============== * Application-level errors, i.e. exceptions raised by user code, will not destroy the collective group and will instead be propagated to any dependent task(s), as for non-RDT Ray objects. From 40906686ce7771979c96bd82ceaa98bf66f8921c Mon Sep 17 00:00:00 2001 From: dayshah Date: Sat, 25 Oct 2025 15:46:06 -0700 Subject: [PATCH 24/27] up Signed-off-by: dayshah --- .../ray/experimental/collective/tensor_transport_manager.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/ray/experimental/collective/tensor_transport_manager.py b/python/ray/experimental/collective/tensor_transport_manager.py index 98653f926fe8..5ba0434346ae 100644 --- a/python/ray/experimental/collective/tensor_transport_manager.py +++ b/python/ray/experimental/collective/tensor_transport_manager.py @@ -31,9 +31,9 @@ def is_one_sided() -> bool: bool: True if the backend is one-sided, False otherwise. """ + @staticmethod @abstractmethod - @abstractmethod - def can_abort_transport(self) -> bool: + def can_abort_transport() -> bool: """ Whether the backend can abort the transport. If this returns False, then Ray will kill involved actors upon system errors to avoid hanging. From 3e9d8c9cd85a358e14f2a9e9fa0a09bcf558a122 Mon Sep 17 00:00:00 2001 From: dayshah Date: Tue, 11 Nov 2025 19:06:40 +0000 Subject: [PATCH 25/27] up Signed-off-by: dayshah --- .../doc_code/direct_transport_nixl.py | 2 +- .../collective_group/nixl_backend.py | 22 +++++++++---------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/doc/source/ray-core/doc_code/direct_transport_nixl.py b/doc/source/ray-core/doc_code/direct_transport_nixl.py index dd4cd3925524..774fcd6bab7a 100644 --- a/doc/source/ray-core/doc_code/direct_transport_nixl.py +++ b/doc/source/ray-core/doc_code/direct_transport_nixl.py @@ -88,6 +88,6 @@ def sum_dict(self, dict): result2 = receiver.sum_dict.remote(ref2) try: print(ray.get(result2)) -except ActorDiedError as e: +except ValueError as e: print("Error caught:", e) # __nixl_limitations_end__ diff --git a/python/ray/util/collective/collective_group/nixl_backend.py b/python/ray/util/collective/collective_group/nixl_backend.py index cd08ca1ae63b..e3ac6debad2c 100644 --- a/python/ray/util/collective/collective_group/nixl_backend.py +++ b/python/ray/util/collective/collective_group/nixl_backend.py @@ -105,17 +105,17 @@ def recv( time.sleep(0.001) # Avoid busy waiting elif state == "DONE": break - finally: - # We could raise errors or NIXL could raise errors like NIXL_ERR_REMOTE_DISCONNECT, - # so doing best effort cleanup. - with self._aborted_transfer_obj_ids_lock: - self._aborted_transfer_obj_ids.discard(obj_id) - if xfer_handle: - nixl_agent.release_xfer_handle(xfer_handle) - if remote_name: - nixl_agent.remove_remote_agent(remote_name) - if local_descs: - nixl_agent.deregister_memory(local_descs) + finally: + # We could raise errors or NIXL could raise errors like NIXL_ERR_REMOTE_DISCONNECT, + # so doing best effort cleanup. + with self._aborted_transfer_obj_ids_lock: + self._aborted_transfer_obj_ids.discard(obj_id) + if xfer_handle: + nixl_agent.release_xfer_handle(xfer_handle) + if remote_name: + nixl_agent.remove_remote_agent(remote_name) + if local_descs: + nixl_agent.deregister_memory(local_descs) def get_nixl_metadata( From 2a0785638873757115cacb9a687e34340dfc86a2 Mon Sep 17 00:00:00 2001 From: dayshah Date: Tue, 11 Nov 2025 11:27:51 -0800 Subject: [PATCH 26/27] fix precommit Signed-off-by: dayshah --- python/ray/tests/gpu_objects/test_gpu_objects_nixl.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/tests/gpu_objects/test_gpu_objects_nixl.py b/python/ray/tests/gpu_objects/test_gpu_objects_nixl.py index 7f8e34dbf93f..f38199611390 100644 --- a/python/ray/tests/gpu_objects/test_gpu_objects_nixl.py +++ b/python/ray/tests/gpu_objects/test_gpu_objects_nixl.py @@ -4,7 +4,7 @@ import torch import ray -from ray._common.test_utils import wait_for_condition, SignalActor +from ray._common.test_utils import SignalActor, wait_for_condition @ray.remote(num_gpus=1, num_cpus=0, enable_tensor_transport=True) From 3982e6cb92c7df8e7967be8133ccddd5ab4e32e3 Mon Sep 17 00:00:00 2001 From: dayshah Date: Tue, 11 Nov 2025 11:28:07 -0800 Subject: [PATCH 27/27] fix precommit Signed-off-by: dayshah --- python/ray/util/collective/collective_group/nixl_backend.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/ray/util/collective/collective_group/nixl_backend.py b/python/ray/util/collective/collective_group/nixl_backend.py index e3ac6debad2c..beff753b055a 100644 --- a/python/ray/util/collective/collective_group/nixl_backend.py +++ b/python/ray/util/collective/collective_group/nixl_backend.py @@ -117,7 +117,6 @@ def recv( if local_descs: nixl_agent.deregister_memory(local_descs) - def get_nixl_metadata( self, tensors: List["torch.Tensor"] ) -> Tuple[Any, bytes, bytes]: