Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
45ff234
error handling with actor deaths
dayshah Sep 15, 2025
a4aaf51
up
dayshah Sep 15, 2025
3be637b
Fixes + nccl+nixl abort
dayshah Sep 16, 2025
d587a12
up
dayshah Sep 16, 2025
3aaf712
fix communicator name
dayshah Sep 16, 2025
82650fd
kill nccl nixl abort + address comments
dayshah Sep 17, 2025
6033f31
Merge branch 'master' into rdt-system-exc
dayshah Sep 18, 2025
95b9ff0
no background concurrency group
dayshah Sep 18, 2025
b31e1ed
join monitor thread and check for log
dayshah Sep 21, 2025
b794b0a
update loop to only wait if there's nothing imminent left
dayshah Sep 22, 2025
888fe7d
Merge branch 'master' into rdt-system-exc
dayshah Sep 22, 2025
1b7f3a9
Merge branch 'master' into rdt-system-exc
dayshah Sep 22, 2025
e7bdf48
add todo for bigger collective groups
dayshah Sep 23, 2025
e3036d0
Merge branch 'master' into rdt-system-exc
dayshah Sep 23, 2025
0c58b7a
[core][rdt] Abort NIXL and allow actor reuse on failed transfers
dayshah Sep 22, 2025
0232a4f
non-obj ref tensor transport
dayshah Sep 22, 2025
629272e
always try to discard from aborted set
dayshah Sep 22, 2025
34b571b
up
dayshah Sep 22, 2025
87f16ad
up
dayshah Sep 23, 2025
4e03bb4
Merge branch 'master' into nixl-abort
dayshah Sep 25, 2025
c78e268
Merge branch 'master' into nixl-abort
dayshah Sep 25, 2025
7b19754
address comments
dayshah Sep 25, 2025
5e4aff8
update test
dayshah Sep 25, 2025
dfa020a
update docs
dayshah Sep 25, 2025
d569b41
add space in log
dayshah Sep 25, 2025
ad6a442
try doc fix
dayshah Sep 27, 2025
469d024
Merge branch 'master' into nixl-abort
dayshah Sep 28, 2025
f97cd95
Merge branch 'master' into nixl-abort
dayshah Oct 16, 2025
4860c73
Merge branch 'master' into nixl-abort
dayshah Oct 22, 2025
74dffd6
comments
dayshah Oct 22, 2025
7f9f1e6
polish doc
dayshah Oct 22, 2025
0d24723
Merge branch 'master' into nixl-abort
dayshah Oct 25, 2025
6c689c7
up
dayshah Oct 25, 2025
4090668
up
dayshah Oct 25, 2025
0049441
Merge branch 'master' into nixl-abort
dayshah Nov 11, 2025
3e9d8c9
up
dayshah Nov 11, 2025
2a07856
fix precommit
dayshah Nov 11, 2025
3982e6c
fix precommit
dayshah Nov 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 19 additions & 8 deletions doc/source/ray-core/direct-transport.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ For example, passing a CUDA ``torch.Tensor`` from one Ray task to another would
*Ray Direct Transport (RDT)* is a new feature that allows Ray to store and pass objects directly between Ray actors.
This feature augments the familiar Ray :class:`ObjectRef <ray.ObjectRef>` API by:

- Keeping GPU data in GPU memory until a transfer is needed
- Keeping GPU data in GPU memory until a transfer is necessary
- Avoiding expensive serialization and copies to and from the Ray object store
- Using efficient data transports like collective communication libraries (`Gloo <https://github.com/pytorch/gloo>`__ or `NCCL <https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/index.html>`__) or point-to-point RDMA (via `NVIDIA's NIXL <https://github.com/ai-dynamo/nixl>`__) to transfer data directly between devices, including both CPU and GPUs

.. note::
RDT is currently in **alpha**. Not all Ray Core APIs are supported yet. Future releases may introduce breaking API changes. See the :ref:`limitations <limitations>` section for more details.
RDT is currently in **alpha** and doesn't support all Ray Core APIs yet. Future releases may introduce breaking API changes. See the :ref:`limitations <limitations>` section for more details.

