Skip to content

Commit

Permalink
[core] Guard concurrent access to generator IDs with a mutex (#50740)
Browse files Browse the repository at this point in the history
The serve microbenchmark has been sporadically failing due to memory
corruption issues (see the linked issue). One of the tracebacks captured
pointed to the fact that the `deleted_generator_ids_` map was being
accessed concurrently by multiple threads. Fixed by adding a mutex.

Verified that it at least dramatically reduces the frequency of the
crashes.

I've also renamed a few fields for clarity.

## Related issue number

#50802

---------

Signed-off-by: Edward Oakes <[email protected]>
  • Loading branch information
edoakes authored Feb 23, 2025
1 parent c5a895d commit d5b51d9
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 20 deletions.
29 changes: 18 additions & 11 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -964,9 +964,9 @@ CoreWorker::CoreWorker(CoreWorkerOptions options, const WorkerID &worker_id)
"CoreWorker.RecordMetrics");

periodical_runner_->RunFnPeriodically(
[this] { TryDeleteObjectRefStreams(); },
[this] { TryDelPendingObjectRefStreams(); },
RayConfig::instance().local_gc_min_interval_s() * 1000,
"CoreWorker.GCStreamingGeneratorMetadata");
"CoreWorker.TryDelPendingObjectRefStreams");

#ifndef _WIN32
// Doing this last during CoreWorker initialization, so initialization logic like
Expand Down Expand Up @@ -3471,21 +3471,28 @@ void CoreWorker::AsyncDelObjectRefStream(const ObjectID &generator_id) {
if (task_manager_->TryDelObjectRefStream(generator_id)) {
return;
}
deleted_generator_ids_.insert(generator_id);

{
// TryDelObjectRefStream is thread safe so no need to hold the lock above.
absl::MutexLock lock(&generator_ids_pending_deletion_mutex_);
generator_ids_pending_deletion_.insert(generator_id);
}
}

void CoreWorker::TryDeleteObjectRefStreams() {
std::vector<ObjectID> out_of_scope_generator_ids;
for (auto it = deleted_generator_ids_.begin(); it != deleted_generator_ids_.end();
it++) {
const auto &generator_id = *it;
void CoreWorker::TryDelPendingObjectRefStreams() {
absl::MutexLock lock(&generator_ids_pending_deletion_mutex_);

std::vector<ObjectID> deleted;
for (const auto &generator_id : generator_ids_pending_deletion_) {
RAY_LOG(DEBUG).WithField(generator_id)
<< "TryDelObjectRefStream from generator_ids_pending_deletion_";
if (task_manager_->TryDelObjectRefStream(generator_id)) {
out_of_scope_generator_ids.push_back(generator_id);
deleted.push_back(generator_id);
}
}

for (const auto &generator_id : out_of_scope_generator_ids) {
deleted_generator_ids_.erase(generator_id);
for (const auto &generator_id : deleted) {
generator_ids_pending_deletion_.erase(generator_id);
}
}

Expand Down
29 changes: 20 additions & 9 deletions src/ray/core_worker/core_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,18 +121,17 @@ class TaskCounter {
private:
mutable absl::Mutex mu_;
// Tracks all tasks submitted to this worker by state, is_retry.
CounterMap<std::tuple<std::string, TaskStatusType, bool>> counter_
ABSL_GUARDED_BY(&mu_);
CounterMap<std::tuple<std::string, TaskStatusType, bool>> counter_ ABSL_GUARDED_BY(mu_);

// Additionally tracks the sub-states of RUNNING_IN_RAY_GET/WAIT. The counters here
// overlap with those of counter_.
CounterMap<std::pair<std::string, bool>> running_in_get_counter_ ABSL_GUARDED_BY(&mu_);
CounterMap<std::pair<std::string, bool>> running_in_wait_counter_ ABSL_GUARDED_BY(&mu_);
CounterMap<std::pair<std::string, bool>> running_in_get_counter_ ABSL_GUARDED_BY(mu_);
CounterMap<std::pair<std::string, bool>> running_in_wait_counter_ ABSL_GUARDED_BY(mu_);

std::string job_id_ ABSL_GUARDED_BY(&mu_);
std::string job_id_ ABSL_GUARDED_BY(mu_);
// Used for actor state tracking.
std::string actor_name_ ABSL_GUARDED_BY(&mu_);
int64_t num_tasks_running_ ABSL_GUARDED_BY(&mu_) = 0;
std::string actor_name_ ABSL_GUARDED_BY(mu_);
int64_t num_tasks_running_ ABSL_GUARDED_BY(mu_) = 0;
};

struct TaskToRetry {
Expand Down Expand Up @@ -294,7 +293,10 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// generator task.
void AsyncDelObjectRefStream(const ObjectID &generator_id);

void TryDeleteObjectRefStreams();
// Attempt to delete ObjectRefStreams that were unable to be deleted when
// AsyncDelObjectRefStream was called (stored in generator_ids_pending_deletion_).
// This function is called periodically on the io_service_.
void TryDelPendingObjectRefStreams();

const PlacementGroupID &GetCurrentPlacementGroupId() const {
return worker_context_.GetCurrentPlacementGroupId();
Expand Down Expand Up @@ -1903,7 +1905,16 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// Worker's PID
uint32_t pid_;

absl::flat_hash_set<ObjectID> deleted_generator_ids_;
// Guards generator_ids_pending_deletion_.
absl::Mutex generator_ids_pending_deletion_mutex_;

// A set of generator IDs that have gone out of scope but couldn't be deleted from
// the task manager yet (e.g., due to lineage references). We will periodically
// attempt to delete them in the background until it succeeds.
// This field is accessed on the destruction path of an ObjectRefGenerator as well as
// by a background thread attempting later deletion, so it must be guarded by a lock.
absl::flat_hash_set<ObjectID> generator_ids_pending_deletion_
ABSL_GUARDED_BY(generator_ids_pending_deletion_mutex_);

/// TODO(hjiang):
/// 1. Cached job runtime env info, it's not implemented at first place since
Expand Down

0 comments on commit d5b51d9

Please sign in to comment.