Skip to content

Commit 244ea49

Browse files
can-anyscaleelliot-barn
authored andcommitted
[core] Fix "RayEventRecorder::StartExportingEvents() should be called only once." (#57917)
This PR fixes the Ray check failure RayEventRecorder::StartExportingEvents() should be called only once.. The failure can occur in the following scenario: - The metric_agent_client successfully establishes a connection with the dashboard agent. In this case, RayEventRecorder::StartExportingEvents is correctly invoked to start sending events. - At the same time, the metric_agent_client exceeds its maximum number of connection retries. In this case, RayEventRecorder::StartExportingEvents is invoked again incorrectly, causing duplicate attempts to start exporting events. This PR introduces two fixes: - In metric_agent_client, the connection success and retry logic are now synchronized (previously they ran asynchronously, allowing both paths to trigger). - Do not call StartExportingEvents if the connection cannot be established. Test: - CI --------- Signed-off-by: Cuong Nguyen <[email protected]> Signed-off-by: elliot-barn <[email protected]>
1 parent d4e4336 commit 244ea49

File tree

3 files changed

+36
-29
lines changed

3 files changed

+36
-29
lines changed

src/ray/gcs/gcs_server.cc

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,13 @@ void GcsServer::DoStart(const GcsInitData &gcs_init_data) {
296296
// Init metrics and event exporter.
297297
metrics_agent_client_->WaitForServerReady([this](const Status &server_status) {
298298
stats::InitOpenTelemetryExporter(config_.metrics_agent_port, server_status);
299-
ray_event_recorder_->StartExportingEvents();
299+
if (server_status.ok()) {
300+
ray_event_recorder_->StartExportingEvents();
301+
} else {
302+
RAY_LOG(ERROR) << "Failed to establish connection to the event exporter. Events "
303+
"will not be exported. "
304+
<< "Event exporter status: " << server_status.ToString();
305+
}
300306
});
301307

302308
// Start RPC server when all tables have finished loading initial

src/ray/rpc/metrics_agent_client.cc

Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -41,30 +41,31 @@ void MetricsAgentClientImpl::WaitForServerReadyWithRetry(
4141
// Only log the first time we start the retry loop.
4242
RAY_LOG(INFO) << "Initializing exporter ...";
4343
}
44-
HealthCheck(rpc::HealthCheckRequest(),
45-
[this, init_exporter_fn](auto &status, auto &&reply) {
46-
if (status.ok() && !exporter_initialized_) {
47-
init_exporter_fn(status);
48-
exporter_initialized_ = true;
49-
RAY_LOG(INFO) << "Exporter initialized.";
50-
}
51-
});
52-
if (retry_count >= max_retry) {
53-
init_exporter_fn(Status::RpcError("The metrics agent server is not ready.", 14));
54-
return;
55-
}
56-
retry_count++;
57-
retry_timer_->expires_after(std::chrono::milliseconds(retry_interval_ms));
58-
retry_timer_->async_wait(
59-
[this, init_exporter_fn, retry_count, max_retry, retry_interval_ms](
60-
const boost::system::error_code &error) {
61-
if (!error) {
62-
WaitForServerReadyWithRetry(
63-
init_exporter_fn, retry_count, max_retry, retry_interval_ms);
64-
} else {
65-
RAY_LOG(ERROR) << "Failed to initialize exporter. Data will not be exported to "
66-
"the metrics agent.";
44+
HealthCheck(
45+
rpc::HealthCheckRequest(),
46+
[this, init_exporter_fn, retry_count, max_retry, retry_interval_ms](auto &status,
47+
auto &&reply) {
48+
if (status.ok()) {
49+
if (exporter_initialized_) {
50+
return;
51+
}
52+
init_exporter_fn(status);
53+
exporter_initialized_ = true;
54+
RAY_LOG(INFO) << "Exporter initialized.";
55+
return;
56+
}
57+
if (retry_count >= max_retry) {
58+
init_exporter_fn(Status::RpcError(
59+
"Running out of retries to initialize the metrics agent.", 14));
60+
return;
6761
}
62+
io_service_.post(
63+
[this, init_exporter_fn, retry_count, max_retry, retry_interval_ms]() {
64+
WaitForServerReadyWithRetry(
65+
init_exporter_fn, retry_count + 1, max_retry, retry_interval_ms);
66+
},
67+
"MetricsAgentClient.WaitForServerReadyWithRetry",
68+
retry_interval_ms * 1000);
6869
});
6970
}
7071

src/ray/rpc/metrics_agent_client.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,12 +72,12 @@ class MetricsAgentClientImpl : public MetricsAgentClient {
7272
MetricsAgentClientImpl(const std::string &address,
7373
const int port,
7474
instrumented_io_context &io_service,
75-
rpc::ClientCallManager &client_call_manager) {
75+
rpc::ClientCallManager &client_call_manager)
76+
: io_service_(io_service) {
7677
RAY_LOG(DEBUG) << "Initiate the metrics client of address:"
7778
<< BuildAddress(address, port);
7879
grpc_client_ =
7980
std::make_unique<GrpcClient<ReporterService>>(address, port, client_call_manager);
80-
retry_timer_ = std::make_unique<boost::asio::steady_timer>(io_service);
8181
};
8282

8383
VOID_RPC_CLIENT_METHOD(ReporterService,
@@ -89,7 +89,7 @@ class MetricsAgentClientImpl : public MetricsAgentClient {
8989
VOID_RPC_CLIENT_METHOD(ReporterService,
9090
HealthCheck,
9191
grpc_client_,
92-
/*method_timeout_ms*/ -1,
92+
/*method_timeout_ms*/ kMetricAgentInitRetryDelayMs,
9393
override)
9494

9595
/// Wait for the server to be ready. Invokes the callback with the final readiness
@@ -99,8 +99,8 @@ class MetricsAgentClientImpl : public MetricsAgentClient {
9999
private:
100100
/// The RPC client.
101101
std::unique_ptr<GrpcClient<ReporterService>> grpc_client_;
102-
/// Timer for retrying to initialize the OpenTelemetry exporter.
103-
std::unique_ptr<boost::asio::steady_timer> retry_timer_;
102+
/// The io context to run the retry loop.
103+
instrumented_io_context &io_service_;
104104
/// Whether the exporter is initialized.
105105
bool exporter_initialized_ = false;
106106
/// Wait for the server to be ready with a retry count. Invokes the callback

0 commit comments

Comments
 (0)