Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 53 additions & 56 deletions src/ray/object_manager/object_directory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,41 @@

namespace ray {

ObjectDirectory::ObjectDirectory(std::shared_ptr<gcs::AsyncGcsClient> gcs_client) {
ObjectDirectory::ObjectDirectory(std::shared_ptr<gcs::AsyncGcsClient> &gcs_client) {
gcs_client_ = gcs_client;
};
}

void ObjectDirectory::RegisterBackend() {
auto object_notification_callback = [this](gcs::AsyncGcsClient *client,
const ObjectID &object_id,
const std::vector<ObjectTableDataT> &data) {
// Objects are added to this map in SubscribeObjectLocations.
auto entry = listeners_.find(object_id);
// Do nothing for objects we are not listening for.
if (entry == listeners_.end()) {
return;
}
// Update entries for this object.
auto client_id_set = entry->second.client_ids;
for (auto &object_table_data : data) {
ClientID client_id = ClientID::from_binary(object_table_data.manager);
if (!object_table_data.is_eviction) {
client_id_set.insert(client_id);
} else {
client_id_set.erase(client_id);
}
}
if (!client_id_set.empty()) {
// Only call the callback if we have object locations.
std::vector<ClientID> client_id_vec(client_id_set.begin(), client_id_set.end());
auto callback = entry->second.locations_found_callback;
callback(client_id_vec, object_id);
}
};
RAY_CHECK_OK(gcs_client_->object_table().Subscribe(
UniqueID::nil(), gcs_client_->client_table().GetLocalClientId(),
object_notification_callback, nullptr));
}

ray::Status ObjectDirectory::ReportObjectAdded(const ObjectID &object_id,
const ClientID &client_id,
Expand All @@ -19,7 +51,7 @@ ray::Status ObjectDirectory::ReportObjectAdded(const ObjectID &object_id,
ray::Status status =
gcs_client_->object_table().Append(job_id, object_id, data, nullptr);
return status;
};
}

ray::Status ObjectDirectory::ReportObjectRemoved(const ObjectID &object_id,
const ClientID &client_id) {
Expand Down Expand Up @@ -52,63 +84,28 @@ ray::Status ObjectDirectory::GetInformation(const ClientID &client_id,
success_callback(info);
}
return ray::Status::OK();
};
}

