Skip to content

Commit 618d6e5

Browse files
authored
[core][event] feature flag for enabling ray export event (#57999)
We have a feature flag to control the rolling out of ray export event, but the feature flag is missing the controlling of `StartExportingEvents`. This PR fixes the issue. Test: - CI Signed-off-by: Cuong Nguyen <[email protected]>
1 parent c650eaf commit 618d6e5

File tree

4 files changed

+49
-1
lines changed

4 files changed

+49
-1
lines changed

src/ray/observability/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ ray_cc_library(
166166
":ray_event",
167167
":ray_event_recorder_interface",
168168
"//src/ray/common:asio",
169+
"//src/ray/common:ray_config",
169170
"//src/ray/protobuf:events_event_aggregator_service_cc_proto",
170171
"//src/ray/rpc:event_aggregator_client",
171172
"//src/ray/util:logging",

src/ray/observability/ray_event_recorder.cc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ RayEventRecorder::RayEventRecorder(
3636

3737
void RayEventRecorder::StartExportingEvents() {
3838
absl::MutexLock lock(&mutex_);
39+
if (!RayConfig::instance().enable_ray_event()) {
40+
RAY_LOG(INFO) << "Ray event recording is disabled. Skipping start exporting events.";
41+
return;
42+
}
3943
RAY_CHECK(!exporting_started_)
4044
<< "RayEventRecorder::StartExportingEvents() should be called only once.";
4145
exporting_started_ = true;
@@ -74,6 +78,9 @@ void RayEventRecorder::ExportEvents() {
7478
void RayEventRecorder::AddEvents(
7579
std::vector<std::unique_ptr<RayEventInterface>> &&data_list) {
7680
absl::MutexLock lock(&mutex_);
81+
if (!RayConfig::instance().enable_ray_event()) {
82+
return;
83+
}
7784
if (data_list.size() + buffer_.size() > max_buffer_size_) {
7885
size_t events_to_remove = data_list.size() + buffer_.size() - max_buffer_size_;
7986
// Record dropped events from the buffer

src/ray/observability/ray_event_recorder.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
#include "absl/synchronization/mutex.h"
2020
#include "ray/common/asio/periodical_runner.h"
21+
#include "ray/common/ray_config.h"
2122
#include "ray/observability/metric_interface.h"
2223
#include "ray/observability/ray_event_interface.h"
2324
#include "ray/observability/ray_event_recorder_interface.h"

src/ray/observability/tests/ray_event_recorder_test.cc

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@ class RayEventRecorderTest : public ::testing::Test {
6969
max_buffer_size_,
7070
"gcs",
7171
*fake_dropped_events_counter_);
72-
recorder_->StartExportingEvents();
7372
}
7473

7574
instrumented_io_context io_service_;
@@ -80,6 +79,13 @@ class RayEventRecorderTest : public ::testing::Test {
8079
};
8180

8281
TEST_F(RayEventRecorderTest, TestRecordEvents) {
82+
RayConfig::instance().initialize(
83+
R"(
84+
{
85+
"enable_ray_event": true
86+
}
87+
)");
88+
recorder_->StartExportingEvents();
8389
rpc::JobTableData data1;
8490
data1.set_job_id("test_job_id_1");
8591
data1.set_is_dead(false);
@@ -173,6 +179,13 @@ TEST_F(RayEventRecorderTest, TestRecordEvents) {
173179
}
174180

175181
TEST_F(RayEventRecorderTest, TestDropEvents) {
182+
RayConfig::instance().initialize(
183+
R"(
184+
{
185+
"enable_ray_event": true
186+
}
187+
)");
188+
recorder_->StartExportingEvents();
176189
size_t expected_num_dropped_events = 3;
177190

178191
// Add more events than the buffer size
@@ -204,5 +217,31 @@ TEST_F(RayEventRecorderTest, TestDropEvents) {
204217
ASSERT_EQ(num_dropped_events, expected_num_dropped_events);
205218
}
206219

220+
TEST_F(RayEventRecorderTest, TestDisabled) {
221+
RayConfig::instance().initialize(
222+
R"(
223+
{
224+
"enable_ray_event": false
225+
}
226+
)");
227+
recorder_->StartExportingEvents();
228+
rpc::JobTableData data;
229+
data.set_job_id("test_job_id_1");
230+
data.set_is_dead(false);
231+
data.set_driver_pid(12345);
232+
data.set_start_time(absl::ToUnixSeconds(absl::Now()));
233+
data.set_end_time(0);
234+
data.set_entrypoint("python test_script.py");
235+
data.mutable_driver_address()->set_ip_address("127.0.0.1");
236+
237+
std::vector<std::unique_ptr<RayEventInterface>> events;
238+
events.push_back(
239+
std::make_unique<RayDriverJobDefinitionEvent>(data, "test_session_name"));
240+
recorder_->AddEvents(std::move(events));
241+
io_service_.run_one();
242+
std::vector<rpc::events::RayEvent> recorded_events = fake_client_->GetRecordedEvents();
243+
ASSERT_EQ(recorded_events.size(), 0);
244+
}
245+
207246
} // namespace observability
208247
} // namespace ray

0 commit comments

Comments
 (0)