Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/mock/ray/raylet_client/raylet_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ class MockRayletClientInterface : public RayletClientInterface {
uint64_t metadata_size,
void *data,
void *metadata,
const rpc::ClientCallback<ray::rpc::PushMutableObjectReply> &callback),
const rpc::ClientCallback<ray::rpc::PushMutableObjectReply> &callback,
int64_t timeout_ms),
(override));
MOCK_METHOD(void,
GetSystemConfig,
Expand Down
99 changes: 93 additions & 6 deletions src/ray/core_worker/experimental_mutable_object_provider.cc
Original file line number Diff line number Diff line change
Expand Up @@ -136,19 +136,72 @@ void MutableObjectProvider::HandlePushMutableObject(
uint64_t offset = request.offset();
uint64_t chunk_size = request.chunk_size();

// Validate request bounds to prevent buffer overflows.
RAY_CHECK_LE(offset + chunk_size, total_data_size)
<< "Chunk extends beyond total data size. offset=" << offset
<< ", chunk_size=" << chunk_size << ", total_data_size=" << total_data_size;
RAY_CHECK_EQ(request.data().size(), chunk_size)
<< "Data size mismatch. Expected " << chunk_size << " bytes, got "
<< request.data().size() << " bytes";
RAY_CHECK_EQ(request.metadata().size(), total_metadata_size)
<< "Metadata size mismatch. Expected " << total_metadata_size << " bytes, got "
<< request.metadata().size() << " bytes";

// Check if this chunk has already been received (idempotent retry handling).
bool is_new_chunk = false;
uint64_t tmp_written_so_far = 0;
bool object_complete = false;
bool needs_write_acquire = false;
{
absl::MutexLock guard(&written_so_far_lock_);

tmp_written_so_far = written_so_far_[writer_object_id];
written_so_far_[writer_object_id] += chunk_size;
if (written_so_far_[writer_object_id] == total_data_size) {
written_so_far_.erase(written_so_far_.find(writer_object_id));
// Initialize tracking for this object if needed.
auto &received_chunks = received_chunks_[writer_object_id];

// Check if we've already received this specific chunk by offset.
if (received_chunks.find(offset) != received_chunks.end()) {
// This chunk was already received (retry) - skip processing but return status.
auto written_it = written_so_far_.find(writer_object_id);
if (written_it != written_so_far_.end()) {
tmp_written_so_far = written_it->second;
object_complete = (tmp_written_so_far == total_data_size);
} else {
// Tracking was cleaned up, meaning object is complete.
object_complete = true;
}
Comment thread
ruisearch42 marked this conversation as resolved.
Outdated
} else {
// This is a new chunk - check tracking but don't mark as received yet.
// We'll mark it as received only after successfully writing it.
is_new_chunk = true;
auto written_it = written_so_far_.find(writer_object_id);
bool is_new_write = (written_it == written_so_far_.end());
tmp_written_so_far = is_new_write ? 0 : written_it->second;

// Check if WriteAcquire needs to be called. This handles out-of-order chunks:
// only the first chunk (or first chunk to arrive) will call WriteAcquire.
// Reset write_acquired_ flag if this is a new write (after previous WriteRelease).
if (is_new_write) {
write_acquired_[writer_object_id] = false;
received_chunks_[writer_object_id].clear();
}
if (!write_acquired_[writer_object_id]) {
needs_write_acquire = true;
write_acquired_[writer_object_id] = true;
}
Comment thread
ruisearch42 marked this conversation as resolved.
}
}

// If this is a retry of an already-received chunk, return current status without
// re-processing the data (idempotent operation).
if (!is_new_chunk) {
reply->set_done(object_complete);
return;
}

std::shared_ptr<Buffer> object_backing_store;
if (tmp_written_so_far == 0u) {
if (needs_write_acquire) {
// First chunk to arrive (may not be offset 0 due to out-of-order delivery) -
// acquire write lock and allocate backing store.
// We set `metadata` to nullptr since the metadata is at the end of the object, which
// we will not have until the last chunk is received.
RAY_CHECK_OK(object_manager_->WriteAcquire(info.local_object_id,
Expand All @@ -158,23 +211,57 @@ void MutableObjectProvider::HandlePushMutableObject(
info.num_readers,
object_backing_store));
} else {
// Subsequent chunk (or chunk arriving after WriteAcquire was called by another chunk)
// - get existing backing store.
RAY_CHECK_OK(object_manager_->GetObjectBackingStore(info.local_object_id,
total_data_size,
total_metadata_size,
object_backing_store));
}
Comment thread
ruisearch42 marked this conversation as resolved.
RAY_CHECK(object_backing_store);

// Copy chunk data to backing store.
memcpy(object_backing_store->Data() + offset, request.data().data(), chunk_size);
size_t total_written = tmp_written_so_far + chunk_size;
RAY_CHECK_LE(total_written, total_data_size);

// Mark this chunk as received only after successfully writing it.
// This ensures retries are handled correctly even if WriteAcquire fails.
{
absl::MutexLock guard(&written_so_far_lock_);
received_chunks_[writer_object_id].insert(offset);
// Update written_so_far_ by adding this chunk's size.
// Note: We increment rather than set to handle potential out-of-order chunks.
// Initialize to 0 if this is the first chunk of a new write.
if (written_so_far_.find(writer_object_id) == written_so_far_.end()) {
written_so_far_[writer_object_id] = 0;
}
written_so_far_[writer_object_id] += chunk_size;
if (written_so_far_[writer_object_id] == total_data_size) {
object_complete = true;
// Note: We keep received_chunks_ and written_so_far_ entries until WriteRelease
// completes to handle retries. They will be cleaned up after WriteRelease() is
// called.
}
}

if (total_written == total_data_size) {
Comment thread
ruisearch42 marked this conversation as resolved.
Outdated
// Copy the metadata to the end of the object.
// All data chunks received - copy metadata and release write lock.
memcpy(object_backing_store->Data() + total_data_size,
request.metadata().data(),
total_metadata_size);
// The entire object has been written, so call `WriteRelease()`.
RAY_CHECK_OK(object_manager_->WriteRelease(info.local_object_id));

// Clean up tracking state after WriteRelease to prepare for the next write.
// This ensures that subsequent writes to the same channel start fresh.
{
absl::MutexLock guard(&written_so_far_lock_);
written_so_far_.erase(writer_object_id);
received_chunks_.erase(writer_object_id);
write_acquired_.erase(writer_object_id);
}

reply->set_done(true);
} else {
reply->set_done(false);
Expand Down
12 changes: 11 additions & 1 deletion src/ray/core_worker/experimental_mutable_object_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

#include <memory>
#include <unordered_map>
#include <unordered_set>
#include <vector>

#include "ray/common/asio/instrumented_io_context.h"
Expand Down Expand Up @@ -239,13 +240,22 @@ class MutableObjectProvider : public MutableObjectProviderInterface {
// and then send the changes to remote nodes via the network.
std::vector<std::unique_ptr<std::thread>> io_threads_;

// Protects the `written_so_far_` map.
// Protects the `written_so_far_` map and `received_chunks_` set.
absl::Mutex written_so_far_lock_;
// For objects larger than the gRPC max payload size *that this node receives from a
// writer node*, this map tracks how many bytes have been received so far for a single
// object write.
std::unordered_map<ObjectID, uint64_t> written_so_far_
ABSL_GUARDED_BY(written_so_far_lock_);
// Tracks which chunks (by offset) have been received for each object to handle retries
// idempotently. Each offset uniquely identifies a chunk.
std::unordered_map<ObjectID, std::unordered_set<uint64_t>> received_chunks_
ABSL_GUARDED_BY(written_so_far_lock_);
// Tracks whether WriteAcquire has been called for each object to handle out-of-order
// chunks. This ensures WriteAcquire is called exactly once, even if chunks arrive
// concurrently or out of order.
std::unordered_map<ObjectID, bool> write_acquired_
ABSL_GUARDED_BY(written_so_far_lock_);

friend class MutableObjectProvider_MutableObjectBufferReadRelease_Test;
};
Expand Down
2 changes: 2 additions & 0 deletions src/ray/core_worker/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,8 @@ ray_cc_test(
"@com_google_absl//absl/functional:bind_front",
"@com_google_absl//absl/random",
"@com_google_absl//absl/strings:str_format",
"@com_google_absl//absl/synchronization",
"@com_google_absl//absl/time",
"@com_google_googletest//:gtest",
"@com_google_googletest//:gtest_main",
],
Expand Down
Loading