Skip to content
3 changes: 2 additions & 1 deletion src/ray/gcs/client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,8 @@ void ClientTableNotification(gcs::AsyncGcsClient *client, const ClientID &client
ASSERT_EQ(ClientID::from_binary(data.client_id), added_id);
ASSERT_EQ(data.is_insertion, is_insertion);

auto cached_client = client->client_table().GetClient(added_id);
ClientTableDataT cached_client;
client->client_table().GetClient(added_id, cached_client);
ASSERT_EQ(ClientID::from_binary(cached_client.client_id), added_id);
ASSERT_EQ(cached_client.is_insertion, is_insertion);
}
Expand Down
9 changes: 4 additions & 5 deletions src/ray/gcs/tables.cc
Original file line number Diff line number Diff line change
Expand Up @@ -421,15 +421,14 @@ ray::Status ClientTable::MarkDisconnected(const ClientID &dead_client_id) {
return Append(JobID::nil(), client_log_key_, data, nullptr);
}

const ClientTableDataT &ClientTable::GetClient(const ClientID &client_id) const {
void ClientTable::GetClient(const ClientID &client_id,
ClientTableDataT &client_info) const {
RAY_CHECK(!client_id.is_nil());
auto entry = client_cache_.find(client_id);
if (entry != client_cache_.end()) {
return entry->second;
client_info = entry->second;
} else {
// If the requested client was not found, return a reference to the nil
// client entry.
return client_cache_.at(ClientID::nil());
client_info.client_id = ClientID::nil().binary();
}
}

Expand Down
14 changes: 5 additions & 9 deletions src/ray/gcs/tables.h
Original file line number Diff line number Diff line change
Expand Up @@ -517,12 +517,6 @@ class ClientTable : private Log<UniqueID, ClientTableData> {

// Set the local client's ID.
local_client_.client_id = client_id.binary();

// Add a nil client to the cache so that we can serve requests for clients
// that we have not heard about.
ClientTableDataT nil_client;
nil_client.client_id = ClientID::nil().binary();
client_cache_[ClientID::nil()] = nil_client;
};

/// Connect as a client to the GCS. This registers us in the client table
Expand Down Expand Up @@ -560,9 +554,11 @@ class ClientTable : private Log<UniqueID, ClientTableData> {
/// information for clients that we've heard a notification for.
///
/// \param client The client to get information about.
/// \return A reference to the requested client. If the client is not in the
/// cache, then an entry with a nil ClientID will be returned.
const ClientTableDataT &GetClient(const ClientID &client) const;
/// \param A reference to the client information. If we have information
/// about the client in the cache, then the reference will be modified to
/// contain that information. Else, the reference will be updated to contain
/// a nil client ID.
void GetClient(const ClientID &client, ClientTableDataT &client_info) const;

/// Get the local client's ID.
///
Expand Down
56 changes: 28 additions & 28 deletions src/ray/object_manager/object_directory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@

namespace ray {

ObjectDirectory::ObjectDirectory(std::shared_ptr<gcs::AsyncGcsClient> &gcs_client)
: gcs_client_(gcs_client) {}
ObjectDirectory::ObjectDirectory(boost::asio::io_service &io_service,
std::shared_ptr<gcs::AsyncGcsClient> &gcs_client)
: io_service_(io_service), gcs_client_(gcs_client) {}

namespace {

Expand Down Expand Up @@ -61,6 +62,8 @@ void ObjectDirectory::RegisterBackend() {
// empty, since this may indicate that the objects have been evicted from
// all nodes.
for (const auto &callback_pair : callbacks) {
// It is safe to call the callback directly since this is already running
// in the subscription callback stack.
callback_pair.second(client_id_vec, object_id);
}
};
Expand Down Expand Up @@ -102,38 +105,32 @@ ray::Status ObjectDirectory::ReportObjectRemoved(const ObjectID &object_id,
return status;
};

ray::Status ObjectDirectory::GetInformation(const ClientID &client_id,
const InfoSuccessCallback &success_callback,
const InfoFailureCallback &fail_callback) {
const ClientTableDataT &data = gcs_client_->client_table().GetClient(client_id);
ClientID result_client_id = ClientID::from_binary(data.client_id);
if (result_client_id == ClientID::nil() || !data.is_insertion) {
fail_callback();
} else {
const auto &info =
RemoteConnectionInfo(client_id, data.node_manager_address,
static_cast<uint16_t>(data.object_manager_port));
success_callback(info);
void ObjectDirectory::LookupRemoteConnectionInfo(
RemoteConnectionInfo &connection_info) const {
ClientTableDataT client_data;
gcs_client_->client_table().GetClient(connection_info.client_id, client_data);
ClientID result_client_id = ClientID::from_binary(client_data.client_id);
if (!result_client_id.is_nil()) {
RAY_CHECK(result_client_id == connection_info.client_id);
if (client_data.is_insertion) {
connection_info.ip = client_data.node_manager_address;
connection_info.port = static_cast<uint16_t>(client_data.object_manager_port);
}
}
return ray::Status::OK();
}

void ObjectDirectory::RunFunctionForEachClient(
const InfoSuccessCallback &client_function) {
std::vector<RemoteConnectionInfo> ObjectDirectory::LookupAllRemoteConnections() const {
std::vector<RemoteConnectionInfo> remote_connections;
const auto &clients = gcs_client_->client_table().GetAllClients();
for (const auto &client_pair : clients) {
const ClientTableDataT &data = client_pair.second;
if (client_pair.first == ClientID::nil() ||
client_pair.first == gcs_client_->client_table().GetLocalClientId() ||
!data.is_insertion) {
continue;
} else {
const auto &info =
RemoteConnectionInfo(client_pair.first, data.node_manager_address,
static_cast<uint16_t>(data.object_manager_port));
client_function(info);
RemoteConnectionInfo info(client_pair.first);
LookupRemoteConnectionInfo(info);
if (info.Connected() &&
info.client_id != gcs_client_->client_table().GetLocalClientId()) {
remote_connections.push_back(info);
}
}
return remote_connections;
}

ray::Status ObjectDirectory::SubscribeObjectLocations(const UniqueID &callback_id,
Expand All @@ -156,7 +153,8 @@ ray::Status ObjectDirectory::SubscribeObjectLocations(const UniqueID &callback_i
// have been evicted from all nodes.
std::vector<ClientID> client_id_vec(listener_state.current_object_locations.begin(),
listener_state.current_object_locations.end());
callback(client_id_vec, object_id);
io_service_.post(
[callback, client_id_vec, object_id]() { callback(client_id_vec, object_id); });
return status;
}

Expand Down Expand Up @@ -187,6 +185,8 @@ ray::Status ObjectDirectory::LookupLocations(const ObjectID &object_id,
std::unordered_set<ClientID> client_ids;
std::vector<ClientID> locations_vector = UpdateObjectLocations(
client_ids, location_history, gcs_client_->client_table());
// It is safe to call the callback directly since this is already running
// in the GCS client's lookup callback stack.
callback(locations_vector, object_id);
});
return status;
Expand Down
66 changes: 33 additions & 33 deletions src/ray/object_manager/object_directory.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,35 +16,36 @@ namespace ray {

/// Connection information for remote object managers.
struct RemoteConnectionInfo {
RemoteConnectionInfo() = default;
RemoteConnectionInfo(const ClientID &id, const std::string &ip_address,
uint16_t port_num)
: client_id(id), ip(ip_address), port(port_num) {}
RemoteConnectionInfo(const ClientID &id) : client_id(id) {}

// Returns whether there is enough information to connect to the remote
// object manager.
bool Connected() const { return !ip.empty(); }

ClientID client_id;
std::string ip;
uint16_t port;
};

class ObjectDirectoryInterface {
public:
ObjectDirectoryInterface() = default;
virtual ~ObjectDirectoryInterface() = default;

/// Callbacks for GetInformation.
using InfoSuccessCallback = std::function<void(const ray::RemoteConnectionInfo &info)>;
using InfoFailureCallback = std::function<void()>;
virtual ~ObjectDirectoryInterface() {}

virtual void RegisterBackend() = 0;

/// This is used to establish object manager client connections.
/// Lookup how to connect to a remote object manager.
///
/// \param connection_info The connection information to fill out. This
/// should be pre-populated with the requested client ID. If the directory
/// has information about the requested client, then the rest of the fields
/// in this struct will be populated accordingly.
virtual void LookupRemoteConnectionInfo(
RemoteConnectionInfo &connection_info) const = 0;

/// Get information for all connected remote object managers.
///
/// \param client_id The client for which information is required.
/// \param success_cb A callback which handles the success of this method.
/// \param fail_cb A callback which handles the failure of this method.
/// \return Status of whether this asynchronous request succeeded.
virtual ray::Status GetInformation(const ClientID &client_id,
const InfoSuccessCallback &success_cb,
const InfoFailureCallback &fail_cb) = 0;
/// \return A vector of information for all connected remote object managers.
virtual std::vector<RemoteConnectionInfo> LookupAllRemoteConnections() const = 0;

/// Callback for object location notifications.
using OnLocationsFound = std::function<void(const std::vector<ray::ClientID> &,
Expand Down Expand Up @@ -102,28 +103,27 @@ class ObjectDirectoryInterface {
/// \return Status of whether this method succeeded.
virtual ray::Status ReportObjectRemoved(const ObjectID &object_id,
const ClientID &client_id) = 0;

/// Go through all the client information.
///
/// \param success_cb A callback which handles the success of this method.
/// This function will be called multiple times.
/// \return Void.
virtual void RunFunctionForEachClient(const InfoSuccessCallback &client_function) = 0;
};

/// Ray ObjectDirectory declaration.
class ObjectDirectory : public ObjectDirectoryInterface {
public:
ObjectDirectory() = default;
~ObjectDirectory() override = default;
/// Create an object directory.
///
/// \param io_service The event loop to dispatch callbacks to. This should
/// usually be the same event loop that the given gcs_client runs on.
/// \param gcs_client A Ray GCS client to request object and client
/// information from.
ObjectDirectory(boost::asio::io_service &io_service,
std::shared_ptr<gcs::AsyncGcsClient> &gcs_client);

virtual ~ObjectDirectory() {}

void RegisterBackend() override;

ray::Status GetInformation(const ClientID &client_id,
const InfoSuccessCallback &success_callback,
const InfoFailureCallback &fail_callback) override;
void LookupRemoteConnectionInfo(RemoteConnectionInfo &connection_info) const override;

void RunFunctionForEachClient(const InfoSuccessCallback &client_function) override;
std::vector<RemoteConnectionInfo> LookupAllRemoteConnections() const override;

ray::Status LookupLocations(const ObjectID &object_id,
const OnLocationsFound &callback) override;
Expand All @@ -139,8 +139,6 @@ class ObjectDirectory : public ObjectDirectoryInterface {
const object_manager::protocol::ObjectInfoT &object_info) override;
ray::Status ReportObjectRemoved(const ObjectID &object_id,
const ClientID &client_id) override;
/// Ray only (not part of the OD interface).
ObjectDirectory(std::shared_ptr<gcs::AsyncGcsClient> &gcs_client);

/// ObjectDirectory should not be copied.
RAY_DISALLOW_COPY_AND_ASSIGN(ObjectDirectory);
Expand All @@ -154,6 +152,8 @@ class ObjectDirectory : public ObjectDirectoryInterface {
std::unordered_set<ClientID> current_object_locations;
};

/// Reference to the event loop.
boost::asio::io_service &io_service_;
/// Reference to the gcs client.
std::shared_ptr<gcs::AsyncGcsClient> gcs_client_;
/// Info about subscribers to object locations.
Expand Down
Loading