Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/mock/ray/raylet_client/raylet_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,9 @@ class MockRayletClientInterface : public RayletClientInterface {
uint64_t metadata_size,
void *data,
void *metadata,
const rpc::ClientCallback<ray::rpc::PushMutableObjectReply> &callback),
int64_t version,
const rpc::ClientCallback<ray::rpc::PushMutableObjectReply> &callback,
int64_t timeout_ms),
(override));
MOCK_METHOD(void,
GetSystemConfig,
Expand Down
21 changes: 17 additions & 4 deletions src/ray/core_worker/experimental_mutable_object_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,15 @@ Status MutableObjectManager::ReadAcquire(const ObjectID &object_id,
std::shared_ptr<RayObject> &result,
int64_t timeout_ms)
ABSL_NO_THREAD_SAFETY_ANALYSIS {
int64_t unused_version = 0;
return ReadAcquire(object_id, result, unused_version, timeout_ms);
}

Status MutableObjectManager::ReadAcquire(const ObjectID &object_id,
std::shared_ptr<RayObject> &result,
int64_t &version_read,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you return this out with the status with StatusOr instead of an out param

int64_t timeout_ms)
ABSL_NO_THREAD_SAFETY_ANALYSIS {
RAY_LOG(DEBUG).WithField(object_id) << "ReadAcquire";
absl::ReaderMutexLock guard(&destructor_lock_);

Expand Down Expand Up @@ -349,11 +358,11 @@ Status MutableObjectManager::ReadAcquire(const ObjectID &object_id,
}

channel->reading = true;
int64_t version_read = 0;
int64_t version_read_internal = 0;
Status s = object->header->ReadAcquire(object_id,
sem,
channel->next_version_to_read,
version_read,
version_read_internal,
check_signals_,
timeout_point);
if (!s.ok()) {
Expand All @@ -363,8 +372,8 @@ Status MutableObjectManager::ReadAcquire(const ObjectID &object_id,
channel->lock->unlock();
return s;
}
RAY_CHECK_GT(version_read, 0);
channel->next_version_to_read = version_read;
RAY_CHECK_GT(version_read_internal, 0);
channel->next_version_to_read = version_read_internal;

size_t total_size = object->header->data_size + object->header->metadata_size;
RAY_CHECK_LE(static_cast<int64_t>(total_size), channel->mutable_object->allocated_size);
Expand Down Expand Up @@ -402,6 +411,10 @@ Status MutableObjectManager::ReadAcquire(const ObjectID &object_id,
std::move(metadata_copy),
std::vector<rpc::ObjectReference>());
}

// Return the version to caller (obtained atomically with header_sem)
version_read = version_read_internal;

RAY_LOG(DEBUG).WithField(object_id) << "ReadAcquire returning buffer";
return Status::OK();
}
Expand Down
13 changes: 13 additions & 0 deletions src/ray/core_worker/experimental_mutable_object_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,19 @@ class MutableObjectManager : public std::enable_shared_from_this<MutableObjectMa
std::shared_ptr<RayObject> &result,
int64_t timeout_ms = -1);

/// Overload that returns the version of the object that was read.
/// The version is obtained atomically while holding the header semaphore.
///
/// \param[in] object_id The ID of the object.
/// \param[out] result The read object.
/// \param[out] version_read The version of the object that was read.
/// \param[in] timeout_ms Timeout in milliseconds.
/// \return The return status.
Status ReadAcquire(const ObjectID &object_id,
std::shared_ptr<RayObject> &result,
int64_t &version_read,
int64_t timeout_ms);