ray::Status ObjectDirectory::GetLocations(const ObjectID &object_id,
const OnLocationsSuccess &success_callback,
const OnLocationsFailure &fail_callback) {
ray::Status status_code = ray::Status::OK();
if (existing_requests_.count(object_id) == 0) {
existing_requests_[object_id] = ODCallbacks({success_callback, fail_callback});
status_code = ExecuteGetLocations(object_id);
} else {
// Do nothing. A request is in progress.
ray::Status ObjectDirectory::SubscribeObjectLocations(const ObjectID &object_id,
const OnLocationsFound &callback) {
if (listeners_.find(object_id) != listeners_.end()) {
RAY_LOG(ERROR) << "Duplicate calls to SubscribeObjectLocations for " << object_id;
return ray::Status::OK();
}
return status_code;
};

ray::Status ObjectDirectory::ExecuteGetLocations(const ObjectID &object_id) {
JobID job_id = JobID::nil();
// Note: Lookup must be synchronous for thread-safe access.
// For now, this is only accessed by the main thread.
ray::Status status = gcs_client_->object_table().Lookup(
job_id, object_id, [this](gcs::AsyncGcsClient *client, const ObjectID &object_id,
const std::vector<ObjectTableDataT> &data) {
GetLocationsComplete(object_id, data);
});
return status;
};
listeners_.emplace(object_id, LocationListenerState(callback));
return gcs_client_->object_table().RequestNotifications(
JobID::nil(), object_id, gcs_client_->client_table().GetLocalClientId());
}

void ObjectDirectory::GetLocationsComplete(
const ObjectID &object_id, const std::vector<ObjectTableDataT> &location_entries) {
auto request = existing_requests_.find(object_id);
// Do not invoke a callback if the request was cancelled.
if (request == existing_requests_.end()) {
return;
}
// Build the set of current locations based on the entries in the log.
std::unordered_set<ClientID> locations;
for (auto entry : location_entries) {
ClientID client_id = ClientID::from_binary(entry.manager);
if (!entry.is_eviction) {
locations.insert(client_id);
} else {
locations.erase(client_id);
}
ray::Status ObjectDirectory::UnsubscribeObjectLocations(const ObjectID &object_id) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be careful about having too many signatures return ray::Status. This one always seems to return OK. Can we either change the signature to void, or have it return the status from the CancelNotifications call?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This now returns status of CancelNotifications.

auto entry = listeners_.find(object_id);
if (entry == listeners_.end()) {
return ray::Status::OK();
}
// Invoke the callback.
std::vector<ClientID> locations_vector(locations.begin(), locations.end());
if (locations_vector.empty()) {
request->second.fail_cb(object_id);
} else {
request->second.success_cb(locations_vector, object_id);
}
existing_requests_.erase(request);
ray::Status status = gcs_client_->object_table().CancelNotifications(
JobID::nil(), object_id, gcs_client_->client_table().GetLocalClientId());
listeners_.erase(entry);
return status;
}

ray::Status ObjectDirectory::Cancel(const ObjectID &object_id) {
existing_requests_.erase(object_id);
return ray::Status::OK();
};

} // namespace ray
67 changes: 33 additions & 34 deletions src/ray/object_manager/object_directory.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ class ObjectDirectoryInterface {
using InfoSuccessCallback = std::function<void(const ray::RemoteConnectionInfo &info)>;
using InfoFailureCallback = std::function<void(ray::Status status)>;

virtual void RegisterBackend() = 0;

/// This is used to establish object manager client connections.
///
/// \param client_id The client for which information is required.
Expand All @@ -43,27 +45,25 @@ class ObjectDirectoryInterface {
const InfoSuccessCallback &success_cb,
const InfoFailureCallback &fail_cb) = 0;

// Callbacks for GetLocations.
using OnLocationsSuccess = std::function<void(const std::vector<ray::ClientID> &v,
const ray::ObjectID &object_id)>;
using OnLocationsFailure = std::function<void(const ray::ObjectID &object_id)>;
/// Callback for object location notifications.
using OnLocationsFound = std::function<void(const std::vector<ray::ClientID> &v,
const ray::ObjectID &object_id)>;

/// Asynchronously obtain the locations of an object by ObjectID.
/// This is used to handle object pulls.
/// Subscribe to be notified of locations (ClientID) of the given object.
/// The callback will be invoked whenever locations are obtained for the
/// specified object.
///
/// \param object_id The required object's ObjectID.
/// \param success_cb Invoked upon success with list of remote connection info.
/// \param fail_cb Invoked upon failure with ray status and object id.
/// \return Status of whether this asynchronous request succeeded.
virtual ray::Status GetLocations(const ObjectID &object_id,
const OnLocationsSuccess &success_cb,
const OnLocationsFailure &fail_cb) = 0;
/// \param success_cb Invoked with non-empty list of client ids and object_id.
/// \return Status of whether subscription succeeded.
virtual ray::Status SubscribeObjectLocations(const ObjectID &object_id,
const OnLocationsFound &callback) = 0;

/// Cancels the invocation of the callback associated with callback_id.
/// Unsubscribe to object location notifications.
///
/// \param object_id The object id invoked with GetLocations.
/// \return Status of whether this method succeeded.
virtual ray::Status Cancel(const ObjectID &object_id) = 0;
/// \param object_id The object id invoked with Subscribe.
/// \return
virtual ray::Status UnsubscribeObjectLocations(const ObjectID &object_id) = 0;

/// Report objects added to this node's store to the object directory.
///
Expand All @@ -90,40 +90,39 @@ class ObjectDirectory : public ObjectDirectoryInterface {
ObjectDirectory() = default;
~ObjectDirectory() override = default;

void RegisterBackend() override;

ray::Status GetInformation(const ClientID &client_id,
const InfoSuccessCallback &success_callback,
const InfoFailureCallback &fail_callback) override;
ray::Status GetLocations(const ObjectID &object_id,
const OnLocationsSuccess &success_callback,
const OnLocationsFailure &fail_callback) override;
ray::Status Cancel(const ObjectID &object_id) override;

ray::Status SubscribeObjectLocations(const ObjectID &object_id,
const OnLocationsFound &callback) override;
ray::Status UnsubscribeObjectLocations(const ObjectID &object_id) override;

ray::Status ReportObjectAdded(const ObjectID &object_id, const ClientID &client_id,
const ObjectInfoT &object_info) override;
ray::Status ReportObjectRemoved(const ObjectID &object_id,
const ClientID &client_id) override;
/// Ray only (not part of the OD interface).
ObjectDirectory(std::shared_ptr<gcs::AsyncGcsClient> gcs_client);
ObjectDirectory(std::shared_ptr<gcs::AsyncGcsClient> &gcs_client);

/// ObjectDirectory should not be copied.
RAY_DISALLOW_COPY_AND_ASSIGN(ObjectDirectory);

private:
/// Callbacks associated with a call to GetLocations.
// TODO(hme): I think these can be removed.
struct ODCallbacks {
OnLocationsSuccess success_cb;
OnLocationsFailure fail_cb;
struct LocationListenerState {
LocationListenerState(const OnLocationsFound &locations_found_callback)
: locations_found_callback(locations_found_callback) {}
/// The callback to invoke when object locations are found.
OnLocationsFound locations_found_callback;
/// The current set of known locations of this object.
std::unordered_set<ClientID> client_ids;
};

/// GetLocations registers a request for locations.
/// This function actually carries out that request.
ray::Status ExecuteGetLocations(const ObjectID &object_id);
/// Invoked when call to ExecuteGetLocations completes.
void GetLocationsComplete(const ObjectID &object_id,
const std::vector<ObjectTableDataT> &location_entries);

/// Maintain map of in-flight GetLocation requests.
std::unordered_map<ObjectID, ODCallbacks> existing_requests_;
/// Info about subscribers to object locations.
std::unordered_map<ObjectID, LocationListenerState> listeners_;
/// Reference to the gcs client.
std::shared_ptr<gcs::AsyncGcsClient> gcs_client_;
/// Map from object ID to the number of times it's been evicted on this
Expand Down
36 changes: 10 additions & 26 deletions src/ray/object_manager/object_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ ObjectManager::ObjectManager(asio::io_service &main_service,

ObjectManager::~ObjectManager() { StopIOService(); }

void ObjectManager::RegisterGcs() { object_directory_->RegisterBackend(); }

void ObjectManager::StartIOService() {
for (int i = 0; i < config_.max_sends; ++i) {
send_threads_.emplace_back(std::thread(&ObjectManager::RunSendService, this));
Expand Down Expand Up @@ -111,26 +113,12 @@ ray::Status ObjectManager::Pull(const ObjectID &object_id) {
RAY_LOG(ERROR) << object_id << " attempted to pull an object that's already local.";
return ray::Status::OK();
}
return PullGetLocations(object_id);
}

void ObjectManager::SchedulePull(const ObjectID &object_id, int wait_ms) {
pull_requests_[object_id] = std::make_shared<boost::asio::deadline_timer>(
*main_service_, boost::posix_time::milliseconds(wait_ms));
pull_requests_[object_id]->async_wait(
[this, object_id](const boost::system::error_code &error_code) {
pull_requests_.erase(object_id);
RAY_CHECK_OK(PullGetLocations(object_id));
});
}

ray::Status ObjectManager::PullGetLocations(const ObjectID &object_id) {
ray::Status status_code = object_directory_->GetLocations(
ray::Status status_code = object_directory_->SubscribeObjectLocations(
object_id,
[this](const std::vector<ClientID> &client_ids, const ObjectID &object_id) {
return GetLocationsSuccess(client_ids, object_id);
},
[this](const ObjectID &object_id) { return GetLocationsFailed(object_id); });
RAY_CHECK_OK(object_directory_->UnsubscribeObjectLocations(object_id));
GetLocationsSuccess(client_ids, object_id);
});
return status_code;
}

Expand All @@ -145,10 +133,6 @@ void ObjectManager::GetLocationsSuccess(const std::vector<ray::ClientID> &client
}
}

void ObjectManager::GetLocationsFailed(const ObjectID &object_id) {
SchedulePull(object_id, config_.pull_timeout_ms);
}

ray::Status ObjectManager::Pull(const ObjectID &object_id, const ClientID &client_id) {
// Check if object is already local.
if (local_objects_.count(object_id) != 0) {
Expand Down Expand Up @@ -188,7 +172,8 @@ ray::Status ObjectManager::PullEstablishConnection(const ObjectID &object_id,
RAY_CHECK_OK(pull_send_status);
},
[this, object_id](const Status &status) {
SchedulePull(object_id, config_.pull_timeout_ms);
RAY_LOG(ERROR) << "Failed to establish connection with remote object manager.";
RAY_CHECK_OK(status);
});
} else {
status = PullSendRequest(object_id, conn);
Expand Down Expand Up @@ -311,9 +296,8 @@ ray::Status ObjectManager::SendObjectData(const ObjectID &object_id,
}

ray::Status ObjectManager::Cancel(const ObjectID &object_id) {
// TODO(hme): Account for pull timers.
ray::Status status = object_directory_->Cancel(object_id);
return ray::Status::OK();
ray::Status status = object_directory_->UnsubscribeObjectLocations(object_id);
return status;
}

ray::Status ObjectManager::Wait(const std::vector<ObjectID> &object_ids,
Expand Down
22 changes: 3 additions & 19 deletions src/ray/object_manager/object_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ class ObjectManager : public ObjectManagerInterface {

~ObjectManager();

/// Register GCS-related functionality.
void RegisterGcs();

/// Subscribe to notifications of objects added to local store.
/// Upon subscribing, the callback will be invoked for all objects that
///
Expand Down Expand Up @@ -185,10 +188,6 @@ class ObjectManager : public ObjectManagerInterface {
/// Connection pool for reusing outgoing connections to remote object managers.
ConnectionPool connection_pool_;

/// Timeout for failed pull requests.
std::unordered_map<ObjectID, std::shared_ptr<boost::asio::deadline_timer>>
pull_requests_;

/// Cache of locally available objects.
std::unordered_map<ObjectID, ObjectInfoT> local_objects_;

Expand All @@ -204,17 +203,6 @@ class ObjectManager : public ObjectManagerInterface {
/// Register object remove with directory.
void NotifyDirectoryObjectDeleted(const ObjectID &object_id);

/// Wait wait_ms milliseconds before triggering a pull request for object_id.
/// This is invoked when a pull fails. Only point of failure currently considered
/// is GetLocationsFailed.
void SchedulePull(const ObjectID &object_id, int wait_ms);

/// Part of an asynchronous sequence of Pull methods.
/// Gets the location of an object before invoking PullEstablishConnection.
/// Guaranteed to execute on main_service_ thread.
/// Executes on main_service_ thread.
ray::Status PullGetLocations(const ObjectID &object_id);

/// Part of an asynchronous sequence of Pull methods.
/// Uses an existing connection or creates a connection to ClientID.
/// Executes on main_service_ thread.
Expand All @@ -226,10 +214,6 @@ class ObjectManager : public ObjectManagerInterface {
void GetLocationsSuccess(const std::vector<ray::ClientID> &client_ids,
const ray::ObjectID &object_id);

/// Private callback implementation for failure on get location. Called from
/// ObjectDirectory.
void GetLocationsFailed(const ObjectID &object_id);

/// Synchronously send a pull request via remote object manager connection.
/// Executes on main_service_ thread.
ray::Status PullSendRequest(const ObjectID &object_id,
Expand Down
4 changes: 3 additions & 1 deletion src/ray/object_manager/test/object_manager_stress_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ class MockServer {
client_info.node_manager_address = ip;
client_info.node_manager_port = object_manager_port;
client_info.object_manager_port = object_manager_port;
return gcs_client_->client_table().Connect(client_info);
ray::Status status = gcs_client_->client_table().Connect(client_info);
object_manager_.RegisterGcs();
return status;
}

void DoAcceptObjectManager() {
Expand Down
4 changes: 3 additions & 1 deletion src/ray/object_manager/test/object_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ class MockServer {
client_info.node_manager_address = ip;
client_info.node_manager_port = object_manager_port;
client_info.object_manager_port = object_manager_port;
return gcs_client_->client_table().Connect(client_info);
ray::Status status = gcs_client_->client_table().Connect(client_info);
object_manager_.RegisterGcs();
return status;
}

void DoAcceptObjectManager() {
Expand Down
2 changes: 2 additions & 0 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ NodeManager::NodeManager(boost::asio::io_service &io_service,
}

ray::Status NodeManager::RegisterGcs() {
object_manager_.RegisterGcs();

// Subscribe to task entry commits in the GCS. These notifications are
// forwarded to the lineage cache, which requests notifications about tasks
// that were executed remotely.
Expand Down