diff --git a/java/runtime/src/main/java/org/ray/runtime/util/IdUtil.java b/java/runtime/src/main/java/org/ray/runtime/util/IdUtil.java index 62c56d17ceed..04f75500b29d 100644 --- a/java/runtime/src/main/java/org/ray/runtime/util/IdUtil.java +++ b/java/runtime/src/main/java/org/ray/runtime/util/IdUtil.java @@ -161,7 +161,7 @@ public static long murmurHashCode(BaseId id) { } /** - * This method is the same as `hash()` method of `ID` class in ray/src/ray/id.h + * This method is the same as `Hash()` method of `ID` class in ray/src/ray/id.h */ private static long murmurHash64A(byte[] data, int length, int seed) { final long m = 0xc6a4a7935bd1e995L; diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index a5f106f1e911..1cea8354dada 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -80,7 +80,7 @@ cdef c_vector[CObjectID] ObjectIDsToVector(object_ids): cdef VectorToObjectIDs(c_vector[CObjectID] object_ids): result = [] for i in range(object_ids.size()): - result.append(ObjectID(object_ids[i].binary())) + result.append(ObjectID(object_ids[i].Binary())) return result @@ -88,11 +88,11 @@ def compute_put_id(TaskID task_id, int64_t put_index): if put_index < 1 or put_index > kMaxTaskPuts: raise ValueError("The range of 'put_index' should be [1, %d]" % kMaxTaskPuts) - return ObjectID(CObjectID.for_put(task_id.native(), put_index).binary()) + return ObjectID(CObjectID.ForPut(task_id.native(), put_index).Binary()) def compute_task_id(ObjectID object_id): - return TaskID(object_id.native().task_id().binary()) + return TaskID(object_id.native().TaskId().Binary()) cdef c_bool is_simple_value(value, int *num_elements_contained): @@ -362,7 +362,7 @@ cdef class RayletClient: with nogil: check_status(self.client.get().PrepareActorCheckpoint( c_actor_id, checkpoint_id)) - return ActorCheckpointID(checkpoint_id.binary()) + return ActorCheckpointID(checkpoint_id.Binary()) def notify_actor_resumed_from_checkpoint(self, ActorID actor_id, ActorCheckpointID checkpoint_id): @@ -370,7 +370,7 @@ cdef class RayletClient: actor_id.native(), checkpoint_id.native())) def set_resource(self, basestring resource_name, double capacity, ClientID client_id): - self.client.get().SetResource(resource_name.encode("ascii"), capacity, CClientID.from_binary(client_id.binary())) + self.client.get().SetResource(resource_name.encode("ascii"), capacity, CClientID.FromBinary(client_id.binary())) @property def language(self): @@ -378,11 +378,11 @@ cdef class RayletClient: @property def client_id(self): - return ClientID(self.client.get().GetClientID().binary()) + return ClientID(self.client.get().GetClientID().Binary()) @property def driver_id(self): - return DriverID(self.client.get().GetDriverID().binary()) + return DriverID(self.client.get().GetDriverID().Binary()) @property def is_worker(self): diff --git a/python/ray/includes/task.pxi b/python/ray/includes/task.pxi index 70e7e584d457..f8258e0a6e65 100644 --- a/python/ray/includes/task.pxi +++ b/python/ray/includes/task.pxi @@ -124,15 +124,15 @@ cdef class Task: def driver_id(self): """Return the driver ID for this task.""" - return DriverID(self.task_spec.get().DriverId().binary()) + return DriverID(self.task_spec.get().DriverId().Binary()) def task_id(self): """Return the task ID for this task.""" - return TaskID(self.task_spec.get().TaskId().binary()) + return TaskID(self.task_spec.get().TaskId().Binary()) def parent_task_id(self): """Return the task ID of the parent task.""" - return TaskID(self.task_spec.get().ParentTaskId().binary()) + return TaskID(self.task_spec.get().ParentTaskId().Binary()) def parent_counter(self): """Return the parent counter of this task.""" @@ -162,7 +162,7 @@ cdef class Task: if count > 0: assert count == 1 arg_list.append( - ObjectID(task_spec.ArgId(i, 0).binary())) + ObjectID(task_spec.ArgId(i, 0).Binary())) else: serialized_str = ( task_spec.ArgVal(i)[:task_spec.ArgValLength(i)]) @@ -178,7 +178,7 @@ cdef class Task: cdef CTaskSpecification *task_spec = self.task_spec.get() return_id_list = [] for i in range(task_spec.NumReturns()): - return_id_list.append(ObjectID(task_spec.ReturnId(i).binary())) + return_id_list.append(ObjectID(task_spec.ReturnId(i).Binary())) return return_id_list def required_resources(self): @@ -207,16 +207,16 @@ cdef class Task: def actor_creation_id(self): """Return the actor creation ID for the task.""" - return ActorID(self.task_spec.get().ActorCreationId().binary()) + return ActorID(self.task_spec.get().ActorCreationId().Binary()) def actor_creation_dummy_object_id(self): """Return the actor creation dummy object ID for the task.""" return ObjectID( - self.task_spec.get().ActorCreationDummyObjectId().binary()) + self.task_spec.get().ActorCreationDummyObjectId().Binary()) def actor_id(self): """Return the actor ID for this task.""" - return ActorID(self.task_spec.get().ActorId().binary()) + return ActorID(self.task_spec.get().ActorId().Binary()) def actor_counter(self): """Return the actor counter for this task.""" diff --git a/python/ray/includes/unique_ids.pxd b/python/ray/includes/unique_ids.pxd index fbe793cc023b..8bf369c649b7 100644 --- a/python/ray/includes/unique_ids.pxd +++ b/python/ray/includes/unique_ids.pxd @@ -8,116 +8,116 @@ cdef extern from "ray/id.h" namespace "ray" nogil: T from_random() @staticmethod - T from_binary(const c_string &binary) + T FromBinary(const c_string &binary) @staticmethod - const T nil() + const T Nil() @staticmethod - size_t size() + size_t Size() - size_t hash() const - c_bool is_nil() const + size_t Hash() const + c_bool IsNil() const c_bool operator==(const CBaseID &rhs) const c_bool operator!=(const CBaseID &rhs) const const uint8_t *data() const; - c_string binary() const; - c_string hex() const; + c_string Binary() const; + c_string Hex() const; cdef cppclass CUniqueID "ray::UniqueID"(CBaseID): CUniqueID() @staticmethod - size_t size() + size_t Size() @staticmethod CUniqueID from_random() @staticmethod - CUniqueID from_binary(const c_string &binary) + CUniqueID FromBinary(const c_string &binary) @staticmethod - const CUniqueID nil() + const CUniqueID Nil() @staticmethod - size_t size() + size_t Size() cdef cppclass CActorCheckpointID "ray::ActorCheckpointID"(CUniqueID): @staticmethod - CActorCheckpointID from_binary(const c_string &binary) + CActorCheckpointID FromBinary(const c_string &binary) cdef cppclass CActorClassID "ray::ActorClassID"(CUniqueID): @staticmethod - CActorClassID from_binary(const c_string &binary) + CActorClassID FromBinary(const c_string &binary) cdef cppclass CActorID "ray::ActorID"(CUniqueID): @staticmethod - CActorID from_binary(const c_string &binary) + CActorID FromBinary(const c_string &binary) cdef cppclass CActorHandleID "ray::ActorHandleID"(CUniqueID): @staticmethod - CActorHandleID from_binary(const c_string &binary) + CActorHandleID FromBinary(const c_string &binary) cdef cppclass CClientID "ray::ClientID"(CUniqueID): @staticmethod - CClientID from_binary(const c_string &binary) + CClientID FromBinary(const c_string &binary) cdef cppclass CConfigID "ray::ConfigID"(CUniqueID): @staticmethod - CConfigID from_binary(const c_string &binary) + CConfigID FromBinary(const c_string &binary) cdef cppclass CFunctionID "ray::FunctionID"(CUniqueID): @staticmethod - CFunctionID from_binary(const c_string &binary) + CFunctionID FromBinary(const c_string &binary) cdef cppclass CDriverID "ray::DriverID"(CUniqueID): @staticmethod - CDriverID from_binary(const c_string &binary) + CDriverID FromBinary(const c_string &binary) cdef cppclass CTaskID "ray::TaskID"(CBaseID[CTaskID]): @staticmethod - CTaskID from_binary(const c_string &binary) + CTaskID FromBinary(const c_string &binary) @staticmethod - const CTaskID nil() + const CTaskID Nil() @staticmethod - size_t size() + size_t Size() cdef cppclass CObjectID" ray::ObjectID"(CBaseID[CObjectID]): @staticmethod - CObjectID from_binary(const c_string &binary) + CObjectID FromBinary(const c_string &binary) @staticmethod - const CObjectID nil() + const CObjectID Nil() @staticmethod - CObjectID for_put(const CTaskID &task_id, int64_t index); + CObjectID ForPut(const CTaskID &task_id, int64_t index); @staticmethod - CObjectID for_task_return(const CTaskID &task_id, int64_t index); + CObjectID ForTaskReturn(const CTaskID &task_id, int64_t index); @staticmethod - size_t size() + size_t Size() c_bool is_put() - int64_t object_index() const + int64_t ObjectIndex() const - CTaskID task_id() const + CTaskID TaskId() const cdef cppclass CWorkerID "ray::WorkerID"(CUniqueID): @staticmethod - CWorkerID from_binary(const c_string &binary) + CWorkerID FromBinary(const c_string &binary) diff --git a/python/ray/includes/unique_ids.pxi b/python/ray/includes/unique_ids.pxi index b9773d56fb20..cd3c58003fed 100644 --- a/python/ray/includes/unique_ids.pxi +++ b/python/ray/includes/unique_ids.pxi @@ -97,7 +97,7 @@ cdef class UniqueID(BaseID): def __init__(self, id): check_id(id) - self.data = CUniqueID.from_binary(id) + self.data = CUniqueID.FromBinary(id) @classmethod def from_binary(cls, id_bytes): @@ -107,27 +107,27 @@ cdef class UniqueID(BaseID): @classmethod def nil(cls): - return cls(CUniqueID.nil().binary()) + return cls(CUniqueID.Nil().Binary()) @classmethod def from_random(cls): - return cls(os.urandom(CUniqueID.size())) + return cls(os.urandom(CUniqueID.Size())) def size(self): - return CUniqueID.size() + return CUniqueID.Size() def binary(self): - return self.data.binary() + return self.data.Binary() def hex(self): - return decode(self.data.hex()) + return decode(self.data.Hex()) def is_nil(self): - return self.data.is_nil() + return self.data.IsNil() cdef size_t hash(self): - return self.data.hash() + return self.data.Hash() cdef class ObjectID(BaseID): @@ -135,78 +135,78 @@ cdef class ObjectID(BaseID): def __init__(self, id): check_id(id) - self.data = CObjectID.from_binary(id) + self.data = CObjectID.FromBinary(id) cdef CObjectID native(self): return self.data def size(self): - return CObjectID.size() + return CObjectID.Size() def binary(self): - return self.data.binary() + return self.data.Binary() def hex(self): - return decode(self.data.hex()) + return decode(self.data.Hex()) def is_nil(self): - return self.data.is_nil() + return self.data.IsNil() cdef size_t hash(self): - return self.data.hash() + return self.data.Hash() @classmethod def nil(cls): - return cls(CObjectID.nil().binary()) + return cls(CObjectID.Nil().Binary()) @classmethod def from_random(cls): - return cls(os.urandom(CObjectID.size())) + return cls(os.urandom(CObjectID.Size())) cdef class TaskID(BaseID): cdef CTaskID data def __init__(self, id): - check_id(id, CTaskID.size()) - self.data = CTaskID.from_binary(id) + check_id(id, CTaskID.Size()) + self.data = CTaskID.FromBinary(id) cdef CTaskID native(self): return self.data def size(self): - return CTaskID.size() + return CTaskID.Size() def binary(self): - return self.data.binary() + return self.data.Binary() def hex(self): - return decode(self.data.hex()) + return decode(self.data.Hex()) def is_nil(self): - return self.data.is_nil() + return self.data.IsNil() cdef size_t hash(self): - return self.data.hash() + return self.data.Hash() @classmethod def nil(cls): - return cls(CTaskID.nil().binary()) + return cls(CTaskID.Nil().Binary()) @classmethod def size(cla): - return CTaskID.size() + return CTaskID.Size() @classmethod def from_random(cls): - return cls(os.urandom(CTaskID.size())) + return cls(os.urandom(CTaskID.Size())) cdef class ClientID(UniqueID): def __init__(self, id): check_id(id) - self.data = CClientID.from_binary(id) + self.data = CClientID.FromBinary(id) cdef CClientID native(self): return self.data @@ -216,7 +216,7 @@ cdef class DriverID(UniqueID): def __init__(self, id): check_id(id) - self.data = CDriverID.from_binary(id) + self.data = CDriverID.FromBinary(id) cdef CDriverID native(self): return self.data @@ -226,7 +226,7 @@ cdef class ActorID(UniqueID): def __init__(self, id): check_id(id) - self.data = CActorID.from_binary(id) + self.data = CActorID.FromBinary(id) cdef CActorID native(self): return self.data @@ -236,7 +236,7 @@ cdef class ActorHandleID(UniqueID): def __init__(self, id): check_id(id) - self.data = CActorHandleID.from_binary(id) + self.data = CActorHandleID.FromBinary(id) cdef CActorHandleID native(self): return self.data @@ -246,7 +246,7 @@ cdef class ActorCheckpointID(UniqueID): def __init__(self, id): check_id(id) - self.data = CActorCheckpointID.from_binary(id) + self.data = CActorCheckpointID.FromBinary(id) cdef CActorCheckpointID native(self): return self.data @@ -256,7 +256,7 @@ cdef class FunctionID(UniqueID): def __init__(self, id): check_id(id) - self.data = CFunctionID.from_binary(id) + self.data = CFunctionID.FromBinary(id) cdef CFunctionID native(self): return self.data @@ -266,7 +266,7 @@ cdef class ActorClassID(UniqueID): def __init__(self, id): check_id(id) - self.data = CActorClassID.from_binary(id) + self.data = CActorClassID.FromBinary(id) cdef CActorClassID native(self): return self.data diff --git a/src/ray/common/client_connection.cc b/src/ray/common/client_connection.cc index 4ad961008e91..b36382bbb99a 100644 --- a/src/ray/common/client_connection.cc +++ b/src/ray/common/client_connection.cc @@ -231,7 +231,7 @@ ClientConnection::ClientConnection( const std::string &debug_label, const std::vector &message_type_enum_names, int64_t error_message_type) : ServerConnection(std::move(socket)), - client_id_(ClientID::nil()), + client_id_(ClientID::Nil()), message_handler_(message_handler), debug_label_(debug_label), message_type_enum_names_(message_type_enum_names), @@ -307,7 +307,7 @@ bool ClientConnection::CheckRayCookie() { ss << ", remote endpoint info: " << remote_endpoint_info; } - if (!client_id_.is_nil()) { + if (!client_id_.IsNil()) { // This is from a known client, which indicates a bug. RAY_LOG(FATAL) << ss.str(); } else { diff --git a/src/ray/common/common_protocol.h b/src/ray/common/common_protocol.h index 792bb173b105..63a0bf8c259c 100644 --- a/src/ray/common/common_protocol.h +++ b/src/ray/common/common_protocol.h @@ -104,13 +104,13 @@ string_vec_to_flatbuf(flatbuffers::FlatBufferBuilder &fbb, template flatbuffers::Offset to_flatbuf(flatbuffers::FlatBufferBuilder &fbb, ID id) { - return fbb.CreateString(reinterpret_cast(id.data()), id.size()); + return fbb.CreateString(reinterpret_cast(id.Data()), id.Size()); } template ID from_flatbuf(const flatbuffers::String &string) { - RAY_CHECK(string.size() == ID::size()); - return ID::from_binary(string.str()); + RAY_CHECK(string.size() == ID::Size()); + return ID::FromBinary(string.str()); } template @@ -127,14 +127,14 @@ template const std::vector ids_from_flatbuf(const flatbuffers::String &string) { const auto &ids = string_from_flatbuf(string); std::vector ret; - size_t id_size = ID::size(); + size_t id_size = ID::Size(); RAY_CHECK(ids.size() % id_size == 0); auto count = ids.size() / id_size; for (size_t i = 0; i < count; ++i) { auto pos = static_cast(id_size * i); const auto &id = ids.substr(pos, id_size); - ret.push_back(ID::from_binary(id)); + ret.push_back(ID::FromBinary(id)); } return ret; @@ -145,7 +145,7 @@ flatbuffers::Offset ids_to_flatbuf( flatbuffers::FlatBufferBuilder &fbb, const std::vector &ids) { std::string result; for (const auto &id : ids) { - result += id.binary(); + result += id.Binary(); } return fbb.CreateString(result); diff --git a/src/ray/gcs/client.cc b/src/ray/gcs/client.cc index b51421e10a14..642f5f2cf156 100644 --- a/src/ray/gcs/client.cc +++ b/src/ray/gcs/client.cc @@ -146,19 +146,19 @@ AsyncGcsClient::AsyncGcsClient(const std::string &address, int port, AsyncGcsClient::AsyncGcsClient(const std::string &address, int port, CommandType command_type) - : AsyncGcsClient(address, port, ClientID::from_random(), command_type) {} + : AsyncGcsClient(address, port, ClientID::FromRandom(), command_type) {} AsyncGcsClient::AsyncGcsClient(const std::string &address, int port, CommandType command_type, bool is_test_client) - : AsyncGcsClient(address, port, ClientID::from_random(), command_type, + : AsyncGcsClient(address, port, ClientID::FromRandom(), command_type, is_test_client) {} AsyncGcsClient::AsyncGcsClient(const std::string &address, int port, const std::string &password = "") - : AsyncGcsClient(address, port, ClientID::from_random(), false, password) {} + : AsyncGcsClient(address, port, ClientID::FromRandom(), false, password) {} AsyncGcsClient::AsyncGcsClient(const std::string &address, int port, bool is_test_client) - : AsyncGcsClient(address, port, ClientID::from_random(), is_test_client) {} + : AsyncGcsClient(address, port, ClientID::FromRandom(), is_test_client) {} Status AsyncGcsClient::Attach(boost::asio::io_service &io_service) { // Take care of sharding contexts. diff --git a/src/ray/gcs/client_test.cc b/src/ray/gcs/client_test.cc index 7f69c482e5eb..c203a4a9482a 100644 --- a/src/ray/gcs/client_test.cc +++ b/src/ray/gcs/client_test.cc @@ -29,7 +29,7 @@ class TestGcs : public ::testing::Test { TestGcs(CommandType command_type) : num_callbacks_(0), command_type_(command_type) { client_ = std::make_shared("127.0.0.1", 6379, command_type_, /*is_test_client=*/true); - driver_id_ = DriverID::from_random(); + driver_id_ = DriverID::FromRandom(); } virtual ~TestGcs() { @@ -84,7 +84,7 @@ class TestGcsWithChainAsio : public TestGcsWithAsio { void TestTableLookup(const DriverID &driver_id, std::shared_ptr client) { - TaskID task_id = TaskID::from_random(); + TaskID task_id = TaskID::FromRandom(); auto data = std::make_shared(); data->task_specification = "123"; @@ -133,7 +133,7 @@ TEST_MACRO(TestGcsWithChainAsio, TestTableLookup); void TestLogLookup(const DriverID &driver_id, std::shared_ptr client) { // Append some entries to the log at an object ID. - TaskID task_id = TaskID::from_random(); + TaskID task_id = TaskID::FromRandom(); std::vector node_manager_ids = {"abc", "def", "ghi"}; for (auto &node_manager_id : node_manager_ids) { auto data = std::make_shared(); @@ -178,7 +178,7 @@ TEST_F(TestGcsWithAsio, TestLogLookup) { void TestTableLookupFailure(const DriverID &driver_id, std::shared_ptr client) { - TaskID task_id = TaskID::from_random(); + TaskID task_id = TaskID::FromRandom(); // Check that the lookup does not return data. auto lookup_callback = [](gcs::AsyncGcsClient *client, const TaskID &id, @@ -205,7 +205,7 @@ TEST_MACRO(TestGcsWithChainAsio, TestTableLookupFailure); void TestLogAppendAt(const DriverID &driver_id, std::shared_ptr client) { - TaskID task_id = TaskID::from_random(); + TaskID task_id = TaskID::FromRandom(); std::vector node_manager_ids = {"A", "B"}; std::vector> data_log; for (const auto &node_manager_id : node_manager_ids) { @@ -265,7 +265,7 @@ TEST_F(TestGcsWithAsio, TestLogAppendAt) { void TestSet(const DriverID &driver_id, std::shared_ptr client) { // Add some entries to the set at an object ID. - ObjectID object_id = ObjectID::from_random(); + ObjectID object_id = ObjectID::FromRandom(); std::vector managers = {"abc", "def", "ghi"}; for (auto &manager : managers) { auto data = std::make_shared(); @@ -335,7 +335,7 @@ void TestDeleteKeysFromLog( std::vector ids; TaskID task_id; for (auto &data : data_vector) { - task_id = TaskID::from_random(); + task_id = TaskID::FromRandom(); ids.push_back(task_id); // Check that we added the correct object entries. auto add_callback = [task_id, data](gcs::AsyncGcsClient *client, const TaskID &id, @@ -383,7 +383,7 @@ void TestDeleteKeysFromTable(const DriverID &driver_id, std::vector ids; TaskID task_id; for (auto &data : data_vector) { - task_id = TaskID::from_random(); + task_id = TaskID::FromRandom(); ids.push_back(task_id); // Check that we added the correct object entries. auto add_callback = [task_id, data](gcs::AsyncGcsClient *client, const TaskID &id, @@ -431,7 +431,7 @@ void TestDeleteKeysFromSet(const DriverID &driver_id, std::vector ids; ObjectID object_id; for (auto &data : data_vector) { - object_id = ObjectID::from_random(); + object_id = ObjectID::FromRandom(); ids.push_back(object_id); // Check that we added the correct object entries. auto add_callback = [object_id, data](gcs::AsyncGcsClient *client, const ObjectID &id, @@ -477,7 +477,7 @@ void TestDeleteKeys(const DriverID &driver_id, auto AppendTaskReconstructionData = [&task_reconstruction_vector](size_t add_count) { for (size_t i = 0; i < add_count; ++i) { auto data = std::make_shared(); - data->node_manager_id = ObjectID::from_random().hex(); + data->node_manager_id = ObjectID::FromRandom().Hex(); task_reconstruction_vector.push_back(data); } }; @@ -506,7 +506,7 @@ void TestDeleteKeys(const DriverID &driver_id, auto AppendTaskData = [&task_vector](size_t add_count) { for (size_t i = 0; i < add_count; ++i) { auto task_data = std::make_shared(); - task_data->task_specification = ObjectID::from_random().hex(); + task_data->task_specification = ObjectID::FromRandom().Hex(); task_vector.push_back(task_data); } }; @@ -532,7 +532,7 @@ void TestDeleteKeys(const DriverID &driver_id, auto AppendObjectData = [&object_vector](size_t add_count) { for (size_t i = 0; i < add_count; ++i) { auto data = std::make_shared(); - data->manager = ObjectID::from_random().hex(); + data->manager = ObjectID::FromRandom().Hex(); object_vector.push_back(data); } }; @@ -603,7 +603,7 @@ void TestLogSubscribeAll(const DriverID &driver_id, std::shared_ptr client) { std::vector driver_ids; for (int i = 0; i < 3; i++) { - driver_ids.emplace_back(DriverID::from_random()); + driver_ids.emplace_back(DriverID::FromRandom()); } // Callback for a notification. auto notification_callback = [driver_ids](gcs::AsyncGcsClient *client, @@ -612,7 +612,7 @@ void TestLogSubscribeAll(const DriverID &driver_id, ASSERT_EQ(id, driver_ids[test->NumCallbacks()]); // Check that we get notifications in the same order as the writes. for (const auto &entry : data) { - ASSERT_EQ(entry.driver_id, driver_ids[test->NumCallbacks()].binary()); + ASSERT_EQ(entry.driver_id, driver_ids[test->NumCallbacks()].Binary()); test->IncrementNumCallbacks(); } if (test->NumCallbacks() == driver_ids.size()) { @@ -633,7 +633,7 @@ void TestLogSubscribeAll(const DriverID &driver_id, // subscribed, we will append to the key several times and check that we get // notified for each. RAY_CHECK_OK(client->driver_table().Subscribe( - driver_id, ClientID::nil(), notification_callback, subscribe_callback)); + driver_id, ClientID::Nil(), notification_callback, subscribe_callback)); // Run the event loop. The loop will only stop if the registered subscription // callback is called (or an assertion failure). @@ -651,7 +651,7 @@ void TestSetSubscribeAll(const DriverID &driver_id, std::shared_ptr client) { std::vector object_ids; for (int i = 0; i < 3; i++) { - object_ids.emplace_back(ObjectID::from_random()); + object_ids.emplace_back(ObjectID::FromRandom()); } std::vector managers = {"abc", "def", "ghi"}; @@ -711,7 +711,7 @@ void TestSetSubscribeAll(const DriverID &driver_id, // subscribed, we will append to the key several times and check that we get // notified for each. RAY_CHECK_OK(client->object_table().Subscribe( - driver_id, ClientID::nil(), notification_callback, subscribe_callback)); + driver_id, ClientID::Nil(), notification_callback, subscribe_callback)); // Run the event loop. The loop will only stop if the registered subscription // callback is called (or an assertion failure). @@ -728,11 +728,11 @@ TEST_F(TestGcsWithAsio, TestSetSubscribeAll) { void TestTableSubscribeId(const DriverID &driver_id, std::shared_ptr client) { // Add a table entry. - TaskID task_id1 = TaskID::from_random(); + TaskID task_id1 = TaskID::FromRandom(); std::vector task_specs1 = {"abc", "def", "ghi"}; // Add a table entry at a second key. - TaskID task_id2 = TaskID::from_random(); + TaskID task_id2 = TaskID::FromRandom(); std::vector task_specs2 = {"jkl", "mno", "pqr"}; // The callback for a notification from the table. This should only be @@ -804,14 +804,14 @@ TEST_MACRO(TestGcsWithChainAsio, TestTableSubscribeId); void TestLogSubscribeId(const DriverID &driver_id, std::shared_ptr client) { // Add a log entry. - DriverID driver_id1 = DriverID::from_random(); + DriverID driver_id1 = DriverID::FromRandom(); std::vector driver_ids1 = {"abc", "def", "ghi"}; auto data1 = std::make_shared(); data1->driver_id = driver_ids1[0]; RAY_CHECK_OK(client->driver_table().Append(driver_id, driver_id1, data1, nullptr)); // Add a log entry at a second key. - DriverID driver_id2 = DriverID::from_random(); + DriverID driver_id2 = DriverID::FromRandom(); std::vector driver_ids2 = {"jkl", "mno", "pqr"}; auto data2 = std::make_shared(); data2->driver_id = driver_ids2[0]; @@ -878,14 +878,14 @@ TEST_F(TestGcsWithAsio, TestLogSubscribeId) { void TestSetSubscribeId(const DriverID &driver_id, std::shared_ptr client) { // Add a set entry. - ObjectID object_id1 = ObjectID::from_random(); + ObjectID object_id1 = ObjectID::FromRandom(); std::vector managers1 = {"abc", "def", "ghi"}; auto data1 = std::make_shared(); data1->manager = managers1[0]; RAY_CHECK_OK(client->object_table().Add(driver_id, object_id1, data1, nullptr)); // Add a set entry at a second key. - ObjectID object_id2 = ObjectID::from_random(); + ObjectID object_id2 = ObjectID::FromRandom(); std::vector managers2 = {"jkl", "mno", "pqr"}; auto data2 = std::make_shared(); data2->manager = managers2[0]; @@ -954,7 +954,7 @@ TEST_F(TestGcsWithAsio, TestSetSubscribeId) { void TestTableSubscribeCancel(const DriverID &driver_id, std::shared_ptr client) { // Add a table entry. - TaskID task_id = TaskID::from_random(); + TaskID task_id = TaskID::FromRandom(); std::vector task_specs = {"jkl", "mno", "pqr"}; auto data = std::make_shared(); data->task_specification = task_specs[0]; @@ -1029,7 +1029,7 @@ TEST_MACRO(TestGcsWithChainAsio, TestTableSubscribeCancel); void TestLogSubscribeCancel(const DriverID &driver_id, std::shared_ptr client) { // Add a log entry. - DriverID random_driver_id = DriverID::from_random(); + DriverID random_driver_id = DriverID::FromRandom(); std::vector driver_ids = {"jkl", "mno", "pqr"}; auto data = std::make_shared(); data->driver_id = driver_ids[0]; @@ -1102,7 +1102,7 @@ TEST_F(TestGcsWithAsio, TestLogSubscribeCancel) { void TestSetSubscribeCancel(const DriverID &driver_id, std::shared_ptr client) { // Add a set entry. - ObjectID object_id = ObjectID::from_random(); + ObjectID object_id = ObjectID::FromRandom(); std::vector managers = {"jkl", "mno", "pqr"}; auto data = std::make_shared(); data->manager = managers[0]; @@ -1186,13 +1186,13 @@ void ClientTableNotification(gcs::AsyncGcsClient *client, const ClientID &client const ClientTableDataT &data, bool is_insertion) { ClientID added_id = client->client_table().GetLocalClientId(); ASSERT_EQ(client_id, added_id); - ASSERT_EQ(ClientID::from_binary(data.client_id), added_id); - ASSERT_EQ(ClientID::from_binary(data.client_id), added_id); + ASSERT_EQ(ClientID::FromBinary(data.client_id), added_id); + ASSERT_EQ(ClientID::FromBinary(data.client_id), added_id); ASSERT_EQ(data.entry_type == EntryType::INSERTION, is_insertion); ClientTableDataT cached_client; client->client_table().GetClient(added_id, cached_client); - ASSERT_EQ(ClientID::from_binary(cached_client.client_id), added_id); + ASSERT_EQ(ClientID::FromBinary(cached_client.client_id), added_id); ASSERT_EQ(cached_client.entry_type == EntryType::INSERTION, is_insertion); } @@ -1290,13 +1290,13 @@ void TestClientTableMarkDisconnected(const DriverID &driver_id, // Connect to the client table to start receiving notifications. RAY_CHECK_OK(client->client_table().Connect(local_client_info)); // Mark a different client as dead. - ClientID dead_client_id = ClientID::from_random(); + ClientID dead_client_id = ClientID::FromRandom(); RAY_CHECK_OK(client->client_table().MarkDisconnected(dead_client_id)); // Make sure we only get a notification for the removal of the client we // marked as dead. client->client_table().RegisterClientRemovedCallback([dead_client_id]( gcs::AsyncGcsClient *client, const UniqueID &id, const ClientTableDataT &data) { - ASSERT_EQ(ClientID::from_binary(data.client_id), dead_client_id); + ASSERT_EQ(ClientID::FromBinary(data.client_id), dead_client_id); test->Stop(); }); test->Start(); diff --git a/src/ray/gcs/redis_context.cc b/src/ray/gcs/redis_context.cc index 6b03fa735007..6ff4af38bce3 100644 --- a/src/ray/gcs/redis_context.cc +++ b/src/ray/gcs/redis_context.cc @@ -255,7 +255,7 @@ Status RedisContext::SubscribeAsync(const ClientID &client_id, RAY_CHECK(out_callback_index != nullptr); *out_callback_index = callback_index; int status = 0; - if (client_id.is_nil()) { + if (client_id.IsNil()) { // Subscribe to all messages. std::string redis_command = "SUBSCRIBE %d"; status = redisAsyncCommand( @@ -267,7 +267,7 @@ Status RedisContext::SubscribeAsync(const ClientID &client_id, status = redisAsyncCommand( subscribe_context_, reinterpret_cast(&SubscribeRedisCallback), reinterpret_cast(callback_index), redis_command.c_str(), pubsub_channel, - client_id.data(), client_id.size()); + client_id.Data(), client_id.Size()); } if (status == REDIS_ERR) { diff --git a/src/ray/gcs/redis_context.h b/src/ray/gcs/redis_context.h index 93a343464892..c2d9a6a79262 100644 --- a/src/ray/gcs/redis_context.h +++ b/src/ray/gcs/redis_context.h @@ -134,7 +134,7 @@ Status RedisContext::RunAsync(const std::string &command, const ID &id, int status = redisAsyncCommand( async_context_, reinterpret_cast(&GlobalRedisCallback), reinterpret_cast(callback_index), redis_command.c_str(), prefix, - pubsub_channel, id.data(), id.size(), data, length, log_length); + pubsub_channel, id.Data(), id.Size(), data, length, log_length); if (status == REDIS_ERR) { return Status::RedisError(std::string(async_context_->errstr)); } @@ -143,7 +143,7 @@ Status RedisContext::RunAsync(const std::string &command, const ID &id, int status = redisAsyncCommand( async_context_, reinterpret_cast(&GlobalRedisCallback), reinterpret_cast(callback_index), redis_command.c_str(), prefix, - pubsub_channel, id.data(), id.size(), data, length); + pubsub_channel, id.Data(), id.Size(), data, length); if (status == REDIS_ERR) { return Status::RedisError(std::string(async_context_->errstr)); } @@ -154,7 +154,7 @@ Status RedisContext::RunAsync(const std::string &command, const ID &id, int status = redisAsyncCommand( async_context_, reinterpret_cast(&GlobalRedisCallback), reinterpret_cast(callback_index), redis_command.c_str(), prefix, - pubsub_channel, id.data(), id.size()); + pubsub_channel, id.Data(), id.Size()); if (status == REDIS_ERR) { return Status::RedisError(std::string(async_context_->errstr)); } diff --git a/src/ray/gcs/redis_module/ray_redis_module.cc b/src/ray/gcs/redis_module/ray_redis_module.cc index b9891e8cae32..503f55780007 100644 --- a/src/ray/gcs/redis_module/ray_redis_module.cc +++ b/src/ray/gcs/redis_module/ray_redis_module.cc @@ -648,7 +648,7 @@ static Status DeleteKeyHelper(RedisModuleCtx *ctx, RedisModuleString *prefix_str const char *redis_string_str = RedisModule_StringPtrLen(id_data, &redis_string_size); auto id_binary = std::string(redis_string_str, redis_string_size); ostream << "Undesired type for RAY.TableDelete: " << key_type - << " id:" << ray::UniqueID::from_binary(id_binary); + << " id:" << ray::UniqueID::FromBinary(id_binary); RAY_LOG(ERROR) << ostream.str(); return Status::RedisError(ostream.str()); } diff --git a/src/ray/gcs/tables.cc b/src/ray/gcs/tables.cc index 3d4708940d1a..245cbdf891ea 100644 --- a/src/ray/gcs/tables.cc +++ b/src/ray/gcs/tables.cc @@ -167,7 +167,7 @@ Status Log::RequestNotifications(const DriverID &driver_id, const ID & RAY_CHECK(subscribe_callback_index_ >= 0) << "Client requested notifications on a key before Subscribe completed"; return GetRedisContext(id)->RunAsync("RAY.TABLE_REQUEST_NOTIFICATIONS", id, - client_id.data(), client_id.size(), prefix_, + client_id.Data(), client_id.Size(), prefix_, pubsub_channel_, nullptr); } @@ -177,7 +177,7 @@ Status Log::CancelNotifications(const DriverID &driver_id, const ID &i RAY_CHECK(subscribe_callback_index_ >= 0) << "Client canceled notifications on a key before Subscribe completed"; return GetRedisContext(id)->RunAsync("RAY.TABLE_CANCEL_NOTIFICATIONS", id, - client_id.data(), client_id.size(), prefix_, + client_id.Data(), client_id.Size(), prefix_, pubsub_channel_, nullptr); } @@ -188,16 +188,16 @@ void Log::Delete(const DriverID &driver_id, const std::vector &ids } std::unordered_map sharded_data; for (const auto &id : ids) { - sharded_data[GetRedisContext(id).get()] << id.binary(); + sharded_data[GetRedisContext(id).get()] << id.Binary(); } // Breaking really large deletion commands into batches of smaller size. const size_t batch_size = - RayConfig::instance().maximum_gcs_deletion_batch_size() * ID::size(); + RayConfig::instance().maximum_gcs_deletion_batch_size() * ID::Size(); for (const auto &pair : sharded_data) { std::string current_data = pair.second.str(); for (size_t cur = 0; cur < pair.second.str().size(); cur += batch_size) { size_t data_field_size = std::min(batch_size, current_data.size() - cur); - uint16_t id_count = data_field_size / ID::size(); + uint16_t id_count = data_field_size / ID::Size(); // Send data contains id count and all the id data. std::string send_data(data_field_size + sizeof(id_count), 0); uint8_t *buffer = reinterpret_cast(&send_data[0]); @@ -207,7 +207,7 @@ void Log::Delete(const DriverID &driver_id, const std::vector &ids data_field_size, buffer + sizeof(uint16_t))); RAY_IGNORE_EXPR( - pair.first->RunAsync("RAY.TABLE_DELETE", UniqueID::nil(), + pair.first->RunAsync("RAY.TABLE_DELETE", UniqueID::Nil(), reinterpret_cast(send_data.c_str()), send_data.size(), prefix_, pubsub_channel_, /*redisCallback=*/nullptr)); @@ -337,7 +337,7 @@ std::string Set::DebugString() const { Status ErrorTable::PushErrorToDriver(const DriverID &driver_id, const std::string &type, const std::string &error_message, double timestamp) { auto data = std::make_shared(); - data->driver_id = driver_id.binary(); + data->driver_id = driver_id.Binary(); data->type = type; data->error_message = error_message; data->timestamp = timestamp; @@ -354,7 +354,7 @@ Status ProfileTable::AddProfileEventBatch(const ProfileTableData &profile_events // call "Pack" and undo the "UnPack". profile_events.UnPackTo(data.get()); - return Append(DriverID::nil(), UniqueID::from_random(), data, + return Append(DriverID::Nil(), UniqueID::FromRandom(), data, /*done_callback=*/nullptr); } @@ -364,7 +364,7 @@ std::string ProfileTable::DebugString() const { Status DriverTable::AppendDriverData(const DriverID &driver_id, bool is_dead) { auto data = std::make_shared(); - data->driver_id = driver_id.binary(); + data->driver_id = driver_id.Binary(); data->is_dead = is_dead; return Append(DriverID(driver_id), driver_id, data, /*done_callback=*/nullptr); } @@ -373,7 +373,7 @@ void ClientTable::RegisterClientAddedCallback(const ClientTableCallback &callbac client_added_callback_ = callback; // Call the callback for any added clients that are cached. for (const auto &entry : client_cache_) { - if (!entry.first.is_nil() && (entry.second.entry_type == EntryType::INSERTION)) { + if (!entry.first.IsNil() && (entry.second.entry_type == EntryType::INSERTION)) { client_added_callback_(client_, entry.first, entry.second); } } @@ -383,7 +383,7 @@ void ClientTable::RegisterClientRemovedCallback(const ClientTableCallback &callb client_removed_callback_ = callback; // Call the callback for any removed clients that are cached. for (const auto &entry : client_cache_) { - if (!entry.first.is_nil() && entry.second.entry_type == EntryType::DELETION) { + if (!entry.first.IsNil() && entry.second.entry_type == EntryType::DELETION) { client_removed_callback_(client_, entry.first, entry.second); } } @@ -394,7 +394,7 @@ void ClientTable::RegisterResourceCreateUpdatedCallback( resource_createupdated_callback_ = callback; // Call the callback for any clients that are cached. for (const auto &entry : client_cache_) { - if (!entry.first.is_nil() && + if (!entry.first.IsNil() && (entry.second.entry_type == EntryType::RES_CREATEUPDATE)) { resource_createupdated_callback_(client_, entry.first, entry.second); } @@ -405,7 +405,7 @@ void ClientTable::RegisterResourceDeletedCallback(const ClientTableCallback &cal resource_deleted_callback_ = callback; // Call the callback for any clients that are cached. for (const auto &entry : client_cache_) { - if (!entry.first.is_nil() && entry.second.entry_type == EntryType::RES_DELETE) { + if (!entry.first.IsNil() && entry.second.entry_type == EntryType::RES_DELETE) { resource_deleted_callback_(client_, entry.first, entry.second); } } @@ -413,7 +413,7 @@ void ClientTable::RegisterResourceDeletedCallback(const ClientTableCallback &cal void ClientTable::HandleNotification(AsyncGcsClient *client, const ClientTableDataT &data) { - ClientID client_id = ClientID::from_binary(data.client_id); + ClientID client_id = ClientID::FromBinary(data.client_id); // It's possible to get duplicate notifications from the client table, so // check whether this notification is new. auto entry = client_cache_.find(client_id); @@ -519,7 +519,7 @@ void ClientTable::HandleNotification(AsyncGcsClient *client, } void ClientTable::HandleConnected(AsyncGcsClient *client, const ClientTableDataT &data) { - auto connected_client_id = ClientID::from_binary(data.client_id); + auto connected_client_id = ClientID::FromBinary(data.client_id); RAY_CHECK(client_id_ == connected_client_id) << connected_client_id << " " << client_id_; } @@ -578,13 +578,13 @@ Status ClientTable::Connect(const ClientTableDataT &local_client) { // Callback to request notifications from the client table once we've // successfully subscribed. auto subscription_callback = [this](AsyncGcsClient *c) { - RAY_CHECK_OK(RequestNotifications(DriverID::nil(), client_log_key_, client_id_)); + RAY_CHECK_OK(RequestNotifications(DriverID::Nil(), client_log_key_, client_id_)); }; // Subscribe to the client table. - RAY_CHECK_OK(Subscribe(DriverID::nil(), client_id_, notification_callback, + RAY_CHECK_OK(Subscribe(DriverID::Nil(), client_id_, notification_callback, subscription_callback)); }; - return Append(DriverID::nil(), client_log_key_, data, add_callback); + return Append(DriverID::Nil(), client_log_key_, data, add_callback); } Status ClientTable::Disconnect(const DisconnectCallback &callback) { @@ -593,12 +593,12 @@ Status ClientTable::Disconnect(const DisconnectCallback &callback) { auto add_callback = [this, callback](AsyncGcsClient *client, const ClientID &id, const ClientTableDataT &data) { HandleConnected(client, data); - RAY_CHECK_OK(CancelNotifications(DriverID::nil(), client_log_key_, id)); + RAY_CHECK_OK(CancelNotifications(DriverID::Nil(), client_log_key_, id)); if (callback != nullptr) { callback(); } }; - RAY_RETURN_NOT_OK(Append(DriverID::nil(), client_log_key_, data, add_callback)); + RAY_RETURN_NOT_OK(Append(DriverID::Nil(), client_log_key_, data, add_callback)); // We successfully added the deletion entry. Mark ourselves as disconnected. disconnected_ = true; return Status::OK(); @@ -606,19 +606,19 @@ Status ClientTable::Disconnect(const DisconnectCallback &callback) { ray::Status ClientTable::MarkDisconnected(const ClientID &dead_client_id) { auto data = std::make_shared(); - data->client_id = dead_client_id.binary(); + data->client_id = dead_client_id.Binary(); data->entry_type = EntryType::DELETION; - return Append(DriverID::nil(), client_log_key_, data, nullptr); + return Append(DriverID::Nil(), client_log_key_, data, nullptr); } void ClientTable::GetClient(const ClientID &client_id, ClientTableDataT &client_info) const { - RAY_CHECK(!client_id.is_nil()); + RAY_CHECK(!client_id.IsNil()); auto entry = client_cache_.find(client_id); if (entry != client_cache_.end()) { client_info = entry->second; } else { - client_info.client_id = ClientID::nil().binary(); + client_info.client_id = ClientID::Nil().Binary(); } } @@ -628,7 +628,7 @@ const std::unordered_map &ClientTable::GetAllClients Status ClientTable::Lookup(const Callback &lookup) { RAY_CHECK(lookup != nullptr); - return Log::Lookup(DriverID::nil(), client_log_key_, lookup); + return Log::Lookup(DriverID::Nil(), client_log_key_, lookup); } std::string ClientTable::DebugString() const { @@ -648,12 +648,12 @@ Status ActorCheckpointIdTable::AddCheckpointId(const DriverID &driver_id, std::shared_ptr copy = std::make_shared(data); copy->timestamps.push_back(current_sys_time_ms()); - copy->checkpoint_ids += checkpoint_id.binary(); + copy->checkpoint_ids += checkpoint_id.Binary(); auto num_to_keep = RayConfig::instance().num_actor_checkpoints_to_keep(); while (copy->timestamps.size() > num_to_keep) { // Delete the checkpoint from actor checkpoint table. const auto &checkpoint_id = - ActorCheckpointID::from_binary(copy->checkpoint_ids.substr(0, kUniqueIDSize)); + ActorCheckpointID::FromBinary(copy->checkpoint_ids.substr(0, kUniqueIDSize)); RAY_LOG(DEBUG) << "Deleting checkpoint " << checkpoint_id << " for actor " << actor_id; copy->timestamps.erase(copy->timestamps.begin()); @@ -666,9 +666,9 @@ Status ActorCheckpointIdTable::AddCheckpointId(const DriverID &driver_id, ray::gcs::AsyncGcsClient *client, const UniqueID &id) { std::shared_ptr data = std::make_shared(); - data->actor_id = id.binary(); + data->actor_id = id.Binary(); data->timestamps.push_back(current_sys_time_ms()); - data->checkpoint_ids = checkpoint_id.binary(); + data->checkpoint_ids = checkpoint_id.Binary(); RAY_CHECK_OK(Add(driver_id, actor_id, data, nullptr)); }; return Lookup(driver_id, actor_id, lookup_callback, failure_callback); diff --git a/src/ray/gcs/tables.h b/src/ray/gcs/tables.h index 58a087d8c666..af739dc2ed32 100644 --- a/src/ray/gcs/tables.h +++ b/src/ray/gcs/tables.h @@ -559,7 +559,7 @@ class TaskLeaseTable : public Table { // TODO(swang): Use a common helper function to format the key instead of // hardcoding it to match the Redis module. std::vector args = {"PEXPIRE", - EnumNameTablePrefix(prefix_) + id.binary(), + EnumNameTablePrefix(prefix_) + id.Binary(), std::to_string(data->timeout)}; return GetRedisContext(id)->RunArgvAsync(args); @@ -695,7 +695,7 @@ class ClientTable : public Log { prefix_ = TablePrefix::CLIENT; // Set the local client's ID. - local_client_.client_id = client_id.binary(); + local_client_.client_id = client_id.Binary(); }; /// Connect as a client to the GCS. This registers us in the client table diff --git a/src/ray/id.cc b/src/ray/id.cc index a011430ad1cf..4c9ce4dc9244 100644 --- a/src/ray/id.cc +++ b/src/ray/id.cc @@ -26,14 +26,14 @@ std::mt19937 RandomlySeededMersenneTwister() { uint64_t MurmurHash64A(const void *key, int len, unsigned int seed); -plasma::UniqueID ObjectID::to_plasma_id() const { +plasma::UniqueID ObjectID::ToPlasmaId() const { plasma::UniqueID result; - std::memcpy(result.mutable_data(), data(), kUniqueIDSize); + std::memcpy(result.mutable_data(), Data(), kUniqueIDSize); return result; } ObjectID::ObjectID(const plasma::UniqueID &from) { - std::memcpy(this->mutable_data(), from.data(), kUniqueIDSize); + std::memcpy(this->MutableData(), from.data(), kUniqueIDSize); } // This code is from https://sites.google.com/site/murmurhash/ @@ -86,29 +86,29 @@ uint64_t MurmurHash64A(const void *key, int len, unsigned int seed) { } TaskID TaskID::GetDriverTaskID(const DriverID &driver_id) { - std::string driver_id_str = driver_id.binary(); - driver_id_str.resize(size()); - return TaskID::from_binary(driver_id_str); + std::string driver_id_str = driver_id.Binary(); + driver_id_str.resize(Size()); + return TaskID::FromBinary(driver_id_str); } -TaskID ObjectID::task_id() const { - return TaskID::from_binary( - std::string(reinterpret_cast(id_), TaskID::size())); +TaskID ObjectID::TaskId() const { + return TaskID::FromBinary( + std::string(reinterpret_cast(id_), TaskID::Size())); } -ObjectID ObjectID::for_put(const TaskID &task_id, int64_t put_index) { +ObjectID ObjectID::ForPut(const TaskID &task_id, int64_t put_index) { RAY_CHECK(put_index >= 1 && put_index <= kMaxTaskPuts) << "index=" << put_index; ObjectID object_id; - std::memcpy(object_id.id_, task_id.binary().c_str(), task_id.size()); + std::memcpy(object_id.id_, task_id.Binary().c_str(), task_id.Size()); object_id.index_ = -put_index; return object_id; } -ObjectID ObjectID::for_task_return(const TaskID &task_id, int64_t return_index) { +ObjectID ObjectID::ForTaskReturn(const TaskID &task_id, int64_t return_index) { RAY_CHECK(return_index >= 1 && return_index <= kMaxTaskReturns) << "index=" << return_index; ObjectID object_id; - std::memcpy(object_id.id_, task_id.binary().c_str(), task_id.size()); + std::memcpy(object_id.id_, task_id.Binary().c_str(), task_id.Size()); object_id.index_ = return_index; return object_id; } @@ -118,23 +118,23 @@ const TaskID GenerateTaskId(const DriverID &driver_id, const TaskID &parent_task // Compute hashes. SHA256_CTX ctx; sha256_init(&ctx); - sha256_update(&ctx, reinterpret_cast(driver_id.data()), driver_id.size()); - sha256_update(&ctx, reinterpret_cast(parent_task_id.data()), - parent_task_id.size()); + sha256_update(&ctx, reinterpret_cast(driver_id.Data()), driver_id.Size()); + sha256_update(&ctx, reinterpret_cast(parent_task_id.Data()), + parent_task_id.Size()); sha256_update(&ctx, (const BYTE *)&parent_task_counter, sizeof(parent_task_counter)); // Compute the final task ID from the hash. BYTE buff[DIGEST_SIZE]; sha256_final(&ctx, buff); - return TaskID::from_binary(std::string(buff, buff + TaskID::size())); + return TaskID::FromBinary(std::string(buff, buff + TaskID::Size())); } #define ID_OSTREAM_OPERATOR(id_type) \ std::ostream &operator<<(std::ostream &os, const id_type &id) { \ - if (id.is_nil()) { \ + if (id.IsNil()) { \ os << "NIL_ID"; \ } else { \ - os << id.hex(); \ + os << id.Hex(); \ } \ return os; \ } diff --git a/src/ray/id.h b/src/ray/id.h index f90f66549358..7153a95f7750 100644 --- a/src/ray/id.h +++ b/src/ray/id.h @@ -31,26 +31,26 @@ template class BaseID { public: BaseID(); - static T from_random(); - static T from_binary(const std::string &binary); - static const T &nil(); - static size_t size() { return T::size(); } + static T FromRandom(); + static T FromBinary(const std::string &binary); + static const T &Nil(); + static size_t Size() { return T::Size(); } - size_t hash() const; - bool is_nil() const; + size_t Hash() const; + bool IsNil() const; bool operator==(const BaseID &rhs) const; bool operator!=(const BaseID &rhs) const; - const uint8_t *data() const; - std::string binary() const; - std::string hex() const; + const uint8_t *Data() const; + std::string Binary() const; + std::string Hex() const; protected: BaseID(const std::string &binary) { - std::memcpy(const_cast(this->data()), binary.data(), T::size()); + std::memcpy(const_cast(this->Data()), binary.data(), T::Size()); } - // All IDs are immutable for hash evaluations. mutable_data is only allow to use + // All IDs are immutable for hash evaluations. MutableData is only allow to use // in construction time, so this function is protected. - uint8_t *mutable_data(); + uint8_t *MutableData(); // For lazy evaluation, be careful to have one Id contained in another. // This hash code will be duplicated. mutable size_t hash_ = 0; @@ -59,7 +59,7 @@ class BaseID { class UniqueID : public BaseID { public: UniqueID() : BaseID(){}; - static size_t size() { return kUniqueIDSize; } + static size_t Size() { return kUniqueIDSize; } protected: UniqueID(const std::string &binary); @@ -71,7 +71,7 @@ class UniqueID : public BaseID { class TaskID : public BaseID { public: TaskID() : BaseID() {} - static size_t size() { return kTaskIDSize; } + static size_t Size() { return kTaskIDSize; } static TaskID GetDriverTaskID(const DriverID &driver_id); private: @@ -81,8 +81,8 @@ class TaskID : public BaseID { class ObjectID : public BaseID { public: ObjectID() : BaseID() {} - static size_t size() { return kUniqueIDSize; } - plasma::ObjectID to_plasma_id() const; + static size_t Size() { return kUniqueIDSize; } + plasma::ObjectID ToPlasmaId() const; ObjectID(const plasma::UniqueID &from); /// Get the index of this object in the task that created it. @@ -90,26 +90,26 @@ class ObjectID : public BaseID { /// \return The index of object creation according to the task that created /// this object. This is positive if the task returned the object and negative /// if created by a put. - int32_t object_index() const { return index_; } + int32_t ObjectIndex() const { return index_; } /// Compute the task ID of the task that created the object. /// /// \return The task ID of the task that created this object. - TaskID task_id() const; + TaskID TaskId() const; /// Compute the object ID of an object put by the task. /// /// \param task_id The task ID of the task that created the object. /// \param index What index of the object put in the task. /// \return The computed object ID. - static ObjectID for_put(const TaskID &task_id, int64_t put_index); + static ObjectID ForPut(const TaskID &task_id, int64_t put_index); /// Compute the object ID of an object returned by the task. /// /// \param task_id The task ID of the task that created the object. /// \param return_index What index of the object returned by in the task. /// \return The computed object ID. - static ObjectID for_task_return(const TaskID &task_id, int64_t return_index); + static ObjectID ForTaskReturn(const TaskID &task_id, int64_t return_index); private: uint8_t id_[kTaskIDSize]; @@ -125,22 +125,22 @@ std::ostream &operator<<(std::ostream &os, const UniqueID &id); std::ostream &operator<<(std::ostream &os, const TaskID &id); std::ostream &operator<<(std::ostream &os, const ObjectID &id); -#define DEFINE_UNIQUE_ID(type) \ - class RAY_EXPORT type : public UniqueID { \ - public: \ - explicit type(const UniqueID &from) { \ - std::memcpy(&id_, from.data(), kUniqueIDSize); \ - } \ - type() : UniqueID() {} \ - static type from_random() { return type(UniqueID::from_random()); } \ - static type from_binary(const std::string &binary) { return type(binary); } \ - static type nil() { return type(UniqueID::nil()); } \ - static size_t size() { return kUniqueIDSize; } \ - \ - private: \ - explicit type(const std::string &binary) { \ - std::memcpy(&id_, binary.data(), kUniqueIDSize); \ - } \ +#define DEFINE_UNIQUE_ID(type) \ + class RAY_EXPORT type : public UniqueID { \ + public: \ + explicit type(const UniqueID &from) { \ + std::memcpy(&id_, from.Data(), kUniqueIDSize); \ + } \ + type() : UniqueID() {} \ + static type FromRandom() { return type(UniqueID::FromRandom()); } \ + static type FromBinary(const std::string &binary) { return type(binary); } \ + static type Nil() { return type(UniqueID::Nil()); } \ + static size_t Size() { return kUniqueIDSize; } \ + \ + private: \ + explicit type(const std::string &binary) { \ + std::memcpy(&id_, binary.data(), kUniqueIDSize); \ + } \ }; #include "id_def.h" @@ -163,12 +163,12 @@ template BaseID::BaseID() { // Using const_cast to directly change data is dangerous. The cached // hash may not be changed. This is used in construction time. - std::fill_n(this->mutable_data(), T::size(), 0xff); + std::fill_n(this->MutableData(), T::Size(), 0xff); } template -T BaseID::from_random() { - std::string data(T::size(), 0); +T BaseID::FromRandom() { + std::string data(T::Size(), 0); // NOTE(pcm): The right way to do this is to have one std::mt19937 per // thread (using the thread_local keyword), but that's not supported on // older versions of macOS (see https://stackoverflow.com/a/29929949) @@ -176,44 +176,44 @@ T BaseID::from_random() { std::lock_guard lock(random_engine_mutex); static std::mt19937 generator = RandomlySeededMersenneTwister(); std::uniform_int_distribution dist(0, std::numeric_limits::max()); - for (int i = 0; i < T::size(); i++) { + for (int i = 0; i < T::Size(); i++) { data[i] = static_cast(dist(generator)); } - return T::from_binary(data); + return T::FromBinary(data); } template -T BaseID::from_binary(const std::string &binary) { - T t = T::nil(); - std::memcpy(t.mutable_data(), binary.data(), T::size()); +T BaseID::FromBinary(const std::string &binary) { + T t = T::Nil(); + std::memcpy(t.MutableData(), binary.data(), T::Size()); return t; } template -const T &BaseID::nil() { +const T &BaseID::Nil() { static const T nil_id; return nil_id; } template -bool BaseID::is_nil() const { - static T nil_id = T::nil(); +bool BaseID::IsNil() const { + static T nil_id = T::Nil(); return *this == nil_id; } template -size_t BaseID::hash() const { +size_t BaseID::Hash() const { // Note(ashione): hash code lazy calculation(it's invoked every time if hash code is // default value 0) if (!hash_) { - hash_ = MurmurHash64A(data(), T::size(), 0); + hash_ = MurmurHash64A(Data(), T::Size(), 0); } return hash_; } template bool BaseID::operator==(const BaseID &rhs) const { - return std::memcmp(data(), rhs.data(), T::size()) == 0; + return std::memcmp(Data(), rhs.Data(), T::Size()) == 0; } template @@ -222,26 +222,26 @@ bool BaseID::operator!=(const BaseID &rhs) const { } template -uint8_t *BaseID::mutable_data() { +uint8_t *BaseID::MutableData() { return reinterpret_cast(this) + sizeof(hash_); } template -const uint8_t *BaseID::data() const { +const uint8_t *BaseID::Data() const { return reinterpret_cast(this) + sizeof(hash_); } template -std::string BaseID::binary() const { - return std::string(reinterpret_cast(data()), T::size()); +std::string BaseID::Binary() const { + return std::string(reinterpret_cast(Data()), T::Size()); } template -std::string BaseID::hex() const { +std::string BaseID::Hex() const { constexpr char hex[] = "0123456789abcdef"; - const uint8_t *id = data(); + const uint8_t *id = Data(); std::string result; - for (int i = 0; i < T::size(); i++) { + for (int i = 0; i < T::Size(); i++) { unsigned int val = id[i]; result.push_back(hex[val >> 4]); result.push_back(hex[val & 0xf]); @@ -256,11 +256,11 @@ namespace std { #define DEFINE_UNIQUE_ID(type) \ template <> \ struct hash<::ray::type> { \ - size_t operator()(const ::ray::type &id) const { return id.hash(); } \ + size_t operator()(const ::ray::type &id) const { return id.Hash(); } \ }; \ template <> \ struct hash { \ - size_t operator()(const ::ray::type &id) const { return id.hash(); } \ + size_t operator()(const ::ray::type &id) const { return id.Hash(); } \ }; DEFINE_UNIQUE_ID(UniqueID); diff --git a/src/ray/object_manager/object_buffer_pool.cc b/src/ray/object_manager/object_buffer_pool.cc index fe0471797c0d..fba426d732c3 100644 --- a/src/ray/object_manager/object_buffer_pool.cc +++ b/src/ray/object_manager/object_buffer_pool.cc @@ -43,7 +43,7 @@ std::pair ObjectBufferPool::Ge std::lock_guard lock(pool_mutex_); if (get_buffer_state_.count(object_id) == 0) { plasma::ObjectBuffer object_buffer; - plasma::ObjectID plasma_id = object_id.to_plasma_id(); + plasma::ObjectID plasma_id = object_id.ToPlasmaId(); RAY_ARROW_CHECK_OK(store_client_.Get(&plasma_id, 1, 0, &object_buffer)); if (object_buffer.data == nullptr) { RAY_LOG(ERROR) << "Failed to get object"; @@ -72,14 +72,14 @@ void ObjectBufferPool::ReleaseGetChunk(const ObjectID &object_id, uint64_t chunk GetBufferState &buffer_state = get_buffer_state_[object_id]; buffer_state.references--; if (buffer_state.references == 0) { - RAY_ARROW_CHECK_OK(store_client_.Release(object_id.to_plasma_id())); + RAY_ARROW_CHECK_OK(store_client_.Release(object_id.ToPlasmaId())); get_buffer_state_.erase(object_id); } } void ObjectBufferPool::AbortGet(const ObjectID &object_id) { std::lock_guard lock(pool_mutex_); - RAY_ARROW_CHECK_OK(store_client_.Release(object_id.to_plasma_id())); + RAY_ARROW_CHECK_OK(store_client_.Release(object_id.ToPlasmaId())); get_buffer_state_.erase(object_id); } @@ -88,7 +88,7 @@ std::pair ObjectBufferPool::Cr uint64_t chunk_index) { std::lock_guard lock(pool_mutex_); if (create_buffer_state_.count(object_id) == 0) { - const plasma::ObjectID plasma_id = object_id.to_plasma_id(); + const plasma::ObjectID plasma_id = object_id.ToPlasmaId(); int64_t object_size = data_size - metadata_size; // Try to create shared buffer. std::shared_ptr data; @@ -150,7 +150,7 @@ void ObjectBufferPool::SealChunk(const ObjectID &object_id, const uint64_t chunk create_buffer_state_[object_id].chunk_state[chunk_index] = CreateChunkState::SEALED; create_buffer_state_[object_id].num_seals_remaining--; if (create_buffer_state_[object_id].num_seals_remaining == 0) { - const plasma::ObjectID plasma_id = object_id.to_plasma_id(); + const plasma::ObjectID plasma_id = object_id.ToPlasmaId(); RAY_ARROW_CHECK_OK(store_client_.Seal(plasma_id)); RAY_ARROW_CHECK_OK(store_client_.Release(plasma_id)); create_buffer_state_.erase(object_id); @@ -158,7 +158,7 @@ void ObjectBufferPool::SealChunk(const ObjectID &object_id, const uint64_t chunk } void ObjectBufferPool::AbortCreate(const ObjectID &object_id) { - const plasma::ObjectID plasma_id = object_id.to_plasma_id(); + const plasma::ObjectID plasma_id = object_id.ToPlasmaId(); RAY_ARROW_CHECK_OK(store_client_.Release(plasma_id)); RAY_ARROW_CHECK_OK(store_client_.Abort(plasma_id)); create_buffer_state_.erase(object_id); @@ -186,7 +186,7 @@ void ObjectBufferPool::FreeObjects(const std::vector &object_ids) { std::vector plasma_ids; plasma_ids.reserve(object_ids.size()); for (const auto &id : object_ids) { - plasma_ids.push_back(id.to_plasma_id()); + plasma_ids.push_back(id.ToPlasmaId()); } std::lock_guard lock(pool_mutex_); RAY_ARROW_CHECK_OK(store_client_.Delete(plasma_ids)); diff --git a/src/ray/object_manager/object_directory.cc b/src/ray/object_manager/object_directory.cc index 85157abcdbe9..1f05559f4b87 100644 --- a/src/ray/object_manager/object_directory.cc +++ b/src/ray/object_manager/object_directory.cc @@ -19,7 +19,7 @@ void UpdateObjectLocations(const GcsTableNotificationMode notification_mode, // with GcsTableNotificationMode, we can determine whether the update mode is // addition or deletion. for (const auto &object_table_data : location_updates) { - ClientID client_id = ClientID::from_binary(object_table_data.manager); + ClientID client_id = ClientID::FromBinary(object_table_data.manager); if (notification_mode != GcsTableNotificationMode::REMOVE) { client_ids->insert(client_id); } else { @@ -71,7 +71,7 @@ void ObjectDirectory::RegisterBackend() { } }; RAY_CHECK_OK(gcs_client_->object_table().Subscribe( - DriverID::nil(), gcs_client_->client_table().GetLocalClientId(), + DriverID::Nil(), gcs_client_->client_table().GetLocalClientId(), object_notification_callback, nullptr)); } @@ -81,10 +81,10 @@ ray::Status ObjectDirectory::ReportObjectAdded( RAY_LOG(DEBUG) << "Reporting object added to GCS " << object_id; // Append the addition entry to the object table. auto data = std::make_shared(); - data->manager = client_id.binary(); + data->manager = client_id.Binary(); data->object_size = object_info.data_size; ray::Status status = - gcs_client_->object_table().Add(DriverID::nil(), object_id, data, nullptr); + gcs_client_->object_table().Add(DriverID::Nil(), object_id, data, nullptr); return status; } @@ -94,10 +94,10 @@ ray::Status ObjectDirectory::ReportObjectRemoved( RAY_LOG(DEBUG) << "Reporting object removed to GCS " << object_id; // Append the eviction entry to the object table. auto data = std::make_shared(); - data->manager = client_id.binary(); + data->manager = client_id.Binary(); data->object_size = object_info.data_size; ray::Status status = - gcs_client_->object_table().Remove(DriverID::nil(), object_id, data, nullptr); + gcs_client_->object_table().Remove(DriverID::Nil(), object_id, data, nullptr); return status; }; @@ -105,8 +105,8 @@ void ObjectDirectory::LookupRemoteConnectionInfo( RemoteConnectionInfo &connection_info) const { ClientTableDataT client_data; gcs_client_->client_table().GetClient(connection_info.client_id, client_data); - ClientID result_client_id = ClientID::from_binary(client_data.client_id); - if (!result_client_id.is_nil()) { + ClientID result_client_id = ClientID::FromBinary(client_data.client_id); + if (!result_client_id.IsNil()) { RAY_CHECK(result_client_id == connection_info.client_id); if (client_data.entry_type == EntryType::INSERTION) { connection_info.ip = client_data.node_manager_address; @@ -157,7 +157,7 @@ ray::Status ObjectDirectory::SubscribeObjectLocations(const UniqueID &callback_i if (it == listeners_.end()) { it = listeners_.emplace(object_id, LocationListenerState()).first; status = gcs_client_->object_table().RequestNotifications( - DriverID::nil(), object_id, gcs_client_->client_table().GetLocalClientId()); + DriverID::Nil(), object_id, gcs_client_->client_table().GetLocalClientId()); } auto &listener_state = it->second; // TODO(hme): Make this fatal after implementing Pull suppression. @@ -185,7 +185,7 @@ ray::Status ObjectDirectory::UnsubscribeObjectLocations(const UniqueID &callback entry->second.callbacks.erase(callback_id); if (entry->second.callbacks.empty()) { status = gcs_client_->object_table().CancelNotifications( - DriverID::nil(), object_id, gcs_client_->client_table().GetLocalClientId()); + DriverID::Nil(), object_id, gcs_client_->client_table().GetLocalClientId()); listeners_.erase(entry); } return status; @@ -208,7 +208,7 @@ ray::Status ObjectDirectory::LookupLocations(const ObjectID &object_id, // SubscribeObjectLocations call, so look up the object's locations // directly from the GCS. status = gcs_client_->object_table().Lookup( - DriverID::nil(), object_id, + DriverID::Nil(), object_id, [this, callback](gcs::AsyncGcsClient *client, const ObjectID &object_id, const std::vector &location_updates) { // Build the set of current locations based on the entries in the log. diff --git a/src/ray/object_manager/object_manager.cc b/src/ray/object_manager/object_manager.cc index 3d829027b7cf..954162c21aef 100644 --- a/src/ray/object_manager/object_manager.cc +++ b/src/ray/object_manager/object_manager.cc @@ -64,7 +64,7 @@ void ObjectManager::StopIOService() { void ObjectManager::HandleObjectAdded( const object_manager::protocol::ObjectInfoT &object_info) { // Notify the object directory that the object has been added to this node. - ObjectID object_id = ObjectID::from_binary(object_info.object_id); + ObjectID object_id = ObjectID::FromBinary(object_info.object_id); RAY_LOG(DEBUG) << "Object added " << object_id; RAY_CHECK(local_objects_.count(object_id) == 0); local_objects_[object_id].object_info = object_info; @@ -272,7 +272,7 @@ void ObjectManager::PullSendRequest(const ObjectID &object_id, flatbuffers::FlatBufferBuilder fbb; auto message = object_manager_protocol::CreatePullRequestMessage( - fbb, fbb.CreateString(client_id_.binary()), fbb.CreateString(object_id.binary())); + fbb, fbb.CreateString(client_id_.Binary()), fbb.CreateString(object_id.Binary())); fbb.Finish(message); conn->WriteMessageAsync( static_cast(object_manager_protocol::MessageType::PullRequest), @@ -315,7 +315,7 @@ void ObjectManager::HandleSendFinished(const ObjectID &object_id, profile_event.end_time = end_time; // Encode the object ID, client ID, chunk index, and status as a json list, // which will be parsed by the reader of the profile table. - profile_event.extra_data = "[\"" + object_id.hex() + "\",\"" + client_id.hex() + "\"," + + profile_event.extra_data = "[\"" + object_id.Hex() + "\",\"" + client_id.Hex() + "\"," + std::to_string(chunk_index) + ",\"" + status.ToString() + "\"]"; profile_events_.push_back(profile_event); @@ -335,7 +335,7 @@ void ObjectManager::HandleReceiveFinished(const ObjectID &object_id, profile_event.end_time = end_time; // Encode the object ID, client ID, chunk index, and status as a json list, // which will be parsed by the reader of the profile table. - profile_event.extra_data = "[\"" + object_id.hex() + "\",\"" + client_id.hex() + "\"," + + profile_event.extra_data = "[\"" + object_id.Hex() + "\",\"" + client_id.Hex() + "\"," + std::to_string(chunk_index) + ",\"" + status.ToString() + "\"]"; profile_events_.push_back(profile_event); @@ -408,7 +408,7 @@ void ObjectManager::Push(const ObjectID &object_id, const ClientID &client_id) { static_cast(object_info.data_size + object_info.metadata_size); uint64_t metadata_size = static_cast(object_info.metadata_size); uint64_t num_chunks = buffer_pool_.GetNumChunks(data_size); - UniqueID push_id = UniqueID::from_random(); + UniqueID push_id = UniqueID::FromRandom(); for (uint64_t chunk_index = 0; chunk_index < num_chunks; ++chunk_index) { send_service_.post([this, push_id, client_id, object_id, data_size, metadata_size, chunk_index, connection_info]() { @@ -527,7 +527,7 @@ void ObjectManager::CancelPull(const ObjectID &object_id) { ray::Status ObjectManager::Wait(const std::vector &object_ids, int64_t timeout_ms, uint64_t num_required_objects, bool wait_local, const WaitCallback &callback) { - UniqueID wait_id = UniqueID::from_random(); + UniqueID wait_id = UniqueID::FromRandom(); RAY_LOG(DEBUG) << "Wait request " << wait_id << " on " << client_id_; RAY_RETURN_NOT_OK(AddWaitRequest(wait_id, object_ids, timeout_ms, num_required_objects, wait_local, callback)); @@ -773,7 +773,7 @@ void ObjectManager::ConnectClient(std::shared_ptr &conn, // TODO: trash connection on failure. auto info = flatbuffers::GetRoot(message); - ClientID client_id = ClientID::from_binary(info->client_id()->str()); + ClientID client_id = ClientID::FromBinary(info->client_id()->str()); bool is_transfer = info->is_transfer(); conn->SetClientID(client_id); if (is_transfer) { @@ -798,14 +798,14 @@ void ObjectManager::ReceivePullRequest(std::shared_ptr &con const uint8_t *message) { // Serialize and push object to requesting client. auto pr = flatbuffers::GetRoot(message); - ObjectID object_id = ObjectID::from_binary(pr->object_id()->str()); - ClientID client_id = ClientID::from_binary(pr->client_id()->str()); + ObjectID object_id = ObjectID::FromBinary(pr->object_id()->str()); + ClientID client_id = ClientID::FromBinary(pr->client_id()->str()); ProfileEventT profile_event; profile_event.event_type = "receive_pull_request"; profile_event.start_time = current_sys_time_seconds(); profile_event.end_time = profile_event.start_time; - profile_event.extra_data = "[\"" + object_id.hex() + "\",\"" + client_id.hex() + "\"]"; + profile_event.extra_data = "[\"" + object_id.Hex() + "\",\"" + client_id.Hex() + "\"]"; profile_events_.push_back(profile_event); Push(object_id, client_id); @@ -817,7 +817,7 @@ void ObjectManager::ReceivePushRequest(std::shared_ptr &con // Serialize. auto object_header = flatbuffers::GetRoot(message); - const ObjectID object_id = ObjectID::from_binary(object_header->object_id()->str()); + const ObjectID object_id = ObjectID::FromBinary(object_header->object_id()->str()); uint64_t chunk_index = object_header->chunk_index(); uint64_t data_size = object_header->data_size(); uint64_t metadata_size = object_header->metadata_size(); @@ -941,7 +941,7 @@ void ObjectManager::SpreadFreeObjectRequest(const std::vector &object_ ProfileTableDataT ObjectManager::GetAndResetProfilingInfo() { ProfileTableDataT profile_info; profile_info.component_type = "object_manager"; - profile_info.component_id = client_id_.binary(); + profile_info.component_id = client_id_.Binary(); for (auto const &profile_event : profile_events_) { profile_info.profile_events.emplace_back(new ProfileEventT(profile_event)); diff --git a/src/ray/object_manager/object_manager.h b/src/ray/object_manager/object_manager.h index 0d301052358f..cb0cff83f349 100644 --- a/src/ray/object_manager/object_manager.h +++ b/src/ray/object_manager/object_manager.h @@ -394,7 +394,7 @@ class ObjectManager : public ObjectManagerInterface { /// This is used as the callback identifier in Pull for /// SubscribeObjectLocations. We only need one identifier because we never need to /// subscribe multiple times to the same object during Pull. - UniqueID object_directory_pull_callback_id_ = UniqueID::from_random(); + UniqueID object_directory_pull_callback_id_ = UniqueID::FromRandom(); /// A set of active wait requests. std::unordered_map active_wait_requests_; diff --git a/src/ray/object_manager/test/object_manager_stress_test.cc b/src/ray/object_manager/test/object_manager_stress_test.cc index 91b0ffc3d576..6d7c0be0f856 100644 --- a/src/ray/object_manager/test/object_manager_stress_test.cc +++ b/src/ray/object_manager/test/object_manager_stress_test.cc @@ -121,8 +121,8 @@ class TestObjectManagerBase : public ::testing::Test { flushall_redis(); // start store - store_id_1 = StartStore(UniqueID::from_random().hex()); - store_id_2 = StartStore(UniqueID::from_random().hex()); + store_id_1 = StartStore(UniqueID::FromRandom().Hex()); + store_id_2 = StartStore(UniqueID::FromRandom().Hex()); uint pull_timeout_ms = 1000; int max_sends_a = 2; @@ -174,14 +174,14 @@ class TestObjectManagerBase : public ::testing::Test { } ObjectID WriteDataToClient(plasma::PlasmaClient &client, int64_t data_size) { - ObjectID object_id = ObjectID::from_random(); + ObjectID object_id = ObjectID::FromRandom(); RAY_LOG(DEBUG) << "ObjectID Created: " << object_id; uint8_t metadata[] = {5}; int64_t metadata_size = sizeof(metadata); std::shared_ptr data; - RAY_ARROW_CHECK_OK(client.Create(object_id.to_plasma_id(), data_size, metadata, - metadata_size, &data)); - RAY_ARROW_CHECK_OK(client.Seal(object_id.to_plasma_id())); + RAY_ARROW_CHECK_OK( + client.Create(object_id.ToPlasmaId(), data_size, metadata, metadata_size, &data)); + RAY_ARROW_CHECK_OK(client.Seal(object_id.ToPlasmaId())); return object_id; } @@ -242,7 +242,7 @@ class StressTestObjectManager : public TestObjectManagerBase { client_id_2 = gcs_client_2->client_table().GetLocalClientId(); gcs_client_1->client_table().RegisterClientAddedCallback([this]( gcs::AsyncGcsClient *client, const ClientID &id, const ClientTableDataT &data) { - ClientID parsed_id = ClientID::from_binary(data.client_id); + ClientID parsed_id = ClientID::FromBinary(data.client_id); if (parsed_id == client_id_1 || parsed_id == client_id_2) { num_connected_clients += 1; } @@ -262,7 +262,7 @@ class StressTestObjectManager : public TestObjectManagerBase { ray::Status status = ray::Status::OK(); status = server1->object_manager_.SubscribeObjAdded( [this](const object_manager::protocol::ObjectInfoT &object_info) { - object_added_handler_1(ObjectID::from_binary(object_info.object_id)); + object_added_handler_1(ObjectID::FromBinary(object_info.object_id)); if (v1.size() == num_expected_objects && v1.size() == v2.size()) { TransferTestComplete(); } @@ -270,7 +270,7 @@ class StressTestObjectManager : public TestObjectManagerBase { RAY_CHECK_OK(status); status = server2->object_manager_.SubscribeObjAdded( [this](const object_manager::protocol::ObjectInfoT &object_info) { - object_added_handler_2(ObjectID::from_binary(object_info.object_id)); + object_added_handler_2(ObjectID::FromBinary(object_info.object_id)); if (v2.size() == num_expected_objects && v1.size() == v2.size()) { TransferTestComplete(); } @@ -290,7 +290,7 @@ class StressTestObjectManager : public TestObjectManagerBase { plasma::ObjectBuffer GetObject(plasma::PlasmaClient &client, ObjectID &object_id) { plasma::ObjectBuffer object_buffer; - plasma::ObjectID plasma_id = object_id.to_plasma_id(); + plasma::ObjectID plasma_id = object_id.ToPlasmaId(); RAY_ARROW_CHECK_OK(client.Get(&plasma_id, 1, 0, &object_buffer)); return object_buffer; } @@ -298,7 +298,7 @@ class StressTestObjectManager : public TestObjectManagerBase { static unsigned char *GetDigest(plasma::PlasmaClient &client, ObjectID &object_id) { const int64_t size = sizeof(uint64_t); static unsigned char digest_1[size]; - RAY_ARROW_CHECK_OK(client.Hash(object_id.to_plasma_id(), &digest_1[0])); + RAY_ARROW_CHECK_OK(client.Hash(object_id.ToPlasmaId(), &digest_1[0])); return digest_1; } @@ -439,12 +439,12 @@ class StressTestObjectManager : public TestObjectManagerBase { << "\n"; ClientTableDataT data; gcs_client_1->client_table().GetClient(client_id_1, data); - RAY_LOG(DEBUG) << "ClientID=" << ClientID::from_binary(data.client_id) << "\n" + RAY_LOG(DEBUG) << "ClientID=" << ClientID::FromBinary(data.client_id) << "\n" << "ClientIp=" << data.node_manager_address << "\n" << "ClientPort=" << data.node_manager_port; ClientTableDataT data2; gcs_client_1->client_table().GetClient(client_id_2, data2); - RAY_LOG(DEBUG) << "ClientID=" << ClientID::from_binary(data2.client_id) << "\n" + RAY_LOG(DEBUG) << "ClientID=" << ClientID::FromBinary(data2.client_id) << "\n" << "ClientIp=" << data2.node_manager_address << "\n" << "ClientPort=" << data2.node_manager_port; } diff --git a/src/ray/object_manager/test/object_manager_test.cc b/src/ray/object_manager/test/object_manager_test.cc index 98eeb9186192..983a8fa7bc05 100644 --- a/src/ray/object_manager/test/object_manager_test.cc +++ b/src/ray/object_manager/test/object_manager_test.cc @@ -114,8 +114,8 @@ class TestObjectManagerBase : public ::testing::Test { flushall_redis(); // start store - store_id_1 = StartStore(UniqueID::from_random().hex()); - store_id_2 = StartStore(UniqueID::from_random().hex()); + store_id_1 = StartStore(UniqueID::FromRandom().Hex()); + store_id_2 = StartStore(UniqueID::FromRandom().Hex()); uint pull_timeout_ms = 1; push_timeout_ms = 1000; @@ -162,7 +162,7 @@ class TestObjectManagerBase : public ::testing::Test { } ObjectID WriteDataToClient(plasma::PlasmaClient &client, int64_t data_size) { - return WriteDataToClient(client, data_size, ObjectID::from_random()); + return WriteDataToClient(client, data_size, ObjectID::FromRandom()); } ObjectID WriteDataToClient(plasma::PlasmaClient &client, int64_t data_size, @@ -171,9 +171,9 @@ class TestObjectManagerBase : public ::testing::Test { uint8_t metadata[] = {5}; int64_t metadata_size = sizeof(metadata); std::shared_ptr data; - RAY_ARROW_CHECK_OK(client.Create(object_id.to_plasma_id(), data_size, metadata, - metadata_size, &data)); - RAY_ARROW_CHECK_OK(client.Seal(object_id.to_plasma_id())); + RAY_ARROW_CHECK_OK( + client.Create(object_id.ToPlasmaId(), data_size, metadata, metadata_size, &data)); + RAY_ARROW_CHECK_OK(client.Seal(object_id.ToPlasmaId())); return object_id; } @@ -221,7 +221,7 @@ class TestObjectManager : public TestObjectManagerBase { client_id_2 = gcs_client_2->client_table().GetLocalClientId(); gcs_client_1->client_table().RegisterClientAddedCallback([this]( gcs::AsyncGcsClient *client, const ClientID &id, const ClientTableDataT &data) { - ClientID parsed_id = ClientID::from_binary(data.client_id); + ClientID parsed_id = ClientID::FromBinary(data.client_id); if (parsed_id == client_id_1 || parsed_id == client_id_2) { num_connected_clients += 1; } @@ -240,13 +240,13 @@ class TestObjectManager : public TestObjectManagerBase { ray::Status status = ray::Status::OK(); status = server1->object_manager_.SubscribeObjAdded( [this](const object_manager::protocol::ObjectInfoT &object_info) { - object_added_handler_1(ObjectID::from_binary(object_info.object_id)); + object_added_handler_1(ObjectID::FromBinary(object_info.object_id)); NotificationTestCompleteIfSatisfied(); }); RAY_CHECK_OK(status); status = server2->object_manager_.SubscribeObjAdded( [this](const object_manager::protocol::ObjectInfoT &object_info) { - object_added_handler_2(ObjectID::from_binary(object_info.object_id)); + object_added_handler_2(ObjectID::FromBinary(object_info.object_id)); NotificationTestCompleteIfSatisfied(); }); RAY_CHECK_OK(status); @@ -254,11 +254,11 @@ class TestObjectManager : public TestObjectManagerBase { uint data_size = 1000000; // dummy_id is not local. The push function will timeout. - ObjectID dummy_id = ObjectID::from_random(); + ObjectID dummy_id = ObjectID::FromRandom(); server1->object_manager_.Push(dummy_id, gcs_client_2->client_table().GetLocalClientId()); - created_object_id1 = ObjectID::from_random(); + created_object_id1 = ObjectID::FromRandom(); WriteDataToClient(client1, data_size, created_object_id1); // Server1 holds Object1 so this Push call will success. server1->object_manager_.Push(created_object_id1, @@ -268,7 +268,7 @@ class TestObjectManager : public TestObjectManagerBase { timer.reset(new boost::asio::deadline_timer(main_service)); auto period = boost::posix_time::milliseconds(push_timeout_ms + 10); timer->expires_from_now(period); - created_object_id2 = ObjectID::from_random(); + created_object_id2 = ObjectID::FromRandom(); timer->async_wait([this, data_size](const boost::system::error_code &error) { WriteDataToClient(client2, data_size, created_object_id2); }); @@ -288,7 +288,7 @@ class TestObjectManager : public TestObjectManagerBase { // object. ObjectID object_1 = WriteDataToClient(client2, data_size); ObjectID object_2 = WriteDataToClient(client2, data_size); - UniqueID sub_id = ray::UniqueID::from_random(); + UniqueID sub_id = ray::UniqueID::FromRandom(); RAY_CHECK_OK(server1->object_manager_.object_directory_->SubscribeObjectLocations( sub_id, object_1, [this, sub_id, object_1, object_2]( @@ -307,7 +307,7 @@ class TestObjectManager : public TestObjectManagerBase { std::vector object_ids = {object_1, object_2}; boost::posix_time::ptime start_time = boost::posix_time::second_clock::local_time(); - UniqueID wait_id = UniqueID::from_random(); + UniqueID wait_id = UniqueID::FromRandom(); RAY_CHECK_OK(server1->object_manager_.AddWaitRequest( wait_id, object_ids, timeout_ms, required_objects, false, @@ -378,7 +378,7 @@ class TestObjectManager : public TestObjectManagerBase { } if (include_nonexistent) { num_objects += 1; - object_ids.push_back(ObjectID::from_random()); + object_ids.push_back(ObjectID::FromRandom()); } boost::posix_time::ptime start_time = boost::posix_time::second_clock::local_time(); RAY_CHECK_OK(server1->object_manager_.Wait( @@ -457,17 +457,17 @@ class TestObjectManager : public TestObjectManagerBase { << "\n"; ClientTableDataT data; gcs_client_1->client_table().GetClient(client_id_1, data); - RAY_LOG(DEBUG) << (ClientID::from_binary(data.client_id).is_nil()); - RAY_LOG(DEBUG) << "Server 1 ClientID=" << ClientID::from_binary(data.client_id); + RAY_LOG(DEBUG) << (ClientID::FromBinary(data.client_id).IsNil()); + RAY_LOG(DEBUG) << "Server 1 ClientID=" << ClientID::FromBinary(data.client_id); RAY_LOG(DEBUG) << "Server 1 ClientIp=" << data.node_manager_address; RAY_LOG(DEBUG) << "Server 1 ClientPort=" << data.node_manager_port; - ASSERT_EQ(client_id_1, ClientID::from_binary(data.client_id)); + ASSERT_EQ(client_id_1, ClientID::FromBinary(data.client_id)); ClientTableDataT data2; gcs_client_1->client_table().GetClient(client_id_2, data2); - RAY_LOG(DEBUG) << "Server 2 ClientID=" << ClientID::from_binary(data2.client_id); + RAY_LOG(DEBUG) << "Server 2 ClientID=" << ClientID::FromBinary(data2.client_id); RAY_LOG(DEBUG) << "Server 2 ClientIp=" << data2.node_manager_address; RAY_LOG(DEBUG) << "Server 2 ClientPort=" << data2.node_manager_port; - ASSERT_EQ(client_id_2, ClientID::from_binary(data2.client_id)); + ASSERT_EQ(client_id_2, ClientID::FromBinary(data2.client_id)); } }; diff --git a/src/ray/raylet/actor_registration.cc b/src/ray/raylet/actor_registration.cc index 1cc55367ed07..cc587bc4d74e 100644 --- a/src/ray/raylet/actor_registration.cc +++ b/src/ray/raylet/actor_registration.cc @@ -14,28 +14,28 @@ ActorRegistration::ActorRegistration(const ActorTableDataT &actor_table_data) ActorRegistration::ActorRegistration(const ActorTableDataT &actor_table_data, const ActorCheckpointDataT &checkpoint_data) : actor_table_data_(actor_table_data), - execution_dependency_(ObjectID::from_binary(checkpoint_data.execution_dependency)) { + execution_dependency_(ObjectID::FromBinary(checkpoint_data.execution_dependency)) { // Restore `frontier_`. for (size_t i = 0; i < checkpoint_data.handle_ids.size(); i++) { - auto handle_id = ActorHandleID::from_binary(checkpoint_data.handle_ids[i]); + auto handle_id = ActorHandleID::FromBinary(checkpoint_data.handle_ids[i]); auto &frontier_entry = frontier_[handle_id]; frontier_entry.task_counter = checkpoint_data.task_counters[i]; frontier_entry.execution_dependency = - ObjectID::from_binary(checkpoint_data.frontier_dependencies[i]); + ObjectID::FromBinary(checkpoint_data.frontier_dependencies[i]); } // Restore `dummy_objects_`. for (size_t i = 0; i < checkpoint_data.unreleased_dummy_objects.size(); i++) { - auto dummy = ObjectID::from_binary(checkpoint_data.unreleased_dummy_objects[i]); + auto dummy = ObjectID::FromBinary(checkpoint_data.unreleased_dummy_objects[i]); dummy_objects_[dummy] = checkpoint_data.num_dummy_object_dependencies[i]; } } const ClientID ActorRegistration::GetNodeManagerId() const { - return ClientID::from_binary(actor_table_data_.node_manager_id); + return ClientID::FromBinary(actor_table_data_.node_manager_id); } const ObjectID ActorRegistration::GetActorCreationDependency() const { - return ObjectID::from_binary(actor_table_data_.actor_creation_dummy_object_id); + return ObjectID::FromBinary(actor_table_data_.actor_creation_dummy_object_id); } const ObjectID ActorRegistration::GetExecutionDependency() const { @@ -43,7 +43,7 @@ const ObjectID ActorRegistration::GetExecutionDependency() const { } const DriverID ActorRegistration::GetDriverId() const { - return DriverID::from_binary(actor_table_data_.driver_id); + return DriverID::FromBinary(actor_table_data_.driver_id); } const int64_t ActorRegistration::GetMaxReconstructions() const { @@ -65,7 +65,7 @@ ObjectID ActorRegistration::ExtendFrontier(const ActorHandleID &handle_id, // Release the reference to the previous cursor for this // actor handle, if there was one. ObjectID object_to_release; - if (!frontier_entry.execution_dependency.is_nil()) { + if (!frontier_entry.execution_dependency.IsNil()) { auto it = dummy_objects_.find(frontier_entry.execution_dependency); RAY_CHECK(it != dummy_objects_.end()); it->second--; @@ -110,16 +110,16 @@ std::shared_ptr ActorRegistration::GenerateCheckpointData( // Use actor's current state to generate checkpoint data. auto checkpoint_data = std::make_shared(); - checkpoint_data->actor_id = actor_id.binary(); - checkpoint_data->execution_dependency = copy.GetExecutionDependency().binary(); + checkpoint_data->actor_id = actor_id.Binary(); + checkpoint_data->execution_dependency = copy.GetExecutionDependency().Binary(); for (const auto &frontier : copy.GetFrontier()) { - checkpoint_data->handle_ids.push_back(frontier.first.binary()); + checkpoint_data->handle_ids.push_back(frontier.first.Binary()); checkpoint_data->task_counters.push_back(frontier.second.task_counter); checkpoint_data->frontier_dependencies.push_back( - frontier.second.execution_dependency.binary()); + frontier.second.execution_dependency.Binary()); } for (const auto &entry : copy.GetDummyObjects()) { - checkpoint_data->unreleased_dummy_objects.push_back(entry.first.binary()); + checkpoint_data->unreleased_dummy_objects.push_back(entry.first.Binary()); checkpoint_data->num_dummy_object_dependencies.push_back(entry.second); } return checkpoint_data; diff --git a/src/ray/raylet/client_connection_test.cc b/src/ray/raylet/client_connection_test.cc index 361a04de1147..952088bb2f4b 100644 --- a/src/ray/raylet/client_connection_test.cc +++ b/src/ray/raylet/client_connection_test.cc @@ -180,7 +180,7 @@ TEST_F(ClientConnectionTest, ProcessBadMessage) { "reader", {}, error_message_type_); // If client ID is set, bad message would crash the test. - // reader->SetClientID(UniqueID::from_random()); + // reader->SetClientID(UniqueID::FromRandom()); // Intentionally write a message with incorrect cookie. // Verify it won't crash as long as client ID is not set. diff --git a/src/ray/raylet/lib/java/org_ray_runtime_raylet_RayletClientImpl.cc b/src/ray/raylet/lib/java/org_ray_runtime_raylet_RayletClientImpl.cc index ac32911ef2d0..ba9fef4f44d6 100644 --- a/src/ray/raylet/lib/java/org_ray_runtime_raylet_RayletClientImpl.cc +++ b/src/ray/raylet/lib/java/org_ray_runtime_raylet_RayletClientImpl.cc @@ -12,10 +12,10 @@ class UniqueIdFromJByteArray { const ID &GetId() const { return id; } UniqueIdFromJByteArray(JNIEnv *env, const jbyteArray &bytes) { - std::string id_str(ID::size(), 0); - env->GetByteArrayRegion(bytes, 0, ID::size(), + std::string id_str(ID::Size(), 0); + env->GetByteArrayRegion(bytes, 0, ID::Size(), reinterpret_cast(&id_str.front())); - id = ID::from_binary(id_str); + id = ID::FromBinary(id_str); } private: @@ -231,12 +231,12 @@ Java_org_ray_runtime_raylet_RayletClientImpl_nativeGenerateTaskId( TaskID task_id = ray::GenerateTaskId(driver_id.GetId(), parent_task_id.GetId(), parent_task_counter); - jbyteArray result = env->NewByteArray(task_id.size()); + jbyteArray result = env->NewByteArray(task_id.Size()); if (nullptr == result) { return nullptr; } - env->SetByteArrayRegion(result, 0, task_id.size(), - reinterpret_cast(task_id.data())); + env->SetByteArrayRegion(result, 0, task_id.Size(), + reinterpret_cast(task_id.Data())); return result; } @@ -280,9 +280,9 @@ Java_org_ray_runtime_raylet_RayletClientImpl_nativePrepareCheckpoint(JNIEnv *env if (ThrowRayExceptionIfNotOK(env, status)) { return nullptr; } - jbyteArray result = env->NewByteArray(checkpoint_id.size()); - env->SetByteArrayRegion(result, 0, checkpoint_id.size(), - reinterpret_cast(checkpoint_id.data())); + jbyteArray result = env->NewByteArray(checkpoint_id.Size()); + env->SetByteArrayRegion(result, 0, checkpoint_id.Size(), + reinterpret_cast(checkpoint_id.Data())); return result; } diff --git a/src/ray/raylet/lineage_cache.cc b/src/ray/raylet/lineage_cache.cc index 4c3fac24f19e..910c3481bf58 100644 --- a/src/ray/raylet/lineage_cache.cc +++ b/src/ray/raylet/lineage_cache.cc @@ -48,7 +48,7 @@ void LineageEntry::ComputeParentTaskIds() { parent_task_ids_.clear(); // A task's parents are the tasks that created its arguments. for (const auto &dependency : task_.GetDependencies()) { - parent_task_ids_.insert(dependency.task_id()); + parent_task_ids_.insert(dependency.TaskId()); } } @@ -296,7 +296,7 @@ bool LineageCache::RemoveWaitingTask(const TaskID &task_id) { } void LineageCache::MarkTaskAsForwarded(const TaskID &task_id, const ClientID &node_id) { - RAY_CHECK(!node_id.is_nil()); + RAY_CHECK(!node_id.IsNil()); lineage_.GetEntryMutable(task_id)->MarkExplicitlyForwarded(node_id); } @@ -374,7 +374,7 @@ bool LineageCache::SubscribeTask(const TaskID &task_id) { if (unsubscribed) { // Request notifications for the task if we haven't already requested // notifications for it. - RAY_CHECK_OK(task_pubsub_.RequestNotifications(DriverID::nil(), task_id, client_id_)); + RAY_CHECK_OK(task_pubsub_.RequestNotifications(DriverID::Nil(), task_id, client_id_)); } // Return whether we were previously unsubscribed to this task and are now // subscribed. @@ -387,7 +387,7 @@ bool LineageCache::UnsubscribeTask(const TaskID &task_id) { if (subscribed) { // Cancel notifications for the task if we previously requested // notifications for it. - RAY_CHECK_OK(task_pubsub_.CancelNotifications(DriverID::nil(), task_id, client_id_)); + RAY_CHECK_OK(task_pubsub_.CancelNotifications(DriverID::Nil(), task_id, client_id_)); subscribed_tasks_.erase(it); } // Return whether we were previously subscribed to this task and are now diff --git a/src/ray/raylet/lineage_cache_test.cc b/src/ray/raylet/lineage_cache_test.cc index af411066e914..a61ae846a925 100644 --- a/src/ray/raylet/lineage_cache_test.cc +++ b/src/ray/raylet/lineage_cache_test.cc @@ -43,7 +43,7 @@ class MockGcs : public gcs::TableInterface, notification_callback_(client, task_id, data); } }; - return Add(DriverID::nil(), task_id, task_data, callback); + return Add(DriverID::Nil(), task_id, task_data, callback); } Status RequestNotifications(const DriverID &driver_id, const TaskID &task_id, @@ -91,7 +91,7 @@ class LineageCacheTest : public ::testing::Test { LineageCacheTest() : max_lineage_size_(10), mock_gcs_(), - lineage_cache_(ClientID::from_random(), mock_gcs_, mock_gcs_, max_lineage_size_) { + lineage_cache_(ClientID::FromRandom(), mock_gcs_, mock_gcs_, max_lineage_size_) { mock_gcs_.Subscribe([this](ray::gcs::AsyncGcsClient *client, const TaskID &task_id, const ray::protocol::TaskT &data) { lineage_cache_.HandleEntryCommitted(task_id); @@ -113,7 +113,7 @@ static inline Task ExampleTask(const std::vector &arguments, task_arguments.emplace_back(std::make_shared(references)); } std::vector function_descriptor(3); - auto spec = TaskSpecification(DriverID::nil(), TaskID::from_random(), 0, task_arguments, + auto spec = TaskSpecification(DriverID::Nil(), TaskID::FromRandom(), 0, task_arguments, num_returns, required_resources, Language::PYTHON, function_descriptor); auto execution_spec = TaskExecutionSpecification(std::vector()); @@ -160,7 +160,7 @@ TEST_F(LineageCacheTest, TestGetUncommittedLineageOrDie) { // Get the uncommitted lineage for the last task (the leaf) of one of the chains. auto uncommitted_lineage = - lineage_cache_.GetUncommittedLineageOrDie(task_ids1.back(), ClientID::nil()); + lineage_cache_.GetUncommittedLineageOrDie(task_ids1.back(), ClientID::Nil()); // Check that the uncommitted lineage is exactly equal to the first chain of tasks. ASSERT_EQ(task_ids1.size(), uncommitted_lineage.GetEntries().size()); for (auto &task_id : task_ids1) { @@ -181,7 +181,7 @@ TEST_F(LineageCacheTest, TestGetUncommittedLineageOrDie) { // Get the uncommitted lineage for the inserted task. uncommitted_lineage = lineage_cache_.GetUncommittedLineageOrDie( - combined_task_ids.back(), ClientID::nil()); + combined_task_ids.back(), ClientID::Nil()); // Check that the uncommitted lineage is exactly equal to the entire set of // tasks inserted so far. ASSERT_EQ(combined_task_ids.size(), uncommitted_lineage.GetEntries().size()); @@ -200,8 +200,8 @@ TEST_F(LineageCacheTest, TestMarkTaskAsForwarded) { task_ids.push_back(task.GetTaskSpecification().TaskId()); } - auto node_id = ClientID::from_random(); - auto node_id2 = ClientID::from_random(); + auto node_id = ClientID::FromRandom(); + auto node_id2 = ClientID::FromRandom(); auto forwarded_task_id = task_ids[task_ids.size() - 2]; auto remaining_task_id = task_ids[task_ids.size() - 1]; lineage_cache_.MarkTaskAsForwarded(forwarded_task_id, node_id); @@ -285,7 +285,7 @@ TEST_F(LineageCacheTest, TestEvictChain) { mock_gcs_.Flush(); ASSERT_EQ(lineage_cache_ .GetUncommittedLineageOrDie(tasks.back().GetTaskSpecification().TaskId(), - ClientID::nil()) + ClientID::Nil()) .GetEntries() .size(), tasks.size()); @@ -298,7 +298,7 @@ TEST_F(LineageCacheTest, TestEvictChain) { mock_gcs_.Flush(); ASSERT_EQ(lineage_cache_ .GetUncommittedLineageOrDie(tasks.back().GetTaskSpecification().TaskId(), - ClientID::nil()) + ClientID::Nil()) .GetEntries() .size(), tasks.size()); @@ -335,7 +335,7 @@ TEST_F(LineageCacheTest, TestEvictManyParents) { ASSERT_EQ(lineage_cache_.GetLineage().GetEntries().size(), total_tasks); ASSERT_EQ(lineage_cache_ .GetUncommittedLineageOrDie(child_task.GetTaskSpecification().TaskId(), - ClientID::nil()) + ClientID::Nil()) .GetEntries() .size(), total_tasks); @@ -351,7 +351,7 @@ TEST_F(LineageCacheTest, TestEvictManyParents) { ASSERT_EQ(lineage_cache_.GetLineage().GetEntries().size(), total_tasks); ASSERT_EQ(lineage_cache_ .GetUncommittedLineageOrDie( - child_task.GetTaskSpecification().TaskId(), ClientID::nil()) + child_task.GetTaskSpecification().TaskId(), ClientID::Nil()) .GetEntries() .size(), total_tasks); @@ -376,7 +376,7 @@ TEST_F(LineageCacheTest, TestForwardTasksRoundTrip) { const auto task_id = it->GetTaskSpecification().TaskId(); // Simulate removing the task and forwarding it to another node. auto uncommitted_lineage = - lineage_cache_.GetUncommittedLineageOrDie(task_id, ClientID::nil()); + lineage_cache_.GetUncommittedLineageOrDie(task_id, ClientID::Nil()); ASSERT_TRUE(lineage_cache_.RemoveWaitingTask(task_id)); // Simulate receiving the task again. Make sure we can add the task back. flatbuffers::FlatBufferBuilder fbb; @@ -400,7 +400,7 @@ TEST_F(LineageCacheTest, TestForwardTask) { tasks.erase(it); auto task_id_to_remove = forwarded_task.GetTaskSpecification().TaskId(); auto uncommitted_lineage = - lineage_cache_.GetUncommittedLineageOrDie(task_id_to_remove, ClientID::nil()); + lineage_cache_.GetUncommittedLineageOrDie(task_id_to_remove, ClientID::Nil()); ASSERT_TRUE(lineage_cache_.RemoveWaitingTask(task_id_to_remove)); ASSERT_EQ(lineage_cache_.GetLineage().GetEntries().size(), 3); @@ -450,7 +450,7 @@ TEST_F(LineageCacheTest, TestEviction) { // uncommitted lineage. const auto last_task_id = tasks.back().GetTaskSpecification().TaskId(); auto uncommitted_lineage = - lineage_cache_.GetUncommittedLineageOrDie(last_task_id, ClientID::nil()); + lineage_cache_.GetUncommittedLineageOrDie(last_task_id, ClientID::Nil()); ASSERT_EQ(uncommitted_lineage.GetEntries().size(), lineage_size); // Simulate executing the first task on a remote node and adding it to the @@ -484,7 +484,7 @@ TEST_F(LineageCacheTest, TestEviction) { // All tasks have now been flushed. Check that enough lineage has been // evicted that the uncommitted lineage is now less than the maximum size. uncommitted_lineage = - lineage_cache_.GetUncommittedLineageOrDie(last_task_id, ClientID::nil()); + lineage_cache_.GetUncommittedLineageOrDie(last_task_id, ClientID::Nil()); ASSERT_TRUE(uncommitted_lineage.GetEntries().size() < max_lineage_size_); // The remaining task should have no uncommitted lineage. ASSERT_EQ(uncommitted_lineage.GetEntries().size(), 1); @@ -510,7 +510,7 @@ TEST_F(LineageCacheTest, TestOutOfOrderEviction) { // uncommitted lineage. const auto last_task_id = tasks.back().GetTaskSpecification().TaskId(); auto uncommitted_lineage = - lineage_cache_.GetUncommittedLineageOrDie(last_task_id, ClientID::nil()); + lineage_cache_.GetUncommittedLineageOrDie(last_task_id, ClientID::Nil()); ASSERT_EQ(uncommitted_lineage.GetEntries().size(), lineage_size); ASSERT_EQ(lineage_cache_.GetLineage().GetEntries().size(), lineage_size); diff --git a/src/ray/raylet/monitor.cc b/src/ray/raylet/monitor.cc index 1e20fe3f4131..171b4dc9439e 100644 --- a/src/ray/raylet/monitor.cc +++ b/src/ray/raylet/monitor.cc @@ -35,7 +35,7 @@ void Monitor::Start() { HandleHeartbeat(id, heartbeat_data); }; RAY_CHECK_OK(gcs_client_.heartbeat_table().Subscribe( - DriverID::nil(), ClientID::nil(), heartbeat_callback, nullptr, nullptr)); + DriverID::Nil(), ClientID::Nil(), heartbeat_callback, nullptr, nullptr)); Tick(); } @@ -52,7 +52,7 @@ void Monitor::Tick() { const std::vector &all_data) { bool marked = false; for (const auto &data : all_data) { - if (client_id.binary() == data.client_id && + if (client_id.Binary() == data.client_id && data.entry_type == EntryType::DELETION) { // The node has been marked dead by itself. marked = true; @@ -70,7 +70,7 @@ void Monitor::Tick() { << " has missed too many heartbeats from it."; // We use the nil DriverID to broadcast the message to all drivers. RAY_CHECK_OK(gcs_client_.error_table().PushErrorToDriver( - DriverID::nil(), type, error_message.str(), current_time_ms())); + DriverID::Nil(), type, error_message.str(), current_time_ms())); } }; RAY_CHECK_OK(gcs_client_.client_table().Lookup(lookup_callback)); @@ -89,7 +89,7 @@ void Monitor::Tick() { batch->batch.push_back(std::unique_ptr( new HeartbeatTableDataT(heartbeat.second))); } - RAY_CHECK_OK(gcs_client_.heartbeat_batch_table().Add(DriverID::nil(), ClientID::nil(), + RAY_CHECK_OK(gcs_client_.heartbeat_batch_table().Add(DriverID::Nil(), ClientID::Nil(), batch, nullptr)); heartbeat_buffer_.clear(); } diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 2e25407f12fb..1b582d7617cb 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -110,7 +110,7 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, RAY_CHECK_OK(object_manager_.SubscribeObjAdded( [this](const object_manager::protocol::ObjectInfoT &object_info) { - ObjectID object_id = ObjectID::from_binary(object_info.object_id); + ObjectID object_id = ObjectID::FromBinary(object_info.object_id); HandleObjectLocal(object_id); })); RAY_CHECK_OK(object_manager_.SubscribeObjDeleted( @@ -131,13 +131,13 @@ ray::Status NodeManager::RegisterGcs() { lineage_cache_.HandleEntryCommitted(task_id); }; RAY_RETURN_NOT_OK(gcs_client_->raylet_task_table().Subscribe( - DriverID::nil(), gcs_client_->client_table().GetLocalClientId(), + DriverID::Nil(), gcs_client_->client_table().GetLocalClientId(), task_committed_callback, nullptr, nullptr)); const auto task_lease_notification_callback = [this](gcs::AsyncGcsClient *client, const TaskID &task_id, const TaskLeaseDataT &task_lease) { - const ClientID node_manager_id = ClientID::from_binary(task_lease.node_manager_id); + const ClientID node_manager_id = ClientID::FromBinary(task_lease.node_manager_id); if (gcs_client_->client_table().IsRemoved(node_manager_id)) { // The node manager that added the task lease is already removed. The // lease is considered inactive. @@ -155,7 +155,7 @@ ray::Status NodeManager::RegisterGcs() { reconstruction_policy_.HandleTaskLeaseNotification(task_id, 0); }; RAY_RETURN_NOT_OK(gcs_client_->task_lease_table().Subscribe( - DriverID::nil(), gcs_client_->client_table().GetLocalClientId(), + DriverID::Nil(), gcs_client_->client_table().GetLocalClientId(), task_lease_notification_callback, task_lease_empty_callback, nullptr)); // Register a callback to handle actor notifications. @@ -170,7 +170,7 @@ ray::Status NodeManager::RegisterGcs() { }; RAY_RETURN_NOT_OK(gcs_client_->actor_table().Subscribe( - DriverID::nil(), ClientID::nil(), actor_notification_callback, nullptr)); + DriverID::Nil(), ClientID::Nil(), actor_notification_callback, nullptr)); // Register a callback on the client table for new clients. auto node_manager_client_added = [this](gcs::AsyncGcsClient *client, const UniqueID &id, @@ -208,7 +208,7 @@ ray::Status NodeManager::RegisterGcs() { HeartbeatBatchAdded(heartbeat_batch); }; RAY_RETURN_NOT_OK(gcs_client_->heartbeat_batch_table().Subscribe( - DriverID::nil(), ClientID::nil(), heartbeat_batch_added, + DriverID::Nil(), ClientID::Nil(), heartbeat_batch_added, /*subscribe_callback=*/nullptr, /*done_callback=*/nullptr)); @@ -219,7 +219,7 @@ ray::Status NodeManager::RegisterGcs() { HandleDriverTableUpdate(client_id, driver_data); }; RAY_RETURN_NOT_OK(gcs_client_->driver_table().Subscribe( - DriverID::nil(), ClientID::nil(), driver_table_handler, nullptr)); + DriverID::Nil(), ClientID::Nil(), driver_table_handler, nullptr)); // Start sending heartbeats to the GCS. last_heartbeat_at_ms_ = current_time_ms(); @@ -253,10 +253,10 @@ void NodeManager::KillWorker(std::shared_ptr worker) { void NodeManager::HandleDriverTableUpdate( const DriverID &id, const std::vector &driver_data) { for (const auto &entry : driver_data) { - RAY_LOG(DEBUG) << "HandleDriverTableUpdate " << UniqueID::from_binary(entry.driver_id) + RAY_LOG(DEBUG) << "HandleDriverTableUpdate " << UniqueID::FromBinary(entry.driver_id) << " " << entry.is_dead; if (entry.is_dead) { - auto driver_id = DriverID::from_binary(entry.driver_id); + auto driver_id = DriverID::FromBinary(entry.driver_id); auto workers = worker_pool_.GetWorkersRunningTasksForDriver(driver_id); // Kill all the workers. The actual cleanup for these workers is done @@ -291,7 +291,7 @@ void NodeManager::Heartbeat() { auto heartbeat_data = std::make_shared(); const auto &my_client_id = gcs_client_->client_table().GetLocalClientId(); SchedulingResources &local_resources = cluster_resource_map_[my_client_id]; - heartbeat_data->client_id = my_client_id.binary(); + heartbeat_data->client_id = my_client_id.Binary(); // TODO(atumanov): modify the heartbeat table protocol to use the ResourceSet directly. // TODO(atumanov): implement a ResourceSet const_iterator. for (const auto &resource_pair : @@ -311,7 +311,7 @@ void NodeManager::Heartbeat() { } ray::Status status = heartbeat_table.Add( - DriverID::nil(), gcs_client_->client_table().GetLocalClientId(), heartbeat_data, + DriverID::Nil(), gcs_client_->client_table().GetLocalClientId(), heartbeat_data, /*success_callback=*/nullptr); RAY_CHECK_OK_PREPEND(status, "Heartbeat failed"); @@ -359,7 +359,7 @@ void NodeManager::GetObjectManagerProfileInfo() { } void NodeManager::ClientAdded(const ClientTableDataT &client_data) { - const ClientID client_id = ClientID::from_binary(client_data.client_id); + const ClientID client_id = ClientID::FromBinary(client_data.client_id); RAY_LOG(DEBUG) << "[ClientAdded] Received callback from client id " << client_id; if (client_id == gcs_client_->client_table().GetLocalClientId()) { @@ -393,7 +393,7 @@ void NodeManager::ClientAdded(const ClientTableDataT &client_data) { << ". This may be since the node was recently removed."; // We use the nil DriverID to broadcast the message to all drivers. RAY_CHECK_OK(gcs_client_->error_table().PushErrorToDriver( - DriverID::nil(), type, error_message.str(), current_time_ms())); + DriverID::Nil(), type, error_message.str(), current_time_ms())); return; } @@ -432,7 +432,7 @@ ray::Status NodeManager::ConnectRemoteNodeManager(const ClientID &client_id, void NodeManager::ClientRemoved(const ClientTableDataT &client_data) { // TODO(swang): If we receive a notification for our own death, clean up and // exit immediately. - const ClientID client_id = ClientID::from_binary(client_data.client_id); + const ClientID client_id = ClientID::FromBinary(client_data.client_id); RAY_LOG(DEBUG) << "[ClientRemoved] Received callback from client id " << client_id; RAY_CHECK(client_id != gcs_client_->client_table().GetLocalClientId()) @@ -478,7 +478,7 @@ void NodeManager::ClientRemoved(const ClientTableDataT &client_data) { } void NodeManager::ResourceCreateUpdated(const ClientTableDataT &client_data) { - const ClientID client_id = ClientID::from_binary(client_data.client_id); + const ClientID client_id = ClientID::FromBinary(client_data.client_id); const ClientID &local_client_id = gcs_client_->client_table().GetLocalClientId(); RAY_LOG(DEBUG) << "[ResourceCreateUpdated] received callback from client id " @@ -514,7 +514,7 @@ void NodeManager::ResourceCreateUpdated(const ClientTableDataT &client_data) { } void NodeManager::ResourceDeleted(const ClientTableDataT &client_data) { - const ClientID client_id = ClientID::from_binary(client_data.client_id); + const ClientID client_id = ClientID::FromBinary(client_data.client_id); const ClientID &local_client_id = gcs_client_->client_table().GetLocalClientId(); ResourceSet new_res_set(client_data.resources_total_label, @@ -608,7 +608,7 @@ void NodeManager::HeartbeatBatchAdded(const HeartbeatBatchTableDataT &heartbeat_ const ClientID &local_client_id = gcs_client_->client_table().GetLocalClientId(); // Update load information provided by each heartbeat. for (const auto &heartbeat_data : heartbeat_batch.batch) { - const ClientID &client_id = ClientID::from_binary(heartbeat_data->client_id); + const ClientID &client_id = ClientID::FromBinary(heartbeat_data->client_id); if (client_id == local_client_id) { // Skip heartbeats from self. continue; @@ -638,12 +638,12 @@ void NodeManager::PublishActorStateTransition( const ActorTableDataT &data) { auto redis_context = client->primary_context(); if (data.state == ActorState::DEAD || data.state == ActorState::RECONSTRUCTING) { - std::vector args = {"XADD", id.hex(), "*", "signal", + std::vector args = {"XADD", id.Hex(), "*", "signal", "ACTOR_DIED_SIGNAL"}; RAY_CHECK_OK(redis_context->RunArgvAsync(args)); } }; - RAY_CHECK_OK(gcs_client_->actor_table().AppendAt(DriverID::nil(), actor_id, + RAY_CHECK_OK(gcs_client_->actor_table().AppendAt(DriverID::Nil(), actor_id, actor_notification, success_callback, failure_callback, log_length)); } @@ -852,9 +852,9 @@ void NodeManager::ProcessClientMessage( // Clean up their creating tasks from GCS. std::vector creating_task_ids; for (const auto &object_id : object_ids) { - creating_task_ids.push_back(object_id.task_id()); + creating_task_ids.push_back(object_id.TaskId()); } - gcs_client_->raylet_task_table().Delete(DriverID::nil(), creating_task_ids); + gcs_client_->raylet_task_table().Delete(DriverID::Nil(), creating_task_ids); } } break; case protocol::MessageType::PrepareActorCheckpointRequest: { @@ -945,7 +945,7 @@ void NodeManager::ProcessGetTaskMessage( std::shared_ptr worker = worker_pool_.GetRegisteredWorker(client); RAY_CHECK(worker); // If the worker was assigned a task, mark it as finished. - if (!worker->GetAssignedTaskId().is_nil()) { + if (!worker->GetAssignedTaskId().IsNil()) { FinishAssignedTask(*worker); } // Return the worker to the idle pool. @@ -1003,7 +1003,7 @@ void NodeManager::ProcessDisconnectClientMessage( } const ActorID &actor_id = worker->GetActorId(); - if (!actor_id.is_nil()) { + if (!actor_id.IsNil()) { // If the worker was an actor, update actor state, reconstruct the actor if needed, // and clean up actor's tasks if the actor is permanently dead. HandleDisconnectedActor(actor_id, true, intentional_disconnect); @@ -1012,10 +1012,10 @@ void NodeManager::ProcessDisconnectClientMessage( const TaskID &task_id = worker->GetAssignedTaskId(); // If the worker was running a task, clean up the task and push an error to // the driver, unless the worker is already dead. - if (!task_id.is_nil() && !worker->IsDead()) { + if (!task_id.IsNil() && !worker->IsDead()) { // If the worker was an actor, the task was already cleaned up in // `HandleDisconnectedActor`. - if (actor_id.is_nil()) { + if (actor_id.IsNil()) { const Task &task = local_queues_.RemoveTask(task_id); TreatTaskAsFailed(task, ErrorType::WORKER_DIED); } @@ -1062,7 +1062,7 @@ void NodeManager::ProcessDisconnectClientMessage( gcs_client_->driver_table().AppendDriverData(DriverID(client->GetClientId()), /*is_dead=*/true)); auto driver_id = worker->GetAssignedTaskId(); - RAY_CHECK(!driver_id.is_nil()); + RAY_CHECK(!driver_id.IsNil()); local_queues_.RemoveDriverTaskId(driver_id); worker_pool_.DisconnectDriver(worker); @@ -1197,13 +1197,13 @@ void NodeManager::ProcessPrepareActorCheckpointRequest( const auto task_id = worker->GetAssignedTaskId(); const Task &task = local_queues_.GetTaskOfState(task_id, TaskState::RUNNING); // Generate checkpoint id and data. - ActorCheckpointID checkpoint_id = ActorCheckpointID::from_random(); + ActorCheckpointID checkpoint_id = ActorCheckpointID::FromRandom(); auto checkpoint_data = actor_entry->second.GenerateCheckpointData(actor_entry->first, task); // Write checkpoint data to GCS. RAY_CHECK_OK(gcs_client_->actor_checkpoint_table().Add( - DriverID::nil(), checkpoint_id, checkpoint_data, + DriverID::Nil(), checkpoint_id, checkpoint_data, [worker, actor_id, this](ray::gcs::AsyncGcsClient *client, const ActorCheckpointID &checkpoint_id, const ActorCheckpointDataT &data) { @@ -1212,7 +1212,7 @@ void NodeManager::ProcessPrepareActorCheckpointRequest( // Save this actor-to-checkpoint mapping, and remove old checkpoints associated // with this actor. RAY_CHECK_OK(gcs_client_->actor_checkpoint_id_table().AddCheckpointId( - DriverID::nil(), actor_id, checkpoint_id)); + DriverID::Nil(), actor_id, checkpoint_id)); // Send reply to worker. flatbuffers::FlatBufferBuilder fbb; auto reply = ray::protocol::CreatePrepareActorCheckpointReply( @@ -1293,7 +1293,7 @@ void NodeManager::ProcessSetResourceRequest( ClientID client_id = from_flatbuf(*message->client_id()); // If the python arg was null, set client_id to the local client - if (client_id.is_nil()) { + if (client_id.IsNil()) { client_id = gcs_client_->client_table().GetLocalClientId(); } @@ -1331,7 +1331,7 @@ void NodeManager::ProcessSetResourceRequest( auto data_shared_ptr = std::make_shared(data); auto client_table = gcs_client_->client_table(); RAY_CHECK_OK(gcs_client_->client_table().Append( - DriverID::nil(), client_table.client_log_key_, data_shared_ptr, nullptr)); + DriverID::Nil(), client_table.client_log_key_, data_shared_ptr, nullptr)); } void NodeManager::ScheduleTasks( @@ -1450,7 +1450,7 @@ void NodeManager::TreatTaskAsFailed(const Task &task, const ErrorType &error_typ } const std::string meta = std::to_string(static_cast(error_type)); for (int64_t i = 0; i < num_returns; i++) { - const auto object_id = spec.ReturnId(i).to_plasma_id(); + const auto object_id = spec.ReturnId(i).ToPlasmaId(); arrow::Status status = store_client_.CreateAndSeal(object_id, "", meta); if (!status.ok() && !status.IsPlasmaObjectExists()) { // If we failed to save the error code, log a warning and push an error message @@ -1605,7 +1605,7 @@ void NodeManager::SubmitTask(const Task &task, const Lineage &uncommitted_lineag HandleActorStateTransition(actor_id, ActorRegistration(data.back())); } }; - RAY_CHECK_OK(gcs_client_->actor_table().Lookup(DriverID::nil(), spec.ActorId(), + RAY_CHECK_OK(gcs_client_->actor_table().Lookup(DriverID::Nil(), spec.ActorId(), lookup_callback)); actor_creation_dummy_object = spec.ActorCreationDummyObjectId(); } else { @@ -1796,7 +1796,7 @@ bool NodeManager::AssignTask(const Task &task) { const std::string warning_message = worker_pool_.WarningAboutSize(); if (warning_message != "") { RAY_CHECK_OK(gcs_client_->error_table().PushErrorToDriver( - DriverID::nil(), "worker_pool_large", warning_message, current_time_ms())); + DriverID::Nil(), "worker_pool_large", warning_message, current_time_ms())); } } // We couldn't assign this task, as no worker available. @@ -1875,7 +1875,7 @@ bool NodeManager::AssignTask(const Task &task) { // The execution dependency is initialized to the actor creation task's // return value, and is subsequently updated to the assigned tasks' // return values, so it should never be nil. - RAY_CHECK(!execution_dependency.is_nil()); + RAY_CHECK(!execution_dependency.IsNil()); // Update the task's execution dependencies to reflect the actual // execution order, to support deterministic reconstruction. // NOTE(swang): The update of an actor task's execution dependencies is @@ -1946,11 +1946,11 @@ void NodeManager::FinishAssignedTask(Worker &worker) { task_dependency_manager_.TaskCanceled(task_id); // Unset the worker's assigned task. - worker.AssignTaskId(TaskID::nil()); + worker.AssignTaskId(TaskID::Nil()); // Unset the worker's assigned driver Id if this is not an actor. if (!task.GetTaskSpecification().IsActorCreationTask() && !task.GetTaskSpecification().IsActorTask()) { - worker.AssignDriverId(DriverID::nil()); + worker.AssignDriverId(DriverID::Nil()); } } @@ -1966,10 +1966,10 @@ ActorTableDataT NodeManager::CreateActorTableDataFromCreationTask(const Task &ta if (actor_entry == actor_registry_.end()) { // Set all of the static fields for the actor. These fields will not // change even if the actor fails or is reconstructed. - new_actor_data.actor_id = actor_id.binary(); + new_actor_data.actor_id = actor_id.Binary(); new_actor_data.actor_creation_dummy_object_id = - task.GetTaskSpecification().ActorDummyObject().binary(); - new_actor_data.driver_id = task.GetTaskSpecification().DriverId().binary(); + task.GetTaskSpecification().ActorDummyObject().Binary(); + new_actor_data.driver_id = task.GetTaskSpecification().DriverId().Binary(); new_actor_data.max_reconstructions = task.GetTaskSpecification().MaxActorReconstructions(); // This is the first time that the actor has been created, so the number @@ -1990,7 +1990,7 @@ ActorTableDataT NodeManager::CreateActorTableDataFromCreationTask(const Task &ta // Set the new fields for the actor's state to indicate that the actor is // now alive on this node manager. new_actor_data.node_manager_id = - gcs_client_->client_table().GetLocalClientId().binary(); + gcs_client_->client_table().GetLocalClientId().Binary(); new_actor_data.state = ActorState::ALIVE; return new_actor_data; } @@ -2001,7 +2001,7 @@ void NodeManager::FinishAssignedActorTask(Worker &worker, const Task &task) { bool resumed_from_checkpoint = false; if (task.GetTaskSpecification().IsActorCreationTask()) { actor_id = task.GetTaskSpecification().ActorCreationId(); - actor_handle_id = ActorHandleID::nil(); + actor_handle_id = ActorHandleID::Nil(); if (checkpoint_id_to_restore_.count(actor_id) > 0) { resumed_from_checkpoint = true; } @@ -2024,7 +2024,7 @@ void NodeManager::FinishAssignedActorTask(Worker &worker, const Task &task) { RAY_LOG(DEBUG) << "Looking up checkpoint " << checkpoint_id << " for actor " << actor_id; RAY_CHECK_OK(gcs_client_->actor_checkpoint_table().Lookup( - DriverID::nil(), checkpoint_id, + DriverID::Nil(), checkpoint_id, [this, actor_id, new_actor_data](ray::gcs::AsyncGcsClient *client, const UniqueID &checkpoint_id, const ActorCheckpointDataT &checkpoint_data) { @@ -2074,7 +2074,7 @@ void NodeManager::FinishAssignedActorTask(Worker &worker, const Task &task) { const auto dummy_object = task.GetTaskSpecification().ActorDummyObject(); const ObjectID object_to_release = actor_entry->second.ExtendFrontier(actor_handle_id, dummy_object); - if (!object_to_release.is_nil()) { + if (!object_to_release.IsNil()) { // If there were no new actor handles created, then no other actor task // will depend on this execution dependency, so it safe to release. HandleObjectMissing(object_to_release); @@ -2094,7 +2094,7 @@ void NodeManager::FinishAssignedActorTask(Worker &worker, const Task &task) { void NodeManager::HandleTaskReconstruction(const TaskID &task_id) { // Retrieve the task spec in order to re-execute the task. RAY_CHECK_OK(gcs_client_->raylet_task_table().Lookup( - DriverID::nil(), task_id, + DriverID::Nil(), task_id, /*success_callback=*/ [this](ray::gcs::AsyncGcsClient *client, const TaskID &task_id, const ray::protocol::TaskT &task_data) { @@ -2380,7 +2380,7 @@ std::string NodeManager::DebugString() const { result << "\nInitialConfigResources: " << initial_config_.resource_config.ToString(); result << "\nClusterResources:"; for (auto &pair : cluster_resource_map_) { - result << "\n" << pair.first.hex() << ": " << pair.second.DebugString(); + result << "\n" << pair.first.Hex() << ": " << pair.second.DebugString(); } result << "\n" << object_manager_.DebugString(); result << "\n" << gcs_client_->DebugString(); @@ -2399,7 +2399,7 @@ std::string NodeManager::DebugString() const { result << "\nRemoteConnections:"; for (auto &pair : remote_server_connections_) { - result << "\n" << pair.first.hex() << ": " << pair.second->DebugString(); + result << "\n" << pair.first.Hex() << ": " << pair.second->DebugString(); } result << "\nDebugString() time ms: " << (current_time_ms() - now_ms); return result.str(); diff --git a/src/ray/raylet/object_manager_integration_test.cc b/src/ray/raylet/object_manager_integration_test.cc index 861020595448..a774e1409195 100644 --- a/src/ray/raylet/object_manager_integration_test.cc +++ b/src/ray/raylet/object_manager_integration_test.cc @@ -99,14 +99,14 @@ class TestObjectManagerBase : public ::testing::Test { } ObjectID WriteDataToClient(plasma::PlasmaClient &client, int64_t data_size) { - ObjectID object_id = ObjectID::from_random(); + ObjectID object_id = ObjectID::FromRandom(); RAY_LOG(DEBUG) << "ObjectID Created: " << object_id; uint8_t metadata[] = {5}; int64_t metadata_size = sizeof(metadata); std::shared_ptr data; - RAY_ARROW_CHECK_OK(client.Create(object_id.to_plasma_id(), data_size, metadata, - metadata_size, &data)); - RAY_ARROW_CHECK_OK(client.Seal(object_id.to_plasma_id())); + RAY_ARROW_CHECK_OK( + client.Create(object_id.ToPlasmaId(), data_size, metadata, metadata_size, &data)); + RAY_ARROW_CHECK_OK(client.Seal(object_id.ToPlasmaId())); return object_id; } @@ -138,7 +138,7 @@ class TestObjectManagerIntegration : public TestObjectManagerBase { client_id_2 = gcs_client_2->client_table().GetLocalClientId(); gcs_client_1->client_table().RegisterClientAddedCallback([this]( gcs::AsyncGcsClient *client, const ClientID &id, const ClientTableDataT &data) { - ClientID parsed_id = ClientID::from_binary(data.client_id); + ClientID parsed_id = ClientID::FromBinary(data.client_id); if (parsed_id == client_id_1 || parsed_id == client_id_2) { num_connected_clients += 1; } @@ -158,7 +158,7 @@ class TestObjectManagerIntegration : public TestObjectManagerBase { ray::Status status = ray::Status::OK(); status = server1->object_manager_.SubscribeObjAdded( [this](const object_manager::protocol::ObjectInfoT &object_info) { - v1.push_back(ObjectID::from_binary(object_info.object_id)); + v1.push_back(ObjectID::FromBinary(object_info.object_id)); if (v1.size() == num_expected_objects && v1.size() == v2.size()) { TestPushComplete(); } @@ -166,7 +166,7 @@ class TestObjectManagerIntegration : public TestObjectManagerBase { RAY_CHECK_OK(status); status = server2->object_manager_.SubscribeObjAdded( [this](const object_manager::protocol::ObjectInfoT &object_info) { - v2.push_back(ObjectID::from_binary(object_info.object_id)); + v2.push_back(ObjectID::FromBinary(object_info.object_id)); if (v2.size() == num_expected_objects && v1.size() == v2.size()) { TestPushComplete(); } @@ -208,13 +208,13 @@ class TestObjectManagerIntegration : public TestObjectManagerBase { << "\n"; ClientTableDataT data; gcs_client_2->client_table().GetClient(client_id_1, data); - RAY_LOG(INFO) << (ClientID::from_binary(data.client_id).is_nil()); - RAY_LOG(INFO) << "ClientID=" << ClientID::from_binary(data.client_id); + RAY_LOG(INFO) << (ClientID::FromBinary(data.client_id).IsNil()); + RAY_LOG(INFO) << "ClientID=" << ClientID::FromBinary(data.client_id); RAY_LOG(INFO) << "ClientIp=" << data.node_manager_address; RAY_LOG(INFO) << "ClientPort=" << data.node_manager_port; ClientTableDataT data2; gcs_client_1->client_table().GetClient(client_id_2, data2); - RAY_LOG(INFO) << "ClientID=" << ClientID::from_binary(data2.client_id); + RAY_LOG(INFO) << "ClientID=" << ClientID::FromBinary(data2.client_id); RAY_LOG(INFO) << "ClientIp=" << data2.node_manager_address; RAY_LOG(INFO) << "ClientPort=" << data2.node_manager_port; } diff --git a/src/ray/raylet/raylet_client.cc b/src/ray/raylet/raylet_client.cc index 0f488089e6d0..ac312b79d13e 100644 --- a/src/ray/raylet/raylet_client.cc +++ b/src/ray/raylet/raylet_client.cc @@ -312,12 +312,12 @@ ray::Status RayletClient::Wait(const std::vector &object_ids, int num_ auto reply_message = flatbuffers::GetRoot(reply.get()); auto found = reply_message->found(); for (uint i = 0; i < found->size(); i++) { - ObjectID object_id = ObjectID::from_binary(found->Get(i)->str()); + ObjectID object_id = ObjectID::FromBinary(found->Get(i)->str()); result->first.push_back(object_id); } auto remaining = reply_message->remaining(); for (uint i = 0; i < remaining->size(); i++) { - ObjectID object_id = ObjectID::from_binary(remaining->Get(i)->str()); + ObjectID object_id = ObjectID::FromBinary(remaining->Get(i)->str()); result->second.push_back(object_id); } return ray::Status::OK(); @@ -373,7 +373,7 @@ ray::Status RayletClient::PrepareActorCheckpoint(const ActorID &actor_id, if (!status.ok()) return status; auto reply_message = flatbuffers::GetRoot(reply.get()); - checkpoint_id = ActorCheckpointID::from_binary(reply_message->checkpoint_id()->str()); + checkpoint_id = ActorCheckpointID::FromBinary(reply_message->checkpoint_id()->str()); return ray::Status::OK(); } diff --git a/src/ray/raylet/reconstruction_policy.cc b/src/ray/raylet/reconstruction_policy.cc index d1a648a34ce4..97c86ea73cd8 100644 --- a/src/ray/raylet/reconstruction_policy.cc +++ b/src/ray/raylet/reconstruction_policy.cc @@ -52,7 +52,7 @@ void ReconstructionPolicy::SetTaskTimeout( // required by the task are no longer needed soon after. If the // task is still required after this initial period, then we now // subscribe to task lease notifications. - RAY_CHECK_OK(task_lease_pubsub_.RequestNotifications(DriverID::nil(), task_id, + RAY_CHECK_OK(task_lease_pubsub_.RequestNotifications(DriverID::Nil(), task_id, client_id_)); it->second.subscribed = true; } @@ -108,9 +108,9 @@ void ReconstructionPolicy::AttemptReconstruction(const TaskID &task_id, // an entry for this reconstruction. auto reconstruction_entry = std::make_shared(); reconstruction_entry->num_reconstructions = reconstruction_attempt; - reconstruction_entry->node_manager_id = client_id_.binary(); + reconstruction_entry->node_manager_id = client_id_.Binary(); RAY_CHECK_OK(task_reconstruction_log_.AppendAt( - DriverID::nil(), task_id, reconstruction_entry, + DriverID::Nil(), task_id, reconstruction_entry, /*success_callback=*/ [this](gcs::AsyncGcsClient *client, const TaskID &task_id, const TaskReconstructionDataT &data) { @@ -171,7 +171,7 @@ void ReconstructionPolicy::HandleTaskLeaseNotification(const TaskID &task_id, } void ReconstructionPolicy::ListenAndMaybeReconstruct(const ObjectID &object_id) { - TaskID task_id = object_id.task_id(); + TaskID task_id = object_id.TaskId(); auto it = listening_tasks_.find(task_id); // Add this object to the list of objects created by the same task. if (it == listening_tasks_.end()) { @@ -185,7 +185,7 @@ void ReconstructionPolicy::ListenAndMaybeReconstruct(const ObjectID &object_id) } void ReconstructionPolicy::Cancel(const ObjectID &object_id) { - TaskID task_id = object_id.task_id(); + TaskID task_id = object_id.TaskId(); auto it = listening_tasks_.find(task_id); if (it == listening_tasks_.end()) { // We already stopped listening for this task. @@ -199,7 +199,7 @@ void ReconstructionPolicy::Cancel(const ObjectID &object_id) { // Cancel notifications for the task lease if we were subscribed to them. if (it->second.subscribed) { RAY_CHECK_OK( - task_lease_pubsub_.CancelNotifications(DriverID::nil(), task_id, client_id_)); + task_lease_pubsub_.CancelNotifications(DriverID::Nil(), task_id, client_id_)); } listening_tasks_.erase(it); } diff --git a/src/ray/raylet/reconstruction_policy_test.cc b/src/ray/raylet/reconstruction_policy_test.cc index 7f8887b15372..4ccebd0c0c09 100644 --- a/src/ray/raylet/reconstruction_policy_test.cc +++ b/src/ray/raylet/reconstruction_policy_test.cc @@ -154,7 +154,7 @@ class ReconstructionPolicyTest : public ::testing::Test { reconstruction_policy_(std::make_shared( io_service_, [this](const TaskID &task_id) { TriggerReconstruction(task_id); }, - reconstruction_timeout_ms_, ClientID::from_random(), mock_gcs_, + reconstruction_timeout_ms_, ClientID::FromRandom(), mock_gcs_, mock_object_directory_, mock_gcs_)), timer_canceled_(false) { mock_gcs_.Subscribe( @@ -223,8 +223,8 @@ class ReconstructionPolicyTest : public ::testing::Test { }; TEST_F(ReconstructionPolicyTest, TestReconstructionSimple) { - TaskID task_id = TaskID::from_random(); - ObjectID object_id = ObjectID::for_task_return(task_id, 1); + TaskID task_id = TaskID::FromRandom(); + ObjectID object_id = ObjectID::ForTaskReturn(task_id, 1); // Listen for an object. reconstruction_policy_->ListenAndMaybeReconstruct(object_id); @@ -241,9 +241,9 @@ TEST_F(ReconstructionPolicyTest, TestReconstructionSimple) { } TEST_F(ReconstructionPolicyTest, TestReconstructionEvicted) { - TaskID task_id = TaskID::from_random(); - ObjectID object_id = ObjectID::for_task_return(task_id, 1); - mock_object_directory_->SetObjectLocations(object_id, {ClientID::from_random()}); + TaskID task_id = TaskID::FromRandom(); + ObjectID object_id = ObjectID::ForTaskReturn(task_id, 1); + mock_object_directory_->SetObjectLocations(object_id, {ClientID::FromRandom()}); // Listen for both objects. reconstruction_policy_->ListenAndMaybeReconstruct(object_id); @@ -264,9 +264,9 @@ TEST_F(ReconstructionPolicyTest, TestReconstructionEvicted) { } TEST_F(ReconstructionPolicyTest, TestReconstructionObjectLost) { - TaskID task_id = TaskID::from_random(); - ObjectID object_id = ObjectID::for_task_return(task_id, 1); - ClientID client_id = ClientID::from_random(); + TaskID task_id = TaskID::FromRandom(); + ObjectID object_id = ObjectID::ForTaskReturn(task_id, 1); + ClientID client_id = ClientID::FromRandom(); mock_object_directory_->SetObjectLocations(object_id, {client_id}); // Listen for both objects. @@ -288,9 +288,9 @@ TEST_F(ReconstructionPolicyTest, TestReconstructionObjectLost) { TEST_F(ReconstructionPolicyTest, TestDuplicateReconstruction) { // Create two object IDs produced by the same task. - TaskID task_id = TaskID::from_random(); - ObjectID object_id1 = ObjectID::for_task_return(task_id, 1); - ObjectID object_id2 = ObjectID::for_task_return(task_id, 2); + TaskID task_id = TaskID::FromRandom(); + ObjectID object_id1 = ObjectID::ForTaskReturn(task_id, 1); + ObjectID object_id2 = ObjectID::ForTaskReturn(task_id, 2); // Listen for both objects. reconstruction_policy_->ListenAndMaybeReconstruct(object_id1); @@ -308,17 +308,17 @@ TEST_F(ReconstructionPolicyTest, TestDuplicateReconstruction) { } TEST_F(ReconstructionPolicyTest, TestReconstructionSuppressed) { - TaskID task_id = TaskID::from_random(); - ObjectID object_id = ObjectID::for_task_return(task_id, 1); + TaskID task_id = TaskID::FromRandom(); + ObjectID object_id = ObjectID::ForTaskReturn(task_id, 1); // Run the test for much longer than the reconstruction timeout. int64_t test_period = 2 * reconstruction_timeout_ms_; // Acquire the task lease for a period longer than the test period. auto task_lease_data = std::make_shared(); - task_lease_data->node_manager_id = ClientID::from_random().binary(); + task_lease_data->node_manager_id = ClientID::FromRandom().Binary(); task_lease_data->acquired_at = current_sys_time_ms(); task_lease_data->timeout = 2 * test_period; - mock_gcs_.Add(DriverID::nil(), task_id, task_lease_data); + mock_gcs_.Add(DriverID::Nil(), task_id, task_lease_data); // Listen for an object. reconstruction_policy_->ListenAndMaybeReconstruct(object_id); @@ -334,18 +334,18 @@ TEST_F(ReconstructionPolicyTest, TestReconstructionSuppressed) { } TEST_F(ReconstructionPolicyTest, TestReconstructionContinuallySuppressed) { - TaskID task_id = TaskID::from_random(); - ObjectID object_id = ObjectID::for_task_return(task_id, 1); + TaskID task_id = TaskID::FromRandom(); + ObjectID object_id = ObjectID::ForTaskReturn(task_id, 1); // Listen for an object. reconstruction_policy_->ListenAndMaybeReconstruct(object_id); // Send the reconstruction manager heartbeats about the object. SetPeriodicTimer(reconstruction_timeout_ms_ / 2, [this, task_id]() { auto task_lease_data = std::make_shared(); - task_lease_data->node_manager_id = ClientID::from_random().binary(); + task_lease_data->node_manager_id = ClientID::FromRandom().Binary(); task_lease_data->acquired_at = current_sys_time_ms(); task_lease_data->timeout = reconstruction_timeout_ms_; - mock_gcs_.Add(DriverID::nil(), task_id, task_lease_data); + mock_gcs_.Add(DriverID::Nil(), task_id, task_lease_data); }); // Run the test for much longer than the reconstruction timeout. Run(reconstruction_timeout_ms_ * 2); @@ -361,8 +361,8 @@ TEST_F(ReconstructionPolicyTest, TestReconstructionContinuallySuppressed) { } TEST_F(ReconstructionPolicyTest, TestReconstructionCanceled) { - TaskID task_id = TaskID::from_random(); - ObjectID object_id = ObjectID::for_task_return(task_id, 1); + TaskID task_id = TaskID::FromRandom(); + ObjectID object_id = ObjectID::ForTaskReturn(task_id, 1); // Listen for an object. reconstruction_policy_->ListenAndMaybeReconstruct(object_id); @@ -387,17 +387,17 @@ TEST_F(ReconstructionPolicyTest, TestReconstructionCanceled) { } TEST_F(ReconstructionPolicyTest, TestSimultaneousReconstructionSuppressed) { - TaskID task_id = TaskID::from_random(); - ObjectID object_id = ObjectID::for_task_return(task_id, 1); + TaskID task_id = TaskID::FromRandom(); + ObjectID object_id = ObjectID::ForTaskReturn(task_id, 1); // Log a reconstruction attempt to simulate a different node attempting the // reconstruction first. This should suppress this node's first attempt at // reconstruction. auto task_reconstruction_data = std::make_shared(); - task_reconstruction_data->node_manager_id = ClientID::from_random().binary(); + task_reconstruction_data->node_manager_id = ClientID::FromRandom().Binary(); task_reconstruction_data->num_reconstructions = 0; RAY_CHECK_OK( - mock_gcs_.AppendAt(DriverID::nil(), task_id, task_reconstruction_data, nullptr, + mock_gcs_.AppendAt(DriverID::Nil(), task_id, task_reconstruction_data, nullptr, /*failure_callback=*/ [](ray::gcs::AsyncGcsClient *client, const TaskID &task_id, const TaskReconstructionDataT &data) { ASSERT_TRUE(false); }, diff --git a/src/ray/raylet/task_dependency_manager.cc b/src/ray/raylet/task_dependency_manager.cc index dc24c95d46e4..c5155b96b0c1 100644 --- a/src/ray/raylet/task_dependency_manager.cc +++ b/src/ray/raylet/task_dependency_manager.cc @@ -24,7 +24,7 @@ bool TaskDependencyManager::CheckObjectLocal(const ObjectID &object_id) const { } bool TaskDependencyManager::CheckObjectRequired(const ObjectID &object_id) const { - const TaskID task_id = object_id.task_id(); + const TaskID task_id = object_id.TaskId(); auto task_entry = required_tasks_.find(task_id); // If there are no subscribed tasks that are dependent on the object, then do // nothing. @@ -82,7 +82,7 @@ std::vector TaskDependencyManager::HandleObjectLocal( // Find any tasks that are dependent on the newly available object. std::vector ready_task_ids; - auto creating_task_entry = required_tasks_.find(object_id.task_id()); + auto creating_task_entry = required_tasks_.find(object_id.TaskId()); if (creating_task_entry != required_tasks_.end()) { auto object_entry = creating_task_entry->second.find(object_id); if (object_entry != creating_task_entry->second.end()) { @@ -113,7 +113,7 @@ std::vector TaskDependencyManager::HandleObjectMissing( // Find any tasks that are dependent on the missing object. std::vector waiting_task_ids; - TaskID creating_task_id = object_id.task_id(); + TaskID creating_task_id = object_id.TaskId(); auto creating_task_entry = required_tasks_.find(creating_task_id); if (creating_task_entry != required_tasks_.end()) { auto object_entry = creating_task_entry->second.find(object_id); @@ -149,7 +149,7 @@ bool TaskDependencyManager::SubscribeDependencies( auto inserted = task_entry.object_dependencies.insert(object_id); if (inserted.second) { // Get the ID of the task that creates the dependency. - TaskID creating_task_id = object_id.task_id(); + TaskID creating_task_id = object_id.TaskId(); // Determine whether the dependency can be fulfilled by the local node. if (local_objects_.count(object_id) == 0) { // The object is not local. @@ -186,7 +186,7 @@ bool TaskDependencyManager::UnsubscribeDependencies(const TaskID &task_id) { // Remove the task from the list of tasks that are dependent on this // object. // Get the ID of the task that creates the dependency. - TaskID creating_task_id = object_id.task_id(); + TaskID creating_task_id = object_id.TaskId(); auto creating_task_entry = required_tasks_.find(creating_task_id); std::vector &dependent_tasks = creating_task_entry->second[object_id]; auto it = std::find(dependent_tasks.begin(), dependent_tasks.end(), task_id); @@ -262,10 +262,10 @@ void TaskDependencyManager::AcquireTaskLease(const TaskID &task_id) { } auto task_lease_data = std::make_shared(); - task_lease_data->node_manager_id = client_id_.hex(); + task_lease_data->node_manager_id = client_id_.Hex(); task_lease_data->acquired_at = current_sys_time_ms(); task_lease_data->timeout = it->second.lease_period; - RAY_CHECK_OK(task_lease_table_.Add(DriverID::nil(), task_id, task_lease_data, nullptr)); + RAY_CHECK_OK(task_lease_table_.Add(DriverID::Nil(), task_id, task_lease_data, nullptr)); auto period = boost::posix_time::milliseconds(it->second.lease_period / 2); it->second.lease_timer->expires_from_now(period); @@ -324,7 +324,7 @@ void TaskDependencyManager::RemoveTasksAndRelatedObjects( // Cancel all of the objects that were required by the removed tasks. for (const auto &object_id : required_objects) { - TaskID creating_task_id = object_id.task_id(); + TaskID creating_task_id = object_id.TaskId(); required_tasks_.erase(creating_task_id); HandleRemoteDependencyCanceled(object_id); } diff --git a/src/ray/raylet/task_dependency_manager_test.cc b/src/ray/raylet/task_dependency_manager_test.cc index 62bbf17069d5..e0f832a12870 100644 --- a/src/ray/raylet/task_dependency_manager_test.cc +++ b/src/ray/raylet/task_dependency_manager_test.cc @@ -43,7 +43,7 @@ class TaskDependencyManagerTest : public ::testing::Test { gcs_mock_(), initial_lease_period_ms_(100), task_dependency_manager_(object_manager_mock_, reconstruction_policy_mock_, - io_service_, ClientID::nil(), initial_lease_period_ms_, + io_service_, ClientID::Nil(), initial_lease_period_ms_, gcs_mock_) {} void Run(uint64_t timeout_ms) { @@ -75,7 +75,7 @@ static inline Task ExampleTask(const std::vector &arguments, task_arguments.emplace_back(std::make_shared(references)); } std::vector function_descriptor(3); - auto spec = TaskSpecification(DriverID::nil(), TaskID::from_random(), 0, task_arguments, + auto spec = TaskSpecification(DriverID::Nil(), TaskID::FromRandom(), 0, task_arguments, num_returns, required_resources, Language::PYTHON, function_descriptor); auto execution_spec = TaskExecutionSpecification(std::vector()); @@ -105,9 +105,9 @@ TEST_F(TaskDependencyManagerTest, TestSimpleTask) { int num_arguments = 3; std::vector arguments; for (int i = 0; i < num_arguments; i++) { - arguments.push_back(ObjectID::from_random()); + arguments.push_back(ObjectID::FromRandom()); } - TaskID task_id = TaskID::from_random(); + TaskID task_id = TaskID::FromRandom(); // No objects have been registered in the task dependency manager, so all // arguments should be remote. for (const auto &argument_id : arguments) { @@ -139,12 +139,12 @@ TEST_F(TaskDependencyManagerTest, TestSimpleTask) { TEST_F(TaskDependencyManagerTest, TestDuplicateSubscribe) { // Create a task with 3 arguments. - TaskID task_id = TaskID::from_random(); + TaskID task_id = TaskID::FromRandom(); int num_arguments = 3; std::vector arguments; for (int i = 0; i < num_arguments; i++) { // Add the new argument to the list of dependencies to subscribe to. - ObjectID argument_id = ObjectID::from_random(); + ObjectID argument_id = ObjectID::FromRandom(); arguments.push_back(argument_id); // Subscribe to the task's dependencies. All arguments except the last are // duplicates of previous subscription calls. Each argument should only be @@ -176,7 +176,7 @@ TEST_F(TaskDependencyManagerTest, TestDuplicateSubscribe) { TEST_F(TaskDependencyManagerTest, TestMultipleTasks) { // Create 3 tasks that are dependent on the same object. - ObjectID argument_id = ObjectID::from_random(); + ObjectID argument_id = ObjectID::FromRandom(); std::vector dependent_tasks; int num_dependent_tasks = 3; // The object should only be requested from the object manager once for all @@ -184,7 +184,7 @@ TEST_F(TaskDependencyManagerTest, TestMultipleTasks) { EXPECT_CALL(object_manager_mock_, Pull(argument_id)); EXPECT_CALL(reconstruction_policy_mock_, ListenAndMaybeReconstruct(argument_id)); for (int i = 0; i < num_dependent_tasks; i++) { - TaskID task_id = TaskID::from_random(); + TaskID task_id = TaskID::FromRandom(); dependent_tasks.push_back(task_id); // Subscribe to each of the task's dependencies. bool ready = task_dependency_manager_.SubscribeDependencies(task_id, {argument_id}); @@ -266,7 +266,7 @@ TEST_F(TaskDependencyManagerTest, TestTaskChain) { TEST_F(TaskDependencyManagerTest, TestDependentPut) { // Create a task with 3 arguments. auto task1 = ExampleTask({}, 0); - ObjectID put_id = ObjectID::for_put(task1.GetTaskSpecification().TaskId(), 1); + ObjectID put_id = ObjectID::ForPut(task1.GetTaskSpecification().TaskId(), 1); auto task2 = ExampleTask({put_id}, 0); // No objects have been registered in the task dependency manager, so the put @@ -326,9 +326,9 @@ TEST_F(TaskDependencyManagerTest, TestEviction) { int num_arguments = 3; std::vector arguments; for (int i = 0; i < num_arguments; i++) { - arguments.push_back(ObjectID::from_random()); + arguments.push_back(ObjectID::FromRandom()); } - TaskID task_id = TaskID::from_random(); + TaskID task_id = TaskID::FromRandom(); // No objects have been registered in the task dependency manager, so all // arguments should be remote. for (const auto &argument_id : arguments) { diff --git a/src/ray/raylet/task_execution_spec.cc b/src/ray/raylet/task_execution_spec.cc index c5b1486d5bcc..dc7bf30b83d2 100644 --- a/src/ray/raylet/task_execution_spec.cc +++ b/src/ray/raylet/task_execution_spec.cc @@ -25,7 +25,7 @@ TaskExecutionSpecification::ToFlatbuffer(flatbuffers::FlatBufferBuilder &fbb) co std::vector TaskExecutionSpecification::ExecutionDependencies() const { std::vector dependencies; for (const auto &dependency : execution_spec_.dependencies) { - dependencies.push_back(ObjectID::from_binary(dependency)); + dependencies.push_back(ObjectID::FromBinary(dependency)); } return dependencies; } @@ -34,7 +34,7 @@ void TaskExecutionSpecification::SetExecutionDependencies( const std::vector &dependencies) { execution_spec_.dependencies.clear(); for (const auto &dependency : dependencies) { - execution_spec_.dependencies.push_back(dependency.binary()); + execution_spec_.dependencies.push_back(dependency.Binary()); } } diff --git a/src/ray/raylet/task_spec.cc b/src/ray/raylet/task_spec.cc index 17a8b185fc78..eeab29272126 100644 --- a/src/ray/raylet/task_spec.cc +++ b/src/ray/raylet/task_spec.cc @@ -65,8 +65,8 @@ TaskSpecification::TaskSpecification( const std::vector> &task_arguments, int64_t num_returns, const std::unordered_map &required_resources, const Language &language, const std::vector &function_descriptor) - : TaskSpecification(driver_id, parent_task_id, parent_counter, ActorID::nil(), - ObjectID::nil(), 0, ActorID::nil(), ActorHandleID::nil(), -1, {}, + : TaskSpecification(driver_id, parent_task_id, parent_counter, ActorID::Nil(), + ObjectID::Nil(), 0, ActorID::Nil(), ActorHandleID::Nil(), -1, {}, task_arguments, num_returns, required_resources, std::unordered_map(), language, function_descriptor) {} @@ -165,8 +165,7 @@ int64_t TaskSpecification::NumReturns() const { } ObjectID TaskSpecification::ReturnId(int64_t return_index) const { - auto message = flatbuffers::GetRoot(spec_.data()); - return ObjectID::for_task_return(TaskId(), return_index + 1); + return ObjectID::ForTaskReturn(TaskId(), return_index + 1); } bool TaskSpecification::ArgByRef(int64_t arg_index) const { @@ -215,11 +214,9 @@ Language TaskSpecification::GetLanguage() const { return message->language(); } -bool TaskSpecification::IsActorCreationTask() const { - return !ActorCreationId().is_nil(); -} +bool TaskSpecification::IsActorCreationTask() const { return !ActorCreationId().IsNil(); } -bool TaskSpecification::IsActorTask() const { return !ActorId().is_nil(); } +bool TaskSpecification::IsActorTask() const { return !ActorId().IsNil(); } ActorID TaskSpecification::ActorCreationId() const { auto message = flatbuffers::GetRoot(spec_.data()); diff --git a/src/ray/raylet/task_test.cc b/src/ray/raylet/task_test.cc index 6d0cfa37017a..1e26cb33bf82 100644 --- a/src/ray/raylet/task_test.cc +++ b/src/ray/raylet/task_test.cc @@ -10,21 +10,21 @@ namespace raylet { void TestTaskReturnId(const TaskID &task_id, int64_t return_index) { // Round trip test for computing the object ID for a task's return value, // then computing the task ID that created the object. - ObjectID return_id = ObjectID::for_task_return(task_id, return_index); - ASSERT_EQ(return_id.task_id(), task_id); - ASSERT_EQ(return_id.object_index(), return_index); + ObjectID return_id = ObjectID::ForTaskReturn(task_id, return_index); + ASSERT_EQ(return_id.TaskId(), task_id); + ASSERT_EQ(return_id.ObjectIndex(), return_index); } void TestTaskPutId(const TaskID &task_id, int64_t put_index) { // Round trip test for computing the object ID for a task's put value, then // computing the task ID that created the object. - ObjectID put_id = ObjectID::for_put(task_id, put_index); - ASSERT_EQ(put_id.task_id(), task_id); - ASSERT_EQ(put_id.object_index(), -1 * put_index); + ObjectID put_id = ObjectID::ForPut(task_id, put_index); + ASSERT_EQ(put_id.TaskId(), task_id); + ASSERT_EQ(put_id.ObjectIndex(), -1 * put_index); } TEST(TaskSpecTest, TestTaskReturnIds) { - TaskID task_id = TaskID::from_random(); + TaskID task_id = TaskID::FromRandom(); // Check that we can compute between a task ID and the object IDs of its // return values and puts. @@ -37,25 +37,25 @@ TEST(TaskSpecTest, TestTaskReturnIds) { } TEST(IdPropertyTest, TestIdProperty) { - TaskID task_id = TaskID::from_random(); - ASSERT_EQ(task_id, TaskID::from_binary(task_id.binary())); - ObjectID object_id = ObjectID::from_random(); - ASSERT_EQ(object_id, ObjectID::from_binary(object_id.binary())); + TaskID task_id = TaskID::FromRandom(); + ASSERT_EQ(task_id, TaskID::FromBinary(task_id.Binary())); + ObjectID object_id = ObjectID::FromRandom(); + ASSERT_EQ(object_id, ObjectID::FromBinary(object_id.Binary())); - ASSERT_TRUE(TaskID().is_nil()); - ASSERT_TRUE(TaskID::nil().is_nil()); - ASSERT_TRUE(ObjectID().is_nil()); - ASSERT_TRUE(ObjectID::nil().is_nil()); + ASSERT_TRUE(TaskID().IsNil()); + ASSERT_TRUE(TaskID::Nil().IsNil()); + ASSERT_TRUE(ObjectID().IsNil()); + ASSERT_TRUE(ObjectID::Nil().IsNil()); } TEST(TaskSpecTest, TaskInfoSize) { - std::vector references = {ObjectID::from_random(), ObjectID::from_random()}; + std::vector references = {ObjectID::FromRandom(), ObjectID::FromRandom()}; auto arguments_1 = std::make_shared(references); std::string one_arg("This is an value argument."); auto arguments_2 = std::make_shared( reinterpret_cast(one_arg.c_str()), one_arg.size()); std::vector> task_arguments({arguments_1, arguments_2}); - auto task_id = TaskID::from_random(); + auto task_id = TaskID::FromRandom(); { flatbuffers::FlatBufferBuilder fbb; std::vector> arguments; @@ -64,10 +64,10 @@ TEST(TaskSpecTest, TaskInfoSize) { } // General task. auto spec = CreateTaskInfo( - fbb, to_flatbuf(fbb, DriverID::from_random()), to_flatbuf(fbb, task_id), - to_flatbuf(fbb, TaskID::from_random()), 0, to_flatbuf(fbb, ActorID::nil()), - to_flatbuf(fbb, ObjectID::nil()), 0, to_flatbuf(fbb, ActorID::nil()), - to_flatbuf(fbb, ActorHandleID::nil()), 0, + fbb, to_flatbuf(fbb, DriverID::FromRandom()), to_flatbuf(fbb, task_id), + to_flatbuf(fbb, TaskID::FromRandom()), 0, to_flatbuf(fbb, ActorID::Nil()), + to_flatbuf(fbb, ObjectID::Nil()), 0, to_flatbuf(fbb, ActorID::Nil()), + to_flatbuf(fbb, ActorHandleID::Nil()), 0, ids_to_flatbuf(fbb, std::vector()), fbb.CreateVector(arguments), 1, map_to_flatbuf(fbb, {}), map_to_flatbuf(fbb, {}), Language::PYTHON, string_vec_to_flatbuf(fbb, {"PackageName", "ClassName", "FunctionName"})); @@ -83,13 +83,13 @@ TEST(TaskSpecTest, TaskInfoSize) { } // General task. auto spec = CreateTaskInfo( - fbb, to_flatbuf(fbb, DriverID::from_random()), to_flatbuf(fbb, task_id), - to_flatbuf(fbb, TaskID::from_random()), 10, - to_flatbuf(fbb, ActorID::from_random()), to_flatbuf(fbb, ObjectID::from_random()), - 10000000, to_flatbuf(fbb, ActorID::from_random()), - to_flatbuf(fbb, ActorHandleID::from_random()), 20, - ids_to_flatbuf(fbb, std::vector( - {ObjectID::from_random(), ObjectID::from_random()})), + fbb, to_flatbuf(fbb, DriverID::FromRandom()), to_flatbuf(fbb, task_id), + to_flatbuf(fbb, TaskID::FromRandom()), 10, to_flatbuf(fbb, ActorID::FromRandom()), + to_flatbuf(fbb, ObjectID::FromRandom()), 10000000, + to_flatbuf(fbb, ActorID::FromRandom()), + to_flatbuf(fbb, ActorHandleID::FromRandom()), 20, + ids_to_flatbuf( + fbb, std::vector({ObjectID::FromRandom(), ObjectID::FromRandom()})), fbb.CreateVector(arguments), 2, map_to_flatbuf(fbb, {}), map_to_flatbuf(fbb, {}), Language::PYTHON, string_vec_to_flatbuf(fbb, {"PackageName", "ClassName", "FunctionName"})); diff --git a/src/ray/raylet/worker.cc b/src/ray/raylet/worker.cc index c6686e4b6f6b..36bfc6d846b9 100644 --- a/src/ray/raylet/worker.cc +++ b/src/ray/raylet/worker.cc @@ -57,9 +57,9 @@ void Worker::AssignDriverId(const DriverID &driver_id) { const DriverID &Worker::GetAssignedDriverId() const { return assigned_driver_id_; } void Worker::AssignActorId(const ActorID &actor_id) { - RAY_CHECK(actor_id_.is_nil()) + RAY_CHECK(actor_id_.IsNil()) << "A worker that is already an actor cannot be assigned an actor ID again."; - RAY_CHECK(!actor_id.is_nil()); + RAY_CHECK(!actor_id.IsNil()); actor_id_ = actor_id; } diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index 3138b88cf696..27e7fea05311 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -172,7 +172,7 @@ void WorkerPool::RegisterWorker(const std::shared_ptr &worker) { } void WorkerPool::RegisterDriver(const std::shared_ptr &driver) { - RAY_CHECK(!driver->GetAssignedTaskId().is_nil()); + RAY_CHECK(!driver->GetAssignedTaskId().IsNil()); auto &state = GetStateForLanguage(driver->GetLanguage()); state.registered_drivers.insert(std::move(driver)); } @@ -201,11 +201,11 @@ std::shared_ptr WorkerPool::GetRegisteredDriver( void WorkerPool::PushWorker(const std::shared_ptr &worker) { // Since the worker is now idle, unset its assigned task ID. - RAY_CHECK(worker->GetAssignedTaskId().is_nil()) + RAY_CHECK(worker->GetAssignedTaskId().IsNil()) << "Idle workers cannot have an assigned task ID"; auto &state = GetStateForLanguage(worker->GetLanguage()); // Add the worker to the idle pool. - if (worker->GetActorId().is_nil()) { + if (worker->GetActorId().IsNil()) { state.idle.insert(std::move(worker)); } else { state.idle_actor[worker->GetActorId()] = std::move(worker); @@ -216,7 +216,7 @@ std::shared_ptr WorkerPool::PopWorker(const TaskSpecification &task_spec auto &state = GetStateForLanguage(task_spec.GetLanguage()); const auto &actor_id = task_spec.ActorId(); std::shared_ptr worker = nullptr; - if (actor_id.is_nil()) { + if (actor_id.IsNil()) { if (!state.idle.empty()) { worker = std::move(*state.idle.begin()); state.idle.erase(state.idle.begin()); diff --git a/src/ray/raylet/worker_pool_test.cc b/src/ray/raylet/worker_pool_test.cc index 9799dfb80a40..143ffd57dda6 100644 --- a/src/ray/raylet/worker_pool_test.cc +++ b/src/ray/raylet/worker_pool_test.cc @@ -72,11 +72,11 @@ class WorkerPoolTest : public ::testing::Test { }; static inline TaskSpecification ExampleTaskSpec( - const ActorID actor_id = ActorID::nil(), + const ActorID actor_id = ActorID::Nil(), const Language &language = Language::PYTHON) { std::vector function_descriptor(3); - return TaskSpecification(DriverID::nil(), TaskID::nil(), 0, ActorID::nil(), - ObjectID::nil(), 0, actor_id, ActorHandleID::nil(), 0, {}, {}, + return TaskSpecification(DriverID::Nil(), TaskID::Nil(), 0, ActorID::Nil(), + ObjectID::Nil(), 0, actor_id, ActorHandleID::Nil(), 0, {}, {}, 0, {}, {}, language, function_descriptor); } @@ -155,7 +155,7 @@ TEST_F(WorkerPoolTest, PopActorWorker) { // Assign an actor ID to the worker. const auto task_spec = ExampleTaskSpec(); auto actor = worker_pool_.PopWorker(task_spec); - auto actor_id = ActorID::from_random(); + auto actor_id = ActorID::FromRandom(); actor->AssignActorId(actor_id); worker_pool_.PushWorker(actor); @@ -173,10 +173,10 @@ TEST_F(WorkerPoolTest, PopWorkersOfMultipleLanguages) { auto py_worker = CreateWorker(1234, Language::PYTHON); worker_pool_.PushWorker(py_worker); // Check that no worker will be popped if the given task is a Java task - const auto java_task_spec = ExampleTaskSpec(ActorID::nil(), Language::JAVA); + const auto java_task_spec = ExampleTaskSpec(ActorID::Nil(), Language::JAVA); ASSERT_EQ(worker_pool_.PopWorker(java_task_spec), nullptr); // Check that the worker can be popped if the given task is a Python task - const auto py_task_spec = ExampleTaskSpec(ActorID::nil(), Language::PYTHON); + const auto py_task_spec = ExampleTaskSpec(ActorID::Nil(), Language::PYTHON); ASSERT_NE(worker_pool_.PopWorker(py_task_spec), nullptr); // Create a Java Worker, and add it to the pool