-
Notifications
You must be signed in to change notification settings - Fork 7.1k
Inline objects #3756
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Inline objects #3756
Changes from 15 commits
71da1ed
e43a221
94cea3a
bd71d14
2c058f5
ded447e
3e6f9b2
4855736
71bdf21
52e2611
9931dd1
88ed5e2
4b6dd43
0869d26
0fa9835
3c52065
2633db2
dd02029
42cb64c
477f731
0052225
d8df8d8
0174f63
f5cf045
f8345d3
efaa8d6
461934e
ffd5663
71ee334
0e26655
dd2be9a
270b489
e95b510
f14d1f4
feb1e68
ffcd402
90721a6
f32a7b0
d284fa8
a374ce6
40b5d27
b663753
c29ab7c
08f705a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,14 +8,21 @@ ObjectDirectory::ObjectDirectory(boost::asio::io_service &io_service, | |
|
|
||
| namespace { | ||
|
|
||
| /// Process a suffix of the object table log and store the result in | ||
| /// client_ids. This assumes that client_ids already contains the result of the | ||
| /// object table log up to but not including this suffix. This also stores a | ||
| /// bool in has_been_created indicating whether the object has ever been | ||
| /// created before. | ||
| /// Process a suffix of the object table log. | ||
| /// If object is inlined (inline_object_flag = TRUE), its data and metadata are | ||
| /// stored with the object's entry so we read them into nline_object_data, and | ||
| /// nline_object_metadata, respectively. | ||
|
||
| /// If object is not inlined, store the result in client_ids. | ||
| /// This assumes that client_ids already contains the result of the | ||
| /// object table log up to but not including this suffix. | ||
| /// This function also stores a bool in has_been_created indicating whether the | ||
| /// object has ever been created before. | ||
| void UpdateObjectLocations(const std::vector<ObjectTableDataT> &location_history, | ||
| const ray::gcs::ClientTable &client_table, | ||
| std::unordered_set<ClientID> *client_ids, | ||
| bool *inline_object_flag, | ||
stephanie-wang marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| std::vector<uint8_t> *inline_object_data, | ||
| std::string *inline_object_metadata, | ||
| bool *has_been_created) { | ||
| // location_history contains the history of locations of the object (it is a log), | ||
| // which might look like the following: | ||
|
|
@@ -24,25 +31,41 @@ void UpdateObjectLocations(const std::vector<ObjectTableDataT> &location_history | |
| // client2.is_eviction = false | ||
| // In such a scenario, we want to indicate client2 is the only client that contains | ||
| // the object, which the following code achieves. | ||
| // | ||
| // If object is inlined each entry contains both the object's data and metadata, | ||
| // so we don't care about its location. | ||
| if (!location_history.empty()) { | ||
| // If there are entries, then the object has been created. Once this flag | ||
| // is set to true, it should never go back to false. | ||
| *has_been_created = true; | ||
| } | ||
| *inline_object_flag = false; | ||
| for (const auto &object_table_data : location_history) { | ||
| ClientID client_id = ClientID::from_binary(object_table_data.manager); | ||
| if (!object_table_data.is_eviction) { | ||
| client_ids->insert(client_id); | ||
| if (object_table_data.inline_object_flag) { | ||
stephanie-wang marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| // This is an inlined object. Read object's data from the object's GCS entry. | ||
| *inline_object_flag = object_table_data.inline_object_flag; | ||
| inline_object_data->assign(object_table_data.inline_object_data.begin(), | ||
| object_table_data.inline_object_data.end()); | ||
| inline_object_metadata->assign(object_table_data.inline_object_metadata.begin(), | ||
| object_table_data.inline_object_metadata.end()); | ||
| break; // We got the data and metadata of the object so exit the loop. | ||
stephanie-wang marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } else { | ||
| client_ids->erase(client_id); | ||
| if (!object_table_data.is_eviction) { | ||
| client_ids->insert(client_id); | ||
| } else { | ||
| client_ids->erase(client_id); | ||
| } | ||
| } | ||
| } | ||
| // Filter out the removed clients from the object locations. | ||
| for (auto it = client_ids->begin(); it != client_ids->end();) { | ||
| if (client_table.IsRemoved(*it)) { | ||
| it = client_ids->erase(it); | ||
| } else { | ||
| it++; | ||
| if (!*inline_object_flag) { | ||
| // Filter out the removed clients from the object locations. | ||
| for (auto it = client_ids->begin(); it != client_ids->end();) { | ||
| if (client_table.IsRemoved(*it)) { | ||
| it = client_ids->erase(it); | ||
| } else { | ||
| it++; | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -51,7 +74,8 @@ void UpdateObjectLocations(const std::vector<ObjectTableDataT> &location_history | |
|
|
||
| void ObjectDirectory::RegisterBackend() { | ||
| auto object_notification_callback = [this]( | ||
| gcs::AsyncGcsClient *client, const ObjectID &object_id, | ||
| gcs::AsyncGcsClient *client, | ||
| const ObjectID &object_id, | ||
| const std::vector<ObjectTableDataT> &location_history) { | ||
| // Objects are added to this map in SubscribeObjectLocations. | ||
| auto it = listeners_.find(object_id); | ||
|
|
@@ -62,6 +86,9 @@ void ObjectDirectory::RegisterBackend() { | |
| // Update entries for this object. | ||
| UpdateObjectLocations(location_history, gcs_client_->client_table(), | ||
| &it->second.current_object_locations, | ||
| &it->second.inline_object_flag, | ||
| &it->second.inline_object_data, | ||
| &it->second.inline_object_metadata, | ||
| &it->second.has_been_created); | ||
| // Copy the callbacks so that the callbacks can unsubscribe without interrupting | ||
| // looping over the callbacks. | ||
|
|
@@ -73,7 +100,11 @@ void ObjectDirectory::RegisterBackend() { | |
| for (const auto &callback_pair : callbacks) { | ||
| // It is safe to call the callback directly since this is already running | ||
| // in the subscription callback stack. | ||
| callback_pair.second(object_id, it->second.current_object_locations, | ||
| callback_pair.second(object_id, | ||
| it->second.current_object_locations, | ||
| it->second.inline_object_flag, | ||
| it->second.inline_object_data, | ||
| it->second.inline_object_metadata, | ||
| it->second.has_been_created); | ||
| } | ||
| }; | ||
|
|
@@ -84,13 +115,24 @@ void ObjectDirectory::RegisterBackend() { | |
|
|
||
| ray::Status ObjectDirectory::ReportObjectAdded( | ||
| const ObjectID &object_id, const ClientID &client_id, | ||
| const object_manager::protocol::ObjectInfoT &object_info) { | ||
| const object_manager::protocol::ObjectInfoT &object_info, | ||
| bool inline_object_flag, | ||
| const std::vector<uint8_t> &inline_object_data, | ||
| const std::string &inline_object_metadata) { | ||
| // Append the addition entry to the object table. | ||
| auto data = std::make_shared<ObjectTableDataT>(); | ||
| data->manager = client_id.binary(); | ||
| data->is_eviction = false; | ||
| data->num_evictions = object_evictions_[object_id]; | ||
| data->object_size = object_info.data_size; | ||
| data->inline_object_flag = inline_object_flag; | ||
| if (inline_object_flag) { | ||
| // Add object's data to its GCS entry. | ||
| data->inline_object_data.assign(inline_object_data.begin(), | ||
| inline_object_data.end()); | ||
| data->inline_object_metadata.assign(inline_object_metadata.begin(), | ||
| inline_object_metadata.end()); | ||
| } | ||
| ray::Status status = | ||
| gcs_client_->object_table().Append(JobID::nil(), object_id, data, nullptr); | ||
| return status; | ||
|
|
@@ -149,13 +191,20 @@ void ObjectDirectory::HandleClientRemoved(const ClientID &client_id) { | |
| // its locations with an empty log so that the location will be removed. | ||
| UpdateObjectLocations({}, gcs_client_->client_table(), | ||
| &listener.second.current_object_locations, | ||
| &listener.second.inline_object_flag, | ||
| &listener.second.inline_object_data, | ||
| &listener.second.inline_object_metadata, | ||
| &listener.second.has_been_created); | ||
| // Re-call all the subscribed callbacks for the object, since its | ||
| // locations have changed. | ||
| for (const auto &callback_pair : listener.second.callbacks) { | ||
| // It is safe to call the callback directly since this is already running | ||
| // in the subscription callback stack. | ||
| callback_pair.second(object_id, listener.second.current_object_locations, | ||
| callback_pair.second(object_id, | ||
| listener.second.current_object_locations, | ||
| listener.second.inline_object_flag, | ||
| listener.second.inline_object_data, | ||
| listener.second.inline_object_metadata, | ||
| listener.second.has_been_created); | ||
| } | ||
| } | ||
|
|
@@ -182,8 +231,16 @@ ray::Status ObjectDirectory::SubscribeObjectLocations(const UniqueID &callback_i | |
| // immediately notify the caller of the current known locations. | ||
| if (listener_state.has_been_created) { | ||
| auto &locations = listener_state.current_object_locations; | ||
| io_service_.post([callback, locations, object_id]() { | ||
| callback(object_id, locations, /*has_been_created=*/true); | ||
| auto inline_object_flag = listener_state.inline_object_flag; | ||
| const auto &inline_object_data = listener_state.inline_object_data; | ||
| const auto &inline_object_metadata = listener_state.inline_object_metadata; | ||
| io_service_.post([callback, locations, inline_object_flag, | ||
| inline_object_data, | ||
| inline_object_metadata, | ||
| object_id]() { | ||
| callback(object_id, locations, inline_object_flag, | ||
| inline_object_data, inline_object_metadata, | ||
| /*has_been_created=*/true); | ||
| }); | ||
| } | ||
| return status; | ||
|
|
@@ -216,20 +273,41 @@ ray::Status ObjectDirectory::LookupLocations(const ObjectID &object_id, | |
| const std::vector<ObjectTableDataT> &location_history) { | ||
| // Build the set of current locations based on the entries in the log. | ||
| std::unordered_set<ClientID> client_ids; | ||
| bool inline_object_flag = false; | ||
| std::vector<uint8_t> inline_object_data; | ||
| std::string inline_object_metadata; | ||
| bool has_been_created = false; | ||
| UpdateObjectLocations(location_history, gcs_client_->client_table(), | ||
| &client_ids, &has_been_created); | ||
| &client_ids, | ||
| &inline_object_flag, | ||
| &inline_object_data, | ||
| &inline_object_metadata, | ||
| &has_been_created); | ||
| // It is safe to call the callback directly since this is already running | ||
| // in the GCS client's lookup callback stack. | ||
| callback(object_id, client_ids, has_been_created); | ||
| callback(object_id, client_ids, inline_object_flag, | ||
| inline_object_data, inline_object_metadata, | ||
| has_been_created); | ||
| }); | ||
| } else { | ||
| // If we have locations cached due to a concurrent SubscribeObjectLocations | ||
| // call, call the callback immediately with the cached locations. | ||
| // If object inlined, we already have the object's data. | ||
| auto &locations = it->second.current_object_locations; | ||
| bool has_been_created = it->second.has_been_created; | ||
| io_service_.post([callback, object_id, locations, has_been_created]() { | ||
| callback(object_id, locations, has_been_created); | ||
| bool inline_object_flag = it->second.inline_object_flag; | ||
| const auto& inline_object_data = it->second.inline_object_data; | ||
| const auto& inline_object_metadata = it->second.inline_object_metadata; | ||
| io_service_.post([callback, object_id, locations, | ||
| inline_object_flag, | ||
| inline_object_data, | ||
| inline_object_metadata, | ||
| has_been_created]() { | ||
| callback(object_id, locations, | ||
| inline_object_flag, | ||
| inline_object_data, | ||
| inline_object_metadata, | ||
| has_been_created); | ||
| }); | ||
| } | ||
| return status; | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -50,6 +50,9 @@ class ObjectDirectoryInterface { | |||||
| /// Callback for object location notifications. | ||||||
| using OnLocationsFound = std::function<void(const ray::ObjectID &object_id, | ||||||
| const std::unordered_set<ray::ClientID> &, | ||||||
| bool, | ||||||
| const std::vector<uint8_t> &, | ||||||
| const std::string &, | ||||||
| bool has_been_created)>; | ||||||
|
|
||||||
| /// Lookup object locations. Callback may be invoked with empty list of client ids. | ||||||
|
|
@@ -99,10 +102,16 @@ class ObjectDirectoryInterface { | |||||
| /// \param object_id The object id that was put into the store. | ||||||
| /// \param client_id The client id corresponding to this node. | ||||||
| /// \param object_info Additional information about the object. | ||||||
| /// \param inline_object_flag Flag specifying whether object is inlined. | ||||||
| /// \param inline_object_data Object data. Only for inlined objects. | ||||||
| /// \param inline_object_metadata Object metadata. Only for inlined objects.. | ||||||
|
||||||
| /// \param inline_object_metadata Object metadata. Only for inlined objects.. | |
| /// \param inline_object_metadata Object metadata. Only for inlined objects. |
stephanie-wang marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a description of what inlined means. Also specify what the contents of current_object_locations will be if this is true/false.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.