From 4a5d788c002a0719cd7131b860d364f40dbb96c0 Mon Sep 17 00:00:00 2001 From: Stephanie Date: Sun, 4 Nov 2018 13:09:26 -0800 Subject: [PATCH 01/10] Add event loop as member of ObjectDirectory --- src/ray/object_manager/object_directory.cc | 5 +++-- src/ray/object_manager/object_directory.h | 16 +++++++++++----- src/ray/object_manager/object_manager.cc | 2 +- src/ray/raylet/node_manager.cc | 2 +- 4 files changed, 16 insertions(+), 9 deletions(-) diff --git a/src/ray/object_manager/object_directory.cc b/src/ray/object_manager/object_directory.cc index 53bbf7a8cba8..fce31627151c 100644 --- a/src/ray/object_manager/object_directory.cc +++ b/src/ray/object_manager/object_directory.cc @@ -2,8 +2,9 @@ namespace ray { -ObjectDirectory::ObjectDirectory(std::shared_ptr &gcs_client) - : gcs_client_(gcs_client) {} +ObjectDirectory::ObjectDirectory(boost::asio::io_service &io_service, + std::shared_ptr &gcs_client) + : io_service_(io_service), gcs_client_(gcs_client) {} namespace { diff --git a/src/ray/object_manager/object_directory.h b/src/ray/object_manager/object_directory.h index bbe9334623ff..c6d7a7c1c766 100644 --- a/src/ray/object_manager/object_directory.h +++ b/src/ray/object_manager/object_directory.h @@ -27,7 +27,6 @@ struct RemoteConnectionInfo { class ObjectDirectoryInterface { public: - ObjectDirectoryInterface() = default; virtual ~ObjectDirectoryInterface() = default; /// Callbacks for GetInformation. @@ -114,8 +113,15 @@ class ObjectDirectoryInterface { /// Ray ObjectDirectory declaration. class ObjectDirectory : public ObjectDirectoryInterface { public: - ObjectDirectory() = default; - ~ObjectDirectory() override = default; + /// Create an object directory. + /// + /// \param io_service The event loop to dispatch callbacks to. + /// \param gcs_client A Ray GCS client to request object and client + /// information from. + ObjectDirectory(boost::asio::io_service &io_service, + std::shared_ptr &gcs_client); + + virtual ~ObjectDirectory(){}; void RegisterBackend() override; @@ -139,8 +145,6 @@ class ObjectDirectory : public ObjectDirectoryInterface { const object_manager::protocol::ObjectInfoT &object_info) override; ray::Status ReportObjectRemoved(const ObjectID &object_id, const ClientID &client_id) override; - /// Ray only (not part of the OD interface). - ObjectDirectory(std::shared_ptr &gcs_client); /// ObjectDirectory should not be copied. RAY_DISALLOW_COPY_AND_ASSIGN(ObjectDirectory); @@ -154,6 +158,8 @@ class ObjectDirectory : public ObjectDirectoryInterface { std::unordered_set current_object_locations; }; + /// Reference to the event loop. + boost::asio::io_service &io_service_; /// Reference to the gcs client. std::shared_ptr gcs_client_; /// Info about subscribers to object locations. diff --git a/src/ray/object_manager/object_manager.cc b/src/ray/object_manager/object_manager.cc index 61549807692d..8d4d51d4e627 100644 --- a/src/ray/object_manager/object_manager.cc +++ b/src/ray/object_manager/object_manager.cc @@ -23,7 +23,7 @@ ObjectManager::ObjectManager(asio::io_service &main_service, // TODO(hme): Eliminate knowledge of GCS. : client_id_(gcs_client->client_table().GetLocalClientId()), config_(config), - object_directory_(new ObjectDirectory(gcs_client)), + object_directory_(new ObjectDirectory(main_service, gcs_client)), store_notification_(main_service, config_.store_socket_name), // release_delay of 2 * config_.max_sends is to ensure the pool does not release // an object prematurely whenever we reach the maximum number of sends. diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 94ac03a68ea6..1cd785a6b5db 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -61,7 +61,7 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, [this](const TaskID &task_id) { HandleTaskReconstruction(task_id); }, RayConfig::instance().initial_reconstruction_timeout_milliseconds(), gcs_client_->client_table().GetLocalClientId(), gcs_client->task_lease_table(), - std::make_shared(gcs_client), + std::make_shared(io_service, gcs_client), gcs_client_->task_reconstruction_log()), task_dependency_manager_( object_manager, reconstruction_policy_, io_service, From 3e69e443b9eba8fcd6739dc21630e4228f867273 Mon Sep 17 00:00:00 2001 From: Stephanie Date: Sun, 4 Nov 2018 13:17:20 -0800 Subject: [PATCH 02/10] Post object subscription callback to event loop --- src/ray/object_manager/object_directory.cc | 8 +++++++- src/ray/object_manager/object_directory.h | 3 ++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/src/ray/object_manager/object_directory.cc b/src/ray/object_manager/object_directory.cc index fce31627151c..c117f08c4c9a 100644 --- a/src/ray/object_manager/object_directory.cc +++ b/src/ray/object_manager/object_directory.cc @@ -62,6 +62,8 @@ void ObjectDirectory::RegisterBackend() { // empty, since this may indicate that the objects have been evicted from // all nodes. for (const auto &callback_pair : callbacks) { + // It is safe to call the callback directly since this is already running + // in the subscription callback stack. callback_pair.second(client_id_vec, object_id); } }; @@ -157,7 +159,9 @@ ray::Status ObjectDirectory::SubscribeObjectLocations(const UniqueID &callback_i // have been evicted from all nodes. std::vector client_id_vec(listener_state.current_object_locations.begin(), listener_state.current_object_locations.end()); - callback(client_id_vec, object_id); + io_service_.post([this, callback, client_id_vec, object_id]() { + callback(client_id_vec, object_id); + }); return status; } @@ -188,6 +192,8 @@ ray::Status ObjectDirectory::LookupLocations(const ObjectID &object_id, std::unordered_set client_ids; std::vector locations_vector = UpdateObjectLocations( client_ids, location_history, gcs_client_->client_table()); + // It is safe to call the callback directly since this is already running + // in the GCS client's lookup callback stack. callback(locations_vector, object_id); }); return status; diff --git a/src/ray/object_manager/object_directory.h b/src/ray/object_manager/object_directory.h index c6d7a7c1c766..dc6da4cc5f1a 100644 --- a/src/ray/object_manager/object_directory.h +++ b/src/ray/object_manager/object_directory.h @@ -115,7 +115,8 @@ class ObjectDirectory : public ObjectDirectoryInterface { public: /// Create an object directory. /// - /// \param io_service The event loop to dispatch callbacks to. + /// \param io_service The event loop to dispatch callbacks to. This should + /// usually be the same event loop that the given gcs_client runs on. /// \param gcs_client A Ray GCS client to request object and client /// information from. ObjectDirectory(boost::asio::io_service &io_service, From db191f5997f98d904f65eefb78c849fea6737b0f Mon Sep 17 00:00:00 2001 From: Stephanie Date: Sun, 4 Nov 2018 13:49:08 -0800 Subject: [PATCH 03/10] Convert GetInformation for a client to not use callbacks --- src/ray/object_manager/object_directory.cc | 23 ++---- src/ray/object_manager/object_directory.h | 35 ++++---- src/ray/object_manager/object_manager.cc | 85 ++++++++++---------- src/ray/raylet/reconstruction_policy_test.cc | 3 +- 4 files changed, 66 insertions(+), 80 deletions(-) diff --git a/src/ray/object_manager/object_directory.cc b/src/ray/object_manager/object_directory.cc index c117f08c4c9a..cd63ac2aa363 100644 --- a/src/ray/object_manager/object_directory.cc +++ b/src/ray/object_manager/object_directory.cc @@ -105,20 +105,14 @@ ray::Status ObjectDirectory::ReportObjectRemoved(const ObjectID &object_id, return status; }; -ray::Status ObjectDirectory::GetInformation(const ClientID &client_id, - const InfoSuccessCallback &success_callback, - const InfoFailureCallback &fail_callback) { - const ClientTableDataT &data = gcs_client_->client_table().GetClient(client_id); +void ObjectDirectory::LookupRemoteConnectionInfo(RemoteConnectionInfo &connection_info) { + const ClientTableDataT &data = + gcs_client_->client_table().GetClient(connection_info.client_id); ClientID result_client_id = ClientID::from_binary(data.client_id); - if (result_client_id == ClientID::nil() || !data.is_insertion) { - fail_callback(); - } else { - const auto &info = - RemoteConnectionInfo(client_id, data.node_manager_address, - static_cast(data.object_manager_port)); - success_callback(info); + if (result_client_id != ClientID::nil() && data.is_insertion) { + connection_info.ip = data.node_manager_address; + connection_info.port = static_cast(data.object_manager_port); } - return ray::Status::OK(); } void ObjectDirectory::RunFunctionForEachClient( @@ -131,9 +125,8 @@ void ObjectDirectory::RunFunctionForEachClient( !data.is_insertion) { continue; } else { - const auto &info = - RemoteConnectionInfo(client_pair.first, data.node_manager_address, - static_cast(data.object_manager_port)); + RemoteConnectionInfo info(client_pair.first); + LookupRemoteConnectionInfo(info); client_function(info); } } diff --git a/src/ray/object_manager/object_directory.h b/src/ray/object_manager/object_directory.h index dc6da4cc5f1a..7a2dbe3c2351 100644 --- a/src/ray/object_manager/object_directory.h +++ b/src/ray/object_manager/object_directory.h @@ -16,10 +16,12 @@ namespace ray { /// Connection information for remote object managers. struct RemoteConnectionInfo { - RemoteConnectionInfo() = default; - RemoteConnectionInfo(const ClientID &id, const std::string &ip_address, - uint16_t port_num) - : client_id(id), ip(ip_address), port(port_num) {} + RemoteConnectionInfo(const ClientID &id) : client_id(id) {} + + // Returns whether there is enough information to connect to the remote + // object manager. + bool Connected() const { return !ip.empty(); } + ClientID client_id; std::string ip; uint16_t port; @@ -27,23 +29,20 @@ struct RemoteConnectionInfo { class ObjectDirectoryInterface { public: - virtual ~ObjectDirectoryInterface() = default; + virtual ~ObjectDirectoryInterface() {} - /// Callbacks for GetInformation. + /// Callbacks for LookupRemoteConnectionInfo. using InfoSuccessCallback = std::function; - using InfoFailureCallback = std::function; virtual void RegisterBackend() = 0; - /// This is used to establish object manager client connections. + /// Lookup how to connect to a remote object manager. /// - /// \param client_id The client for which information is required. - /// \param success_cb A callback which handles the success of this method. - /// \param fail_cb A callback which handles the failure of this method. - /// \return Status of whether this asynchronous request succeeded. - virtual ray::Status GetInformation(const ClientID &client_id, - const InfoSuccessCallback &success_cb, - const InfoFailureCallback &fail_cb) = 0; + /// \param connection_info The connection information to fill out. This + /// should be pre-populated with the requested client ID. If the directory + /// has information about the requested client, then the rest of the fields + /// in this struct will be populated accordingly. + virtual void LookupRemoteConnectionInfo(RemoteConnectionInfo &connection_info) = 0; /// Callback for object location notifications. using OnLocationsFound = std::function &, @@ -122,13 +121,11 @@ class ObjectDirectory : public ObjectDirectoryInterface { ObjectDirectory(boost::asio::io_service &io_service, std::shared_ptr &gcs_client); - virtual ~ObjectDirectory(){}; + virtual ~ObjectDirectory() {} void RegisterBackend() override; - ray::Status GetInformation(const ClientID &client_id, - const InfoSuccessCallback &success_callback, - const InfoFailureCallback &fail_callback) override; + void LookupRemoteConnectionInfo(RemoteConnectionInfo &connection_info) override; void RunFunctionForEachClient(const InfoSuccessCallback &client_function) override; diff --git a/src/ray/object_manager/object_manager.cc b/src/ray/object_manager/object_manager.cc index 8d4d51d4e627..b77912e615d5 100644 --- a/src/ray/object_manager/object_manager.cc +++ b/src/ray/object_manager/object_manager.cc @@ -263,23 +263,24 @@ void ObjectManager::PullEstablishConnection(const ObjectID &object_id, // TODO(hme): There is no cap on the number of pull request connections. connection_pool_.GetSender(ConnectionPool::ConnectionType::MESSAGE, client_id, &conn); + // Try to create a new connection to the remote object manager if one doesn't + // already exist. if (conn == nullptr) { - status = object_directory_->GetInformation( - client_id, - [this, object_id, client_id](const RemoteConnectionInfo &connection_info) { - std::shared_ptr async_conn = CreateSenderConnection( - ConnectionPool::ConnectionType::MESSAGE, connection_info); - if (async_conn == nullptr) { - return; - } - connection_pool_.RegisterSender(ConnectionPool::ConnectionType::MESSAGE, - client_id, async_conn); - PullSendRequest(object_id, async_conn); - }, - []() { - RAY_LOG(ERROR) << "Failed to establish connection with remote object manager."; - }); - } else { + RemoteConnectionInfo connection_info(client_id); + object_directory_->LookupRemoteConnectionInfo(connection_info); + if (connection_info.Connected()) { + conn = CreateSenderConnection(ConnectionPool::ConnectionType::MESSAGE, + connection_info); + if (conn != nullptr) { + connection_pool_.RegisterSender(ConnectionPool::ConnectionType::MESSAGE, + client_id, conn); + } + } else { + RAY_LOG(ERROR) << "Failed to establish connection with remote object manager."; + } + } + + if (conn != nullptr) { PullSendRequest(object_id, conn); } } @@ -347,34 +348,30 @@ void ObjectManager::Push(const ObjectID &object_id, const ClientID &client_id) { return; } - // TODO(hme): Cache this data in ObjectDirectory. - // Okay for now since the GCS client caches this data. - RAY_CHECK_OK(object_directory_->GetInformation( - client_id, - [this, object_id, client_id](const RemoteConnectionInfo &info) { - const object_manager::protocol::ObjectInfoT &object_info = - local_objects_[object_id]; - uint64_t data_size = - static_cast(object_info.data_size + object_info.metadata_size); - uint64_t metadata_size = static_cast(object_info.metadata_size); - uint64_t num_chunks = buffer_pool_.GetNumChunks(data_size); - for (uint64_t chunk_index = 0; chunk_index < num_chunks; ++chunk_index) { - send_service_.post([this, client_id, object_id, data_size, metadata_size, - chunk_index, info]() { - // NOTE: When this callback executes, it's possible that the object - // will have already been evicted. It's also possible that the - // object could be in the process of being transferred to this - // object manager from another object manager. - ExecuteSendObject(client_id, object_id, data_size, metadata_size, chunk_index, - info); - }); - } - }, - []() { - // Push is best effort, so do nothing here. - RAY_LOG(ERROR) - << "Failed to establish connection for Push with remote object manager."; - })); + RemoteConnectionInfo connection_info(client_id); + object_directory_->LookupRemoteConnectionInfo(connection_info); + if (connection_info.Connected()) { + const object_manager::protocol::ObjectInfoT &object_info = local_objects_[object_id]; + uint64_t data_size = + static_cast(object_info.data_size + object_info.metadata_size); + uint64_t metadata_size = static_cast(object_info.metadata_size); + uint64_t num_chunks = buffer_pool_.GetNumChunks(data_size); + for (uint64_t chunk_index = 0; chunk_index < num_chunks; ++chunk_index) { + send_service_.post([this, client_id, object_id, data_size, metadata_size, + chunk_index, connection_info]() { + // NOTE: When this callback executes, it's possible that the object + // will have already been evicted. It's also possible that the + // object could be in the process of being transferred to this + // object manager from another object manager. + ExecuteSendObject(client_id, object_id, data_size, metadata_size, chunk_index, + connection_info); + }); + } + } else { + // Push is best effort, so do nothing here. + RAY_LOG(ERROR) + << "Failed to establish connection for Push with remote object manager."; + } } void ObjectManager::ExecuteSendObject(const ClientID &client_id, diff --git a/src/ray/raylet/reconstruction_policy_test.cc b/src/ray/raylet/reconstruction_policy_test.cc index e103001cda3b..d3bc5e085061 100644 --- a/src/ray/raylet/reconstruction_policy_test.cc +++ b/src/ray/raylet/reconstruction_policy_test.cc @@ -37,8 +37,7 @@ class MockObjectDirectory : public ObjectDirectoryInterface { } MOCK_METHOD0(RegisterBackend, void(void)); - MOCK_METHOD3(GetInformation, ray::Status(const ClientID &, const InfoSuccessCallback &, - const InfoFailureCallback &)); + MOCK_METHOD1(LookupRemoteConnectionInfo, void(RemoteConnectionInfo &)); MOCK_METHOD3(SubscribeObjectLocations, ray::Status(const ray::UniqueID &, const ObjectID &, const OnLocationsFound &)); From a98f086c91d4f80572726b2d0228fc9d137dc9f2 Mon Sep 17 00:00:00 2001 From: Stephanie Date: Sun, 4 Nov 2018 14:11:45 -0800 Subject: [PATCH 04/10] Convert RunFunctionOnEachClient to not use callbacks and other cleanups --- src/ray/object_manager/object_directory.cc | 22 +++---- src/ray/object_manager/object_directory.h | 22 +++---- src/ray/object_manager/object_manager.cc | 68 ++++++++++---------- src/ray/raylet/reconstruction_policy_test.cc | 4 +- 4 files changed, 54 insertions(+), 62 deletions(-) diff --git a/src/ray/object_manager/object_directory.cc b/src/ray/object_manager/object_directory.cc index cd63ac2aa363..790fc155f9cb 100644 --- a/src/ray/object_manager/object_directory.cc +++ b/src/ray/object_manager/object_directory.cc @@ -105,7 +105,8 @@ ray::Status ObjectDirectory::ReportObjectRemoved(const ObjectID &object_id, return status; }; -void ObjectDirectory::LookupRemoteConnectionInfo(RemoteConnectionInfo &connection_info) { +void ObjectDirectory::LookupRemoteConnectionInfo( + RemoteConnectionInfo &connection_info) const { const ClientTableDataT &data = gcs_client_->client_table().GetClient(connection_info.client_id); ClientID result_client_id = ClientID::from_binary(data.client_id); @@ -115,21 +116,18 @@ void ObjectDirectory::LookupRemoteConnectionInfo(RemoteConnectionInfo &connectio } } -void ObjectDirectory::RunFunctionForEachClient( - const InfoSuccessCallback &client_function) { +std::vector ObjectDirectory::LookupAllRemoteConnections() const { + std::vector remote_connections; const auto &clients = gcs_client_->client_table().GetAllClients(); for (const auto &client_pair : clients) { - const ClientTableDataT &data = client_pair.second; - if (client_pair.first == ClientID::nil() || - client_pair.first == gcs_client_->client_table().GetLocalClientId() || - !data.is_insertion) { - continue; - } else { - RemoteConnectionInfo info(client_pair.first); - LookupRemoteConnectionInfo(info); - client_function(info); + RemoteConnectionInfo info(client_pair.first); + LookupRemoteConnectionInfo(info); + if (info.Connected() && + info.client_id != gcs_client_->client_table().GetLocalClientId()) { + remote_connections.push_back(info); } } + return remote_connections; } ray::Status ObjectDirectory::SubscribeObjectLocations(const UniqueID &callback_id, diff --git a/src/ray/object_manager/object_directory.h b/src/ray/object_manager/object_directory.h index 7a2dbe3c2351..e705e1b56f30 100644 --- a/src/ray/object_manager/object_directory.h +++ b/src/ray/object_manager/object_directory.h @@ -31,9 +31,6 @@ class ObjectDirectoryInterface { public: virtual ~ObjectDirectoryInterface() {} - /// Callbacks for LookupRemoteConnectionInfo. - using InfoSuccessCallback = std::function; - virtual void RegisterBackend() = 0; /// Lookup how to connect to a remote object manager. @@ -42,7 +39,13 @@ class ObjectDirectoryInterface { /// should be pre-populated with the requested client ID. If the directory /// has information about the requested client, then the rest of the fields /// in this struct will be populated accordingly. - virtual void LookupRemoteConnectionInfo(RemoteConnectionInfo &connection_info) = 0; + virtual void LookupRemoteConnectionInfo( + RemoteConnectionInfo &connection_info) const = 0; + + /// Get information for all connected remote object managers. + /// + /// \return A vector of information for all connected remote object managers. + virtual std::vector LookupAllRemoteConnections() const = 0; /// Callback for object location notifications. using OnLocationsFound = std::function &, @@ -100,13 +103,6 @@ class ObjectDirectoryInterface { /// \return Status of whether this method succeeded. virtual ray::Status ReportObjectRemoved(const ObjectID &object_id, const ClientID &client_id) = 0; - - /// Go through all the client information. - /// - /// \param success_cb A callback which handles the success of this method. - /// This function will be called multiple times. - /// \return Void. - virtual void RunFunctionForEachClient(const InfoSuccessCallback &client_function) = 0; }; /// Ray ObjectDirectory declaration. @@ -125,9 +121,9 @@ class ObjectDirectory : public ObjectDirectoryInterface { void RegisterBackend() override; - void LookupRemoteConnectionInfo(RemoteConnectionInfo &connection_info) override; + void LookupRemoteConnectionInfo(RemoteConnectionInfo &connection_info) const override; - void RunFunctionForEachClient(const InfoSuccessCallback &client_function) override; + std::vector LookupAllRemoteConnections() const override; ray::Status LookupLocations(const ObjectID &object_id, const OnLocationsFound &callback) override; diff --git a/src/ray/object_manager/object_manager.cc b/src/ray/object_manager/object_manager.cc index b77912e615d5..cbf1d7e5d7a8 100644 --- a/src/ray/object_manager/object_manager.cc +++ b/src/ray/object_manager/object_manager.cc @@ -271,10 +271,6 @@ void ObjectManager::PullEstablishConnection(const ObjectID &object_id, if (connection_info.Connected()) { conn = CreateSenderConnection(ConnectionPool::ConnectionType::MESSAGE, connection_info); - if (conn != nullptr) { - connection_pool_.RegisterSender(ConnectionPool::ConnectionType::MESSAGE, - client_id, conn); - } } else { RAY_LOG(ERROR) << "Failed to establish connection with remote object manager."; } @@ -386,15 +382,13 @@ void ObjectManager::ExecuteSendObject(const ClientID &client_id, if (conn == nullptr) { conn = CreateSenderConnection(ConnectionPool::ConnectionType::TRANSFER, connection_info); - connection_pool_.RegisterSender(ConnectionPool::ConnectionType::TRANSFER, client_id, - conn); - if (conn == nullptr) { - return; - } } - status = SendObjectHeaders(object_id, data_size, metadata_size, chunk_index, conn); - if (!status.ok()) { - CheckIOError(status, "Push"); + + if (conn != nullptr) { + status = SendObjectHeaders(object_id, data_size, metadata_size, chunk_index, conn); + if (!status.ok()) { + CheckIOError(status, "Push"); + } } } @@ -640,19 +634,22 @@ std::shared_ptr ObjectManager::CreateSenderConnection( SenderConnection::Create(*main_service_, info.client_id, info.ip, info.port); if (conn == nullptr) { RAY_LOG(ERROR) << "Failed to connect to remote object manager."; - return conn; + } else { + // Register the new connection. + // TODO(Yuhong): Implement ConnectionPool::RemoveSender and call it if the client + // disconnects. + connection_pool_.RegisterSender(type, info.client_id, conn); + // Prepare client connection info buffer + flatbuffers::FlatBufferBuilder fbb; + bool is_transfer = (type == ConnectionPool::ConnectionType::TRANSFER); + auto message = object_manager_protocol::CreateConnectClientMessage( + fbb, fbb.CreateString(client_id_.binary()), is_transfer); + fbb.Finish(message); + // Send synchronously. + RAY_CHECK_OK(conn->WriteMessage( + static_cast(object_manager_protocol::MessageType::ConnectClient), + fbb.GetSize(), fbb.GetBufferPointer())); } - // Prepare client connection info buffer - flatbuffers::FlatBufferBuilder fbb; - bool is_transfer = (type == ConnectionPool::ConnectionType::TRANSFER); - auto message = object_manager_protocol::CreateConnectClientMessage( - fbb, fbb.CreateString(client_id_.binary()), is_transfer); - fbb.Finish(message); - // Send synchronously. - RAY_CHECK_OK(conn->WriteMessage( - static_cast(object_manager_protocol::MessageType::ConnectClient), - fbb.GetSize(), fbb.GetBufferPointer())); - // The connection is ready; return to caller. return conn; } @@ -805,25 +802,26 @@ void ObjectManager::SpreadFreeObjectRequest(const std::vector &object_ flatbuffers::Offset request = object_manager_protocol::CreateFreeRequestMessage(fbb, to_flatbuf(fbb, object_ids)); fbb.Finish(request); - auto function_on_client = [this, &fbb](const RemoteConnectionInfo &connection_info) { + + const auto remote_connections = object_directory_->LookupAllRemoteConnections(); + for (const auto &connection_info : remote_connections) { std::shared_ptr conn; connection_pool_.GetSender(ConnectionPool::ConnectionType::MESSAGE, connection_info.client_id, &conn); if (conn == nullptr) { conn = CreateSenderConnection(ConnectionPool::ConnectionType::MESSAGE, connection_info); - connection_pool_.RegisterSender(ConnectionPool::ConnectionType::MESSAGE, - connection_info.client_id, conn); } - ray::Status status = conn->WriteMessage( - static_cast(object_manager_protocol::MessageType::FreeRequest), - fbb.GetSize(), fbb.GetBufferPointer()); - if (status.ok()) { - connection_pool_.ReleaseSender(ConnectionPool::ConnectionType::MESSAGE, conn); + + if (conn != nullptr) { + ray::Status status = conn->WriteMessage( + static_cast(object_manager_protocol::MessageType::FreeRequest), + fbb.GetSize(), fbb.GetBufferPointer()); + if (status.ok()) { + connection_pool_.ReleaseSender(ConnectionPool::ConnectionType::MESSAGE, conn); + } } - // TODO(Yuhong): Implement ConnectionPool::RemoveSender and call it in "else". - }; - object_directory_->RunFunctionForEachClient(function_on_client); + } } } // namespace ray diff --git a/src/ray/raylet/reconstruction_policy_test.cc b/src/ray/raylet/reconstruction_policy_test.cc index d3bc5e085061..ec117cd1c147 100644 --- a/src/ray/raylet/reconstruction_policy_test.cc +++ b/src/ray/raylet/reconstruction_policy_test.cc @@ -37,7 +37,8 @@ class MockObjectDirectory : public ObjectDirectoryInterface { } MOCK_METHOD0(RegisterBackend, void(void)); - MOCK_METHOD1(LookupRemoteConnectionInfo, void(RemoteConnectionInfo &)); + MOCK_CONST_METHOD1(LookupRemoteConnectionInfo, void(RemoteConnectionInfo &)); + MOCK_CONST_METHOD0(LookupAllRemoteConnections, std::vector()); MOCK_METHOD3(SubscribeObjectLocations, ray::Status(const ray::UniqueID &, const ObjectID &, const OnLocationsFound &)); @@ -47,7 +48,6 @@ class MockObjectDirectory : public ObjectDirectoryInterface { ray::Status(const ObjectID &, const ClientID &, const object_manager::protocol::ObjectInfoT &)); MOCK_METHOD2(ReportObjectRemoved, ray::Status(const ObjectID &, const ClientID &)); - MOCK_METHOD1(RunFunctionForEachClient, void(const InfoSuccessCallback &success_cb)); private: std::vector> callbacks_; From c6ba1d2aeae7c2ae2a078137b484ae8bf5c60e0d Mon Sep 17 00:00:00 2001 From: Stephanie Date: Sun, 4 Nov 2018 14:12:54 -0800 Subject: [PATCH 05/10] todo --- src/ray/object_manager/object_manager.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/ray/object_manager/object_manager.cc b/src/ray/object_manager/object_manager.cc index cbf1d7e5d7a8..ec4be43d5e50 100644 --- a/src/ray/object_manager/object_manager.cc +++ b/src/ray/object_manager/object_manager.cc @@ -646,6 +646,7 @@ std::shared_ptr ObjectManager::CreateSenderConnection( fbb, fbb.CreateString(client_id_.binary()), is_transfer); fbb.Finish(message); // Send synchronously. + // TODO(swang): Make this a WriteMessageAsync. RAY_CHECK_OK(conn->WriteMessage( static_cast(object_manager_protocol::MessageType::ConnectClient), fbb.GetSize(), fbb.GetBufferPointer())); @@ -814,6 +815,7 @@ void ObjectManager::SpreadFreeObjectRequest(const std::vector &object_ } if (conn != nullptr) { + // TODO(swang): Make this a WriteMessageAsync. ray::Status status = conn->WriteMessage( static_cast(object_manager_protocol::MessageType::FreeRequest), fbb.GetSize(), fbb.GetBufferPointer()); From edcade8bd769a5ae89bff01b5325fbd9627603f4 Mon Sep 17 00:00:00 2001 From: Stephanie Date: Tue, 6 Nov 2018 11:37:54 -0800 Subject: [PATCH 06/10] Fix object free test, clean up GetClient interface --- src/ray/gcs/client_test.cc | 3 ++- src/ray/gcs/tables.cc | 9 ++++----- src/ray/gcs/tables.h | 14 +++++--------- src/ray/object_manager/object_directory.cc | 15 +++++++++------ .../test/object_manager_stress_test.cc | 6 ++++-- .../object_manager/test/object_manager_test.cc | 6 ++++-- src/ray/raylet/node_manager.cc | 9 +++------ src/ray/raylet/object_manager_integration_test.cc | 6 ++++-- 8 files changed, 35 insertions(+), 33 deletions(-) diff --git a/src/ray/gcs/client_test.cc b/src/ray/gcs/client_test.cc index 116d4dd04b59..a99e4ceb0611 100644 --- a/src/ray/gcs/client_test.cc +++ b/src/ray/gcs/client_test.cc @@ -648,7 +648,8 @@ void ClientTableNotification(gcs::AsyncGcsClient *client, const ClientID &client ASSERT_EQ(ClientID::from_binary(data.client_id), added_id); ASSERT_EQ(data.is_insertion, is_insertion); - auto cached_client = client->client_table().GetClient(added_id); + ClientTableDataT cached_client; + client->client_table().GetClient(added_id, cached_client); ASSERT_EQ(ClientID::from_binary(cached_client.client_id), added_id); ASSERT_EQ(cached_client.is_insertion, is_insertion); } diff --git a/src/ray/gcs/tables.cc b/src/ray/gcs/tables.cc index 6ef2b66afeed..f1a9a1975231 100644 --- a/src/ray/gcs/tables.cc +++ b/src/ray/gcs/tables.cc @@ -421,15 +421,14 @@ ray::Status ClientTable::MarkDisconnected(const ClientID &dead_client_id) { return Append(JobID::nil(), client_log_key_, data, nullptr); } -const ClientTableDataT &ClientTable::GetClient(const ClientID &client_id) const { +void ClientTable::GetClient(const ClientID &client_id, + ClientTableDataT &client_info) const { RAY_CHECK(!client_id.is_nil()); auto entry = client_cache_.find(client_id); if (entry != client_cache_.end()) { - return entry->second; + client_info = entry->second; } else { - // If the requested client was not found, return a reference to the nil - // client entry. - return client_cache_.at(ClientID::nil()); + client_info.client_id = ClientID::nil().binary(); } } diff --git a/src/ray/gcs/tables.h b/src/ray/gcs/tables.h index 1973ebf42ef3..6bc3759e99d2 100644 --- a/src/ray/gcs/tables.h +++ b/src/ray/gcs/tables.h @@ -517,12 +517,6 @@ class ClientTable : private Log { // Set the local client's ID. local_client_.client_id = client_id.binary(); - - // Add a nil client to the cache so that we can serve requests for clients - // that we have not heard about. - ClientTableDataT nil_client; - nil_client.client_id = ClientID::nil().binary(); - client_cache_[ClientID::nil()] = nil_client; }; /// Connect as a client to the GCS. This registers us in the client table @@ -560,9 +554,11 @@ class ClientTable : private Log { /// information for clients that we've heard a notification for. /// /// \param client The client to get information about. - /// \return A reference to the requested client. If the client is not in the - /// cache, then an entry with a nil ClientID will be returned. - const ClientTableDataT &GetClient(const ClientID &client) const; + /// \param A reference to the client information. If we have information + /// about the client in the cache, then the reference will be modified to + /// contain that information. Else, the reference will be updated to contain + /// a nil client ID. + void GetClient(const ClientID &client, ClientTableDataT &client_info) const; /// Get the local client's ID. /// diff --git a/src/ray/object_manager/object_directory.cc b/src/ray/object_manager/object_directory.cc index 790fc155f9cb..833294f0ab05 100644 --- a/src/ray/object_manager/object_directory.cc +++ b/src/ray/object_manager/object_directory.cc @@ -107,12 +107,15 @@ ray::Status ObjectDirectory::ReportObjectRemoved(const ObjectID &object_id, void ObjectDirectory::LookupRemoteConnectionInfo( RemoteConnectionInfo &connection_info) const { - const ClientTableDataT &data = - gcs_client_->client_table().GetClient(connection_info.client_id); - ClientID result_client_id = ClientID::from_binary(data.client_id); - if (result_client_id != ClientID::nil() && data.is_insertion) { - connection_info.ip = data.node_manager_address; - connection_info.port = static_cast(data.object_manager_port); + ClientTableDataT client_data; + gcs_client_->client_table().GetClient(connection_info.client_id, client_data); + ClientID result_client_id = ClientID::from_binary(client_data.client_id); + if (!result_client_id.is_nil()) { + RAY_CHECK(result_client_id == connection_info.client_id); + if (client_data.is_insertion) { + connection_info.ip = client_data.node_manager_address; + connection_info.port = static_cast(client_data.object_manager_port); + } } } diff --git a/src/ray/object_manager/test/object_manager_stress_test.cc b/src/ray/object_manager/test/object_manager_stress_test.cc index a26519c54b0b..31ea5aed7300 100644 --- a/src/ray/object_manager/test/object_manager_stress_test.cc +++ b/src/ray/object_manager/test/object_manager_stress_test.cc @@ -433,11 +433,13 @@ class StressTestObjectManager : public TestObjectManagerBase { RAY_LOG(DEBUG) << "\n" << "All connected clients:" << "\n"; - const ClientTableDataT &data = gcs_client_1->client_table().GetClient(client_id_1); + ClientTableDataT data; + gcs_client_1->client_table().GetClient(client_id_1, data); RAY_LOG(DEBUG) << "ClientID=" << ClientID::from_binary(data.client_id) << "\n" << "ClientIp=" << data.node_manager_address << "\n" << "ClientPort=" << data.node_manager_port; - const ClientTableDataT &data2 = gcs_client_1->client_table().GetClient(client_id_2); + ClientTableDataT data2; + gcs_client_1->client_table().GetClient(client_id_2, data2); RAY_LOG(DEBUG) << "ClientID=" << ClientID::from_binary(data2.client_id) << "\n" << "ClientIp=" << data2.node_manager_address << "\n" << "ClientPort=" << data2.node_manager_port; diff --git a/src/ray/object_manager/test/object_manager_test.cc b/src/ray/object_manager/test/object_manager_test.cc index 3ce2f2c2ab48..cb98706753d9 100644 --- a/src/ray/object_manager/test/object_manager_test.cc +++ b/src/ray/object_manager/test/object_manager_test.cc @@ -447,13 +447,15 @@ class TestObjectManager : public TestObjectManagerBase { RAY_LOG(DEBUG) << "\n" << "Server client ids:" << "\n"; - const ClientTableDataT &data = gcs_client_1->client_table().GetClient(client_id_1); + ClientTableDataT data; + gcs_client_1->client_table().GetClient(client_id_1, data); RAY_LOG(DEBUG) << (ClientID::from_binary(data.client_id) == ClientID::nil()); RAY_LOG(DEBUG) << "Server 1 ClientID=" << ClientID::from_binary(data.client_id); RAY_LOG(DEBUG) << "Server 1 ClientIp=" << data.node_manager_address; RAY_LOG(DEBUG) << "Server 1 ClientPort=" << data.node_manager_port; ASSERT_EQ(client_id_1, ClientID::from_binary(data.client_id)); - const ClientTableDataT &data2 = gcs_client_1->client_table().GetClient(client_id_2); + ClientTableDataT data2; + gcs_client_1->client_table().GetClient(client_id_2, data2); RAY_LOG(DEBUG) << "Server 2 ClientID=" << ClientID::from_binary(data2.client_id); RAY_LOG(DEBUG) << "Server 2 ClientIp=" << data2.node_manager_address; RAY_LOG(DEBUG) << "Server 2 ClientPort=" << data2.node_manager_port; diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 1cd785a6b5db..54145152b728 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -304,14 +304,13 @@ void NodeManager::ClientAdded(const ClientTableDataT &client_data) { } // Establish a new NodeManager connection to this GCS client. - auto client_info = gcs_client_->client_table().GetClient(client_id); RAY_LOG(DEBUG) << "[ClientAdded] Trying to connect to client " << client_id << " at " - << client_info.node_manager_address << ":" - << client_info.node_manager_port; + << client_data.node_manager_address << ":" + << client_data.node_manager_port; boost::asio::ip::tcp::socket socket(io_service_); auto status = - TcpConnect(socket, client_info.node_manager_address, client_info.node_manager_port); + TcpConnect(socket, client_data.node_manager_address, client_data.node_manager_port); // A disconnected client has 2 entries in the client table (one for being // inserted and one for being removed). When a new raylet starts, ClientAdded // will be called with the disconnected client's first entry, which will cause @@ -1556,8 +1555,6 @@ void NodeManager::ForwardTask(const Task &task, const ClientID &node_id, RAY_LOG(DEBUG) << "Forwarding task " << task_id << " to " << node_id << " spillback=" << lineage_cache_entry_task.GetTaskExecutionSpec().NumForwards(); - auto client_info = gcs_client_->client_table().GetClient(node_id); - // Lookup remote server connection for this node_id and use it to send the request. auto it = remote_server_connections_.find(node_id); if (it == remote_server_connections_.end()) { diff --git a/src/ray/raylet/object_manager_integration_test.cc b/src/ray/raylet/object_manager_integration_test.cc index 96ecc41de988..83c7a9f8f8be 100644 --- a/src/ray/raylet/object_manager_integration_test.cc +++ b/src/ray/raylet/object_manager_integration_test.cc @@ -204,12 +204,14 @@ class TestObjectManagerIntegration : public TestObjectManagerBase { RAY_LOG(INFO) << "\n" << "All connected clients:" << "\n"; - const ClientTableDataT &data = gcs_client_2->client_table().GetClient(client_id_1); + ClientTableDataT data; + gcs_client_2->client_table().GetClient(client_id_1, data); RAY_LOG(INFO) << (ClientID::from_binary(data.client_id) == ClientID::nil()); RAY_LOG(INFO) << "ClientID=" << ClientID::from_binary(data.client_id); RAY_LOG(INFO) << "ClientIp=" << data.node_manager_address; RAY_LOG(INFO) << "ClientPort=" << data.node_manager_port; - const ClientTableDataT &data2 = gcs_client_1->client_table().GetClient(client_id_2); + ClientTableDataT data2; + gcs_client_1->client_table().GetClient(client_id_2, data2); RAY_LOG(INFO) << "ClientID=" << ClientID::from_binary(data2.client_id); RAY_LOG(INFO) << "ClientIp=" << data2.node_manager_address; RAY_LOG(INFO) << "ClientPort=" << data2.node_manager_port; From fae7faeebc1631e7a5657cde64d6e6f78cf3cf3d Mon Sep 17 00:00:00 2001 From: Stephanie Date: Tue, 6 Nov 2018 11:42:49 -0800 Subject: [PATCH 07/10] Use to_flatbuf --- src/ray/object_manager/object_manager.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/object_manager/object_manager.cc b/src/ray/object_manager/object_manager.cc index ec4be43d5e50..1bbdf3573700 100644 --- a/src/ray/object_manager/object_manager.cc +++ b/src/ray/object_manager/object_manager.cc @@ -643,7 +643,7 @@ std::shared_ptr ObjectManager::CreateSenderConnection( flatbuffers::FlatBufferBuilder fbb; bool is_transfer = (type == ConnectionPool::ConnectionType::TRANSFER); auto message = object_manager_protocol::CreateConnectClientMessage( - fbb, fbb.CreateString(client_id_.binary()), is_transfer); + fbb, to_flatbuf(fbb, client_id_), is_transfer); fbb.Finish(message); // Send synchronously. // TODO(swang): Make this a WriteMessageAsync. From b1c459c8376645b08d962f87abab3aab0f8ea2f1 Mon Sep 17 00:00:00 2001 From: Stephanie Date: Tue, 6 Nov 2018 13:20:06 -0800 Subject: [PATCH 08/10] fix osx build --- src/ray/object_manager/object_directory.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/object_manager/object_directory.cc b/src/ray/object_manager/object_directory.cc index 833294f0ab05..aadf923ef699 100644 --- a/src/ray/object_manager/object_directory.cc +++ b/src/ray/object_manager/object_directory.cc @@ -153,7 +153,7 @@ ray::Status ObjectDirectory::SubscribeObjectLocations(const UniqueID &callback_i // have been evicted from all nodes. std::vector client_id_vec(listener_state.current_object_locations.begin(), listener_state.current_object_locations.end()); - io_service_.post([this, callback, client_id_vec, object_id]() { + io_service_.post([callback, client_id_vec, object_id]() { callback(client_id_vec, object_id); }); return status; From d7733da5e57788d756c1b5331caf2d164acd07c6 Mon Sep 17 00:00:00 2001 From: Stephanie Date: Tue, 6 Nov 2018 15:07:44 -0800 Subject: [PATCH 09/10] lint --- src/ray/object_manager/object_directory.cc | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/ray/object_manager/object_directory.cc b/src/ray/object_manager/object_directory.cc index aadf923ef699..64f6033c58fd 100644 --- a/src/ray/object_manager/object_directory.cc +++ b/src/ray/object_manager/object_directory.cc @@ -153,9 +153,8 @@ ray::Status ObjectDirectory::SubscribeObjectLocations(const UniqueID &callback_i // have been evicted from all nodes. std::vector client_id_vec(listener_state.current_object_locations.begin(), listener_state.current_object_locations.end()); - io_service_.post([callback, client_id_vec, object_id]() { - callback(client_id_vec, object_id); - }); + io_service_.post( + [callback, client_id_vec, object_id]() { callback(client_id_vec, object_id); }); return status; } From 463ac57c0e37b1183638d7c75e0d8593646dd4c6 Mon Sep 17 00:00:00 2001 From: Stephanie Date: Tue, 6 Nov 2018 16:41:55 -0800 Subject: [PATCH 10/10] Clean up object manager Wait code --- src/ray/object_manager/object_manager.cc | 28 ++++++++++-------------- 1 file changed, 11 insertions(+), 17 deletions(-) diff --git a/src/ray/object_manager/object_manager.cc b/src/ray/object_manager/object_manager.cc index 1bbdf3573700..74527058746d 100644 --- a/src/ray/object_manager/object_manager.cc +++ b/src/ray/object_manager/object_manager.cc @@ -536,23 +536,13 @@ void ObjectManager::SubscribeRemainingWaitObjects(const UniqueID &wait_id) { wait_state.timeout_ms == 0) { // Requirements already satisfied. WaitComplete(wait_id); - } else { - // Wait may complete during the execution of any one of the following calls to - // SubscribeObjectLocations, so copy the object ids that need to be iterated over. - // Order matters for test purposes. - std::vector ordered_remaining_object_ids; - for (const auto &object_id : wait_state.object_id_order) { - if (wait_state.remaining.count(object_id) > 0) { - ordered_remaining_object_ids.push_back(object_id); - } - } - for (const auto &object_id : ordered_remaining_object_ids) { - if (active_wait_requests_.find(wait_id) == active_wait_requests_.end()) { - // This is possible if an object's location is obtained immediately, - // within the current callstack. In this case, WaitComplete has been - // invoked already, so we're done. - return; - } + return; + } + + // There are objects remaining whose locations we don't know. Request their + // locations from the object directory. + for (const auto &object_id : wait_state.object_id_order) { + if (wait_state.remaining.count(object_id) > 0) { wait_state.requested_objects.insert(object_id); // Subscribe to object notifications. RAY_CHECK_OK(object_directory_->SubscribeObjectLocations( @@ -575,6 +565,10 @@ void ObjectManager::SubscribeRemainingWaitObjects(const UniqueID &wait_id) { } })); } + + // If a timeout was provided, then set a timer. If we don't find locations + // for enough objects by the time the timer expires, then we will return + // from the Wait. if (wait_state.timeout_ms != -1) { auto timeout = boost::posix_time::milliseconds(wait_state.timeout_ms); wait_state.timeout_timer->expires_from_now(timeout);