Skip to content
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/mock/ray/raylet_client/raylet_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ class MockRayletClientInterface : public RayletClientInterface {
uint64_t metadata_size,
void *data,
void *metadata,
const rpc::ClientCallback<ray::rpc::PushMutableObjectReply> &callback),
const rpc::ClientCallback<ray::rpc::PushMutableObjectReply> &callback,
int64_t timeout_ms),
(override));
MOCK_METHOD(void,
GetSystemConfig,
Expand Down
105 changes: 96 additions & 9 deletions src/ray/core_worker/experimental_mutable_object_provider.cc
Original file line number Diff line number Diff line change
Expand Up @@ -136,19 +136,76 @@ void MutableObjectProvider::HandlePushMutableObject(
uint64_t offset = request.offset();
uint64_t chunk_size = request.chunk_size();

// Validate request bounds to prevent buffer overflows.
RAY_CHECK_LE(offset + chunk_size, total_data_size)
<< "Chunk extends beyond total data size. offset=" << offset
<< ", chunk_size=" << chunk_size << ", total_data_size=" << total_data_size;
RAY_CHECK_EQ(request.data().size(), chunk_size)
<< "Data size mismatch. Expected " << chunk_size << " bytes, got "
<< request.data().size() << " bytes";
RAY_CHECK_EQ(request.metadata().size(), total_metadata_size)
<< "Metadata size mismatch. Expected " << total_metadata_size << " bytes, got "
<< request.metadata().size() << " bytes";

// Check if this chunk has already been received (idempotent retry handling).
bool is_new_chunk = false;
uint64_t tmp_written_so_far = 0;
bool object_complete = false;
bool needs_write_acquire = false;
{
absl::MutexLock guard(&written_so_far_lock_);

tmp_written_so_far = written_so_far_[writer_object_id];
written_so_far_[writer_object_id] += chunk_size;
if (written_so_far_[writer_object_id] == total_data_size) {
written_so_far_.erase(written_so_far_.find(writer_object_id));
// Initialize tracking for this object if needed.
auto &received_chunks = received_chunks_[writer_object_id];

// Check if we've already received this specific chunk by offset.
if (received_chunks.find(offset) != received_chunks.end()) {
// This chunk was already received (retry) - skip processing but return status.
auto written_it = written_so_far_.find(writer_object_id);
if (written_it != written_so_far_.end()) {
tmp_written_so_far = written_it->second;
object_complete = (tmp_written_so_far == total_data_size);
} else {
// Tracking was cleaned up, meaning object is complete.
object_complete = true;
}
Comment thread
ruisearch42 marked this conversation as resolved.
Outdated
} else {
// This is a new chunk - check tracking but don't mark as received yet.
// We'll mark it as received only after successfully writing it.
is_new_chunk = true;
auto written_it = written_so_far_.find(writer_object_id);
bool is_new_write = (written_it == written_so_far_.end());
tmp_written_so_far = is_new_write ? 0 : written_it->second;

// Check if WriteAcquire needs to be called. This handles out-of-order chunks:
// only the first chunk (or first chunk to arrive) will call WriteAcquire.
// Reset write_acquired_ flag if this is a new write (after previous WriteRelease).
if (is_new_write) {
write_acquired_[writer_object_id] = false;
received_chunks_[writer_object_id].clear();
// Initialize written_so_far_ immediately to prevent race condition where
// multiple concurrent threads all see is_new_write=true and reset
// write_acquired_.
written_so_far_[writer_object_id] = 0;
}
if (!write_acquired_[writer_object_id]) {
needs_write_acquire = true;
write_acquired_[writer_object_id] = true;
}
Comment thread
ruisearch42 marked this conversation as resolved.
}
}

// If this is a retry of an already-received chunk, return current status without
// re-processing the data (idempotent operation).
if (!is_new_chunk) {
reply->set_done(object_complete);
return;
}

std::shared_ptr<Buffer> object_backing_store;
if (tmp_written_so_far == 0u) {
if (needs_write_acquire) {
// First chunk to arrive (may not be offset 0 due to out-of-order delivery) -
// acquire write lock and allocate backing store.
// We set `metadata` to nullptr since the metadata is at the end of the object, which
// we will not have until the last chunk is received.
RAY_CHECK_OK(object_manager_->WriteAcquire(info.local_object_id,
Expand All @@ -158,23 +215,53 @@ void MutableObjectProvider::HandlePushMutableObject(
info.num_readers,
object_backing_store));
} else {
// Subsequent chunk (or chunk arriving after WriteAcquire was called by another chunk)
// - get existing backing store.
RAY_CHECK_OK(object_manager_->GetObjectBackingStore(info.local_object_id,
total_data_size,
total_metadata_size,
object_backing_store));
}
Comment thread
ruisearch42 marked this conversation as resolved.
RAY_CHECK(object_backing_store);

// Copy chunk data to backing store.
memcpy(object_backing_store->Data() + offset, request.data().data(), chunk_size);
size_t total_written = tmp_written_so_far + chunk_size;
RAY_CHECK_LE(total_written, total_data_size);
if (total_written == total_data_size) {
// Copy the metadata to the end of the object.

// Mark this chunk as received only after successfully writing it.
// This ensures retries are handled correctly even if WriteAcquire fails.
{
absl::MutexLock guard(&written_so_far_lock_);
received_chunks_[writer_object_id].insert(offset);
// Update written_so_far_ by adding this chunk's size.
// Note: written_so_far_ was already initialized to 0 in the first lock block
// for new writes, so we can safely increment it here.
written_so_far_[writer_object_id] += chunk_size;
RAY_CHECK_LE(written_so_far_[writer_object_id], total_data_size);
if (written_so_far_[writer_object_id] == total_data_size) {
object_complete = true;
// Note: We keep received_chunks_ and written_so_far_ entries until WriteRelease
// completes to handle retries. They will be cleaned up after WriteRelease() is
// called.
}
}

if (object_complete) {
// All data chunks received - copy metadata and release write lock.
memcpy(object_backing_store->Data() + total_data_size,
request.metadata().data(),
total_metadata_size);
// The entire object has been written, so call `WriteRelease()`.
RAY_CHECK_OK(object_manager_->WriteRelease(info.local_object_id));

// Clean up tracking state after WriteRelease to prepare for the next write.
// This ensures that subsequent writes to the same channel start fresh.
{
absl::MutexLock guard(&written_so_far_lock_);
written_so_far_.erase(writer_object_id);
received_chunks_.erase(writer_object_id);
write_acquired_.erase(writer_object_id);
}

reply->set_done(true);
} else {
reply->set_done(false);
Expand Down
12 changes: 11 additions & 1 deletion src/ray/core_worker/experimental_mutable_object_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

#include <memory>
#include <unordered_map>
#include <unordered_set>
#include <vector>

#include "ray/common/asio/instrumented_io_context.h"
Expand Down Expand Up @@ -239,13 +240,22 @@ class MutableObjectProvider : public MutableObjectProviderInterface {
// and then send the changes to remote nodes via the network.
std::vector<std::unique_ptr<std::thread>> io_threads_;

// Protects the `written_so_far_` map.
// Protects the `written_so_far_` map and `received_chunks_` set.
absl::Mutex written_so_far_lock_;
// For objects larger than the gRPC max payload size *that this node receives from a
// writer node*, this map tracks how many bytes have been received so far for a single
// object write.
std::unordered_map<ObjectID, uint64_t> written_so_far_
ABSL_GUARDED_BY(written_so_far_lock_);
// Tracks which chunks (by offset) have been received for each object to handle retries
// idempotently. Each offset uniquely identifies a chunk.
std::unordered_map<ObjectID, std::unordered_set<uint64_t>> received_chunks_
ABSL_GUARDED_BY(written_so_far_lock_);
// Tracks whether WriteAcquire has been called for each object to handle out-of-order
// chunks. This ensures WriteAcquire is called exactly once, even if chunks arrive
// concurrently or out of order.
std::unordered_map<ObjectID, bool> write_acquired_
ABSL_GUARDED_BY(written_so_far_lock_);

friend class MutableObjectProvider_MutableObjectBufferReadRelease_Test;
};
Expand Down
2 changes: 2 additions & 0 deletions src/ray/core_worker/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,8 @@ ray_cc_test(
"@com_google_absl//absl/functional:bind_front",
"@com_google_absl//absl/random",
"@com_google_absl//absl/strings:str_format",
"@com_google_absl//absl/synchronization",
"@com_google_absl//absl/time",
"@com_google_googletest//:gtest",
"@com_google_googletest//:gtest_main",
],
Expand Down
Loading