Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/ray/gcs/gcs_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,13 @@ void GcsServer::DoStart(const GcsInitData &gcs_init_data) {
// Init metrics and event exporter.
metrics_agent_client_->WaitForServerReady([this](const Status &server_status) {
stats::InitOpenTelemetryExporter(config_.metrics_agent_port, server_status);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'll refactor this call to use the same pattern in another follow up (to make this PR minimal)

ray_event_recorder_->StartExportingEvents();
if (server_status.ok()) {
ray_event_recorder_->StartExportingEvents();
} else {
RAY_LOG(ERROR) << "Failed to establish connection to the event exporter. Events "
"will not be exported. "
<< "Event exporter status: " << server_status.ToString();
}
});

// Start RPC server when all tables have finished loading initial
Expand Down
47 changes: 24 additions & 23 deletions src/ray/rpc/metrics_agent_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,30 +41,31 @@ void MetricsAgentClientImpl::WaitForServerReadyWithRetry(
// Only log the first time we start the retry loop.
RAY_LOG(INFO) << "Initializing exporter ...";
}
HealthCheck(rpc::HealthCheckRequest(),
[this, init_exporter_fn](auto &status, auto &&reply) {
if (status.ok() && !exporter_initialized_) {
init_exporter_fn(status);
exporter_initialized_ = true;
RAY_LOG(INFO) << "Exporter initialized.";
}
});
if (retry_count >= max_retry) {
init_exporter_fn(Status::RpcError("The metrics agent server is not ready.", 14));
return;
}
retry_count++;
retry_timer_->expires_after(std::chrono::milliseconds(retry_interval_ms));
retry_timer_->async_wait(
[this, init_exporter_fn, retry_count, max_retry, retry_interval_ms](
const boost::system::error_code &error) {
if (!error) {
WaitForServerReadyWithRetry(
init_exporter_fn, retry_count, max_retry, retry_interval_ms);
} else {
RAY_LOG(ERROR) << "Failed to initialize exporter. Data will not be exported to "
"the metrics agent.";
HealthCheck(
rpc::HealthCheckRequest(),
[this, init_exporter_fn, retry_count, max_retry, retry_interval_ms](auto &status,
auto &&reply) {
if (status.ok()) {
if (exporter_initialized_) {
return;
}
init_exporter_fn(status);
exporter_initialized_ = true;
RAY_LOG(INFO) << "Exporter initialized.";
return;
Comment on lines +48 to +55
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This block has a race condition on exporter_initialized_. Since the io_service can use multiple threads, two concurrent HealthCheck callbacks could both see exporter_initialized_ as false, leading to init_exporter_fn being called twice. This would re-introduce the RAY_CHECK failure this PR aims to fix.

To solve this, you should protect this critical section. One way is to use a mutex. You would add absl::Mutex exporter_mutex_; to MetricsAgentClientImpl (in the .h file, and include absl/synchronization/mutex.h) and then use it here:

if (status.ok()) {
  absl::MutexLock lock(&exporter_mutex_);
  if (exporter_initialized_) {
    return;
  }
  init_exporter_fn(status);
  exporter_initialized_ = true;
  RAY_LOG(INFO) << "Exporter initialized.";
  return;
}

An alternative is to make exporter_initialized_ an std::atomic<bool> (which would require including <atomic>).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, io_context is single threaded

}
if (retry_count >= max_retry) {
init_exporter_fn(Status::RpcError(
"Running out of retries to initialize the metrics agent.", 14));
return;
}
io_service_.post(
[this, init_exporter_fn, retry_count, max_retry, retry_interval_ms]() {
WaitForServerReadyWithRetry(
init_exporter_fn, retry_count + 1, max_retry, retry_interval_ms);
},
"MetricsAgentClient.WaitForServerReadyWithRetry",
retry_interval_ms * 1000);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Refactored Retry Logic Causes Dangling Pointer

The refactored retry logic uses io_service_.post with a delay, capturing this. If the MetricsAgentClientImpl object is destroyed before the delayed callback executes, the captured this becomes a dangling pointer, leading to a use-after-free. This differs from the previous timer-based approach that handled object lifecycle more robustly.

Additional Locations (1)

Fix in Cursor Fix in Web

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

annoyingly this might be true

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or maybe not, the iocontext is "stopped" before the metric agent is destructed (https://github.com/ray-project/ray/blob/master/src/ray/gcs/gcs_server.cc#L331)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Incorrect Retry Timing in Metrics Exporter

The io_service_.post() call uses retry_interval_ms * 1000 for its delay parameter. Since retry_interval_ms is in milliseconds, this multiplication likely causes incorrect retry timing, making retries either too fast or too slow depending on the expected unit. This impacts the metrics exporter's initialization.

Fix in Cursor Fix in Web

});
}

Expand Down
10 changes: 5 additions & 5 deletions src/ray/rpc/metrics_agent_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,12 @@ class MetricsAgentClientImpl : public MetricsAgentClient {
MetricsAgentClientImpl(const std::string &address,
const int port,
instrumented_io_context &io_service,
rpc::ClientCallManager &client_call_manager) {
rpc::ClientCallManager &client_call_manager)
: io_service_(io_service) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Async Callbacks Access Destroyed Objects

The io_service_ member is now a non-owning reference, creating a lifetime mismatch. This can lead to use-after-free when asynchronous callbacks in WaitForServerReadyWithRetry access a destroyed io_service (e.g., during GCS shutdown) or execute on a destroyed MetricsAgentClientImpl object.

Fix in Cursor Fix in Web

RAY_LOG(DEBUG) << "Initiate the metrics client of address:"
<< BuildAddress(address, port);
grpc_client_ =
std::make_unique<GrpcClient<ReporterService>>(address, port, client_call_manager);
retry_timer_ = std::make_unique<boost::asio::steady_timer>(io_service);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

node: we don't perform retry async anymore; instead, we'll retry during the callback of the connection healthcheck

};

VOID_RPC_CLIENT_METHOD(ReporterService,
Expand All @@ -89,7 +89,7 @@ class MetricsAgentClientImpl : public MetricsAgentClient {
VOID_RPC_CLIENT_METHOD(ReporterService,
HealthCheck,
grpc_client_,
/*method_timeout_ms*/ -1,
/*method_timeout_ms*/ kMetricAgentInitRetryDelayMs,
override)

/// Wait for the server to be ready. Invokes the callback with the final readiness
Expand All @@ -99,8 +99,8 @@ class MetricsAgentClientImpl : public MetricsAgentClient {
private:
/// The RPC client.
std::unique_ptr<GrpcClient<ReporterService>> grpc_client_;
/// Timer for retrying to initialize the OpenTelemetry exporter.
std::unique_ptr<boost::asio::steady_timer> retry_timer_;
/// The io context to run the retry loop.
instrumented_io_context &io_service_;
/// Whether the exporter is initialized.
bool exporter_initialized_ = false;
/// Wait for the server to be ready with a retry count. Invokes the callback
Expand Down