-
Notifications
You must be signed in to change notification settings - Fork 7.1k
[Core] Refactor reference_counter out of memory store and plasma store #57590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
3b1f487
0cfe8e3
4ac6478
f01685f
7b55e41
a14329e
60e327b
5cd8f08
915e859
f9cc691
780541d
c0eb76f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1005,7 +1005,9 @@ Status CoreWorker::PutInLocalPlasmaStore(const RayObject &object, | |
| RAY_RETURN_NOT_OK(plasma_store_provider_->Release(object_id)); | ||
| } | ||
| } | ||
| memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), object_id); | ||
| memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), | ||
| object_id, | ||
| reference_counter_->HasReference(object_id)); | ||
| return Status::OK(); | ||
| } | ||
|
|
||
|
|
@@ -1016,7 +1018,7 @@ Status CoreWorker::Put(const RayObject &object, | |
| RAY_RETURN_NOT_OK(WaitForActorRegistered(contained_object_ids)); | ||
| if (options_.is_local_mode) { | ||
| RAY_LOG(DEBUG).WithField(object_id) << "Put object in memory store"; | ||
| memory_store_->Put(object, object_id); | ||
| memory_store_->Put(object, object_id, reference_counter_->HasReference(object_id)); | ||
| return Status::OK(); | ||
| } | ||
| return PutInLocalPlasmaStore(object, object_id, pin_object); | ||
|
|
@@ -1115,7 +1117,9 @@ Status CoreWorker::CreateOwnedAndIncrementLocalRef( | |
| } else if (*data == nullptr) { | ||
| // Object already exists in plasma. Store the in-memory value so that the | ||
| // client will check the plasma store. | ||
| memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), *object_id); | ||
| memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), | ||
| *object_id, | ||
| reference_counter_->HasReference(*object_id)); | ||
| } | ||
| } | ||
| return Status::OK(); | ||
|
|
@@ -1212,7 +1216,9 @@ Status CoreWorker::SealExisting(const ObjectID &object_id, | |
| RAY_RETURN_NOT_OK(plasma_store_provider_->Release(object_id)); | ||
| reference_counter_->FreePlasmaObjects({object_id}); | ||
| } | ||
| memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), object_id); | ||
| memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), | ||
| object_id, | ||
| reference_counter_->HasReference(object_id)); | ||
| return Status::OK(); | ||
| } | ||
|
|
||
|
|
@@ -1384,14 +1390,20 @@ Status CoreWorker::GetObjects(const std::vector<ObjectID> &ids, | |
| // If any of the objects have been promoted to plasma, then we retry their | ||
| // gets at the provider plasma. Once we get the objects from plasma, we flip | ||
| // the transport type again and return them for the original direct call ids. | ||
|
|
||
| // Prepare object ids vector and owner addresses vector | ||
| std::vector<ObjectID> object_ids = | ||
| std::vector<ObjectID>(plasma_object_ids.begin(), plasma_object_ids.end()); | ||
| auto owner_addresses = reference_counter_->GetOwnerAddresses(object_ids); | ||
|
Comment on lines
+1395
to
+1397
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not overload GetOwnerAddresses instead of creating a vector?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Currently, GetOwnerAddresses associate the output owner address vector with the input object_ids by index. If we take in a set instead, the ordering is now ill-defined. The current change should not cause any performance changes here as we simply moved the object_id vectorization out of plasma_store_provider and into core_worker.cc https://github.com/ray-project/ray/blob/master/src/ray/core_worker/store_provider/plasma_store_provider.cc#L262. From offline discussion, we've concluded that the current way of storing owner address as a vector is poor cardinality as most object ids likely share the same owner address. However, the change to reduce cardinality will require significant logic changes across multiple files. GetOwnerAddress will be overloaded in a future PR that addresses the cardinality issue.
israbbani marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| int64_t local_timeout_ms = timeout_ms; | ||
| if (timeout_ms >= 0) { | ||
| local_timeout_ms = std::max(static_cast<int64_t>(0), | ||
| timeout_ms - (current_time_ms() - start_time)); | ||
| } | ||
| RAY_LOG(DEBUG) << "Plasma GET timeout " << local_timeout_ms; | ||
| RAY_RETURN_NOT_OK( | ||
| plasma_store_provider_->Get(plasma_object_ids, local_timeout_ms, &result_map)); | ||
| RAY_RETURN_NOT_OK(plasma_store_provider_->Get( | ||
| object_ids, owner_addresses, local_timeout_ms, &result_map)); | ||
| } | ||
|
|
||
| // Loop through `ids` and fill each entry for the `results` vector, | ||
|
|
@@ -1535,8 +1547,14 @@ Status CoreWorker::Wait(const std::vector<ObjectID> &ids, | |
| // num_objects ready since we want to at least make the request to start pulling | ||
| // these objects. | ||
| if (!plasma_object_ids.empty()) { | ||
| // Prepare object ids map | ||
| std::vector<ObjectID> object_ids = | ||
| std::vector<ObjectID>(plasma_object_ids.begin(), plasma_object_ids.end()); | ||
| auto owner_addresses = reference_counter_->GetOwnerAddresses(object_ids); | ||
|
|
||
| RAY_RETURN_NOT_OK(plasma_store_provider_->Wait( | ||
| plasma_object_ids, | ||
| object_ids, | ||
| owner_addresses, | ||
| std::min(static_cast<int>(plasma_object_ids.size()), | ||
| num_objects - static_cast<int>(ready.size())), | ||
| timeout_ms, | ||
|
|
@@ -3079,7 +3097,12 @@ bool CoreWorker::PinExistingReturnObject(const ObjectID &return_id, | |
| reference_counter_->AddLocalReference(return_id, "<temporary (pin return object)>"); | ||
| reference_counter_->AddBorrowedObject(return_id, ObjectID::Nil(), owner_address); | ||
|
|
||
| Status status = plasma_store_provider_->Get({return_id}, 0, &result_map); | ||
| // Resolve owner address of return id | ||
| std::vector<ObjectID> object_ids = {return_id}; | ||
| auto owner_addresses = reference_counter_->GetOwnerAddresses(object_ids); | ||
|
Comment on lines
+3101
to
+3102
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same comment as above about overloading.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
|
||
| Status status = | ||
| plasma_store_provider_->Get(object_ids, owner_addresses, 0, &result_map); | ||
| // Remove the temporary ref. | ||
| RemoveLocalReference(return_id); | ||
|
|
||
|
|
@@ -3297,7 +3320,8 @@ Status CoreWorker::GetAndPinArgsForExecutor(const TaskSpecification &task, | |
| // otherwise, the put is a no-op. | ||
| if (!options_.is_local_mode) { | ||
| memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), | ||
| task.ArgObjectId(i)); | ||
| task.ArgObjectId(i), | ||
| reference_counter_->HasReference(task.ArgObjectId(i))); | ||
| } | ||
| } else { | ||
| // A pass-by-value argument. | ||
|
|
@@ -3346,7 +3370,12 @@ Status CoreWorker::GetAndPinArgsForExecutor(const TaskSpecification &task, | |
| RAY_RETURN_NOT_OK(memory_store_->Get( | ||
| by_ref_ids, -1, *worker_context_, &result_map, &got_exception)); | ||
| } else { | ||
| RAY_RETURN_NOT_OK(plasma_store_provider_->Get(by_ref_ids, -1, &result_map)); | ||
| // Resolve owner addresses of by-ref ids | ||
| std::vector<ObjectID> object_ids = | ||
| std::vector<ObjectID>(by_ref_ids.begin(), by_ref_ids.end()); | ||
| auto owner_addresses = reference_counter_->GetOwnerAddresses(object_ids); | ||
| RAY_RETURN_NOT_OK( | ||
| plasma_store_provider_->Get(object_ids, owner_addresses, -1, &result_map)); | ||
| } | ||
| for (const auto &it : result_map) { | ||
| for (size_t idx : by_ref_indices[it.first]) { | ||
|
|
@@ -4182,7 +4211,9 @@ Status CoreWorker::DeleteImpl(const std::vector<ObjectID> &object_ids, bool loca | |
| memory_store_->Delete(object_ids); | ||
| for (const auto &object_id : object_ids) { | ||
| RAY_LOG(DEBUG).WithField(object_id) << "Freeing object"; | ||
| memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_FREED), object_id); | ||
| memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_FREED), | ||
| object_id, | ||
| reference_counter_->HasReference(object_id)); | ||
| } | ||
|
|
||
| // We only delete from plasma, which avoids hangs (issue #7105). In-memory | ||
|
|
@@ -4332,7 +4363,9 @@ void CoreWorker::HandleAssignObjectOwner(rpc::AssignObjectOwnerRequest request, | |
| /*add_local_ref=*/false, | ||
| /*pinned_at_node_id=*/NodeID::FromBinary(borrower_address.node_id())); | ||
| reference_counter_->AddBorrowerAddress(object_id, borrower_address); | ||
| memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), object_id); | ||
| memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), | ||
| object_id, | ||
| reference_counter_->HasReference(object_id)); | ||
| send_reply_callback(Status::OK(), nullptr, nullptr); | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yay