diff --git a/src/ray/gcs/gcs_ray_event_converter.cc b/src/ray/gcs/gcs_ray_event_converter.cc index 1eeaa90039b1..2f8e90b4ae98 100644 --- a/src/ray/gcs/gcs_ray_event_converter.cc +++ b/src/ray/gcs/gcs_ray_event_converter.cc @@ -141,10 +141,19 @@ rpc::TaskEvents ConvertToTaskEvents(rpc::events::TaskLifecycleEvent &&event) { task_event.set_job_id(event.job_id()); rpc::TaskStateUpdate *task_state_update = task_event.mutable_state_updates(); - task_state_update->set_node_id(event.node_id()); - task_state_update->set_worker_id(event.worker_id()); - task_state_update->set_worker_pid(event.worker_pid()); - *task_state_update->mutable_error_info() = std::move(*event.mutable_ray_error_info()); + if (!event.node_id().empty()) { + task_state_update->set_node_id(event.node_id()); + } + if (!event.worker_id().empty()) { + task_state_update->set_worker_id(event.worker_id()); + } + // worker pid can never be 0 + if (event.worker_pid() != 0) { + task_state_update->set_worker_pid(event.worker_pid()); + } + if (event.has_ray_error_info()) { + *task_state_update->mutable_error_info() = std::move(*event.mutable_ray_error_info()); + } for (const auto &state_transition : event.state_transitions()) { int64_t ns = ProtoTimestampToAbslTimeNanos(state_transition.timestamp()); @@ -193,7 +202,9 @@ rpc::TaskEvents ConvertToTaskEvents(rpc::events::TaskProfileEvents &&event) { task_event.set_attempt_number(event.attempt_number()); task_event.set_job_id(event.job_id()); - *task_event.mutable_profile_events() = std::move(*event.mutable_profile_events()); + if (event.has_profile_events()) { + *task_event.mutable_profile_events() = std::move(*event.mutable_profile_events()); + } return task_event; } diff --git a/src/ray/gcs/tests/gcs_ray_event_converter_test.cc b/src/ray/gcs/tests/gcs_ray_event_converter_test.cc index 2656013071ef..454faa6dc91b 100644 --- a/src/ray/gcs/tests/gcs_ray_event_converter_test.cc +++ b/src/ray/gcs/tests/gcs_ray_event_converter_test.cc @@ -411,5 +411,139 @@ TEST(GcsRayEventConverterTest, TestConvertActorTaskDefinitionEvent) { EXPECT_EQ(task_info.required_resources().at("GPU"), 1.0); } +// Parameterized test for optional fields in TaskLifecycleEvent. +// Tests that optional fields are only set when they have non-empty values, +// preventing issues where explicitly set empty fields overwrite existing values +// during protobuf mergeFrom() operations. +struct OptionalFieldTestCase { + std::string test_name; + std::string node_id; + std::string worker_id; + int32_t worker_pid; + std::string error_message; // Empty string means no error_info should be set + bool expect_node_id_set; + bool expect_worker_id_set; + bool expect_worker_pid_set; + bool expect_error_info_set; +}; + +class TaskLifecycleEventOptionalFieldsTest + : public ::testing::TestWithParam {}; + +TEST_P(TaskLifecycleEventOptionalFieldsTest, TestOptionalFieldPresence) { + const auto &test_case = GetParam(); + + rpc::events::AddEventsRequest request; + rpc::events::RayEvent &event = *request.mutable_events_data()->mutable_events()->Add(); + event.set_event_type(rpc::events::RayEvent::TASK_LIFECYCLE_EVENT); + rpc::events::TaskLifecycleEvent &lifecycle_event = + *event.mutable_task_lifecycle_event(); + + // Set basic required fields + lifecycle_event.set_task_id("test_task_id"); + lifecycle_event.set_task_attempt(1); + lifecycle_event.set_job_id("test_job_id"); + + // Set optional fields according to test case + lifecycle_event.set_node_id(test_case.node_id); + lifecycle_event.set_worker_id(test_case.worker_id); + lifecycle_event.set_worker_pid(test_case.worker_pid); + + // Set error_info if specified + if (!test_case.error_message.empty()) { + lifecycle_event.mutable_ray_error_info()->set_error_message(test_case.error_message); + } + + // Call the converter + auto task_event_data_requests = ConvertToTaskEventDataRequests(std::move(request)); + ASSERT_EQ(task_event_data_requests.size(), 1); + const rpc::TaskEvents &task_event = + task_event_data_requests[0].data().events_by_task()[0]; + + // Verify that state_updates exists + ASSERT_TRUE(task_event.has_state_updates()); + const auto &state_updates = task_event.state_updates(); + + // Verify field presence matches expectations + EXPECT_EQ(state_updates.has_node_id(), test_case.expect_node_id_set) + << "node_id presence mismatch for test: " << test_case.test_name; + if (test_case.expect_node_id_set) { + EXPECT_EQ(state_updates.node_id(), test_case.node_id); + } + + EXPECT_EQ(state_updates.has_worker_id(), test_case.expect_worker_id_set) + << "worker_id presence mismatch for test: " << test_case.test_name; + if (test_case.expect_worker_id_set) { + EXPECT_EQ(state_updates.worker_id(), test_case.worker_id); + } + + EXPECT_EQ(state_updates.has_worker_pid(), test_case.expect_worker_pid_set) + << "worker_pid presence mismatch for test: " << test_case.test_name; + if (test_case.expect_worker_pid_set) { + EXPECT_EQ(state_updates.worker_pid(), test_case.worker_pid); + } + + EXPECT_EQ(state_updates.has_error_info(), test_case.expect_error_info_set) + << "error_info presence mismatch for test: " << test_case.test_name; + if (test_case.expect_error_info_set) { + EXPECT_EQ(state_updates.error_info().error_message(), test_case.error_message); + } +} + +INSTANTIATE_TEST_SUITE_P( + OptionalFields, + TaskLifecycleEventOptionalFieldsTest, + ::testing::Values( + // All fields empty - none should be set + OptionalFieldTestCase{"AllEmpty", "", "", 0, "", false, false, false, false}, + // All fields non-empty - all should be set + OptionalFieldTestCase{"AllNonEmpty", + "test_node_id", + "test_worker_id", + 1234, + "Test error", + true, + true, + true, + true}, + // Mixed: node_id set, others empty + OptionalFieldTestCase{ + "OnlyNodeId", "test_node_id", "", 0, "", true, false, false, false}, + // Mixed: worker_id set, others empty + OptionalFieldTestCase{ + "OnlyWorkerId", "", "test_worker_id", 0, "", false, true, false, false}, + // Mixed: worker_pid set, others empty + OptionalFieldTestCase{ + "OnlyWorkerPid", "", "", 5678, "", false, false, true, false}, + // Only error_info set, others empty + OptionalFieldTestCase{ + "OnlyErrorInfo", "", "", 0, "Test error", false, false, false, true}, + // Mixed: node_id and worker_pid set, worker_id and error_info empty + OptionalFieldTestCase{ + "NodeIdAndWorkerPid", "test_node_id", "", 9999, "", true, false, true, false}, + // Mixed: worker_id and worker_pid set, node_id and error_info empty + OptionalFieldTestCase{"WorkerIdAndWorkerPid", + "", + "test_worker_id", + 4321, + "", + false, + true, + true, + false}, + // Mixed: worker_id and error_info set, others empty + OptionalFieldTestCase{"WorkerIdAndErrorInfo", + "", + "test_worker_id", + 0, + "Worker error", + false, + true, + false, + true}), + [](const ::testing::TestParamInfo &info) { + return info.param.test_name; + }); + } // namespace gcs } // namespace ray