From 4cb392761d46b242d64d4b559b868ad2f6b9b486 Mon Sep 17 00:00:00 2001 From: Ricky Xu Date: Tue, 3 Jan 2023 10:57:43 -0800 Subject: [PATCH] Revert "Improving message for access to unknown objects (#28209)" This reverts commit d57b582d9823c5af5c9be1d4da8bb52e00230024. --- cpp/src/ray/runtime/abstract_ray_runtime.cc | 2 +- cpp/src/ray/test/cluster/cluster_mode_test.cc | 24 ++-- .../io/ray/test/ReferenceCountingTest.java | 62 ++++----- python/ray/_raylet.pyx | 32 ++--- python/ray/includes/common.pxd | 2 - python/ray/includes/libcoreworker.pxd | 9 +- python/ray/tests/BUILD | 2 +- python/ray/tests/test_basic.py | 3 +- python/ray/tests/test_ray_shutdown.py | 93 ------------- src/ray/common/status.cc | 3 - src/ray/common/status.h | 6 - src/ray/core_worker/core_worker.cc | 126 +++--------------- src/ray/core_worker/core_worker.h | 38 +----- ...io_ray_runtime_object_NativeObjectStore.cc | 5 +- 14 files changed, 74 insertions(+), 333 deletions(-) diff --git a/cpp/src/ray/runtime/abstract_ray_runtime.cc b/cpp/src/ray/runtime/abstract_ray_runtime.cc index 84bb4eb29509..1f263a9c01de 100644 --- a/cpp/src/ray/runtime/abstract_ray_runtime.cc +++ b/cpp/src/ray/runtime/abstract_ray_runtime.cc @@ -141,7 +141,7 @@ std::vector> TransformArgs( auto owner_address = ray::rpc::Address{}; if (ConfigInternal::Instance().run_mode == RunMode::CLUSTER) { auto &core_worker = CoreWorkerProcess::GetCoreWorker(); - owner_address = core_worker.GetOwnerAddressOrDie(id); + owner_address = core_worker.GetOwnerAddress(id); } ray_arg = absl::make_unique(id, owner_address, diff --git a/cpp/src/ray/test/cluster/cluster_mode_test.cc b/cpp/src/ray/test/cluster/cluster_mode_test.cc index 2e6baff3b745..ccc344a2b96d 100644 --- a/cpp/src/ray/test/cluster/cluster_mode_test.cc +++ b/cpp/src/ray/test/cluster/cluster_mode_test.cc @@ -73,7 +73,6 @@ TEST(RayClusterModeTest, FullTest) { config.redis_password_ = password; } ray::Init(config, cmd_argc, cmd_argv); - /// put and get object auto obj = ray::Put(12345); auto get_result = *(ray::Get(obj)); @@ -89,9 +88,9 @@ TEST(RayClusterModeTest, FullTest) { EXPECT_EQ(1, task_result); /// common task with args - auto task_obj1 = ray::Task(Plus1).Remote(5); - auto task_result1 = *(ray::Get(task_obj1)); - EXPECT_EQ(6, task_result1); + task_obj = ray::Task(Plus1).Remote(5); + task_result = *(ray::Get(task_obj)); + EXPECT_EQ(6, task_result); ray::ActorHandle actor = ray::Actor(RAY_FUNC(Counter::FactoryCreate)) .SetMaxRestarts(1) @@ -496,10 +495,6 @@ TEST(RayClusterModeTest, TaskWithPlacementGroup) { } TEST(RayClusterModeTest, NamespaceTest) { - if (ray::IsInitialized()) { - ray::Shutdown(); - } - ray::Init(); // Create a named actor in namespace `isolated_ns`. std::string actor_name_in_isolated_ns = "named_actor_in_isolated_ns"; std::string isolated_ns_name = "isolated_ns"; @@ -521,11 +516,11 @@ TEST(RayClusterModeTest, NamespaceTest) { // Create a named actor in job default namespace. std::string actor_name_in_default_ns = "actor_name_in_default_ns"; - auto actor1 = ray::Actor(RAY_FUNC(Counter::FactoryCreate)) - .SetName(actor_name_in_default_ns) - .Remote(); - auto initialized_obj1 = actor1.Task(&Counter::Initialized).Remote(); - EXPECT_TRUE(*initialized_obj1.Get()); + actor = ray::Actor(RAY_FUNC(Counter::FactoryCreate)) + .SetName(actor_name_in_default_ns) + .Remote(); + initialized_obj = actor.Task(&Counter::Initialized).Remote(); + EXPECT_TRUE(*initialized_obj.Get()); // It is visible to job default namespace. actor_optional = ray::GetActor(actor_name_in_default_ns); EXPECT_TRUE(actor_optional); @@ -539,9 +534,6 @@ TEST(RayClusterModeTest, GetNamespaceApiTest) { std::string ns = "test_get_current_namespace"; ray::RayConfig config; config.ray_namespace = ns; - if (ray::IsInitialized()) { - ray::Shutdown(); - } ray::Init(config, cmd_argc, cmd_argv); // Get namespace in driver. EXPECT_EQ(ray::GetNamespace(), ns); diff --git a/java/test/src/main/java/io/ray/test/ReferenceCountingTest.java b/java/test/src/main/java/io/ray/test/ReferenceCountingTest.java index f3c5fcb7dad6..14f53febc487 100644 --- a/java/test/src/main/java/io/ray/test/ReferenceCountingTest.java +++ b/java/test/src/main/java/io/ray/test/ReferenceCountingTest.java @@ -6,7 +6,6 @@ import io.ray.api.ActorHandle; import io.ray.api.ObjectRef; import io.ray.api.Ray; -import io.ray.api.exception.RayException; import io.ray.api.id.ObjectId; import io.ray.runtime.object.NativeObjectStore; import io.ray.runtime.object.ObjectRefImpl; @@ -113,14 +112,9 @@ private static void fillObjectStoreAndGet( if (succeed) { TestUtils.getRuntime().getObjectStore().getRaw(ImmutableList.of(objectId), Long.MAX_VALUE); } else { - try { - List result = - TestUtils.getRuntime().getObjectStore().wait(ImmutableList.of(objectId), 0, 100, true); - Assert.fail( - "Ray did not fail when waiting for an object that does not belong in this session"); - } catch (RayException e) { - // This is the expected outcome for succeed=false, as we wait for non-existent objects. - } + List result = + TestUtils.getRuntime().getObjectStore().wait(ImmutableList.of(objectId), 1, 100, true); + Assert.assertFalse(result.get(0)); } } @@ -171,7 +165,27 @@ private static void sendSignal(ActorHandle signal) { } /** Based on Python test case `test_dependency_refcounts`. */ - public void testDependencyRefCounts1() { + public void testDependencyRefCounts() { + { + // Test that regular plasma dependency refcounts are decremented once the + // task finishes. + ActorHandle signal = SignalActor.create(); + ObjectRefImpl largeDep = + (ObjectRefImpl) Ray.put(new TestUtils.LargeObject()); + ObjectRefImpl result = + (ObjectRefImpl) + Ray., Object>task( + ReferenceCountingTest::oneDep, largeDep, signal) + .remote(); + checkRefCounts(largeDep.getId(), 1, 1, result.getId(), 1, 0); + sendSignal(signal); + // Reference count should be removed once the task finishes. + checkRefCounts(largeDep.getId(), 1, 0, result.getId(), 1, 0); + del(largeDep); + del(result); + checkRefCounts(ImmutableMap.of()); + } + { // Test that inlined dependency refcounts are decremented once they are // inlined. @@ -193,9 +207,7 @@ public void testDependencyRefCounts1() { del(result); checkRefCounts(ImmutableMap.of()); } - } - public void testDependencyRefCounts2() { { // Test that spilled plasma dependency refcounts are decremented once // the task finishes. @@ -224,9 +236,7 @@ public void testDependencyRefCounts2() { del(result); checkRefCounts(ImmutableMap.of()); } - } - public void testDependencyRefCounts3() { { // Test that regular plasma dependency refcounts are decremented if a task // fails. @@ -247,9 +257,7 @@ public void testDependencyRefCounts3() { del(result); checkRefCounts(ImmutableMap.of()); } - } - public void testDependencyRefCounts4() { { // Test that spilled plasma dependency refcounts are decremented if a task // fails. @@ -280,28 +288,6 @@ public void testDependencyRefCounts4() { } } - public void testDependencyRefCounts5() { - { - // Test that regular plasma dependency refcounts are decremented once the - // task finishes. - ActorHandle signal = SignalActor.create(); - ObjectRefImpl largeDep = - (ObjectRefImpl) Ray.put(new TestUtils.LargeObject()); - ObjectRefImpl result = - (ObjectRefImpl) - Ray., Object>task( - ReferenceCountingTest::oneDep, largeDep, signal) - .remote(); - checkRefCounts(largeDep.getId(), 1, 1, result.getId(), 1, 0); - sendSignal(signal); - // Reference count should be removed once the task finishes. - checkRefCounts(largeDep.getId(), 1, 0, result.getId(), 1, 0); - del(largeDep); - del(result); - checkRefCounts(ImmutableMap.of()); - } - } - private static int fooBasicPinning(Object arg) { return 0; } diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 7ed8dffd4ae2..15a310687afb 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -198,10 +198,6 @@ cdef int check_status(const CRayStatus& status) nogil except -1: raise GetTimeoutError(message) elif status.IsNotFound(): raise ValueError(message) - elif status.IsObjectNotFound(): - raise ValueError(message) - elif status.IsObjectUnknownOwner(): - raise ValueError(message) else: raise RaySystemError(message) @@ -425,8 +421,6 @@ cdef prepare_args_internal( c_vector[CObjectID] inlined_ids c_string put_arg_call_site c_vector[CObjectReference] inlined_refs - CAddress c_owner_address - CRayStatus op_status worker = ray._private.worker.global_worker put_threshold = RayConfig.instance().max_direct_call_object_size() @@ -435,13 +429,11 @@ cdef prepare_args_internal( for arg in args: if isinstance(arg, ObjectRef): c_arg = (arg).native() - op_status = CCoreWorkerProcess.GetCoreWorker().GetOwnerAddress( - c_arg, &c_owner_address) - check_status(op_status) args_vector.push_back( unique_ptr[CTaskArg](new CTaskArgByReference( c_arg, - c_owner_address, + CCoreWorkerProcess.GetCoreWorker().GetOwnerAddress( + c_arg), arg.call_site()))) else: @@ -1573,9 +1565,8 @@ cdef class CoreWorker: CTaskID c_task_id = current_task_id.native() c_vector[CObjectID] c_object_ids = ObjectRefsToVector(object_refs) with nogil: - op_status = CCoreWorkerProcess.GetCoreWorker().Get( - c_object_ids, timeout_ms, &results) - check_status(op_status) + check_status(CCoreWorkerProcess.GetCoreWorker().Get( + c_object_ids, timeout_ms, &results)) return RayObjectsToDataMetadataPairs(results) @@ -1779,9 +1770,8 @@ cdef class CoreWorker: wait_ids = ObjectRefsToVector(object_refs) with nogil: - op_status = CCoreWorkerProcess.GetCoreWorker().Wait( - wait_ids, num_returns, timeout_ms, &results, fetch_local) - check_status(op_status) + check_status(CCoreWorkerProcess.GetCoreWorker().Wait( + wait_ids, num_returns, timeout_ms, &results, fetch_local)) assert len(results) == len(object_refs) @@ -2332,20 +2322,16 @@ cdef class CoreWorker: def get_owner_address(self, ObjectRef object_ref): cdef: CObjectID c_object_id = object_ref.native() - CAddress c_owner_address - op_status = CCoreWorkerProcess.GetCoreWorker().GetOwnerAddress( - c_object_id, &c_owner_address) - check_status(op_status) - return c_owner_address.SerializeAsString() + return CCoreWorkerProcess.GetCoreWorker().GetOwnerAddress( + c_object_id).SerializeAsString() def serialize_object_ref(self, ObjectRef object_ref): cdef: CObjectID c_object_id = object_ref.native() CAddress c_owner_address = CAddress() c_string serialized_object_status - op_status = CCoreWorkerProcess.GetCoreWorker().GetOwnershipInfo( + CCoreWorkerProcess.GetCoreWorker().GetOwnershipInfo( c_object_id, &c_owner_address, &serialized_object_status) - check_status(op_status) return (object_ref, c_owner_address.SerializeAsString(), serialized_object_status) diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index c9f26db37678..3f16da71ebce 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -113,9 +113,7 @@ cdef extern from "ray/common/status.h" namespace "ray" nogil: c_bool IsTimedOut() c_bool IsInterrupted() c_bool ShouldExitWorker() - c_bool IsObjectNotFound() c_bool IsNotFound() - c_bool IsObjectUnknownOwner() c_string ToString() c_string CodeAsString() diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index 18db418a28ca..033039cf329b 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -177,14 +177,13 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: void PutObjectIntoPlasma(const CRayObject &object, const CObjectID &object_id) const CAddress &GetRpcAddress() const - CRayStatus GetOwnerAddress(const CObjectID &object_id, - CAddress *owner_address) const + CAddress GetOwnerAddress(const CObjectID &object_id) const c_vector[CObjectReference] GetObjectRefs( const c_vector[CObjectID] &object_ids) const - CRayStatus GetOwnershipInfo(const CObjectID &object_id, - CAddress *owner_address, - c_string *object_status) + void GetOwnershipInfo(const CObjectID &object_id, + CAddress *owner_address, + c_string *object_status) void RegisterOwnershipInfoAndResolveFuture( const CObjectID &object_id, const CObjectID &outer_object_id, diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index 4fef7a7061ae..586285e5dd94 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -58,6 +58,7 @@ py_test_module_list( "test_get_locations.py", "test_global_state.py", "test_healthcheck.py", + "test_ray_shutdown.py", "test_kill_raylet_signal_log.py", "test_memstat.py", "test_protobuf_compatibility.py" @@ -127,7 +128,6 @@ py_test_module_list( "test_placement_group_failover.py", "test_ray_init.py", "test_ray_init_2.py", - "test_ray_shutdown.py", "test_resource_demand_scheduler.py", "test_resource_metrics.py", "test_runtime_context.py", diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index c696a2cbecad..bc13073c12c0 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -650,9 +650,8 @@ def h(): assert ray.get(f.remote()) == (1, 2) - -def test_recursive_remote_call(ray_start_shared_local_modes): # Test a remote function that recursively calls itself. + @ray.remote def factorial(n): if n == 0: diff --git a/python/ray/tests/test_ray_shutdown.py b/python/ray/tests/test_ray_shutdown.py index 9362d1270362..1d2d8da1c351 100644 --- a/python/ray/tests/test_ray_shutdown.py +++ b/python/ray/tests/test_ray_shutdown.py @@ -55,99 +55,6 @@ def f(): ) -@pytest.mark.skipif(platform.system() == "Windows", reason="Hang on Windows.") -def test_ray_shutdown_then_call(short_gcs_publish_timeout, shutdown_only): - """Make sure ray will not kill cpython when using unrecognized ObjectId""" - # Set include_dashboard=False to have faster startup. - ray.init(num_cpus=1, include_dashboard=False) - - my_ref = ray.put("anystring") - - @ray.remote - def f(s): - print(s) - - ray.shutdown() - - ray.init(num_cpus=1, include_dashboard=False) - with pytest.raises(ValueError, match="Ray object whose owner is unknown"): - f.remote(my_ref) # This would cause full CPython death. - - ray.shutdown() - wait_for_condition(lambda: len(get_all_ray_worker_processes()) == 0) - - -@pytest.mark.skipif(platform.system() == "Windows", reason="Hang on Windows.") -def test_ray_shutdown_then_call_list(short_gcs_publish_timeout, shutdown_only): - """Make sure ray will not kill cpython when using unrecognized ObjectId""" - # Set include_dashboard=False to have faster startup. - ray.init(num_cpus=1, include_dashboard=False) - - my_ref = ray.put("anystring") - - @ray.remote - def f(s): - print(s) - - ray.shutdown() - - ray.init(num_cpus=1, include_dashboard=False) - with pytest.raises(ValueError, match="Ray object whose owner is unknown"): - f.remote([my_ref]) # This would cause full CPython death. - - ray.shutdown() - wait_for_condition(lambda: len(get_all_ray_worker_processes()) == 0) - - -@pytest.mark.skipif(platform.system() == "Windows", reason="Hang on Windows.") -def test_ray_shutdown_then_get(short_gcs_publish_timeout, shutdown_only): - """Make sure ray will not hang when trying to Get an unrecognized Obj.""" - # Set include_dashboard=False to have faster startup. - ray.init(num_cpus=1, include_dashboard=False) - - my_ref = ray.put("anystring") - - ray.shutdown() - - ray.init(num_cpus=1, include_dashboard=False) - with pytest.raises(ValueError, match="Ray objects whose owner is unknown"): - # This used to cause ray to hang indefinitely (without timeout) or - # throw a timeout exception if a timeout was provided. Now it is expected to - # throw an exception reporting the unknown object. - ray.get(my_ref, timeout=30) - - ray.shutdown() - wait_for_condition(lambda: len(get_all_ray_worker_processes()) == 0) - - -@pytest.mark.skipif(platform.system() == "Windows", reason="Hang on Windows.") -def test_ray_shutdown_then_wait(short_gcs_publish_timeout, shutdown_only): - """Make sure ray will not hang when trying to Get an unrecognized Obj.""" - # Set include_dashboard=False to have faster startup. - ray.init(num_cpus=1, include_dashboard=False) - - my_ref = ray.put("anystring") - - ray.shutdown() - - ray.init(num_cpus=1, include_dashboard=False) - my_new_ref = ray.put("anyotherstring") - - # If we have some known and some unknown references, we allow the - # function to wait for the valid references; however, if all the - # references are unknown, we expect an error. - ready, not_ready = ray.wait([my_new_ref, my_ref]) - with pytest.raises(ValueError, match="Ray object whose owner is unknown"): - # This used to cause ray to hang indefinitely (without timeout) or - # forever return all tasks as not-ready if a timeout was provided. - # Now it is expected to throw an exception reporting if all objects are - # unknown. - ray.wait(not_ready, timeout=30) - - ray.shutdown() - wait_for_condition(lambda: len(get_all_ray_worker_processes()) == 0) - - @pytest.mark.skipif(platform.system() == "Windows", reason="Hang on Windows.") def test_driver_dead(short_gcs_publish_timeout, shutdown_only): """Make sure all ray workers are shutdown when driver is killed.""" diff --git a/src/ray/common/status.cc b/src/ray/common/status.cc index 18d6b0ada87f..87254c614c52 100644 --- a/src/ray/common/status.cc +++ b/src/ray/common/status.cc @@ -58,7 +58,6 @@ namespace ray { #define STATUS_CODE_OBJECT_EXISTS "ObjectExists" #define STATUS_CODE_OBJECT_NOT_FOUND "ObjectNotFound" #define STATUS_CODE_OBJECT_STORE_ALREADY_SEALED "ObjectAlreadySealed" -#define STATUS_CODE_OBJECT_UNKNOWN_OWNER "ObjectUnknownOwner" #define STATUS_CODE_OBJECT_STORE_FULL "ObjectStoreFull" #define STATUS_CODE_TRANSIENT_OBJECT_STORE_FULL "TransientObjectStoreFull" // grpc status @@ -108,7 +107,6 @@ std::string Status::CodeAsString() const { {StatusCode::ObjectExists, STATUS_CODE_OBJECT_EXISTS}, {StatusCode::ObjectNotFound, STATUS_CODE_OBJECT_NOT_FOUND}, {StatusCode::ObjectAlreadySealed, STATUS_CODE_OBJECT_STORE_ALREADY_SEALED}, - {StatusCode::ObjectUnknownOwner, STATUS_CODE_OBJECT_UNKNOWN_OWNER}, {StatusCode::ObjectStoreFull, STATUS_CODE_OBJECT_STORE_FULL}, {StatusCode::TransientObjectStoreFull, STATUS_CODE_TRANSIENT_OBJECT_STORE_FULL}, {StatusCode::GrpcUnavailable, STATUS_CODE_GRPC_UNAVAILABLE}, @@ -145,7 +143,6 @@ StatusCode Status::StringToCode(const std::string &str) { {STATUS_CODE_OBJECT_EXISTS, StatusCode::ObjectExists}, {STATUS_CODE_OBJECT_NOT_FOUND, StatusCode::ObjectNotFound}, {STATUS_CODE_OBJECT_STORE_ALREADY_SEALED, StatusCode::ObjectAlreadySealed}, - {STATUS_CODE_OBJECT_UNKNOWN_OWNER, StatusCode::ObjectUnknownOwner}, {STATUS_CODE_OBJECT_STORE_FULL, StatusCode::ObjectStoreFull}, {STATUS_CODE_TRANSIENT_OBJECT_STORE_FULL, StatusCode::TransientObjectStoreFull}, }; diff --git a/src/ray/common/status.h b/src/ray/common/status.h index 528d1e755298..9e396f11bd62 100644 --- a/src/ray/common/status.h +++ b/src/ray/common/status.h @@ -112,7 +112,6 @@ enum class StatusCode : char { // Object store is both out of memory and // out of disk. OutOfDisk = 28, - ObjectUnknownOwner = 29, }; #if defined(__clang__) @@ -208,10 +207,6 @@ class RAY_EXPORT Status { return Status(StatusCode::ObjectNotFound, msg); } - static Status ObjectUnknownOwner(const std::string &msg) { - return Status(StatusCode::ObjectUnknownOwner, msg); - } - static Status ObjectAlreadySealed(const std::string &msg) { return Status(StatusCode::ObjectAlreadySealed, msg); } @@ -269,7 +264,6 @@ class RAY_EXPORT Status { bool IsSchedulingCancelled() const { return code() == StatusCode::SchedulingCancelled; } bool IsObjectExists() const { return code() == StatusCode::ObjectExists; } bool IsObjectNotFound() const { return code() == StatusCode::ObjectNotFound; } - bool IsObjectUnknownOwner() const { return code() == StatusCode::ObjectUnknownOwner; } bool IsObjectAlreadySealed() const { return code() == StatusCode::ObjectAlreadySealed; } bool IsObjectStoreFull() const { return code() == StatusCode::ObjectStoreFull; } bool IsTransientObjectStoreFull() const { diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 9f6a05c2d915..589f339e1375 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -877,33 +877,18 @@ CoreWorker::GetAllReferenceCounts() const { const rpc::Address &CoreWorker::GetRpcAddress() const { return rpc_address_; } -rpc::Address CoreWorker::GetOwnerAddressOrDie(const ObjectID &object_id) const { +rpc::Address CoreWorker::GetOwnerAddress(const ObjectID &object_id) const { rpc::Address owner_address; - auto status = GetOwnerAddress(object_id, &owner_address); - RAY_CHECK(status.ok()) << status.message(); + auto has_owner = reference_counter_->GetOwner(object_id, &owner_address); + RAY_CHECK(has_owner) + << "Object IDs generated randomly (ObjectID.from_random()) or out-of-band " + "(ObjectID.from_binary(...)) cannot be passed as a task argument because Ray " + "does not know which task created them. " + "If this was not how your object ID was generated, please file an issue " + "at https://github.com/ray-project/ray/issues/"; return owner_address; } -Status CoreWorker::GetOwnerAddress(const ObjectID &object_id, - rpc::Address *owner_address) const { - auto has_owner = reference_counter_->GetOwner(object_id, owner_address); - if (!has_owner) { - std::ostringstream stream; - stream << "An application is trying to access a Ray object whose owner is unknown" - << "(" << object_id - << "). " - "Please make sure that all Ray objects you are trying to access are part" - " of the current Ray session. Note that " - "object IDs generated randomly (ObjectID.from_random()) or out-of-band " - "(ObjectID.from_binary(...)) cannot be passed as a task argument because" - " Ray does not know which task created them. " - "If this was not how your object ID was generated, please file an issue " - "at https://github.com/ray-project/ray/issues/"; - return Status::ObjectUnknownOwner(stream.str()); - } - return Status::OK(); -} - std::vector CoreWorker::GetObjectRefs( const std::vector &object_ids) const { std::vector refs; @@ -920,31 +905,17 @@ std::vector CoreWorker::GetObjectRefs( return refs; } -void CoreWorker::GetOwnershipInfoOrDie(const ObjectID &object_id, - rpc::Address *owner_address, - std::string *serialized_object_status) { - auto status = GetOwnershipInfo(object_id, owner_address, serialized_object_status); - RAY_CHECK(status.ok()) << status.message(); -} - -Status CoreWorker::GetOwnershipInfo(const ObjectID &object_id, - rpc::Address *owner_address, - std::string *serialized_object_status) { +void CoreWorker::GetOwnershipInfo(const ObjectID &object_id, + rpc::Address *owner_address, + std::string *serialized_object_status) { auto has_owner = reference_counter_->GetOwner(object_id, owner_address); - if (!has_owner) { - std::ostringstream stream; - stream << "An application is trying to access a Ray object whose owner is unknown" - << "(" << object_id - << "). " - "Please make sure that all Ray objects you are trying to access are part" - " of the current Ray session. Note that " - "object IDs generated randomly (ObjectID.from_random()) or out-of-band " - "(ObjectID.from_binary(...)) cannot be passed as a task argument because" - " Ray does not know which task created them. " - "If this was not how your object ID was generated, please file an issue " - "at https://github.com/ray-project/ray/issues/"; - return Status::ObjectUnknownOwner(stream.str()); - } + RAY_CHECK(has_owner) + << "Object IDs generated randomly (ObjectID.from_random()) or out-of-band " + "(ObjectID.from_binary(...)) cannot be serialized because Ray does not know " + "which task will create them. " + "If this was not how your object ID was generated, please file an issue " + "at https://github.com/ray-project/ray/issues/: " + << object_id; rpc::GetObjectStatusReply object_status; // Optimization: if the object exists, serialize and inline its status. This also @@ -956,7 +927,6 @@ Status CoreWorker::GetOwnershipInfo(const ObjectID &object_id, } } *serialized_object_status = object_status.SerializeAsString(); - return Status::OK(); } void CoreWorker::RegisterOwnershipInfoAndResolveFuture( @@ -1207,31 +1177,6 @@ Status CoreWorker::Get(const std::vector &ids, bool got_exception = false; absl::flat_hash_map> result_map; auto start_time = current_time_ms(); - std::ostringstream ids_stream; - - for (size_t i = 0; i < ids.size(); i++) { - rpc::Address unused_owner_address; - auto status = GetOwnerAddress(ids[i], &unused_owner_address); - if (status.IsObjectUnknownOwner()) { - ids_stream << ids[i] << " "; - got_exception = true; - } - } - - if (got_exception) { - std::ostringstream stream; - stream << "An application is trying to access Ray objects whose owner is unknown" - << "(" << ids_stream.str() - << "). " - "Please make sure that all Ray objects you are trying to access are part" - " of the current Ray session. Note that " - "object IDs generated randomly (ObjectID.from_random()) or out-of-band " - "(ObjectID.from_binary(...)) cannot be passed as a task argument because" - " Ray does not know which task created them. " - "If this was not how your object ID was generated, please file an issue " - "at https://github.com/ray-project/ray/issues/"; - return Status::ObjectUnknownOwner(stream.str()); - } if (!memory_object_ids.empty()) { RAY_RETURN_NOT_OK(memory_store_->Get( @@ -1369,35 +1314,6 @@ Status CoreWorker::Wait(const std::vector &ids, return Status::Invalid("Duplicate object IDs not supported in wait."); } - size_t missing_owners = 0; - std::ostringstream ids_stream; - - for (size_t i = 0; i < ids.size(); i++) { - rpc::Address unused_owner_address; - auto status = GetOwnerAddress(ids[i], &unused_owner_address); - if (status.IsObjectUnknownOwner()) { - ids_stream << ids[i] << " "; - missing_owners += 1; - } - } - - int objects_with_known_owners = ids.size() - missing_owners; - // If we are requesting more items than items available, then return a failed status. - if (missing_owners > 0 && num_objects > objects_with_known_owners) { - std::ostringstream stream; - stream << "An application is trying to access a Ray object whose owner is unknown" - << "(" << ids_stream.str() - << "). " - "Please make sure that all Ray objects you are trying to access are part" - " of the current Ray session. Note that " - "object IDs generated randomly (ObjectID.from_random()) or out-of-band " - "(ObjectID.from_binary(...)) cannot be passed as a task argument because" - " Ray does not know which task created them. " - "If this was not how your object ID was generated, please file an issue " - "at https://github.com/ray-project/ray/issues/"; - return Status::ObjectUnknownOwner(stream.str()); - } - absl::flat_hash_set ready; int64_t start_time = current_time_ms(); RAY_RETURN_NOT_OK(memory_store_->Wait( @@ -1482,8 +1398,7 @@ Status CoreWorker::GetLocationFromOwner( std::make_shared>>(); for (const auto &object_id : object_ids) { - rpc::Address owner_address; - RAY_RETURN_NOT_OK(GetOwnerAddress(object_id, &owner_address)); + auto owner_address = GetOwnerAddress(object_id); auto client = core_worker_client_pool_->GetOrConnect(owner_address); rpc::GetObjectLocationsOwnerRequest request; auto object_location_request = request.mutable_object_location_request(); @@ -3583,8 +3498,7 @@ void CoreWorker::PlasmaCallback(SetResultCallback success, // Ask raylet to subscribe to object notification. Raylet will call this core worker // when the object is local (and it will fire the callback immediately if the object // exists). CoreWorker::HandlePlasmaObjectReady handles such request. - auto owner_address = GetOwnerAddressOrDie(object_id); - local_raylet_client_->SubscribeToPlasma(object_id, owner_address); + local_raylet_client_->SubscribeToPlasma(object_id, GetOwnerAddress(object_id)); } void CoreWorker::HandlePlasmaObjectReady(rpc::PlasmaObjectReadyRequest request, diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 95b16f124a57..351a6b958bcf 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -391,16 +391,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// us, or the caller previously added the ownership information (via /// RegisterOwnershipInfoAndResolveFuture). /// \param[out] The RPC address of the worker that owns this object. - Status GetOwnerAddress(const ObjectID &object_id, rpc::Address *owner_address) const; - - /// Get the RPC address of the worker that owns the given object. If the - /// object has no owner, then we terminate the process. - /// - /// \param[in] object_id The object ID. The object must either be owned by - /// us, or the caller previously added the ownership information (via - /// RegisterOwnershipInfoAndResolveFuture). - /// \param[out] The RPC address of the worker that owns this object. - rpc::Address GetOwnerAddressOrDie(const ObjectID &object_id) const; + rpc::Address GetOwnerAddress(const ObjectID &object_id) const; /// Get the RPC address of the worker that owns the given object. /// @@ -427,30 +418,9 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// \param[out] owner_address The address of the object's owner. This should /// be appended to the serialized object ID. /// \param[out] serialized_object_status The serialized object status protobuf. - Status GetOwnershipInfo(const ObjectID &object_id, - rpc::Address *owner_address, - std::string *serialized_object_status); - - /// Get the owner information of an object. This should be - /// called when serializing an object ID, and the returned information should - /// be stored with the serialized object ID. If the ownership of the object - /// cannot be established, then we terminate the process. - /// - /// This can only be called on object IDs that we created via task - /// submission, ray.put, or object IDs that we deserialized. It cannot be - /// called on object IDs that were created randomly, e.g., - /// ObjectID::FromRandom. - /// - /// Postcondition: Get(object_id) is valid. - /// - /// \param[in] object_id The object ID to serialize. - /// appended to the serialized object ID. - /// \param[out] owner_address The address of the object's owner. This should - /// be appended to the serialized object ID. - /// \param[out] serialized_object_status The serialized object status protobuf. - void GetOwnershipInfoOrDie(const ObjectID &object_id, - rpc::Address *owner_address, - std::string *serialized_object_status); + void GetOwnershipInfo(const ObjectID &object_id, + rpc::Address *owner_address, + std::string *serialized_object_status); /// Add a reference to an ObjectID that was deserialized by the language /// frontend. This will also start the process to resolve the future. diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.cc b/src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.cc index 955b46f746e9..c46b2d1b251e 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.cc +++ b/src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.cc @@ -216,8 +216,7 @@ Java_io_ray_runtime_object_NativeObjectStore_nativeGetOwnerAddress(JNIEnv *env, jclass, jbyteArray objectId) { auto object_id = JavaByteArrayToId(env, objectId); - const auto &rpc_address = - CoreWorkerProcess::GetCoreWorker().GetOwnerAddressOrDie(object_id); + const auto &rpc_address = CoreWorkerProcess::GetCoreWorker().GetOwnerAddress(object_id); return NativeStringToJavaByteArray(env, rpc_address.SerializeAsString()); } @@ -229,7 +228,7 @@ Java_io_ray_runtime_object_NativeObjectStore_nativeGetOwnershipInfo(JNIEnv *env, rpc::Address address; // TODO(ekl) send serialized object status to Java land. std::string serialized_object_status; - CoreWorkerProcess::GetCoreWorker().GetOwnershipInfoOrDie( + CoreWorkerProcess::GetCoreWorker().GetOwnershipInfo( object_id, &address, &serialized_object_status); auto address_str = address.SerializeAsString(); auto arr = NativeStringToJavaByteArray(env, address_str);