Getting started
===============
Expand Down Expand Up @@ -290,12 +290,6 @@ For collective-based tensor transports (Gloo and NCCL):
* Similarly, the process that created the collective group cannot serialize and pass RDT :class:`ray.ObjectRefs <ray.ObjectRef>` to other Ray tasks or actors. Instead, the :class:`ray.ObjectRef`\s can only be passed as direct arguments to other actor tasks, and those actors must be in the same collective group.
* Each actor can only be in one collective group per tensor transport at a time.
* No support for :func:`ray.put <ray.put>`.
* If a system-level error occurs during a collective operation, the collective group will be destroyed and the actors will no longer be able to communicate via the collective group. Note that application-level errors, i.e. exceptions raised by user code, will not destroy the collective group and will instead be propagated to any dependent task(s), as for non-RDT Ray objects. System-level errors include:

* Errors internal to the third-party transport, e.g., NCCL network errors
* Actor and node failure
* Tensors returned by the user that are located on an unsupported device, e.g., a CPU tensor when using NCCL
* Any unexpected system bugs


Due to a known issue, for NIXL, we currently do not support storing different GPU objects at the same actor, where the objects contain an overlapping but not equal set of tensors. To support this pattern, ensure that the first `ObjectRef` has gone out of scope before storing the same tensor(s) again in a second object.
Expand All @@ -305,6 +299,23 @@ Due to a known issue, for NIXL, we currently do not support storing different GP
:start-after: __nixl_limitations_start__
:end-before: __nixl_limitations_end__

Error handling
==============

* Application-level errors, i.e. exceptions raised by user code, will not destroy the collective group and will instead be propagated to any dependent task(s), as for non-RDT Ray objects.

* If a system-level error occurs during a GLOO or NCCL collective operation, the collective group will be destroyed and the actors will be killed to prevent any hanging.

* If a system-level error occurs during a NIXL transfer, Ray or NIXL will abort the transfer with an exception and Ray will raise the exception in the dependent task or on the ray.get on the NIXL ref.

* System-level errors include:
* Errors internal to the third-party transport, e.g., NCCL network errors
* Actor or node failures
* Transport errors due to tensor device / transport mismatches, e.g., a CPU tensor when using NCCL
* Ray object fetch timeouts (can be overridden by setting the ``RAY_fetch_fail_timeout_milliseconds`` environment variable)
* Any unexpected system bugs


Advanced: RDT Internals
=======================

Expand Down
2 changes: 1 addition & 1 deletion doc/source/ray-core/doc_code/direct_transport_nixl.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,6 @@ def sum_dict(self, dict):
result2 = receiver.sum_dict.remote(ref2)
try:
print(ray.get(result2))
except ActorDiedError as e:
except ValueError as e:
print("Error caught:", e)
# __nixl_limitations_end__
1 change: 1 addition & 0 deletions python/ray/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1180,6 +1180,7 @@ def _process_option_dict(actor_options, has_tensor_transport_methods):
if _filled_options.get("concurrency_groups", None) is None:
_filled_options["concurrency_groups"] = {}
_filled_options["concurrency_groups"]["_ray_system"] = 1
_filled_options["concurrency_groups"]["_ray_system_error"] = 1

return _filled_options

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ def tensor_transport_backend(self) -> Backend:
def is_one_sided() -> bool:
return False

@staticmethod
def can_abort_transport() -> bool:
return False

def actor_has_tensor_transport(self, actor: "ray.actor.ActorHandle") -> bool:
from ray.experimental.collective import get_collective_groups