/// Releases the object, allowing it to be written again. If the caller did
/// not previously ReadAcquire the object, then this first blocks until the
/// latest value is available to read, then releases the value.
Expand Down
162 changes: 151 additions & 11 deletions src/ray/core_worker/experimental_mutable_object_provider.cc
Original file line number Diff line number Diff line change
Expand Up @@ -136,19 +136,87 @@ void MutableObjectProvider::HandlePushMutableObject(
uint64_t offset = request.offset();
uint64_t chunk_size = request.chunk_size();

uint64_t tmp_written_so_far = 0;
// Validate request bounds to prevent buffer overflows.
RAY_CHECK_LE(offset + chunk_size, total_data_size)
<< "Chunk extends beyond total data size. offset=" << offset
<< ", chunk_size=" << chunk_size << ", total_data_size=" << total_data_size;
RAY_CHECK_EQ(request.data().size(), chunk_size)
<< "Data size mismatch. Expected " << chunk_size << " bytes, got "
<< request.data().size() << " bytes";
RAY_CHECK_EQ(request.metadata().size(), total_metadata_size)
<< "Metadata size mismatch. Expected " << total_metadata_size << " bytes, got "
<< request.metadata().size() << " bytes";

// Version-based idempotent retry handling strategy:
// - Stale versions (<= highest_completed): discard immediately
// - Active version (== highest_completed + 1): write to backing store
// - Future versions (> highest_completed + 1): buffer for later processing
int64_t request_version = request.version();

// Step 1: Reject stale retries from completed writes (O(1) check)
int64_t highest_completed = 0;
{
absl::MutexLock guard(&written_so_far_lock_);
highest_completed = highest_completed_version_[writer_object_id]; // default 0

tmp_written_so_far = written_so_far_[writer_object_id];
written_so_far_[writer_object_id] += chunk_size;
if (written_so_far_[writer_object_id] == total_data_size) {
written_so_far_.erase(written_so_far_.find(writer_object_id));
if (request_version <= highest_completed) {
// Stale retry from already-completed write
reply->set_done(true);
return;
}
}

// Step 2: Determine if this is active version or future version
int64_t active_version = highest_completed + 1;
bool is_active_version = (request_version == active_version);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TOCTOU race allows stale retries to corrupt state

A time-of-check-time-of-use race exists between the two lock acquisitions. The code reads highest_completed and performs the stale version check in the first critical section (lines 159-167), then releases the lock and computes is_active_version outside the lock (lines 169-171). When the second lock is acquired (line 177), the stale is_active_version is used without re-validation. If another thread completes the version between the two lock acquisitions, a stale retry can bypass the staleness check, find an empty received_chunks_ (cleaned up by the completing thread), and trigger a new WriteAcquire for an already-completed version. This can leave the object in an inconsistent state since only one chunk would be written.

Additional Locations (1)

Fix in Cursor Fix in Web


// Step 3: Check for duplicate chunks using (offset, version) key
auto chunk_key = std::make_pair(offset, request_version);
bool needs_write_acquire = false;

{
absl::MutexLock guard(&written_so_far_lock_);
auto &received_chunks = received_chunks_[writer_object_id];

if (received_chunks.find(chunk_key) != received_chunks.end()) {
// Duplicate chunk - return status
if (is_active_version) {
auto written_it = written_so_far_.find(writer_object_id);
uint64_t written = (written_it != written_so_far_.end()) ? written_it->second : 0;
reply->set_done(written == total_data_size);
} else {
reply->set_done(false);
}
Comment thread
ruisearch42 marked this conversation as resolved.
return;
}

// Step 4: For future versions, buffer only (don't write to backing store)
if (!is_active_version) {
received_chunks.insert(chunk_key);
reply->set_done(false);
return;
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Future version chunks tracked but data never written

When a future version chunk arrives (version > active version), the code inserts the chunk_key into received_chunks_ but returns immediately without writing the actual data to the backing store. Later, when that version becomes active and retries arrive for those chunks, they're found in received_chunks_ and treated as duplicates, returning early without writing data. The comment says "buffer for later processing" but only the chunk key is tracked—the actual data is discarded. This causes data loss for any chunks that arrive before their version becomes active.

Additional Locations (1)

Fix in Cursor Fix in Web

Copy link
Copy Markdown
Contributor

@jeffreywang-anyscale jeffreywang-anyscale Feb 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can simply remove received_chunks.insert(chunk_key) and let the client retry.


// Step 5: Active version - check if need WriteAcquire
if (!write_acquired_[writer_object_id]) {
needs_write_acquire = true;
// Note: We do NOT set write_acquired_ = true here yet. This prevents other threads
// from calling GetObjectBackingStore() before WriteAcquire() completes.
}
}

// Continue with WriteAcquire and write logic for active version
bool object_complete = false;

std::shared_ptr<Buffer> object_backing_store;
if (tmp_written_so_far == 0u) {
if (needs_write_acquire) {
// Initialize written_so_far_ for new write
{
absl::MutexLock guard(&written_so_far_lock_);
written_so_far_[writer_object_id] = 0;
}
// First chunk to arrive (may not be offset 0 due to out-of-order delivery) -
// acquire write lock and allocate backing store.
// We set `metadata` to nullptr since the metadata is at the end of the object, which
// we will not have until the last chunk is received.
RAY_CHECK_OK(object_manager_->WriteAcquire(info.local_object_id,
Expand All @@ -157,24 +225,90 @@ void MutableObjectProvider::HandlePushMutableObject(
total_metadata_size,
info.num_readers,
object_backing_store));
// Now that WriteAcquire has completed, set write_acquired_ to true so other threads
// can proceed with GetObjectBackingStore().
{
absl::MutexLock guard(&written_so_far_lock_);
write_acquired_[writer_object_id] = true;
}
} else {
// Wait until WriteAcquire has completed before calling GetObjectBackingStore.
// This prevents the race condition where we check write_acquired_ before
// WriteAcquire() has actually completed.
{
absl::MutexLock guard(&written_so_far_lock_);
auto condition = [this, &writer_object_id]()
ABSL_SHARED_LOCKS_REQUIRED(written_so_far_lock_) {
return write_acquired_[writer_object_id];
};
written_so_far_lock_.Await(absl::Condition(&condition));
}
// Subsequent chunk (or chunk arriving after WriteAcquire was called by another chunk)
// - get existing backing store.
RAY_CHECK_OK(object_manager_->GetObjectBackingStore(info.local_object_id,
total_data_size,
total_metadata_size,
object_backing_store));
}
Comment thread
ruisearch42 marked this conversation as resolved.
RAY_CHECK(object_backing_store);

// Copy chunk data to backing store.
memcpy(object_backing_store->Data() + offset, request.data().data(), chunk_size);
size_t total_written = tmp_written_so_far + chunk_size;
RAY_CHECK_LE(total_written, total_data_size);
if (total_written == total_data_size) {
// Copy the metadata to the end of the object.

// Mark this chunk as received only after successfully writing it.
// This ensures retries are handled correctly even if WriteAcquire fails.
{
absl::MutexLock guard(&written_so_far_lock_);
// Mark chunk as received using (offset, version) pair (reusing chunk_key from above)
received_chunks_[writer_object_id].insert(chunk_key);
// Update written_so_far_ by adding this chunk's size.
// Note: written_so_far_ was already initialized to 0 in the first lock block
// for new writes, so we can safely increment it here.
written_so_far_[writer_object_id] += chunk_size;
RAY_CHECK_LE(written_so_far_[writer_object_id], total_data_size);
if (written_so_far_[writer_object_id] == total_data_size) {
object_complete = true;
// Note: We keep received_chunks_ and written_so_far_ entries until WriteRelease
// completes to handle retries. They will be cleaned up after WriteRelease() is
// called.
}
}

if (object_complete) {
// All data chunks received - copy metadata and release write lock.
memcpy(object_backing_store->Data() + total_data_size,
request.metadata().data(),
total_metadata_size);
// The entire object has been written, so call `WriteRelease()`.
RAY_CHECK_OK(object_manager_->WriteRelease(info.local_object_id));

// Update tracking state after WriteRelease
{
absl::MutexLock guard(&written_so_far_lock_);

// Update highest completed version
highest_completed_version_[writer_object_id] = request_version;

// Remove ONLY chunks belonging to this completed version
auto &chunks = received_chunks_[writer_object_id];
for (auto it = chunks.begin(); it != chunks.end();) {
if (it->second == request_version) {
it = chunks.erase(it);
} else {
++it;
}
}

// Clear per-object tracking for next write
written_so_far_.erase(writer_object_id);
write_acquired_.erase(writer_object_id);

// Clean up received_chunks_ entry if empty
if (chunks.empty()) {
received_chunks_.erase(writer_object_id);
}
}

reply->set_done(true);
} else {
reply->set_done(false);
Expand Down Expand Up @@ -222,9 +356,11 @@ void MutableObjectProvider::PollWriterClosure(
&remote_readers) {
// NOTE: There's only 1 PollWriterClosure at any time in a single thread.
std::shared_ptr<RayObject> object;
int64_t version = 0;
// The corresponding ReadRelease() will be automatically called when
// `object` goes out of scope.
Status status = object_manager_->ReadAcquire(writer_object_id, object);
Status status =
object_manager_->ReadAcquire(writer_object_id, object, version, /*timeout_ms=*/-1);
// Check if the thread returned from ReadAcquire() because the process is exiting, not
// because there is something to read.
if (status.code() == StatusCode::ChannelError) {
Expand All @@ -236,6 +372,9 @@ void MutableObjectProvider::PollWriterClosure(
RAY_CHECK(object->GetData());
RAY_CHECK(object->GetMetadata());

// Version was obtained safely from ReadAcquire (with header_sem protection)
RAY_CHECK_GT(version, 0) << "Invalid version for " << writer_object_id;

std::shared_ptr<size_t> num_replied = std::make_shared<size_t>(0);
for (const auto &reader : *remote_readers) {
reader->PushMutableObject(
Expand All @@ -244,6 +383,7 @@ void MutableObjectProvider::PollWriterClosure(
object->GetMetadata()->Size(),
object->GetData()->Data(),
object->GetMetadata()->Data(),
version,
[this, &io_context, writer_object_id, remote_readers, num_replied](
const Status &push_object_status, const rpc::PushMutableObjectReply &reply) {
*num_replied += 1;
Expand Down
34 changes: 33 additions & 1 deletion src/ray/core_worker/experimental_mutable_object_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,27 @@

#include <memory>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>

#include "ray/common/asio/instrumented_io_context.h"
#include "ray/core_worker/experimental_mutable_object_manager.h"
#include "ray/raylet_rpc_client/raylet_client_interface.h"
#include "ray/rpc/client_call.h"

// Hash function for std::pair<uint64_t, int64_t> to use in unordered_set for chunk
// tracking
namespace std {
template <>
struct hash<std::pair<uint64_t, int64_t>> {
size_t operator()(const std::pair<uint64_t, int64_t> &p) const {
// Combine the two hash values using a standard hash combining technique
return std::hash<uint64_t>{}(p.first) ^ (std::hash<int64_t>{}(p.second) << 1);
}
};
} // namespace std

namespace ray {
namespace core {
namespace experimental {
Expand Down Expand Up @@ -239,13 +253,31 @@ class MutableObjectProvider : public MutableObjectProviderInterface {
// and then send the changes to remote nodes via the network.
std::vector<std::unique_ptr<std::thread>> io_threads_;

// Protects the `written_so_far_` map.
// Protects the `written_so_far_` map and `received_chunks_` set.
absl::Mutex written_so_far_lock_;
// For objects larger than the gRPC max payload size *that this node receives from a
// writer node*, this map tracks how many bytes have been received so far for a single
// object write.
std::unordered_map<ObjectID, uint64_t> written_so_far_
ABSL_GUARDED_BY(written_so_far_lock_);
// Tracks which chunks (by offset and version) have been received for each object to
// handle retries idempotently. The version comes from PlasmaObjectHeader.version on the
// sender side. The pair (offset, version) uniquely identifies a chunk across different
// write epochs, preventing stale retries from interfering with new writes.
std::unordered_map<ObjectID, std::unordered_set<std::pair<uint64_t, int64_t>>>
received_chunks_ ABSL_GUARDED_BY(written_so_far_lock_);
// Tracks whether WriteAcquire has been called AND completed for each object to handle
// out-of-order chunks. This ensures WriteAcquire is called exactly once, even if chunks
// arrive concurrently or out of order. Other threads must wait until this is true
// before calling GetObjectBackingStore().
std::unordered_map<ObjectID, bool> write_acquired_
ABSL_GUARDED_BY(written_so_far_lock_);
// Maps writer_object_id to the highest version that has completed WriteRelease.
// Only version == highest_completed + 1 can actively write to backing store.
// Versions <= highest_completed are stale retries and are discarded.
// Versions > highest_completed + 1 are buffered for future processing.
std::unordered_map<ObjectID, int64_t> highest_completed_version_
ABSL_GUARDED_BY(written_so_far_lock_);

friend class MutableObjectProvider_MutableObjectBufferReadRelease_Test;
};
Expand Down
2 changes: 2 additions & 0 deletions src/ray/core_worker/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,8 @@ ray_cc_test(
"@com_google_absl//absl/functional:bind_front",
"@com_google_absl//absl/random",
"@com_google_absl//absl/strings:str_format",
"@com_google_absl//absl/synchronization",
"@com_google_absl//absl/time",
"@com_google_googletest//:gtest",
"@com_google_googletest//:gtest_main",
],
Expand Down
Loading