diff --git a/python/ray/actor.py b/python/ray/actor.py index b33c8c0f575d..bb81dd54b89f 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -1180,7 +1180,8 @@ def _process_option_dict(actor_options, has_tensor_transport_methods): if _filled_options.get("concurrency_groups", None) is None: _filled_options["concurrency_groups"] = {} _filled_options["concurrency_groups"]["_ray_system"] = 1 - _filled_options["concurrency_groups"]["_ray_system_error"] = 1 + _filled_options["concurrency_groups"]["_ray_system_rdt_metadata"] = 1 + _filled_options["concurrency_groups"]["_ray_system_rdt_error"] = 1 return _filled_options diff --git a/python/ray/experimental/collective/collective_tensor_transport.py b/python/ray/experimental/collective/collective_tensor_transport.py index 78f97d64c87a..290596622a99 100644 --- a/python/ray/experimental/collective/collective_tensor_transport.py +++ b/python/ray/experimental/collective/collective_tensor_transport.py @@ -86,9 +86,9 @@ def __ray_get_tensor_transport_metadata__( # NOTE(swang): We put this task on the background thread to avoid tasks # executing on the main thread blocking this task. - return src_actor.__ray_call__.options(concurrency_group="_ray_system").remote( - __ray_get_tensor_transport_metadata__, obj_id - ) + return src_actor.__ray_call__.options( + concurrency_group="_ray_system_rdt_metadata" + ).remote(__ray_get_tensor_transport_metadata__, obj_id) @staticmethod def get_communicator_metadata( diff --git a/python/ray/experimental/collective/nixl_tensor_transport.py b/python/ray/experimental/collective/nixl_tensor_transport.py index 6ab5b9d03ee3..c788763a5bd4 100644 --- a/python/ray/experimental/collective/nixl_tensor_transport.py +++ b/python/ray/experimental/collective/nixl_tensor_transport.py @@ -118,9 +118,9 @@ def __ray_get_tensor_transport_metadata__( # NOTE(swang): We put this task on the background thread to avoid tasks # executing on the main thread blocking this task. - return src_actor.__ray_call__.options(concurrency_group="_ray_system").remote( - __ray_get_tensor_transport_metadata__, obj_id - ) + return src_actor.__ray_call__.options( + concurrency_group="_ray_system_rdt_metadata" + ).remote(__ray_get_tensor_transport_metadata__, obj_id) @staticmethod def get_communicator_metadata( diff --git a/python/ray/experimental/gpu_object_manager/gpu_object_manager.py b/python/ray/experimental/gpu_object_manager/gpu_object_manager.py index 3c2cd2526970..2f8ee2434b07 100644 --- a/python/ray/experimental/gpu_object_manager/gpu_object_manager.py +++ b/python/ray/experimental/gpu_object_manager/gpu_object_manager.py @@ -203,14 +203,14 @@ def _abort_transport( # This is dead code until we implement a NCCL abort since NIXL # is the only abortable transport for now and is one-sided. ref_info.src_actor.__ray_call__.options( - concurrency_group="_ray_system_error" + concurrency_group="_ray_system_rdt_error" ).remote( __ray_abort_transport__, ref_info.obj_id, ref_info.communicator_meta, ) ref_info.dst_actor.__ray_call__.options( - concurrency_group="_ray_system_error" + concurrency_group="_ray_system_rdt_error" ).remote( __ray_abort_transport__, ref_info.obj_id, diff --git a/python/ray/tests/BUILD.bazel b/python/ray/tests/BUILD.bazel index d72c9d21d188..4f32e2ea0d9e 100644 --- a/python/ray/tests/BUILD.bazel +++ b/python/ray/tests/BUILD.bazel @@ -616,6 +616,7 @@ py_test_module_list( ], ) +# NO GPU RDT TESTS py_test_module_list( size = "large", files = [ @@ -633,12 +634,14 @@ py_test_module_list( ], ) +# GPU RDT TESTS py_test_module_list( size = "medium", env = {"RAY_PYTEST_USE_GPU": "1"}, files = [ "gpu_objects/test_gpu_objects_nccl.py", "gpu_objects/test_gpu_objects_nixl.py", + "gpu_objects/test_rdt_all_transports.py", ], tags = [ "custom_setup", diff --git a/python/ray/tests/gpu_objects/test_rdt_all_transports.py b/python/ray/tests/gpu_objects/test_rdt_all_transports.py new file mode 100644 index 000000000000..5dffa55c8eb8 --- /dev/null +++ b/python/ray/tests/gpu_objects/test_rdt_all_transports.py @@ -0,0 +1,50 @@ +import os +import sys + +import pytest +import torch + +import ray +from ray.experimental.collective import create_collective_group + +USE_GPU = bool(os.environ.get("RAY_PYTEST_USE_GPU", 0)) +TRANSPORTS_AND_DEVICES = ( + [("nixl", "cuda"), ("nccl", "cuda"), ("gloo", "cpu")] + if USE_GPU + else [("gloo", "cpu")] +) + + +@ray.remote(num_cpus=0, num_gpus=1 if USE_GPU else 0, enable_tensor_transport=True) +class AsyncActor: + async def send(self, data, device): + device_data = data.to(device) + return device_data + + async def intermediate(self, device_data): + return device_data + + async def recv(self, device_data): + return device_data + + +@pytest.mark.parametrize( + "ray_start_regular_shared", [{"num_gpus": 4} if USE_GPU else {}], indirect=True +) +@pytest.mark.parametrize("transport, device", TRANSPORTS_AND_DEVICES) +def test_rdt_async_chain(ray_start_regular_shared, transport, device): + actors = [AsyncActor.remote() for _ in range(3)] + if transport == "gloo" or transport == "nccl": + create_collective_group(actors, transport) + data = torch.randn(100, 100) + send_ref = actors[0].send.options(tensor_transport=transport).remote(data, device) + int_ref = ( + actors[1].intermediate.options(tensor_transport=transport).remote(send_ref) + ) + recv_ref = actors[2].recv.remote(int_ref) + data = ray.get(recv_ref) + assert data.device.type == device + + +if __name__ == "__main__": + sys.exit(pytest.main(["-sv", __file__]))