Expand Down Expand Up @@ -137,6 +141,7 @@ def get_communicator_metadata(
@staticmethod
def recv_multiple_tensors(
tensors,
obj_id: str,
tensor_transport_metadata: CollectiveTransportMetadata,
communicator_metadata: CollectiveCommunicatorMetadata,
):
Expand Down Expand Up @@ -183,3 +188,12 @@ def garbage_collect(
obj_id: str, tensor_transport_meta: CollectiveTransportMetadata
):
pass

@staticmethod
def abort_transport(
obj_id: str,
communicator_metadata: CollectiveCommunicatorMetadata,
):
raise NotImplementedError(
"Collective transport does not support abort_transport for now."
)
17 changes: 17 additions & 0 deletions python/ray/experimental/collective/nixl_tensor_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ def tensor_transport_backend(self) -> Backend:
def is_one_sided() -> bool:
return True

@staticmethod
def can_abort_transport() -> bool:
return True

def actor_has_tensor_transport(self, actor: "ray.actor.ActorHandle") -> bool:
def __ray_actor_has_tensor_transport__(
self: "ray.actor.ActorHandle",
Expand Down Expand Up @@ -134,6 +138,7 @@ def get_communicator_metadata(
@staticmethod
def recv_multiple_tensors(
tensors,
obj_id: str,
tensor_transport_metadata: NixlTransportMetadata,
communicator_metadata: NixlCommunicatorMetadata,
):
Expand All @@ -152,6 +157,7 @@ def recv_multiple_tensors(

g.recv(
tensors,
obj_id,
tensor_transport_metadata.nixl_serialized_descs,
tensor_transport_metadata.nixl_agent_meta,
)
Expand All @@ -178,3 +184,14 @@ def garbage_collect(obj_id: str, tensor_transport_meta: NixlTransportMetadata):
if descs is not None:
nixl_backend = get_group_handle(NIXL_GROUP_NAME)
nixl_backend.deregister_memory(descs)

@staticmethod
def abort_transport(
obj_id: str,
communicator_metadata: NixlCommunicatorMetadata,
):
from ray.util.collective.collective import get_group_handle

g = get_group_handle(communicator_metadata.communicator_name)
if g:
g.abort(obj_id)
27 changes: 27 additions & 0 deletions python/ray/experimental/collective/tensor_transport_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,17 @@ def is_one_sided() -> bool:
bool: True if the backend is one-sided, False otherwise.
"""

@staticmethod
@abstractmethod
def can_abort_transport() -> bool:
"""
Whether the backend can abort the transport.
If this returns False, then Ray will kill involved actors upon system errors to avoid hanging.

Returns:
bool: True if the backend can abort the transport.
"""

@abstractmethod
def actor_has_tensor_transport(self, actor: "ray.actor.ActorHandle") -> bool:
"""Whether the actor has the tensor transport available.
Expand Down Expand Up @@ -102,6 +113,7 @@ def get_communicator_metadata(
@abstractmethod
def recv_multiple_tensors(
tensors: List["torch.Tensor"],
obj_id: str,
tensor_transport_metadata: TensorTransportMetadata,
communicator_metadata: CommunicatorMetadata,
):
Expand All @@ -110,6 +122,7 @@ def recv_multiple_tensors(

Args:
tensors: The pre-allocated tensor space to receive the tensors.
obj_id: The object ID for related GPU object.
tensor_transport_metadata: The tensor transport metadata for the GPU object.
communicator_metadata: The communicator metadata for the send/recv operation.

Expand Down Expand Up @@ -139,3 +152,17 @@ def garbage_collect(obj_id: str, tensor_transport_meta: TensorTransportMetadata)
obj_id: The ID of the GPU object to garbage collect.
tensor_transport_meta: The tensor transport metadata.
"""

@staticmethod
@abstractmethod
def abort_transport(
obj_id: str,
communicator_metadata: CommunicatorMetadata,
):
"""
Abort the transport.

Args:
obj_id: The object ID for related GPU object.
communicator_metadata: The communicator metadata for the send/recv operation.
"""
63 changes: 48 additions & 15 deletions python/ray/experimental/gpu_object_manager/gpu_object_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class TransferMetadata(NamedTuple):
recv_ref: ObjectRef
communicator_meta: "CommunicatorMetadata"
backend: str
obj_id: str
timeout: float


Expand Down Expand Up @@ -179,28 +180,59 @@ def _abort_transport(
Cleans up the ref_info_map, kill the src and dst actors, and destroy the
collective group if necessary.
"""
from ray.experimental.collective import destroy_collective_group
from ray.experimental.collective import (
destroy_collective_group,
get_tensor_transport_manager,
)
from ray.experimental.gpu_object_manager.gpu_object_store import (
__ray_abort_transport__,
)
from ray.util.collective.types import CollectiveCommunicatorMetadata

ref_info = ref_info_map.pop(failed_ref.hex(), None)
if ref_info is None:
return

logger.error(
"RDT transfer with src actor %s and dst actor %s failed. Killing the actors. "
"Transfer failed with exception: %s",
ref_info.src_actor,
ref_info.dst_actor,
exception,
)

if ref_info.send_ref:
ref_info_map.pop(ref_info.send_ref.hex(), None)
ref_info_map.pop(ref_info.recv_ref.hex(), None)

# TODO(#51276): Kill all actors in the collective group when we support more collective operations
ray.kill(ref_info.src_actor)
ray.kill(ref_info.dst_actor)
tensor_transport_manager = get_tensor_transport_manager(ref_info.backend)
if tensor_transport_manager.can_abort_transport():
if not tensor_transport_manager.is_one_sided():
# This is dead code until we implement a NCCL abort since NIXL
# is the only abortable transport for now and is one-sided.
ref_info.src_actor.__ray_call__.options(
concurrency_group="_ray_system_error"
).remote(
__ray_abort_transport__,
ref_info.obj_id,
ref_info.communicator_meta,
)
ref_info.dst_actor.__ray_call__.options(
concurrency_group="_ray_system_error"
).remote(
__ray_abort_transport__,
ref_info.obj_id,
ref_info.communicator_meta,
)
logger.info(
"RDT transfer with src actor %s and dst actor %s failed due to %s.",
ref_info.src_actor,
ref_info.dst_actor,
exception,
)
else:
# TODO(#51276): Kill all actors in the collective group when we support more collective operations
ray.kill(ref_info.src_actor)
ray.kill(ref_info.dst_actor)
logger.error(
"RDT transfer with src actor %s and dst actor %s failed. Killing the actors. "
"Transfer failed with exception: %s",
ref_info.src_actor,
ref_info.dst_actor,
exception,
)

# isinstance does an implicit cast and makes communicator_name inaccessible
# so we have to get communicator_name before the cast.
Expand Down Expand Up @@ -336,7 +368,7 @@ def _fetch_object(
__ray_fetch_gpu_object__, obj_id
)
)
self.gpu_object_store.add_object(obj_id, tensors)
self.gpu_object_store.add_object(obj_id, tensors, is_primary=False)
else:
if isinstance(gpu_object_meta.tensor_transport_meta, ObjectRef):
# If the tensor transport meta is an ObjectRef, gpu object manager
Expand All @@ -358,7 +390,7 @@ def _fetch_object(
None, None, tensor_transport_backend
)
__ray_recv__(
None, obj_id, gpu_object_meta.tensor_transport_meta, communicator_meta
None, obj_id, [gpu_object_meta.tensor_transport_meta], communicator_meta
)

def trigger_out_of_band_tensor_transfer(
Expand Down Expand Up @@ -474,7 +506,7 @@ def trigger_out_of_band_tensor_transfer(
).remote(
__ray_recv__,
obj_id,
tensor_transport_meta,
[tensor_transport_meta],
communicator_meta,
)

Expand All @@ -486,6 +518,7 @@ def trigger_out_of_band_tensor_transfer(
recv_ref=recv_ref,
communicator_meta=communicator_meta,
backend=gpu_object_meta.tensor_transport_backend,
obj_id=obj_id,
timeout=time.time() + ray_constants.FETCH_FAIL_TIMEOUT_SECONDS,
)
)
Expand Down
Loading