Skip to content

Commit

Permalink
[core] Guard concurrent access to generator IDs with a mutex (ray-pro…
Browse files Browse the repository at this point in the history
…ject#50740)

The serve microbenchmark has been sporadically failing due to memory
corruption issues (see the linked issue). One of the tracebacks captured
pointed to the fact that the `deleted_generator_ids_` map was being
accessed concurrently by multiple threads. Fixed by adding a mutex.

Verified that it at least dramatically reduces the frequency of the
crashes.

I've also renamed a few fields for clarity.

## Related issue number

ray-project#50802

---------

Signed-off-by: Edward Oakes <[email protected]>
  • Loading branch information
edoakes committed Feb 23, 2025
1 parent df8546c commit 6d5289a
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 20 deletions.
29 changes: 18 additions & 11 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -964,9 +964,9 @@ CoreWorker::CoreWorker(CoreWorkerOptions options, const WorkerID &worker_id)
"CoreWorker.RecordMetrics");

periodical_runner_->RunFnPeriodically(
[this] { TryDeleteObjectRefStreams(); },
[this] { TryDelPendingObjectRefStreams(); },
RayConfig::instance().local_gc_min_interval_s() * 1000,
"CoreWorker.GCStreamingGeneratorMetadata");
"CoreWorker.TryDelPendingObjectRefStreams");

#ifndef _WIN32
// Doing this last during CoreWorker initialization, so initialization logic like
Expand Down Expand Up @@ -3471,21 +3471,28 @@ void CoreWorker::AsyncDelObjectRefStream(const ObjectID &generator_id) {
if (task_manager_->TryDelObjectRefStream(generator_id)) {
return;
}
deleted_generator_ids_.insert(generator_id);

{
// TryDelObjectRefStream is thread safe so no need to hold the lock above.
absl::MutexLock lock(&generator_ids_pending_deletion_mutex_);
generator_ids_pending_deletion_.insert(generator_id);
}
}

void CoreWorker::TryDeleteObjectRefStreams() {
std::vector<ObjectID> out_of_scope_generator_ids;
for (auto it = deleted_generator_ids_.begin(); it != deleted_generator_ids_.end();
it++) {
const auto &generator_id = *it;
void CoreWorker::TryDelPendingObjectRefStreams() {
absl::MutexLock lock(&generator_ids_pending_deletion_mutex_);

std::vector<ObjectID> deleted;
for (const auto &generator_id : generator_ids_pending_deletion_) {
RAY_LOG(DEBUG).WithField(generator_id)
<< "TryDelObjectRefStream from generator_ids_pending_deletion_";
if (task_manager_->TryDelObjectRefStream(generator_id)) {
out_of_scope_generator_ids.push_back(generator_id);
deleted.push_back(generator_id);
}
}

for (const auto &generator_id : out_of_scope_generator_ids) {
deleted_generator_ids_.erase(generator_id);
for (const auto &generator_id : deleted) {
generator_ids_pending_deletion_.erase(generator_id);
}
}

Expand Down
29 changes: 20 additions & 9 deletions src/ray/core_worker/core_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,18 +121,17 @@ class TaskCounter {
private:
mutable absl::Mutex mu_;
// Tracks all tasks submitted to this worker by state, is_retry.
CounterMap<std::tuple<std::string, TaskStatusType, bool>> counter_
ABSL_GUARDED_BY(&mu_);
CounterMap<std::tuple<std::string, TaskStatusType, bool>> counter_ ABSL_GUARDED_BY(mu_);

// Additionally tracks the sub-states of RUNNING_IN_RAY_GET/WAIT. The counters here
// overlap with those of counter_.
CounterMap<std::pair<std::string, bool>> running_in_get_counter_ ABSL_GUARDED_BY(&mu_);
CounterMap<std::pair<std::string, bool>> running_in_wait_counter_ ABSL_GUARDED_BY(&mu_);
CounterMap<std::pair<std::string, bool>> running_in_get_counter_ ABSL_GUARDED_BY(mu_);
CounterMap<std::pair<std::string, bool>> running_in_wait_counter_ ABSL_GUARDED_BY(mu_);

std::string job_id_ ABSL_GUARDED_BY(&mu_);
std::string job_id_ ABSL_GUARDED_BY(mu_);
// Used for actor state tracking.
std::string actor_name_ ABSL_GUARDED_BY(&mu_);
int64_t num_tasks_running_ ABSL_GUARDED_BY(&mu_) = 0;
std::string actor_name_ ABSL_GUARDED_BY(mu_);
int64_t num_tasks_running_ ABSL_GUARDED_BY(mu_) = 0;
};

struct TaskToRetry {
Expand Down Expand Up @@ -294,7 +293,10 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// generator task.
void AsyncDelObjectRefStream(const ObjectID &generator_id);

void TryDeleteObjectRefStreams();
// Attempt to delete ObjectRefStreams that were unable to be deleted when
// AsyncDelObjectRefStream was called (stored in generator_ids_pending_deletion_).
// This function is called periodically on the io_service_.
void TryDelPendingObjectRefStreams();

const PlacementGroupID &GetCurrentPlacementGroupId() const {
return worker_context_.GetCurrentPlacementGroupId();
Expand Down Expand Up @@ -1903,7 +1905,16 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// Worker's PID
uint32_t pid_;

absl::flat_hash_set<ObjectID> deleted_generator_ids_;
// Guards generator_ids_pending_deletion_.
absl::Mutex generator_ids_pending_deletion_mutex_;

// A set of generator IDs that have gone out of scope but couldn't be deleted from
// the task manager yet (e.g., due to lineage references). We will periodically
// attempt to delete them in the background until it succeeds.
// This field is accessed on the destruction path of an ObjectRefGenerator as well as
// by a background thread attempting later deletion, so it must be guarded by a lock.
absl::flat_hash_set<ObjectID> generator_ids_pending_deletion_
ABSL_GUARDED_BY(generator_ids_pending_deletion_mutex_);

/// TODO(hjiang):
/// 1. Cached job runtime env info, it's not implemented at first place since
Expand Down

0 comments on commit 6d5289a

Please sign in to comment.