Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/src/ray/runtime/abstract_ray_runtime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ std::vector<std::unique_ptr<::ray::TaskArg>> TransformArgs(
auto owner_address = ray::rpc::Address{};
if (ConfigInternal::Instance().run_mode == RunMode::CLUSTER) {
auto &core_worker = CoreWorkerProcess::GetCoreWorker();
owner_address = core_worker.GetOwnerAddressOrDie(id);
owner_address = core_worker.GetOwnerAddress(id);
}
ray_arg = absl::make_unique<ray::TaskArgByReference>(id,
owner_address,
Expand Down
24 changes: 8 additions & 16 deletions cpp/src/ray/test/cluster/cluster_mode_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ TEST(RayClusterModeTest, FullTest) {
config.redis_password_ = password;
}
ray::Init(config, cmd_argc, cmd_argv);

/// put and get object
auto obj = ray::Put(12345);
auto get_result = *(ray::Get(obj));
Expand All @@ -89,9 +88,9 @@ TEST(RayClusterModeTest, FullTest) {
EXPECT_EQ(1, task_result);

/// common task with args
auto task_obj1 = ray::Task(Plus1).Remote(5);
auto task_result1 = *(ray::Get(task_obj1));
EXPECT_EQ(6, task_result1);
task_obj = ray::Task(Plus1).Remote(5);
task_result = *(ray::Get(task_obj));
EXPECT_EQ(6, task_result);

ray::ActorHandle<Counter> actor = ray::Actor(RAY_FUNC(Counter::FactoryCreate))
.SetMaxRestarts(1)
Expand Down Expand Up @@ -496,10 +495,6 @@ TEST(RayClusterModeTest, TaskWithPlacementGroup) {
}

TEST(RayClusterModeTest, NamespaceTest) {
if (ray::IsInitialized()) {
ray::Shutdown();
}
ray::Init();
// Create a named actor in namespace `isolated_ns`.
std::string actor_name_in_isolated_ns = "named_actor_in_isolated_ns";
std::string isolated_ns_name = "isolated_ns";
Expand All @@ -521,11 +516,11 @@ TEST(RayClusterModeTest, NamespaceTest) {

// Create a named actor in job default namespace.
std::string actor_name_in_default_ns = "actor_name_in_default_ns";
auto actor1 = ray::Actor(RAY_FUNC(Counter::FactoryCreate))
.SetName(actor_name_in_default_ns)
.Remote();
auto initialized_obj1 = actor1.Task(&Counter::Initialized).Remote();
EXPECT_TRUE(*initialized_obj1.Get());
actor = ray::Actor(RAY_FUNC(Counter::FactoryCreate))
.SetName(actor_name_in_default_ns)
.Remote();
initialized_obj = actor.Task(&Counter::Initialized).Remote();
EXPECT_TRUE(*initialized_obj.Get());
// It is visible to job default namespace.
actor_optional = ray::GetActor<Counter>(actor_name_in_default_ns);
EXPECT_TRUE(actor_optional);
Expand All @@ -539,9 +534,6 @@ TEST(RayClusterModeTest, GetNamespaceApiTest) {
std::string ns = "test_get_current_namespace";
ray::RayConfig config;
config.ray_namespace = ns;
if (ray::IsInitialized()) {
ray::Shutdown();
}
ray::Init(config, cmd_argc, cmd_argv);
// Get namespace in driver.
EXPECT_EQ(ray::GetNamespace(), ns);
Expand Down
62 changes: 24 additions & 38 deletions java/test/src/main/java/io/ray/test/ReferenceCountingTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import io.ray.api.ActorHandle;
import io.ray.api.ObjectRef;
import io.ray.api.Ray;
import io.ray.api.exception.RayException;
import io.ray.api.id.ObjectId;
import io.ray.runtime.object.NativeObjectStore;
import io.ray.runtime.object.ObjectRefImpl;
Expand Down Expand Up @@ -113,14 +112,9 @@ private static void fillObjectStoreAndGet(
if (succeed) {
TestUtils.getRuntime().getObjectStore().getRaw(ImmutableList.of(objectId), Long.MAX_VALUE);
} else {
try {
List<Boolean> result =
TestUtils.getRuntime().getObjectStore().wait(ImmutableList.of(objectId), 0, 100, true);
Assert.fail(
"Ray did not fail when waiting for an object that does not belong in this session");
} catch (RayException e) {
// This is the expected outcome for succeed=false, as we wait for non-existent objects.
}
List<Boolean> result =
TestUtils.getRuntime().getObjectStore().wait(ImmutableList.of(objectId), 1, 100, true);
Assert.assertFalse(result.get(0));
}
}

Expand Down Expand Up @@ -171,7 +165,27 @@ private static void sendSignal(ActorHandle<SignalActor> signal) {
}

/** Based on Python test case `test_dependency_refcounts`. */
public void testDependencyRefCounts1() {
public void testDependencyRefCounts() {
{
// Test that regular plasma dependency refcounts are decremented once the
// task finishes.
ActorHandle<SignalActor> signal = SignalActor.create();
ObjectRefImpl<TestUtils.LargeObject> largeDep =
(ObjectRefImpl<TestUtils.LargeObject>) Ray.put(new TestUtils.LargeObject());
ObjectRefImpl<Object> result =
(ObjectRefImpl<Object>)
Ray.<TestUtils.LargeObject, ActorHandle<SignalActor>, Object>task(
ReferenceCountingTest::oneDep, largeDep, signal)
.remote();
checkRefCounts(largeDep.getId(), 1, 1, result.getId(), 1, 0);
sendSignal(signal);
// Reference count should be removed once the task finishes.
checkRefCounts(largeDep.getId(), 1, 0, result.getId(), 1, 0);
del(largeDep);
del(result);
checkRefCounts(ImmutableMap.of());
}

{
// Test that inlined dependency refcounts are decremented once they are
// inlined.
Expand All @@ -193,9 +207,7 @@ public void testDependencyRefCounts1() {
del(result);
checkRefCounts(ImmutableMap.of());
}
}

public void testDependencyRefCounts2() {
{
// Test that spilled plasma dependency refcounts are decremented once
// the task finishes.
Expand Down Expand Up @@ -224,9 +236,7 @@ public void testDependencyRefCounts2() {
del(result);
checkRefCounts(ImmutableMap.of());
}
}

public void testDependencyRefCounts3() {
{
// Test that regular plasma dependency refcounts are decremented if a task
// fails.
Expand All @@ -247,9 +257,7 @@ public void testDependencyRefCounts3() {
del(result);
checkRefCounts(ImmutableMap.of());
}
}

public void testDependencyRefCounts4() {
{
// Test that spilled plasma dependency refcounts are decremented if a task
// fails.
Expand Down Expand Up @@ -280,28 +288,6 @@ public void testDependencyRefCounts4() {
}
}

public void testDependencyRefCounts5() {
{
// Test that regular plasma dependency refcounts are decremented once the
// task finishes.
ActorHandle<SignalActor> signal = SignalActor.create();
ObjectRefImpl<TestUtils.LargeObject> largeDep =
(ObjectRefImpl<TestUtils.LargeObject>) Ray.put(new TestUtils.LargeObject());
ObjectRefImpl<Object> result =
(ObjectRefImpl<Object>)
Ray.<TestUtils.LargeObject, ActorHandle<SignalActor>, Object>task(
ReferenceCountingTest::oneDep, largeDep, signal)
.remote();
checkRefCounts(largeDep.getId(), 1, 1, result.getId(), 1, 0);
sendSignal(signal);
// Reference count should be removed once the task finishes.
checkRefCounts(largeDep.getId(), 1, 0, result.getId(), 1, 0);
del(largeDep);
del(result);
checkRefCounts(ImmutableMap.of());
}
}

private static int fooBasicPinning(Object arg) {
return 0;
}
Expand Down
32 changes: 9 additions & 23 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,6 @@ cdef int check_status(const CRayStatus& status) nogil except -1:
raise GetTimeoutError(message)
elif status.IsNotFound():
raise ValueError(message)
elif status.IsObjectNotFound():
raise ValueError(message)
elif status.IsObjectUnknownOwner():
raise ValueError(message)
else:
raise RaySystemError(message)

Expand Down Expand Up @@ -425,8 +421,6 @@ cdef prepare_args_internal(
c_vector[CObjectID] inlined_ids
c_string put_arg_call_site
c_vector[CObjectReference] inlined_refs
CAddress c_owner_address
CRayStatus op_status

worker = ray._private.worker.global_worker
put_threshold = RayConfig.instance().max_direct_call_object_size()
Expand All @@ -435,13 +429,11 @@ cdef prepare_args_internal(
for arg in args:
if isinstance(arg, ObjectRef):
c_arg = (<ObjectRef>arg).native()
op_status = CCoreWorkerProcess.GetCoreWorker().GetOwnerAddress(
c_arg, &c_owner_address)
check_status(op_status)
args_vector.push_back(
unique_ptr[CTaskArg](new CTaskArgByReference(
c_arg,
c_owner_address,
CCoreWorkerProcess.GetCoreWorker().GetOwnerAddress(
c_arg),
arg.call_site())))

else:
Expand Down Expand Up @@ -1573,9 +1565,8 @@ cdef class CoreWorker:
CTaskID c_task_id = current_task_id.native()
c_vector[CObjectID] c_object_ids = ObjectRefsToVector(object_refs)
with nogil:
op_status = CCoreWorkerProcess.GetCoreWorker().Get(
c_object_ids, timeout_ms, &results)
check_status(op_status)
check_status(CCoreWorkerProcess.GetCoreWorker().Get(
c_object_ids, timeout_ms, &results))

return RayObjectsToDataMetadataPairs(results)

Expand Down Expand Up @@ -1779,9 +1770,8 @@ cdef class CoreWorker:

wait_ids = ObjectRefsToVector(object_refs)
with nogil:
op_status = CCoreWorkerProcess.GetCoreWorker().Wait(
wait_ids, num_returns, timeout_ms, &results, fetch_local)
check_status(op_status)
check_status(CCoreWorkerProcess.GetCoreWorker().Wait(
wait_ids, num_returns, timeout_ms, &results, fetch_local))

assert len(results) == len(object_refs)

Expand Down Expand Up @@ -2332,20 +2322,16 @@ cdef class CoreWorker:
def get_owner_address(self, ObjectRef object_ref):
cdef:
CObjectID c_object_id = object_ref.native()
CAddress c_owner_address
op_status = CCoreWorkerProcess.GetCoreWorker().GetOwnerAddress(
c_object_id, &c_owner_address)
check_status(op_status)
return c_owner_address.SerializeAsString()
return CCoreWorkerProcess.GetCoreWorker().GetOwnerAddress(
c_object_id).SerializeAsString()

def serialize_object_ref(self, ObjectRef object_ref):
cdef:
CObjectID c_object_id = object_ref.native()
CAddress c_owner_address = CAddress()
c_string serialized_object_status
op_status = CCoreWorkerProcess.GetCoreWorker().GetOwnershipInfo(
CCoreWorkerProcess.GetCoreWorker().GetOwnershipInfo(
c_object_id, &c_owner_address, &serialized_object_status)
check_status(op_status)
return (object_ref,
c_owner_address.SerializeAsString(),
serialized_object_status)
Expand Down
2 changes: 0 additions & 2 deletions python/ray/includes/common.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,7 @@ cdef extern from "ray/common/status.h" namespace "ray" nogil:
c_bool IsTimedOut()
c_bool IsInterrupted()
c_bool ShouldExitWorker()
c_bool IsObjectNotFound()
c_bool IsNotFound()
c_bool IsObjectUnknownOwner()

c_string ToString()
c_string CodeAsString()
Expand Down
9 changes: 4 additions & 5 deletions python/ray/includes/libcoreworker.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -177,14 +177,13 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
void PutObjectIntoPlasma(const CRayObject &object,
const CObjectID &object_id)
const CAddress &GetRpcAddress() const
CRayStatus GetOwnerAddress(const CObjectID &object_id,
CAddress *owner_address) const
CAddress GetOwnerAddress(const CObjectID &object_id) const
c_vector[CObjectReference] GetObjectRefs(
const c_vector[CObjectID] &object_ids) const

CRayStatus GetOwnershipInfo(const CObjectID &object_id,
CAddress *owner_address,
c_string *object_status)
void GetOwnershipInfo(const CObjectID &object_id,
CAddress *owner_address,
c_string *object_status)
void RegisterOwnershipInfoAndResolveFuture(
const CObjectID &object_id,
const CObjectID &outer_object_id,
Expand Down
2 changes: 1 addition & 1 deletion python/ray/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ py_test_module_list(
"test_get_locations.py",
"test_global_state.py",
"test_healthcheck.py",
"test_ray_shutdown.py",
"test_kill_raylet_signal_log.py",
"test_memstat.py",
"test_protobuf_compatibility.py"
Expand Down Expand Up @@ -127,7 +128,6 @@ py_test_module_list(
"test_placement_group_failover.py",
"test_ray_init.py",
"test_ray_init_2.py",
"test_ray_shutdown.py",
"test_resource_demand_scheduler.py",
"test_resource_metrics.py",
"test_runtime_context.py",
Expand Down
3 changes: 1 addition & 2 deletions python/ray/tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -650,9 +650,8 @@ def h():

assert ray.get(f.remote()) == (1, 2)


def test_recursive_remote_call(ray_start_shared_local_modes):
# Test a remote function that recursively calls itself.

@ray.remote
def factorial(n):
if n == 0:
Expand Down
Loading