From 098ba29d2dc4ad43e1e8cfb5197566d6284e977b Mon Sep 17 00:00:00 2001 From: Mengjin Yan Date: Fri, 10 Oct 2025 16:50:55 -0700 Subject: [PATCH 01/13] add integration tests for task events Signed-off-by: Mengjin Yan --- python/ray/tests/BUILD.bazel | 1 + .../test_ray_event_export_task_events.py | 945 ++++++++++++++++++ 2 files changed, 946 insertions(+) create mode 100644 python/ray/tests/test_ray_event_export_task_events.py diff --git a/python/ray/tests/BUILD.bazel b/python/ray/tests/BUILD.bazel index 98018df64c37..88463833c587 100644 --- a/python/ray/tests/BUILD.bazel +++ b/python/ray/tests/BUILD.bazel @@ -567,6 +567,7 @@ py_test_module_list( "test_placement_group_metrics.py", "test_protobuf_compatibility.py", "test_queue.py", + "test_ray_event_export_task_events.py", "test_raylet_output.py", "test_reconstruction_stress.py", "test_reconstruction_stress_spill.py", diff --git a/python/ray/tests/test_ray_event_export_task_events.py b/python/ray/tests/test_ray_event_export_task_events.py new file mode 100644 index 000000000000..ec64f00655f7 --- /dev/null +++ b/python/ray/tests/test_ray_event_export_task_events.py @@ -0,0 +1,945 @@ +import json +import logging + +import grpc +import pytest + +import ray +from ray._common.test_utils import wait_for_condition +from ray._private.test_utils import find_free_port, run_string_as_driver_nonblocking + +logger = logging.getLogger(__name__) + + +_EVENT_AGGREGATOR_AGENT_TARGET_PORT = find_free_port() +_EVENT_AGGREGATOR_AGENT_TARGET_IP = "127.0.0.1" +_EVENT_AGGREGATOR_AGENT_TARGET_ADDR = ( + f"http://{_EVENT_AGGREGATOR_AGENT_TARGET_IP}:{_EVENT_AGGREGATOR_AGENT_TARGET_PORT}" +) + + +@pytest.fixture(scope="module") +def httpserver_listen_address(): + return (_EVENT_AGGREGATOR_AGENT_TARGET_IP, _EVENT_AGGREGATOR_AGENT_TARGET_PORT) + + +_cluster_with_aggregator_target = pytest.mark.parametrize( + "ray_start_cluster_head_with_env_vars", + [ + { + "env_vars": { + "RAY_task_events_report_interval_ms": 100, + "RAY_enable_core_worker_ray_event_to_aggregator": "1", + "RAY_DASHBOARD_AGGREGATOR_AGENT_EVENTS_EXPORT_ADDR": _EVENT_AGGREGATOR_AGENT_TARGET_ADDR, + }, + }, + ], + indirect=True, +) + + +def wait_until_grpc_channel_ready(target: str, timeout: int = 10): + channel = grpc.insecure_channel(target) + try: + grpc.channel_ready_future(channel).result(timeout=timeout) + return True + except grpc.FutureTimeoutError: + return False + + +def get_job_ids_and_driver_script_task_ids_from_events(events: json) -> tuple[str, str]: + test_job_id = ray.get_runtime_context().get_job_id() + driver_script_job_id = None + driver_task_id = None + for event in events: + if event["eventType"] == "TASK_DEFINITION_EVENT": + if ( + event["taskDefinitionEvent"]["taskType"] == "DRIVER_TASK" + and event["taskDefinitionEvent"]["jobId"] != test_job_id + ): + driver_task_id = event["taskDefinitionEvent"]["taskId"] + driver_script_job_id = event["taskDefinitionEvent"]["jobId"] + assert driver_task_id is not None + assert driver_script_job_id is not None + + return driver_script_job_id, driver_task_id + + +def check_task_event_base_fields(event: json): + assert event["eventId"] is not None + assert event["sourceType"] == "CORE_WORKER" + assert event["timestamp"] is not None + assert event["severity"] == "INFO" + assert event["sessionName"] is not None + + +def check_task_execution_event_states_and_error_info( + events: json, + expected_task_id_states_dict: dict, + expected_task_id_error_info_dict: dict, +): + + task_id_states_dict = {} + task_id_error_info_dict = {} + for event in events: + if event["eventType"] == "TASK_EXECUTION_EVENT": + task_id = event["taskExecutionEvent"]["taskId"] + task_attempt = event["taskExecutionEvent"]["taskAttempt"] + if (task_id, task_attempt) not in task_id_states_dict: + task_id_states_dict[(task_id, task_attempt)] = set() + + for state in event["taskExecutionEvent"]["taskState"]: + task_id_states_dict[(task_id, task_attempt)].add(state) + if "rayErrorInfo" in event["taskExecutionEvent"]: + task_id_error_info_dict[(task_id, task_attempt)] = event[ + "taskExecutionEvent" + ]["rayErrorInfo"] + + for ( + expected_task_id_attempt, + expected_states, + ) in expected_task_id_states_dict.items(): + assert expected_task_id_attempt in task_id_states_dict + assert task_id_states_dict[expected_task_id_attempt] == expected_states + + for ( + expected_task_id_attempt, + expected_error_info, + ) in expected_task_id_error_info_dict.items(): + assert expected_task_id_attempt in task_id_error_info_dict + assert ( + task_id_error_info_dict[expected_task_id_attempt]["errorType"] + == expected_error_info["errorType"] + ) + assert ( + expected_error_info["errorMessage"] + in task_id_error_info_dict[expected_task_id_attempt]["errorMessage"] + ) + + +def get_and_validate_events(httpserver, validation_func): + event_data = [] + for http_log in httpserver.log: + req, _ = http_log + data = json.loads(req.data) + event_data.extend(data) + + try: + validation_func(event_data) + return True + except Exception: + return False + + +def run_driver_script_and_wait_for_events(script, httpserver, cluster, validation_func): + httpserver.expect_request("/", method="POST").respond_with_data("", status=200) + wait_until_grpc_channel_ready( + "127.0.0.1:" + str(cluster.head_node.dashboard_agent_listen_port) + ) + run_string_as_driver_nonblocking(script) + wait_for_condition(lambda: get_and_validate_events(httpserver, validation_func)) + + +class TestNormalTaskEvents: + @_cluster_with_aggregator_target + def test_normal_task_succeed( + self, ray_start_cluster_head_with_env_vars, httpserver + ): + script = """ +import ray +ray.init() + +@ray.remote +def normal_task(): + pass +ray.get(normal_task.remote()) + """ + + def validate_events(events): + ( + driver_script_job_id, + driver_task_id, + ) = get_job_ids_and_driver_script_task_ids_from_events(events) + + expected_driver_task_states = {"8", "11"} + expected_normal_task_states = {"1", "2", "5", "8", "11"} + + # Check definition events + driver_task_definition_received = False + normal_task_definition_received = False + for event in events: + if event["eventType"] == "TASK_DEFINITION_EVENT": + check_task_event_base_fields(event) + + if event["taskDefinitionEvent"]["taskType"] == "DRIVER_TASK": + if event["taskDefinitionEvent"]["taskId"] != driver_task_id: + continue + driver_task_definition_received = True + assert event["taskDefinitionEvent"]["taskAttempt"] == 0 + assert event["taskDefinitionEvent"]["language"] == "PYTHON" + else: + normal_task_definition_received = True + normal_task_id = event["taskDefinitionEvent"]["taskId"] + assert normal_task_id is not None + assert ( + event["taskDefinitionEvent"]["taskFunc"][ + "pythonFunctionDescriptor" + ]["moduleName"] + == "__main__" + ) + assert ( + event["taskDefinitionEvent"]["taskFunc"][ + "pythonFunctionDescriptor" + ]["className"] + == "" + ) + assert ( + event["taskDefinitionEvent"]["taskFunc"][ + "pythonFunctionDescriptor" + ]["functionName"] + == "normal_task" + ) + assert ( + event["taskDefinitionEvent"]["taskFunc"][ + "pythonFunctionDescriptor" + ]["functionHash"] + is not None + ) + assert event["taskDefinitionEvent"]["taskName"] == "normal_task" + assert event["taskDefinitionEvent"]["requiredResources"] == { + "CPU": 1.0 + } + assert ( + event["taskDefinitionEvent"]["jobId"] + == driver_script_job_id + ) + assert ( + event["taskDefinitionEvent"]["parentTaskId"] + == driver_task_id + ) + assert event["taskDefinitionEvent"]["taskAttempt"] == 0 + assert event["taskDefinitionEvent"]["language"] == "PYTHON" + else: + assert event["eventType"] == "TASK_EXECUTION_EVENT" + + assert driver_task_definition_received + assert normal_task_definition_received + + # Check execution events + expected_task_id_states_dict = { + (driver_task_id, 0): expected_driver_task_states, + (normal_task_id, 0): expected_normal_task_states, + } + expected_task_id_error_info_dict = {} + check_task_execution_event_states_and_error_info( + events, expected_task_id_states_dict, expected_task_id_error_info_dict + ) + + run_driver_script_and_wait_for_events( + script, httpserver, ray_start_cluster_head_with_env_vars, validate_events + ) + + @_cluster_with_aggregator_target + def test_normal_task_execution_failure_with_retry( + self, ray_start_cluster_head_with_env_vars, httpserver + ): + script = """ +import ray + +ray.init() + +@ray.remote(max_retries=1, retry_exceptions=[Exception]) +def normal_task(): + raise Exception("test error") +try: + ray.get(normal_task.remote()) +except Exception as e: + pass + """ + + def validate_events(events: json): + ( + driver_script_job_id, + driver_task_id, + ) = get_job_ids_and_driver_script_task_ids_from_events(events) + + # Check definition events + driver_task_definition_received = False + normal_task_definition_received = False + normal_task_definition_retry_received = False + for event in events: + if event["eventType"] == "TASK_DEFINITION_EVENT": + check_task_event_base_fields(event) + + if event["taskDefinitionEvent"]["taskType"] == "DRIVER_TASK": + if event["taskDefinitionEvent"]["taskId"] != driver_task_id: + continue + driver_task_definition_received = True + assert event["taskDefinitionEvent"]["taskAttempt"] == 0 + assert event["taskDefinitionEvent"]["language"] == "PYTHON" + else: + normal_task_definition_received = True + normal_task_id = event["taskDefinitionEvent"]["taskId"] + assert normal_task_id is not None + assert ( + event["taskDefinitionEvent"]["taskFunc"][ + "pythonFunctionDescriptor" + ]["moduleName"] + == "__main__" + ) + assert ( + event["taskDefinitionEvent"]["taskFunc"][ + "pythonFunctionDescriptor" + ]["className"] + == "" + ) + assert ( + event["taskDefinitionEvent"]["taskFunc"][ + "pythonFunctionDescriptor" + ]["functionName"] + == "normal_task" + ) + assert ( + event["taskDefinitionEvent"]["taskFunc"][ + "pythonFunctionDescriptor" + ]["functionHash"] + is not None + ) + assert event["taskDefinitionEvent"]["taskName"] == "normal_task" + assert event["taskDefinitionEvent"]["requiredResources"] == { + "CPU": 1.0 + } + assert ( + event["taskDefinitionEvent"]["jobId"] + == driver_script_job_id + ) + assert ( + event["taskDefinitionEvent"]["parentTaskId"] + == driver_task_id + ) + if event["taskDefinitionEvent"]["taskAttempt"] == 0: + normal_task_definition_received = True + else: + assert event["taskDefinitionEvent"]["taskAttempt"] == 1 + normal_task_definition_retry_received = True + assert event["taskDefinitionEvent"]["language"] == "PYTHON" + else: + assert event["eventType"] == "TASK_EXECUTION_EVENT" + assert driver_task_definition_received + assert normal_task_definition_received + assert normal_task_definition_retry_received + + # Check execution events + expected_driver_task_states = {"8", "11"} + expected_normal_task_states = {"1", "2", "5", "8", "12"} + expected_task_id_states_dict = { + (driver_task_id, 0): expected_driver_task_states, + (normal_task_id, 0): expected_normal_task_states, + (normal_task_id, 1): expected_normal_task_states, + } + expected_task_id_error_info_dict = { + (normal_task_id, 0): { + "errorType": "TASK_EXECUTION_EXCEPTION", + "errorMessage": "test error", + }, + (normal_task_id, 1): { + "errorType": "TASK_EXECUTION_EXCEPTION", + "errorMessage": "test error", + }, + } + check_task_execution_event_states_and_error_info( + events, expected_task_id_states_dict, expected_task_id_error_info_dict + ) + + run_driver_script_and_wait_for_events( + script, httpserver, ray_start_cluster_head_with_env_vars, validate_events + ) + + @_cluster_with_aggregator_target + def test_task_failed_due_to_node_failure( + self, ray_start_cluster_head_with_env_vars, httpserver + ): + cluster = ray_start_cluster_head_with_env_vars + node = cluster.add_node(num_cpus=2) + + script = """ +import ray +ray.init() + +@ray.remote(num_cpus=2, max_retries=0) +def sleep(): + import time + time.sleep(999) + +x = sleep.options(name="node-killed").remote() +try: + ray.get(x) +except Exception as e: + pass + """ + # Run the driver script and wait for the sleep task to be executing + def validate_task_running(events: json): + # Obtain the task id of the sleep task + normal_task_id = None + for event in events: + if ( + event["eventType"] == "TASK_DEFINITION_EVENT" + and event["taskDefinitionEvent"]["taskType"] == "NORMAL_TASK" + ): + normal_task_id = event["taskDefinitionEvent"]["taskId"] + break + assert normal_task_id is not None + + # Check whether the task execution eventhas running state + for event in events: + if ( + event["eventType"] == "TASK_EXECUTION_EVENT" + and event["taskExecutionEvent"]["taskId"] == normal_task_id + ): + for state in event["taskExecutionEvent"]["taskState"]: + if state == "8": + return + assert False + + run_driver_script_and_wait_for_events( + script, + httpserver, + ray_start_cluster_head_with_env_vars, + validate_task_running, + ) + + # Kill the node + cluster.remove_node(node) + + # Wait and verify the task events + def validate_task_killed(events: json): + ( + driver_script_job_id, + driver_task_id, + ) = get_job_ids_and_driver_script_task_ids_from_events(events) + + # Check the task definition events + driver_task_definition_received = False + normal_task_definition_received = False + for event in events: + if event["eventType"] == "TASK_DEFINITION_EVENT": + check_task_event_base_fields(event) + + if event["taskDefinitionEvent"]["taskType"] == "DRIVER_TASK": + if event["taskDefinitionEvent"]["taskId"] != driver_task_id: + continue + driver_task_definition_received = True + assert event["taskDefinitionEvent"]["taskAttempt"] == 0 + assert event["taskDefinitionEvent"]["language"] == "PYTHON" + else: + normal_task_definition_received = True + normal_task_id = event["taskDefinitionEvent"]["taskId"] + assert normal_task_id is not None + assert ( + event["taskDefinitionEvent"]["taskFunc"][ + "pythonFunctionDescriptor" + ]["moduleName"] + == "__main__" + ) + assert ( + event["taskDefinitionEvent"]["taskFunc"][ + "pythonFunctionDescriptor" + ]["className"] + == "" + ) + assert ( + event["taskDefinitionEvent"]["taskFunc"][ + "pythonFunctionDescriptor" + ]["functionName"] + == "sleep" + ) + assert ( + event["taskDefinitionEvent"]["taskFunc"][ + "pythonFunctionDescriptor" + ]["functionHash"] + is not None + ) + assert event["taskDefinitionEvent"]["taskName"] == "node-killed" + assert event["taskDefinitionEvent"]["requiredResources"] == { + "CPU": 2.0 + } + assert ( + event["taskDefinitionEvent"]["jobId"] + == driver_script_job_id + ) + assert ( + event["taskDefinitionEvent"]["parentTaskId"] + == driver_task_id + ) + assert event["taskDefinitionEvent"]["taskAttempt"] == 0 + assert event["taskDefinitionEvent"]["language"] == "PYTHON" + else: + assert event["eventType"] == "TASK_EXECUTION_EVENT" + assert driver_task_definition_received + assert normal_task_definition_received + + # Check the task execution events + expected_driver_task_states = {"8", "11"} + expected_normal_task_states = {"1", "2", "5", "8", "12"} + expected_task_id_states_dict = { + (driver_task_id, 0): expected_driver_task_states, + (normal_task_id, 0): expected_normal_task_states, + } + expected_task_id_error_info_dict = { + (normal_task_id, 0): { + "errorType": "NODE_DIED", + "errorMessage": "Task failed due to the node (where this task was running) was dead or unavailable", + } + } + check_task_execution_event_states_and_error_info( + events, expected_task_id_states_dict, expected_task_id_error_info_dict + ) + + get_and_validate_events(httpserver, validate_task_killed) + + +class TestActorTaskEvents: + @_cluster_with_aggregator_target + def test_actor_creation_succeed( + self, ray_start_cluster_head_with_env_vars, httpserver + ): + script = """ +import ray +ray.init() + +@ray.remote(num_cpus=1) +class Actor: + def __init__(self): + pass + + def task(self, arg): + pass + +actor = Actor.remote() +obj = ray.put("test") +ray.get(actor.task.remote(obj)) + """ + + def validate_events(events: json): + ( + driver_script_job_id, + driver_task_id, + ) = get_job_ids_and_driver_script_task_ids_from_events(events) + + driver_task_definition_received = False + actor_creation_task_definition_received = False + actor_task_definition_received = False + for event in events: + if event["eventType"] == "TASK_DEFINITION_EVENT": + check_task_event_base_fields(event) + + if event["taskDefinitionEvent"]["taskType"] == "DRIVER_TASK": + driver_task_definition_received = True + assert event["taskDefinitionEvent"]["taskAttempt"] == 0 + assert event["taskDefinitionEvent"]["language"] == "PYTHON" + + else: + assert ( + event["taskDefinitionEvent"]["taskType"] + == "ACTOR_CREATION_TASK" + ) + actor_creation_task_definition_received = True + actor_creation_task_id = event["taskDefinitionEvent"]["taskId"] + assert actor_creation_task_id is not None + assert ( + event["taskDefinitionEvent"]["taskFunc"][ + "pythonFunctionDescriptor" + ]["moduleName"] + == "__main__" + ) + assert ( + event["taskDefinitionEvent"]["taskFunc"][ + "pythonFunctionDescriptor" + ]["className"] + == "Actor" + ) + assert ( + event["taskDefinitionEvent"]["taskFunc"][ + "pythonFunctionDescriptor" + ]["functionName"] + == "__init__" + ) + assert ( + event["taskDefinitionEvent"]["taskFunc"][ + "pythonFunctionDescriptor" + ]["functionHash"] + is not None + ) + assert ( + event["taskDefinitionEvent"]["taskName"] == "Actor.__init__" + ) + assert event["taskDefinitionEvent"]["requiredResources"] == { + "CPU": 1.0 + } + assert ( + event["taskDefinitionEvent"]["parentTaskId"] + == driver_task_id + ) + assert ( + event["taskDefinitionEvent"]["jobId"] + == driver_script_job_id + ) + assert event["taskDefinitionEvent"]["taskAttempt"] == 0 + assert event["taskDefinitionEvent"]["language"] == "PYTHON" + + elif event["eventType"] == "ACTOR_TASK_DEFINITION_EVENT": + actor_task_definition_received = True + actor_task_id = event["actorTaskDefinitionEvent"]["taskId"] + assert actor_task_id is not None + assert ( + event["actorTaskDefinitionEvent"]["actorFunc"][ + "pythonFunctionDescriptor" + ]["moduleName"] + == "__main__" + ) + assert ( + event["actorTaskDefinitionEvent"]["actorFunc"][ + "pythonFunctionDescriptor" + ]["className"] + == "Actor" + ) + assert ( + event["actorTaskDefinitionEvent"]["actorFunc"][ + "pythonFunctionDescriptor" + ]["functionName"] + == "task" + ) + assert ( + event["actorTaskDefinitionEvent"]["actorFunc"][ + "pythonFunctionDescriptor" + ]["functionHash"] + is not None + ) + assert ( + event["actorTaskDefinitionEvent"]["actorTaskName"] + == "Actor.task" + ) + assert event["actorTaskDefinitionEvent"]["requiredResources"] == {} + assert ( + event["actorTaskDefinitionEvent"]["jobId"] + == driver_script_job_id + ) + assert ( + event["actorTaskDefinitionEvent"]["parentTaskId"] + == driver_task_id + ) + assert event["actorTaskDefinitionEvent"]["taskAttempt"] == 0 + assert event["actorTaskDefinitionEvent"]["language"] == "PYTHON" + + else: + assert event["eventType"] == "TASK_EXECUTION_EVENT" + + assert driver_task_definition_received + assert actor_creation_task_definition_received + assert actor_task_definition_received + + expected_driver_task_states = {"8", "11"} + expected_actor_creation_task_states = {"1", "2", "8", "11"} + expected_actor_task_states = {"1", "2", "5", "6", "7", "8", "11"} + expected_task_id_states_dict = { + (driver_task_id, 0): expected_driver_task_states, + (actor_creation_task_id, 0): expected_actor_creation_task_states, + (actor_task_id, 0): expected_actor_task_states, + } + expected_task_id_error_info_dict = {} + check_task_execution_event_states_and_error_info( + events, expected_task_id_states_dict, expected_task_id_error_info_dict + ) + + run_driver_script_and_wait_for_events( + script, httpserver, ray_start_cluster_head_with_env_vars, validate_events + ) + + @_cluster_with_aggregator_target + def test_actor_creation_failed_with_actor_task_retry( + self, ray_start_cluster_head_with_env_vars, httpserver + ): + script = """ +import ray +ray.init() + +@ray.remote(num_cpus=1) +class Actor: + def __init__(self): + raise Exception("actor creation error") + + def task(self, arg): + pass + +actor = Actor.remote() +obj = ray.put("test") +ray.get(actor.task.options(max_task_retries=1, retry_exceptions=[Exception]).remote(obj)) + """ + + def validate_events(events: json): + ( + driver_script_job_id, + driver_task_id, + ) = get_job_ids_and_driver_script_task_ids_from_events(events) + + driver_task_definition_received = False + actor_creation_task_definition_received = False + actor_task_definition_received = False + actor_task_definition_retry_received = False + for event in events: + if event["eventType"] == "TASK_DEFINITION_EVENT": + check_task_event_base_fields(event) + + if event["taskDefinitionEvent"]["taskType"] == "DRIVER_TASK": + driver_task_definition_received = True + assert event["taskDefinitionEvent"]["taskAttempt"] == 0 + assert event["taskDefinitionEvent"]["language"] == "PYTHON" + + else: + assert ( + event["taskDefinitionEvent"]["taskType"] + == "ACTOR_CREATION_TASK" + ) + actor_creation_task_definition_received = True + actor_creation_task_id = event["taskDefinitionEvent"]["taskId"] + assert actor_creation_task_id is not None + assert ( + event["taskDefinitionEvent"]["taskFunc"][ + "pythonFunctionDescriptor" + ]["moduleName"] + == "__main__" + ) + assert ( + event["taskDefinitionEvent"]["taskFunc"][ + "pythonFunctionDescriptor" + ]["className"] + == "Actor" + ) + assert ( + event["taskDefinitionEvent"]["taskFunc"][ + "pythonFunctionDescriptor" + ]["functionName"] + == "__init__" + ) + assert ( + event["taskDefinitionEvent"]["taskFunc"][ + "pythonFunctionDescriptor" + ]["functionHash"] + is not None + ) + assert ( + event["taskDefinitionEvent"]["taskName"] == "Actor.__init__" + ) + assert event["taskDefinitionEvent"]["requiredResources"] == { + "CPU": 1.0 + } + assert ( + event["taskDefinitionEvent"]["parentTaskId"] + == driver_task_id + ) + assert ( + event["taskDefinitionEvent"]["jobId"] + == driver_script_job_id + ) + assert event["taskDefinitionEvent"]["taskAttempt"] == 0 + assert event["taskDefinitionEvent"]["language"] == "PYTHON" + elif event["eventType"] == "ACTOR_TASK_DEFINITION_EVENT": + actor_task_definition_received = True + actor_task_id = event["actorTaskDefinitionEvent"]["taskId"] + assert actor_task_id is not None + assert ( + event["actorTaskDefinitionEvent"]["actorFunc"][ + "pythonFunctionDescriptor" + ]["moduleName"] + == "__main__" + ) + assert ( + event["actorTaskDefinitionEvent"]["actorFunc"][ + "pythonFunctionDescriptor" + ]["className"] + == "Actor" + ) + assert ( + event["actorTaskDefinitionEvent"]["actorFunc"][ + "pythonFunctionDescriptor" + ]["functionName"] + == "task" + ) + assert ( + event["actorTaskDefinitionEvent"]["actorFunc"][ + "pythonFunctionDescriptor" + ]["functionHash"] + is not None + ) + assert ( + event["actorTaskDefinitionEvent"]["actorTaskName"] + == "Actor.task" + ) + assert event["actorTaskDefinitionEvent"]["requiredResources"] == {} + assert ( + event["actorTaskDefinitionEvent"]["jobId"] + == driver_script_job_id + ) + assert ( + event["actorTaskDefinitionEvent"]["parentTaskId"] + == driver_task_id + ) + if event["actorTaskDefinitionEvent"]["taskAttempt"] == 0: + actor_task_definition_received = True + else: + assert event["actorTaskDefinitionEvent"]["taskAttempt"] == 1 + actor_task_definition_retry_received = True + assert event["actorTaskDefinitionEvent"]["language"] == "PYTHON" + else: + assert event["eventType"] == "TASK_EXECUTION_EVENT" + assert driver_task_definition_received + assert actor_creation_task_definition_received + assert actor_task_definition_received + assert actor_task_definition_retry_received + + expected_driver_task_states = {"8", "11"} + expected_actor_creation_task_states = {"1", "2", "8", "12"} + expected_actor_task_states = {"1", "2", "5", "12"} + expected_actor_task_states_retry = {"1", "12"} + expected_task_id_states_dict = { + (driver_task_id, 0): expected_driver_task_states, + (actor_creation_task_id, 0): expected_actor_creation_task_states, + (actor_task_id, 0): expected_actor_task_states, + (actor_task_id, 1): expected_actor_task_states_retry, + } + expected_task_id_error_info_dict = { + (actor_creation_task_id, 0): { + "errorType": "TASK_EXECUTION_EXCEPTION", + "errorMessage": "CreationTaskError: Exception raised from an actor init method.", + }, + (actor_task_id, 0): { + "errorType": "ACTOR_DIED", + "errorMessage": "ray.exceptions.ActorDiedError: The actor died because of an error raised in its creation task", + }, + (actor_task_id, 1): { + "errorType": "ACTOR_DIED", + "errorMessage": "ray.exceptions.ActorDiedError: The actor died because of an error raised in its creation task", + }, + } + check_task_execution_event_states_and_error_info( + events, expected_task_id_states_dict, expected_task_id_error_info_dict + ) + + run_driver_script_and_wait_for_events( + script, httpserver, ray_start_cluster_head_with_env_vars, validate_events + ) + + @_cluster_with_aggregator_target + def test_actor_creation_canceled( + self, ray_start_cluster_head_with_env_vars, httpserver + ): + script = """ +import ray +ray.init() + +@ray.remote(num_cpus=2) +class Actor: + def __init__(self): + pass + + def task(self): + pass + +actor = Actor.remote() +ray.kill(actor) + """ + + def validate_events(events: json): + ( + driver_script_job_id, + driver_task_id, + ) = get_job_ids_and_driver_script_task_ids_from_events(events) + + driver_task_definition_received = False + actor_creation_task_definition_received = False + for event in events: + if event["eventType"] == "TASK_DEFINITION_EVENT": + check_task_event_base_fields(event) + + if event["taskDefinitionEvent"]["taskType"] == "DRIVER_TASK": + driver_task_definition_received = True + assert event["taskDefinitionEvent"]["taskAttempt"] == 0 + assert event["taskDefinitionEvent"]["language"] == "PYTHON" + + else: + assert ( + event["taskDefinitionEvent"]["taskType"] + == "ACTOR_CREATION_TASK" + ) + actor_creation_task_definition_received = True + actor_creation_task_id = event["taskDefinitionEvent"]["taskId"] + assert actor_creation_task_id is not None + assert ( + event["taskDefinitionEvent"]["taskFunc"][ + "pythonFunctionDescriptor" + ]["moduleName"] + == "__main__" + ) + assert ( + event["taskDefinitionEvent"]["taskFunc"][ + "pythonFunctionDescriptor" + ]["className"] + == "Actor" + ) + assert ( + event["taskDefinitionEvent"]["taskFunc"][ + "pythonFunctionDescriptor" + ]["functionName"] + == "__init__" + ) + assert ( + event["taskDefinitionEvent"]["taskFunc"][ + "pythonFunctionDescriptor" + ]["functionHash"] + is not None + ) + assert ( + event["taskDefinitionEvent"]["taskName"] == "Actor.__init__" + ) + assert event["taskDefinitionEvent"]["requiredResources"] == { + "CPU": 2.0 + } + assert ( + event["taskDefinitionEvent"]["parentTaskId"] + == driver_task_id + ) + assert ( + event["taskDefinitionEvent"]["jobId"] + == driver_script_job_id + ) + assert event["taskDefinitionEvent"]["taskAttempt"] == 0 + assert event["taskDefinitionEvent"]["language"] == "PYTHON" + else: + assert event["eventType"] == "TASK_EXECUTION_EVENT" + + assert driver_task_definition_received + assert actor_creation_task_definition_received + + expected_driver_task_states = {"8", "11"} + expected_actor_creation_task_states = {"1", "2", "12"} + expected_task_id_states_dict = { + (driver_task_id, 0): expected_driver_task_states, + (actor_creation_task_id, 0): expected_actor_creation_task_states, + } + expected_task_id_error_info_dict = { + (actor_creation_task_id, 0): { + "errorType": "WORKER_DIED", + "errorMessage": "", + } + } + check_task_execution_event_states_and_error_info( + events, expected_task_id_states_dict, expected_task_id_error_info_dict + ) + + run_driver_script_and_wait_for_events( + script, httpserver, ray_start_cluster_head_with_env_vars, validate_events + ) + + +if __name__ == "__main__": + pytest.main(["-vv", __file__]) From 83a1a20b64e62345a94c9f406e9e4d9beb71d435 Mon Sep 17 00:00:00 2001 From: Mengjin Yan Date: Sun, 19 Oct 2025 23:37:56 -0700 Subject: [PATCH 02/13] fix the tests based on the task event schema update Signed-off-by: Mengjin Yan --- .../test_ray_event_export_task_events.py | 131 ++++++++++++------ 1 file changed, 89 insertions(+), 42 deletions(-) diff --git a/python/ray/tests/test_ray_event_export_task_events.py b/python/ray/tests/test_ray_event_export_task_events.py index ec64f00655f7..3fe36bfa9781 100644 --- a/python/ray/tests/test_ray_event_export_task_events.py +++ b/python/ray/tests/test_ray_event_export_task_events.py @@ -73,7 +73,7 @@ def check_task_event_base_fields(event: json): assert event["sessionName"] is not None -def check_task_execution_event_states_and_error_info( +def check_task_lifecycle_event_states_and_error_info( events: json, expected_task_id_states_dict: dict, expected_task_id_error_info_dict: dict, @@ -82,17 +82,17 @@ def check_task_execution_event_states_and_error_info( task_id_states_dict = {} task_id_error_info_dict = {} for event in events: - if event["eventType"] == "TASK_EXECUTION_EVENT": - task_id = event["taskExecutionEvent"]["taskId"] - task_attempt = event["taskExecutionEvent"]["taskAttempt"] + if event["eventType"] == "TASK_LIFECYCLE_EVENT": + task_id = event["taskLifecycleEvent"]["taskId"] + task_attempt = event["taskLifecycleEvent"]["taskAttempt"] if (task_id, task_attempt) not in task_id_states_dict: task_id_states_dict[(task_id, task_attempt)] = set() - for state in event["taskExecutionEvent"]["taskState"]: - task_id_states_dict[(task_id, task_attempt)].add(state) - if "rayErrorInfo" in event["taskExecutionEvent"]: + for state in event["taskLifecycleEvent"]["stateTransitions"]: + task_id_states_dict[(task_id, task_attempt)].add(state["state"]) + if "rayErrorInfo" in event["taskLifecycleEvent"]: task_id_error_info_dict[(task_id, task_attempt)] = event[ - "taskExecutionEvent" + "taskLifecycleEvent" ]["rayErrorInfo"] for ( @@ -161,8 +161,14 @@ def validate_events(events): driver_task_id, ) = get_job_ids_and_driver_script_task_ids_from_events(events) - expected_driver_task_states = {"8", "11"} - expected_normal_task_states = {"1", "2", "5", "8", "11"} + expected_driver_task_states = {"RUNNING", "FINISHED"} + expected_normal_task_states = { + "PENDING_ARGS_AVAIL", + "PENDING_NODE_ASSIGNMENT", + "SUBMITTED_TO_WORKER", + "RUNNING", + "FINISHED", + } # Check definition events driver_task_definition_received = False @@ -220,18 +226,18 @@ def validate_events(events): assert event["taskDefinitionEvent"]["taskAttempt"] == 0 assert event["taskDefinitionEvent"]["language"] == "PYTHON" else: - assert event["eventType"] == "TASK_EXECUTION_EVENT" + assert event["eventType"] == "TASK_LIFECYCLE_EVENT" assert driver_task_definition_received assert normal_task_definition_received - # Check execution events + # Check lifecycle events expected_task_id_states_dict = { (driver_task_id, 0): expected_driver_task_states, (normal_task_id, 0): expected_normal_task_states, } expected_task_id_error_info_dict = {} - check_task_execution_event_states_and_error_info( + check_task_lifecycle_event_states_and_error_info( events, expected_task_id_states_dict, expected_task_id_error_info_dict ) @@ -324,14 +330,20 @@ def validate_events(events: json): normal_task_definition_retry_received = True assert event["taskDefinitionEvent"]["language"] == "PYTHON" else: - assert event["eventType"] == "TASK_EXECUTION_EVENT" + assert event["eventType"] == "TASK_LIFECYCLE_EVENT" assert driver_task_definition_received assert normal_task_definition_received assert normal_task_definition_retry_received # Check execution events - expected_driver_task_states = {"8", "11"} - expected_normal_task_states = {"1", "2", "5", "8", "12"} + expected_driver_task_states = {"RUNNING", "FINISHED"} + expected_normal_task_states = { + "PENDING_ARGS_AVAIL", + "PENDING_NODE_ASSIGNMENT", + "SUBMITTED_TO_WORKER", + "RUNNING", + "FAILED", + } expected_task_id_states_dict = { (driver_task_id, 0): expected_driver_task_states, (normal_task_id, 0): expected_normal_task_states, @@ -347,7 +359,7 @@ def validate_events(events: json): "errorMessage": "test error", }, } - check_task_execution_event_states_and_error_info( + check_task_lifecycle_event_states_and_error_info( events, expected_task_id_states_dict, expected_task_id_error_info_dict ) @@ -390,14 +402,16 @@ def validate_task_running(events: json): break assert normal_task_id is not None - # Check whether the task execution eventhas running state + # Check whether the task lifecycle event has running state for event in events: if ( - event["eventType"] == "TASK_EXECUTION_EVENT" - and event["taskExecutionEvent"]["taskId"] == normal_task_id + event["eventType"] == "TASK_LIFECYCLE_EVENT" + and event["taskLifecycleEvent"]["taskId"] == normal_task_id ): - for state in event["taskExecutionEvent"]["taskState"]: - if state == "8": + for state_transition in event["taskLifecycleEvent"][ + "stateTransitions" + ]: + if state_transition["state"] == "RUNNING": return assert False @@ -474,13 +488,19 @@ def validate_task_killed(events: json): assert event["taskDefinitionEvent"]["taskAttempt"] == 0 assert event["taskDefinitionEvent"]["language"] == "PYTHON" else: - assert event["eventType"] == "TASK_EXECUTION_EVENT" + assert event["eventType"] == "TASK_LIFECYCLE_EVENT" assert driver_task_definition_received assert normal_task_definition_received - # Check the task execution events - expected_driver_task_states = {"8", "11"} - expected_normal_task_states = {"1", "2", "5", "8", "12"} + # Check the task lifecycle events + expected_driver_task_states = {"RUNNING", "FINISHED"} + expected_normal_task_states = { + "PENDING_ARGS_AVAIL", + "PENDING_NODE_ASSIGNMENT", + "SUBMITTED_TO_WORKER", + "RUNNING", + "FAILED", + } expected_task_id_states_dict = { (driver_task_id, 0): expected_driver_task_states, (normal_task_id, 0): expected_normal_task_states, @@ -491,7 +511,7 @@ def validate_task_killed(events: json): "errorMessage": "Task failed due to the node (where this task was running) was dead or unavailable", } } - check_task_execution_event_states_and_error_info( + check_task_lifecycle_event_states_and_error_info( events, expected_task_id_states_dict, expected_task_id_error_info_dict ) @@ -632,22 +652,35 @@ def validate_events(events: json): assert event["actorTaskDefinitionEvent"]["language"] == "PYTHON" else: - assert event["eventType"] == "TASK_EXECUTION_EVENT" + assert event["eventType"] == "TASK_LIFECYCLE_EVENT" assert driver_task_definition_received assert actor_creation_task_definition_received assert actor_task_definition_received - expected_driver_task_states = {"8", "11"} - expected_actor_creation_task_states = {"1", "2", "8", "11"} - expected_actor_task_states = {"1", "2", "5", "6", "7", "8", "11"} + expected_driver_task_states = {"RUNNING", "FINISHED"} + expected_actor_creation_task_states = { + "PENDING_ARGS_AVAIL", + "PENDING_NODE_ASSIGNMENT", + "RUNNING", + "FINISHED", + } + expected_actor_task_states = { + "PENDING_ARGS_AVAIL", + "PENDING_NODE_ASSIGNMENT", + "SUBMITTED_TO_WORKER", + "PENDING_ACTOR_TASK_ARGS_FETCH", + "PENDING_ACTOR_TASK_ORDERING_OR_CONCURRENCY", + "RUNNING", + "FINISHED", + } expected_task_id_states_dict = { (driver_task_id, 0): expected_driver_task_states, (actor_creation_task_id, 0): expected_actor_creation_task_states, (actor_task_id, 0): expected_actor_task_states, } expected_task_id_error_info_dict = {} - check_task_execution_event_states_and_error_info( + check_task_lifecycle_event_states_and_error_info( events, expected_task_id_states_dict, expected_task_id_error_info_dict ) @@ -791,16 +824,26 @@ def validate_events(events: json): actor_task_definition_retry_received = True assert event["actorTaskDefinitionEvent"]["language"] == "PYTHON" else: - assert event["eventType"] == "TASK_EXECUTION_EVENT" + assert event["eventType"] == "TASK_LIFECYCLE_EVENT" assert driver_task_definition_received assert actor_creation_task_definition_received assert actor_task_definition_received assert actor_task_definition_retry_received - expected_driver_task_states = {"8", "11"} - expected_actor_creation_task_states = {"1", "2", "8", "12"} - expected_actor_task_states = {"1", "2", "5", "12"} - expected_actor_task_states_retry = {"1", "12"} + expected_driver_task_states = {"RUNNING", "FINISHED"} + expected_actor_creation_task_states = { + "PENDING_ARGS_AVAIL", + "PENDING_NODE_ASSIGNMENT", + "RUNNING", + "FAILED", + } + expected_actor_task_states = { + "PENDING_ARGS_AVAIL", + "PENDING_NODE_ASSIGNMENT", + "SUBMITTED_TO_WORKER", + "FAILED", + } + expected_actor_task_states_retry = {"PENDING_ARGS_AVAIL", "FAILED"} expected_task_id_states_dict = { (driver_task_id, 0): expected_driver_task_states, (actor_creation_task_id, 0): expected_actor_creation_task_states, @@ -821,7 +864,7 @@ def validate_events(events: json): "errorMessage": "ray.exceptions.ActorDiedError: The actor died because of an error raised in its creation task", }, } - check_task_execution_event_states_and_error_info( + check_task_lifecycle_event_states_and_error_info( events, expected_task_id_states_dict, expected_task_id_error_info_dict ) @@ -915,13 +958,17 @@ def validate_events(events: json): assert event["taskDefinitionEvent"]["taskAttempt"] == 0 assert event["taskDefinitionEvent"]["language"] == "PYTHON" else: - assert event["eventType"] == "TASK_EXECUTION_EVENT" + assert event["eventType"] == "TASK_LIFECYCLE_EVENT" assert driver_task_definition_received assert actor_creation_task_definition_received - expected_driver_task_states = {"8", "11"} - expected_actor_creation_task_states = {"1", "2", "12"} + expected_driver_task_states = {"RUNNING", "FINISHED"} + expected_actor_creation_task_states = { + "PENDING_ARGS_AVAIL", + "PENDING_NODE_ASSIGNMENT", + "FAILED", + } expected_task_id_states_dict = { (driver_task_id, 0): expected_driver_task_states, (actor_creation_task_id, 0): expected_actor_creation_task_states, @@ -932,7 +979,7 @@ def validate_events(events: json): "errorMessage": "", } } - check_task_execution_event_states_and_error_info( + check_task_lifecycle_event_states_and_error_info( events, expected_task_id_states_dict, expected_task_id_error_info_dict ) From c232920ec9f00c16826854ae0210a832d9aa5600 Mon Sep 17 00:00:00 2001 From: Mengjin Yan Date: Mon, 20 Oct 2025 10:48:51 -0700 Subject: [PATCH 03/13] move the test to medium size test list Signed-off-by: Mengjin Yan --- python/ray/tests/BUILD.bazel | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/tests/BUILD.bazel b/python/ray/tests/BUILD.bazel index 88463833c587..fa229bd04a68 100644 --- a/python/ray/tests/BUILD.bazel +++ b/python/ray/tests/BUILD.bazel @@ -345,6 +345,7 @@ py_test_module_list( "test_placement_group_4.py", "test_placement_group_failover.py", "test_ray_debugger.py", + "test_ray_event_export_task_events.py", "test_ray_init.py", "test_ray_init_2.py", "test_ray_shutdown.py", @@ -567,7 +568,6 @@ py_test_module_list( "test_placement_group_metrics.py", "test_protobuf_compatibility.py", "test_queue.py", - "test_ray_event_export_task_events.py", "test_raylet_output.py", "test_reconstruction_stress.py", "test_reconstruction_stress_spill.py", From 1239b26b40ed4b16619eda1c0f68ca844dda5e6d Mon Sep 17 00:00:00 2001 From: Mengjin Yan Date: Tue, 21 Oct 2025 09:33:07 -0700 Subject: [PATCH 04/13] add logic to test the preserving proto field option Signed-off-by: Mengjin Yan --- .../test_ray_event_export_task_events.py | 1628 ++++++++++++----- 1 file changed, 1160 insertions(+), 468 deletions(-) diff --git a/python/ray/tests/test_ray_event_export_task_events.py b/python/ray/tests/test_ray_event_export_task_events.py index 3fe36bfa9781..334c844c3e26 100644 --- a/python/ray/tests/test_ray_event_export_task_events.py +++ b/python/ray/tests/test_ray_event_export_task_events.py @@ -1,5 +1,6 @@ import json import logging +from typing import Union import grpc import pytest @@ -24,17 +25,24 @@ def httpserver_listen_address(): _cluster_with_aggregator_target = pytest.mark.parametrize( - "ray_start_cluster_head_with_env_vars", + ("preserve_proto_field_name", "ray_start_cluster_head_with_env_vars"), [ - { - "env_vars": { - "RAY_task_events_report_interval_ms": 100, - "RAY_enable_core_worker_ray_event_to_aggregator": "1", - "RAY_DASHBOARD_AGGREGATOR_AGENT_EVENTS_EXPORT_ADDR": _EVENT_AGGREGATOR_AGENT_TARGET_ADDR, + pytest.param( + preserve_proto_field_name, + { + "env_vars": { + "RAY_task_events_report_interval_ms": 100, + "RAY_enable_core_worker_ray_event_to_aggregator": "1", + "RAY_DASHBOARD_AGGREGATOR_AGENT_EVENTS_EXPORT_ADDR": _EVENT_AGGREGATOR_AGENT_TARGET_ADDR, + "RAY_DASHBOARD_AGGREGATOR_AGENT_PRESERVE_PROTO_FIELD_NAME": ( + "1" if preserve_proto_field_name is True else "0" + ), + }, }, - }, + ) + for preserve_proto_field_name in [True, False] ], - indirect=True, + indirect=["ray_start_cluster_head_with_env_vars"], ) @@ -47,53 +55,86 @@ def wait_until_grpc_channel_ready(target: str, timeout: int = 10): return False -def get_job_ids_and_driver_script_task_ids_from_events(events: json) -> tuple[str, str]: +def get_job_ids_and_driver_script_task_ids_from_events( + events: json, preserve_proto_field_name: bool +) -> tuple[Union[str, None], Union[str, None]]: test_job_id = ray.get_runtime_context().get_job_id() driver_script_job_id = None driver_task_id = None for event in events: - if event["eventType"] == "TASK_DEFINITION_EVENT": - if ( - event["taskDefinitionEvent"]["taskType"] == "DRIVER_TASK" - and event["taskDefinitionEvent"]["jobId"] != test_job_id - ): - driver_task_id = event["taskDefinitionEvent"]["taskId"] - driver_script_job_id = event["taskDefinitionEvent"]["jobId"] - assert driver_task_id is not None - assert driver_script_job_id is not None + if preserve_proto_field_name: + if event["event_type"] == "TASK_DEFINITION_EVENT": + if ( + event["task_definition_event"]["task_type"] == "DRIVER_TASK" + and event["task_definition_event"]["job_id"] != test_job_id + ): + driver_task_id = event["task_definition_event"]["task_id"] + driver_script_job_id = event["task_definition_event"]["job_id"] + assert driver_task_id is not None + assert driver_script_job_id is not None + else: + if event["eventType"] == "TASK_DEFINITION_EVENT": + if ( + event["taskDefinitionEvent"]["taskType"] == "DRIVER_TASK" + and event["taskDefinitionEvent"]["jobId"] != test_job_id + ): + driver_task_id = event["taskDefinitionEvent"]["taskId"] + driver_script_job_id = event["taskDefinitionEvent"]["jobId"] + assert driver_task_id is not None + assert driver_script_job_id is not None return driver_script_job_id, driver_task_id -def check_task_event_base_fields(event: json): - assert event["eventId"] is not None - assert event["sourceType"] == "CORE_WORKER" +def check_task_event_base_fields(event: json, preserve_proto_field_name: bool): assert event["timestamp"] is not None assert event["severity"] == "INFO" - assert event["sessionName"] is not None + if preserve_proto_field_name: + assert event["event_id"] is not None + assert event["source_type"] == "CORE_WORKER" + assert event["session_name"] is not None + else: + assert event["eventId"] is not None + assert event["sourceType"] == "CORE_WORKER" + assert event["sessionName"] is not None def check_task_lifecycle_event_states_and_error_info( events: json, expected_task_id_states_dict: dict, expected_task_id_error_info_dict: dict, + preserve_proto_field_name: bool, ): task_id_states_dict = {} task_id_error_info_dict = {} for event in events: - if event["eventType"] == "TASK_LIFECYCLE_EVENT": - task_id = event["taskLifecycleEvent"]["taskId"] - task_attempt = event["taskLifecycleEvent"]["taskAttempt"] - if (task_id, task_attempt) not in task_id_states_dict: - task_id_states_dict[(task_id, task_attempt)] = set() - - for state in event["taskLifecycleEvent"]["stateTransitions"]: - task_id_states_dict[(task_id, task_attempt)].add(state["state"]) - if "rayErrorInfo" in event["taskLifecycleEvent"]: - task_id_error_info_dict[(task_id, task_attempt)] = event[ - "taskLifecycleEvent" - ]["rayErrorInfo"] + if preserve_proto_field_name: + if event["event_type"] == "TASK_LIFECYCLE_EVENT": + task_id = event["task_lifecycle_event"]["task_id"] + task_attempt = event["task_lifecycle_event"]["task_attempt"] + if (task_id, task_attempt) not in task_id_states_dict: + task_id_states_dict[(task_id, task_attempt)] = set() + + for state in event["task_lifecycle_event"]["state_transitions"]: + task_id_states_dict[(task_id, task_attempt)].add(state["state"]) + if "ray_error_info" in event["task_lifecycle_event"]: + task_id_error_info_dict[(task_id, task_attempt)] = event[ + "task_lifecycle_event" + ]["ray_error_info"] + else: + if event["eventType"] == "TASK_LIFECYCLE_EVENT": + task_id = event["taskLifecycleEvent"]["taskId"] + task_attempt = event["taskLifecycleEvent"]["taskAttempt"] + if (task_id, task_attempt) not in task_id_states_dict: + task_id_states_dict[(task_id, task_attempt)] = set() + + for state in event["taskLifecycleEvent"]["stateTransitions"]: + task_id_states_dict[(task_id, task_attempt)].add(state["state"]) + if "rayErrorInfo" in event["taskLifecycleEvent"]: + task_id_error_info_dict[(task_id, task_attempt)] = event[ + "taskLifecycleEvent" + ]["rayErrorInfo"] for ( expected_task_id_attempt, @@ -107,14 +148,24 @@ def check_task_lifecycle_event_states_and_error_info( expected_error_info, ) in expected_task_id_error_info_dict.items(): assert expected_task_id_attempt in task_id_error_info_dict - assert ( - task_id_error_info_dict[expected_task_id_attempt]["errorType"] - == expected_error_info["errorType"] - ) - assert ( - expected_error_info["errorMessage"] - in task_id_error_info_dict[expected_task_id_attempt]["errorMessage"] - ) + if preserve_proto_field_name: + assert ( + task_id_error_info_dict[expected_task_id_attempt]["error_type"] + == expected_error_info["error_type"] + ) + assert ( + expected_error_info["error_message"] + in task_id_error_info_dict[expected_task_id_attempt]["error_message"] + ) + else: + assert ( + task_id_error_info_dict[expected_task_id_attempt]["errorType"] + == expected_error_info["errorType"] + ) + assert ( + expected_error_info["errorMessage"] + in task_id_error_info_dict[expected_task_id_attempt]["errorMessage"] + ) def get_and_validate_events(httpserver, validation_func): @@ -137,13 +188,18 @@ def run_driver_script_and_wait_for_events(script, httpserver, cluster, validatio "127.0.0.1:" + str(cluster.head_node.dashboard_agent_listen_port) ) run_string_as_driver_nonblocking(script) - wait_for_condition(lambda: get_and_validate_events(httpserver, validation_func)) + wait_for_condition( + lambda: get_and_validate_events(httpserver, validation_func), timeout=300 + ) class TestNormalTaskEvents: @_cluster_with_aggregator_target def test_normal_task_succeed( - self, ray_start_cluster_head_with_env_vars, httpserver + self, + ray_start_cluster_head_with_env_vars, + httpserver, + preserve_proto_field_name, ): script = """ import ray @@ -159,7 +215,9 @@ def validate_events(events): ( driver_script_job_id, driver_task_id, - ) = get_job_ids_and_driver_script_task_ids_from_events(events) + ) = get_job_ids_and_driver_script_task_ids_from_events( + events, preserve_proto_field_name + ) expected_driver_task_states = {"RUNNING", "FINISHED"} expected_normal_task_states = { @@ -174,59 +232,127 @@ def validate_events(events): driver_task_definition_received = False normal_task_definition_received = False for event in events: - if event["eventType"] == "TASK_DEFINITION_EVENT": - check_task_event_base_fields(event) - - if event["taskDefinitionEvent"]["taskType"] == "DRIVER_TASK": - if event["taskDefinitionEvent"]["taskId"] != driver_task_id: - continue - driver_task_definition_received = True - assert event["taskDefinitionEvent"]["taskAttempt"] == 0 - assert event["taskDefinitionEvent"]["language"] == "PYTHON" + if preserve_proto_field_name: + if event["event_type"] == "TASK_DEFINITION_EVENT": + check_task_event_base_fields(event, preserve_proto_field_name) + + if event["task_definition_event"]["task_type"] == "DRIVER_TASK": + if ( + event["task_definition_event"]["task_id"] + != driver_task_id + ): + continue + driver_task_definition_received = True + assert event["task_definition_event"]["task_attempt"] == 0 + assert ( + event["task_definition_event"]["language"] == "PYTHON" + ) + else: + normal_task_definition_received = True + normal_task_id = event["task_definition_event"]["task_id"] + assert normal_task_id is not None + assert ( + event["task_definition_event"]["task_func"][ + "python_function_descriptor" + ]["module_name"] + == "__main__" + ) + assert ( + event["task_definition_event"]["task_func"][ + "python_function_descriptor" + ]["class_name"] + == "" + ) + assert ( + event["task_definition_event"]["task_func"][ + "python_function_descriptor" + ]["function_name"] + == "normal_task" + ) + assert ( + event["task_definition_event"]["task_func"][ + "python_function_descriptor" + ]["function_hash"] + is not None + ) + assert ( + event["task_definition_event"]["task_name"] + == "normal_task" + ) + assert event["task_definition_event"][ + "required_resources" + ] == {"CPU": 1.0} + assert ( + event["task_definition_event"]["job_id"] + == driver_script_job_id + ) + assert ( + event["task_definition_event"]["parent_task_id"] + == driver_task_id + ) + assert event["task_definition_event"]["task_attempt"] == 0 + assert ( + event["task_definition_event"]["language"] == "PYTHON" + ) else: - normal_task_definition_received = True - normal_task_id = event["taskDefinitionEvent"]["taskId"] - assert normal_task_id is not None - assert ( - event["taskDefinitionEvent"]["taskFunc"][ - "pythonFunctionDescriptor" - ]["moduleName"] - == "__main__" - ) - assert ( - event["taskDefinitionEvent"]["taskFunc"][ - "pythonFunctionDescriptor" - ]["className"] - == "" - ) - assert ( - event["taskDefinitionEvent"]["taskFunc"][ - "pythonFunctionDescriptor" - ]["functionName"] - == "normal_task" - ) - assert ( - event["taskDefinitionEvent"]["taskFunc"][ - "pythonFunctionDescriptor" - ]["functionHash"] - is not None - ) - assert event["taskDefinitionEvent"]["taskName"] == "normal_task" - assert event["taskDefinitionEvent"]["requiredResources"] == { - "CPU": 1.0 - } - assert ( - event["taskDefinitionEvent"]["jobId"] - == driver_script_job_id - ) - assert ( - event["taskDefinitionEvent"]["parentTaskId"] - == driver_task_id - ) - assert event["taskDefinitionEvent"]["taskAttempt"] == 0 - assert event["taskDefinitionEvent"]["language"] == "PYTHON" + assert event["event_type"] == "TASK_LIFECYCLE_EVENT" else: - assert event["eventType"] == "TASK_LIFECYCLE_EVENT" + if event["eventType"] == "TASK_DEFINITION_EVENT": + check_task_event_base_fields(event, preserve_proto_field_name) + + if event["taskDefinitionEvent"]["taskType"] == "DRIVER_TASK": + if event["taskDefinitionEvent"]["taskId"] != driver_task_id: + continue + driver_task_definition_received = True + assert event["taskDefinitionEvent"]["taskAttempt"] == 0 + assert event["taskDefinitionEvent"]["language"] == "PYTHON" + else: + normal_task_definition_received = True + normal_task_id = event["taskDefinitionEvent"]["taskId"] + assert normal_task_id is not None + assert ( + event["taskDefinitionEvent"]["taskFunc"][ + "pythonFunctionDescriptor" + ]["moduleName"] + == "__main__" + ) + assert ( + event["taskDefinitionEvent"]["taskFunc"][ + "pythonFunctionDescriptor" + ]["className"] + == "" + ) + assert ( + event["taskDefinitionEvent"]["taskFunc"][ + "pythonFunctionDescriptor" + ]["functionName"] + == "normal_task" + ) + assert ( + event["taskDefinitionEvent"]["taskFunc"][ + "pythonFunctionDescriptor" + ]["functionHash"] + is not None + ) + assert ( + event["taskDefinitionEvent"]["taskName"] + == "normal_task" + ) + assert event["taskDefinitionEvent"][ + "requiredResources" + ] == {"CPU": 1.0} + assert ( + event["taskDefinitionEvent"]["jobId"] + == driver_script_job_id + ) + assert ( + event["taskDefinitionEvent"]["parentTaskId"] + == driver_task_id + ) + assert event["taskDefinitionEvent"]["taskAttempt"] == 0 + assert event["taskDefinitionEvent"]["language"] == "PYTHON" + else: + assert event["eventType"] == "TASK_LIFECYCLE_EVENT" assert driver_task_definition_received assert normal_task_definition_received @@ -238,7 +364,10 @@ def validate_events(events): } expected_task_id_error_info_dict = {} check_task_lifecycle_event_states_and_error_info( - events, expected_task_id_states_dict, expected_task_id_error_info_dict + events, + expected_task_id_states_dict, + expected_task_id_error_info_dict, + preserve_proto_field_name, ) run_driver_script_and_wait_for_events( @@ -247,7 +376,10 @@ def validate_events(events): @_cluster_with_aggregator_target def test_normal_task_execution_failure_with_retry( - self, ray_start_cluster_head_with_env_vars, httpserver + self, + ray_start_cluster_head_with_env_vars, + httpserver, + preserve_proto_field_name, ): script = """ import ray @@ -267,70 +399,144 @@ def validate_events(events: json): ( driver_script_job_id, driver_task_id, - ) = get_job_ids_and_driver_script_task_ids_from_events(events) + ) = get_job_ids_and_driver_script_task_ids_from_events( + events, preserve_proto_field_name + ) # Check definition events driver_task_definition_received = False normal_task_definition_received = False normal_task_definition_retry_received = False for event in events: - if event["eventType"] == "TASK_DEFINITION_EVENT": - check_task_event_base_fields(event) - - if event["taskDefinitionEvent"]["taskType"] == "DRIVER_TASK": - if event["taskDefinitionEvent"]["taskId"] != driver_task_id: - continue - driver_task_definition_received = True - assert event["taskDefinitionEvent"]["taskAttempt"] == 0 - assert event["taskDefinitionEvent"]["language"] == "PYTHON" - else: - normal_task_definition_received = True - normal_task_id = event["taskDefinitionEvent"]["taskId"] - assert normal_task_id is not None - assert ( - event["taskDefinitionEvent"]["taskFunc"][ - "pythonFunctionDescriptor" - ]["moduleName"] - == "__main__" - ) - assert ( - event["taskDefinitionEvent"]["taskFunc"][ - "pythonFunctionDescriptor" - ]["className"] - == "" - ) - assert ( - event["taskDefinitionEvent"]["taskFunc"][ - "pythonFunctionDescriptor" - ]["functionName"] - == "normal_task" - ) - assert ( - event["taskDefinitionEvent"]["taskFunc"][ - "pythonFunctionDescriptor" - ]["functionHash"] - is not None - ) - assert event["taskDefinitionEvent"]["taskName"] == "normal_task" - assert event["taskDefinitionEvent"]["requiredResources"] == { - "CPU": 1.0 - } - assert ( - event["taskDefinitionEvent"]["jobId"] - == driver_script_job_id - ) - assert ( - event["taskDefinitionEvent"]["parentTaskId"] - == driver_task_id - ) - if event["taskDefinitionEvent"]["taskAttempt"] == 0: - normal_task_definition_received = True + if preserve_proto_field_name: + if event["event_type"] == "TASK_DEFINITION_EVENT": + check_task_event_base_fields(event, preserve_proto_field_name) + + if event["task_definition_event"]["task_type"] == "DRIVER_TASK": + if ( + event["task_definition_event"]["task_id"] + != driver_task_id + ): + continue + driver_task_definition_received = True + assert event["task_definition_event"]["task_attempt"] == 0 + assert ( + event["task_definition_event"]["language"] == "PYTHON" + ) else: - assert event["taskDefinitionEvent"]["taskAttempt"] == 1 - normal_task_definition_retry_received = True - assert event["taskDefinitionEvent"]["language"] == "PYTHON" + normal_task_id = event["task_definition_event"]["task_id"] + assert normal_task_id is not None + assert ( + event["task_definition_event"]["task_func"][ + "python_function_descriptor" + ]["module_name"] + == "__main__" + ) + assert ( + event["task_definition_event"]["task_func"][ + "python_function_descriptor" + ]["class_name"] + == "" + ) + assert ( + event["task_definition_event"]["task_func"][ + "python_function_descriptor" + ]["function_name"] + == "normal_task" + ) + assert ( + event["task_definition_event"]["task_func"][ + "python_function_descriptor" + ]["function_hash"] + is not None + ) + assert ( + event["task_definition_event"]["task_name"] + == "normal_task" + ) + assert event["task_definition_event"][ + "required_resources" + ] == {"CPU": 1.0} + assert ( + event["task_definition_event"]["job_id"] + == driver_script_job_id + ) + assert ( + event["task_definition_event"]["parent_task_id"] + == driver_task_id + ) + if event["task_definition_event"]["task_attempt"] == 0: + normal_task_definition_received = True + else: + assert ( + event["task_definition_event"]["task_attempt"] == 1 + ) + normal_task_definition_retry_received = True + assert ( + event["task_definition_event"]["language"] == "PYTHON" + ) + else: + assert event["event_type"] == "TASK_LIFECYCLE_EVENT" else: - assert event["eventType"] == "TASK_LIFECYCLE_EVENT" + if event["eventType"] == "TASK_DEFINITION_EVENT": + check_task_event_base_fields(event, preserve_proto_field_name) + + if event["taskDefinitionEvent"]["taskType"] == "DRIVER_TASK": + if event["taskDefinitionEvent"]["taskId"] != driver_task_id: + continue + driver_task_definition_received = True + assert event["taskDefinitionEvent"]["taskAttempt"] == 0 + assert event["taskDefinitionEvent"]["language"] == "PYTHON" + else: + normal_task_id = event["taskDefinitionEvent"]["taskId"] + assert normal_task_id is not None + assert ( + event["taskDefinitionEvent"]["taskFunc"][ + "pythonFunctionDescriptor" + ]["moduleName"] + == "__main__" + ) + assert ( + event["taskDefinitionEvent"]["taskFunc"][ + "pythonFunctionDescriptor" + ]["className"] + == "" + ) + assert ( + event["taskDefinitionEvent"]["taskFunc"][ + "pythonFunctionDescriptor" + ]["functionName"] + == "normal_task" + ) + assert ( + event["taskDefinitionEvent"]["taskFunc"][ + "pythonFunctionDescriptor" + ]["functionHash"] + is not None + ) + assert ( + event["taskDefinitionEvent"]["taskName"] + == "normal_task" + ) + assert event["taskDefinitionEvent"][ + "requiredResources" + ] == {"CPU": 1.0} + assert ( + event["taskDefinitionEvent"]["jobId"] + == driver_script_job_id + ) + assert ( + event["taskDefinitionEvent"]["parentTaskId"] + == driver_task_id + ) + if event["taskDefinitionEvent"]["taskAttempt"] == 0: + normal_task_definition_received = True + else: + assert event["taskDefinitionEvent"]["taskAttempt"] == 1 + normal_task_definition_retry_received = True + assert event["taskDefinitionEvent"]["language"] == "PYTHON" + else: + assert event["eventType"] == "TASK_LIFECYCLE_EVENT" assert driver_task_definition_received assert normal_task_definition_received assert normal_task_definition_retry_received @@ -349,18 +555,33 @@ def validate_events(events: json): (normal_task_id, 0): expected_normal_task_states, (normal_task_id, 1): expected_normal_task_states, } - expected_task_id_error_info_dict = { - (normal_task_id, 0): { - "errorType": "TASK_EXECUTION_EXCEPTION", - "errorMessage": "test error", - }, - (normal_task_id, 1): { - "errorType": "TASK_EXECUTION_EXCEPTION", - "errorMessage": "test error", - }, - } + if preserve_proto_field_name: + expected_task_id_error_info_dict = { + (normal_task_id, 0): { + "error_type": "TASK_EXECUTION_EXCEPTION", + "error_message": "test error", + }, + (normal_task_id, 1): { + "error_type": "TASK_EXECUTION_EXCEPTION", + "error_message": "test error", + }, + } + else: + expected_task_id_error_info_dict = { + (normal_task_id, 0): { + "errorType": "TASK_EXECUTION_EXCEPTION", + "errorMessage": "test error", + }, + (normal_task_id, 1): { + "errorType": "TASK_EXECUTION_EXCEPTION", + "errorMessage": "test error", + }, + } check_task_lifecycle_event_states_and_error_info( - events, expected_task_id_states_dict, expected_task_id_error_info_dict + events, + expected_task_id_states_dict, + expected_task_id_error_info_dict, + preserve_proto_field_name, ) run_driver_script_and_wait_for_events( @@ -369,7 +590,10 @@ def validate_events(events: json): @_cluster_with_aggregator_target def test_task_failed_due_to_node_failure( - self, ray_start_cluster_head_with_env_vars, httpserver + self, + ray_start_cluster_head_with_env_vars, + httpserver, + preserve_proto_field_name, ): cluster = ray_start_cluster_head_with_env_vars node = cluster.add_node(num_cpus=2) @@ -394,25 +618,44 @@ def validate_task_running(events: json): # Obtain the task id of the sleep task normal_task_id = None for event in events: - if ( - event["eventType"] == "TASK_DEFINITION_EVENT" - and event["taskDefinitionEvent"]["taskType"] == "NORMAL_TASK" - ): - normal_task_id = event["taskDefinitionEvent"]["taskId"] - break + if preserve_proto_field_name: + if ( + event["event_type"] == "TASK_DEFINITION_EVENT" + and event["task_definition_event"]["task_type"] == "NORMAL_TASK" + ): + normal_task_id = event["task_definition_event"]["task_id"] + break + else: + if ( + event["eventType"] == "TASK_DEFINITION_EVENT" + and event["taskDefinitionEvent"]["taskType"] == "NORMAL_TASK" + ): + normal_task_id = event["taskDefinitionEvent"]["taskId"] + break assert normal_task_id is not None # Check whether the task lifecycle event has running state for event in events: - if ( - event["eventType"] == "TASK_LIFECYCLE_EVENT" - and event["taskLifecycleEvent"]["taskId"] == normal_task_id - ): - for state_transition in event["taskLifecycleEvent"][ - "stateTransitions" - ]: - if state_transition["state"] == "RUNNING": - return + if preserve_proto_field_name: + if ( + event["event_type"] == "TASK_LIFECYCLE_EVENT" + and event["task_lifecycle_event"]["task_id"] == normal_task_id + ): + for state_transition in event["task_lifecycle_event"][ + "state_transitions" + ]: + if state_transition["state"] == "RUNNING": + return + else: + if ( + event["eventType"] == "TASK_LIFECYCLE_EVENT" + and event["taskLifecycleEvent"]["taskId"] == normal_task_id + ): + for state_transition in event["taskLifecycleEvent"][ + "stateTransitions" + ]: + if state_transition["state"] == "RUNNING": + return assert False run_driver_script_and_wait_for_events( @@ -430,65 +673,135 @@ def validate_task_killed(events: json): ( driver_script_job_id, driver_task_id, - ) = get_job_ids_and_driver_script_task_ids_from_events(events) + ) = get_job_ids_and_driver_script_task_ids_from_events( + events, preserve_proto_field_name + ) # Check the task definition events driver_task_definition_received = False normal_task_definition_received = False for event in events: - if event["eventType"] == "TASK_DEFINITION_EVENT": - check_task_event_base_fields(event) - - if event["taskDefinitionEvent"]["taskType"] == "DRIVER_TASK": - if event["taskDefinitionEvent"]["taskId"] != driver_task_id: - continue - driver_task_definition_received = True - assert event["taskDefinitionEvent"]["taskAttempt"] == 0 - assert event["taskDefinitionEvent"]["language"] == "PYTHON" + if preserve_proto_field_name: + if event["event_type"] == "TASK_DEFINITION_EVENT": + check_task_event_base_fields(event, preserve_proto_field_name) + + if event["task_definition_event"]["task_type"] == "DRIVER_TASK": + if ( + event["task_definition_event"]["task_id"] + != driver_task_id + ): + continue + driver_task_definition_received = True + assert event["task_definition_event"]["task_attempt"] == 0 + assert ( + event["task_definition_event"]["language"] == "PYTHON" + ) + else: + normal_task_definition_received = True + normal_task_id = event["task_definition_event"]["task_id"] + assert normal_task_id is not None + assert ( + event["task_definition_event"]["task_func"][ + "python_function_descriptor" + ]["module_name"] + == "__main__" + ) + assert ( + event["task_definition_event"]["task_func"][ + "python_function_descriptor" + ]["class_name"] + == "" + ) + assert ( + event["task_definition_event"]["task_func"][ + "python_function_descriptor" + ]["function_name"] + == "sleep" + ) + assert ( + event["task_definition_event"]["task_func"][ + "python_function_descriptor" + ]["function_hash"] + is not None + ) + assert ( + event["task_definition_event"]["task_name"] + == "node-killed" + ) + assert event["task_definition_event"][ + "required_resources" + ] == {"CPU": 2.0} + assert ( + event["task_definition_event"]["job_id"] + == driver_script_job_id + ) + assert ( + event["task_definition_event"]["parent_task_id"] + == driver_task_id + ) + assert event["task_definition_event"]["task_attempt"] == 0 + assert ( + event["task_definition_event"]["language"] == "PYTHON" + ) else: - normal_task_definition_received = True - normal_task_id = event["taskDefinitionEvent"]["taskId"] - assert normal_task_id is not None - assert ( - event["taskDefinitionEvent"]["taskFunc"][ - "pythonFunctionDescriptor" - ]["moduleName"] - == "__main__" - ) - assert ( - event["taskDefinitionEvent"]["taskFunc"][ - "pythonFunctionDescriptor" - ]["className"] - == "" - ) - assert ( - event["taskDefinitionEvent"]["taskFunc"][ - "pythonFunctionDescriptor" - ]["functionName"] - == "sleep" - ) - assert ( - event["taskDefinitionEvent"]["taskFunc"][ - "pythonFunctionDescriptor" - ]["functionHash"] - is not None - ) - assert event["taskDefinitionEvent"]["taskName"] == "node-killed" - assert event["taskDefinitionEvent"]["requiredResources"] == { - "CPU": 2.0 - } - assert ( - event["taskDefinitionEvent"]["jobId"] - == driver_script_job_id - ) - assert ( - event["taskDefinitionEvent"]["parentTaskId"] - == driver_task_id - ) - assert event["taskDefinitionEvent"]["taskAttempt"] == 0 - assert event["taskDefinitionEvent"]["language"] == "PYTHON" + assert event["event_type"] == "TASK_LIFECYCLE_EVENT" else: - assert event["eventType"] == "TASK_LIFECYCLE_EVENT" + if event["eventType"] == "TASK_DEFINITION_EVENT": + check_task_event_base_fields(event, preserve_proto_field_name) + + if event["taskDefinitionEvent"]["taskType"] == "DRIVER_TASK": + if event["taskDefinitionEvent"]["taskId"] != driver_task_id: + continue + driver_task_definition_received = True + assert event["taskDefinitionEvent"]["taskAttempt"] == 0 + assert event["taskDefinitionEvent"]["language"] == "PYTHON" + else: + normal_task_definition_received = True + normal_task_id = event["taskDefinitionEvent"]["taskId"] + assert normal_task_id is not None + assert ( + event["taskDefinitionEvent"]["taskFunc"][ + "pythonFunctionDescriptor" + ]["moduleName"] + == "__main__" + ) + assert ( + event["taskDefinitionEvent"]["taskFunc"][ + "pythonFunctionDescriptor" + ]["className"] + == "" + ) + assert ( + event["taskDefinitionEvent"]["taskFunc"][ + "pythonFunctionDescriptor" + ]["functionName"] + == "sleep" + ) + assert ( + event["taskDefinitionEvent"]["taskFunc"][ + "pythonFunctionDescriptor" + ]["functionHash"] + is not None + ) + assert ( + event["taskDefinitionEvent"]["taskName"] + == "node-killed" + ) + assert event["taskDefinitionEvent"][ + "requiredResources" + ] == {"CPU": 2.0} + assert ( + event["taskDefinitionEvent"]["jobId"] + == driver_script_job_id + ) + assert ( + event["taskDefinitionEvent"]["parentTaskId"] + == driver_task_id + ) + assert event["taskDefinitionEvent"]["taskAttempt"] == 0 + assert event["taskDefinitionEvent"]["language"] == "PYTHON" + else: + assert event["eventType"] == "TASK_LIFECYCLE_EVENT" assert driver_task_definition_received assert normal_task_definition_received @@ -505,23 +818,40 @@ def validate_task_killed(events: json): (driver_task_id, 0): expected_driver_task_states, (normal_task_id, 0): expected_normal_task_states, } - expected_task_id_error_info_dict = { - (normal_task_id, 0): { - "errorType": "NODE_DIED", - "errorMessage": "Task failed due to the node (where this task was running) was dead or unavailable", + if preserve_proto_field_name: + expected_task_id_error_info_dict = { + (normal_task_id, 0): { + "error_type": "NODE_DIED", + "error_message": "Task failed due to the node (where this task was running) was dead or unavailable", + } + } + else: + expected_task_id_error_info_dict = { + (normal_task_id, 0): { + "errorType": "NODE_DIED", + "errorMessage": "Task failed due to the node (where this task was running) was dead or unavailable", + } } - } check_task_lifecycle_event_states_and_error_info( - events, expected_task_id_states_dict, expected_task_id_error_info_dict + events, + expected_task_id_states_dict, + expected_task_id_error_info_dict, + preserve_proto_field_name, ) - get_and_validate_events(httpserver, validate_task_killed) + wait_for_condition( + lambda: get_and_validate_events(httpserver, validate_task_killed), + timeout=300, + ) class TestActorTaskEvents: @_cluster_with_aggregator_target def test_actor_creation_succeed( - self, ray_start_cluster_head_with_env_vars, httpserver + self, + ray_start_cluster_head_with_env_vars, + httpserver, + preserve_proto_field_name, ): script = """ import ray @@ -544,115 +874,239 @@ def validate_events(events: json): ( driver_script_job_id, driver_task_id, - ) = get_job_ids_and_driver_script_task_ids_from_events(events) + ) = get_job_ids_and_driver_script_task_ids_from_events( + events, preserve_proto_field_name + ) driver_task_definition_received = False actor_creation_task_definition_received = False actor_task_definition_received = False for event in events: - if event["eventType"] == "TASK_DEFINITION_EVENT": - check_task_event_base_fields(event) + if preserve_proto_field_name: + if event["event_type"] == "TASK_DEFINITION_EVENT": + check_task_event_base_fields(event, preserve_proto_field_name) - if event["taskDefinitionEvent"]["taskType"] == "DRIVER_TASK": - driver_task_definition_received = True - assert event["taskDefinitionEvent"]["taskAttempt"] == 0 - assert event["taskDefinitionEvent"]["language"] == "PYTHON" + if event["task_definition_event"]["task_type"] == "DRIVER_TASK": + driver_task_definition_received = True + assert event["task_definition_event"]["task_attempt"] == 0 + assert ( + event["task_definition_event"]["language"] == "PYTHON" + ) - else: + else: + assert ( + event["task_definition_event"]["task_type"] + == "ACTOR_CREATION_TASK" + ) + actor_creation_task_definition_received = True + actor_creation_task_id = event["task_definition_event"][ + "task_id" + ] + assert actor_creation_task_id is not None + assert ( + event["task_definition_event"]["task_func"][ + "python_function_descriptor" + ]["module_name"] + == "__main__" + ) + assert ( + event["task_definition_event"]["task_func"][ + "python_function_descriptor" + ]["class_name"] + == "Actor" + ) + assert ( + event["task_definition_event"]["task_func"][ + "python_function_descriptor" + ]["function_name"] + == "__init__" + ) + assert ( + event["task_definition_event"]["task_func"][ + "python_function_descriptor" + ]["function_hash"] + is not None + ) + assert ( + event["task_definition_event"]["task_name"] + == "Actor.__init__" + ) + assert event["task_definition_event"][ + "required_resources" + ] == {"CPU": 1.0} + assert ( + event["task_definition_event"]["parent_task_id"] + == driver_task_id + ) + assert ( + event["task_definition_event"]["job_id"] + == driver_script_job_id + ) + assert event["task_definition_event"]["task_attempt"] == 0 + assert ( + event["task_definition_event"]["language"] == "PYTHON" + ) + + elif event["event_type"] == "ACTOR_TASK_DEFINITION_EVENT": + actor_task_definition_received = True + actor_task_id = event["actor_task_definition_event"]["task_id"] + assert actor_task_id is not None + assert ( + event["actor_task_definition_event"]["actor_func"][ + "python_function_descriptor" + ]["module_name"] + == "__main__" + ) + assert ( + event["actor_task_definition_event"]["actor_func"][ + "python_function_descriptor" + ]["class_name"] + == "Actor" + ) + assert ( + event["actor_task_definition_event"]["actor_func"][ + "python_function_descriptor" + ]["function_name"] + == "task" + ) + assert ( + event["actor_task_definition_event"]["actor_func"][ + "python_function_descriptor" + ]["function_hash"] + is not None + ) + assert ( + event["actor_task_definition_event"]["actor_task_name"] + == "Actor.task" + ) + assert ( + event["actor_task_definition_event"]["required_resources"] + == {} + ) assert ( - event["taskDefinitionEvent"]["taskType"] - == "ACTOR_CREATION_TASK" + event["actor_task_definition_event"]["job_id"] + == driver_script_job_id ) - actor_creation_task_definition_received = True - actor_creation_task_id = event["taskDefinitionEvent"]["taskId"] - assert actor_creation_task_id is not None assert ( - event["taskDefinitionEvent"]["taskFunc"][ + event["actor_task_definition_event"]["parent_task_id"] + == driver_task_id + ) + assert event["actor_task_definition_event"]["task_attempt"] == 0 + assert ( + event["actor_task_definition_event"]["language"] == "PYTHON" + ) + + else: + assert event["event_type"] == "TASK_LIFECYCLE_EVENT" + else: + if event["eventType"] == "TASK_DEFINITION_EVENT": + check_task_event_base_fields(event, preserve_proto_field_name) + + if event["taskDefinitionEvent"]["taskType"] == "DRIVER_TASK": + driver_task_definition_received = True + assert event["taskDefinitionEvent"]["taskAttempt"] == 0 + assert event["taskDefinitionEvent"]["language"] == "PYTHON" + + else: + assert ( + event["taskDefinitionEvent"]["taskType"] + == "ACTOR_CREATION_TASK" + ) + actor_creation_task_definition_received = True + actor_creation_task_id = event["taskDefinitionEvent"][ + "taskId" + ] + assert actor_creation_task_id is not None + assert ( + event["taskDefinitionEvent"]["taskFunc"][ + "pythonFunctionDescriptor" + ]["moduleName"] + == "__main__" + ) + assert ( + event["taskDefinitionEvent"]["taskFunc"][ + "pythonFunctionDescriptor" + ]["className"] + == "Actor" + ) + assert ( + event["taskDefinitionEvent"]["taskFunc"][ + "pythonFunctionDescriptor" + ]["functionName"] + == "__init__" + ) + assert ( + event["taskDefinitionEvent"]["taskFunc"][ + "pythonFunctionDescriptor" + ]["functionHash"] + is not None + ) + assert ( + event["taskDefinitionEvent"]["taskName"] + == "Actor.__init__" + ) + assert event["taskDefinitionEvent"][ + "requiredResources" + ] == {"CPU": 1.0} + assert ( + event["taskDefinitionEvent"]["parentTaskId"] + == driver_task_id + ) + assert ( + event["taskDefinitionEvent"]["jobId"] + == driver_script_job_id + ) + assert event["taskDefinitionEvent"]["taskAttempt"] == 0 + assert event["taskDefinitionEvent"]["language"] == "PYTHON" + + elif event["eventType"] == "ACTOR_TASK_DEFINITION_EVENT": + actor_task_definition_received = True + actor_task_id = event["actorTaskDefinitionEvent"]["taskId"] + assert actor_task_id is not None + assert ( + event["actorTaskDefinitionEvent"]["actorFunc"][ "pythonFunctionDescriptor" ]["moduleName"] == "__main__" ) assert ( - event["taskDefinitionEvent"]["taskFunc"][ + event["actorTaskDefinitionEvent"]["actorFunc"][ "pythonFunctionDescriptor" ]["className"] == "Actor" ) assert ( - event["taskDefinitionEvent"]["taskFunc"][ + event["actorTaskDefinitionEvent"]["actorFunc"][ "pythonFunctionDescriptor" ]["functionName"] - == "__init__" + == "task" ) assert ( - event["taskDefinitionEvent"]["taskFunc"][ + event["actorTaskDefinitionEvent"]["actorFunc"][ "pythonFunctionDescriptor" ]["functionHash"] is not None ) assert ( - event["taskDefinitionEvent"]["taskName"] == "Actor.__init__" + event["actorTaskDefinitionEvent"]["actorTaskName"] + == "Actor.task" ) - assert event["taskDefinitionEvent"]["requiredResources"] == { - "CPU": 1.0 - } assert ( - event["taskDefinitionEvent"]["parentTaskId"] - == driver_task_id + event["actorTaskDefinitionEvent"]["requiredResources"] == {} ) assert ( - event["taskDefinitionEvent"]["jobId"] + event["actorTaskDefinitionEvent"]["jobId"] == driver_script_job_id ) - assert event["taskDefinitionEvent"]["taskAttempt"] == 0 - assert event["taskDefinitionEvent"]["language"] == "PYTHON" - - elif event["eventType"] == "ACTOR_TASK_DEFINITION_EVENT": - actor_task_definition_received = True - actor_task_id = event["actorTaskDefinitionEvent"]["taskId"] - assert actor_task_id is not None - assert ( - event["actorTaskDefinitionEvent"]["actorFunc"][ - "pythonFunctionDescriptor" - ]["moduleName"] - == "__main__" - ) - assert ( - event["actorTaskDefinitionEvent"]["actorFunc"][ - "pythonFunctionDescriptor" - ]["className"] - == "Actor" - ) - assert ( - event["actorTaskDefinitionEvent"]["actorFunc"][ - "pythonFunctionDescriptor" - ]["functionName"] - == "task" - ) - assert ( - event["actorTaskDefinitionEvent"]["actorFunc"][ - "pythonFunctionDescriptor" - ]["functionHash"] - is not None - ) - assert ( - event["actorTaskDefinitionEvent"]["actorTaskName"] - == "Actor.task" - ) - assert event["actorTaskDefinitionEvent"]["requiredResources"] == {} - assert ( - event["actorTaskDefinitionEvent"]["jobId"] - == driver_script_job_id - ) - assert ( - event["actorTaskDefinitionEvent"]["parentTaskId"] - == driver_task_id - ) - assert event["actorTaskDefinitionEvent"]["taskAttempt"] == 0 - assert event["actorTaskDefinitionEvent"]["language"] == "PYTHON" + assert ( + event["actorTaskDefinitionEvent"]["parentTaskId"] + == driver_task_id + ) + assert event["actorTaskDefinitionEvent"]["taskAttempt"] == 0 + assert event["actorTaskDefinitionEvent"]["language"] == "PYTHON" - else: - assert event["eventType"] == "TASK_LIFECYCLE_EVENT" + else: + assert event["eventType"] == "TASK_LIFECYCLE_EVENT" assert driver_task_definition_received assert actor_creation_task_definition_received @@ -681,7 +1135,10 @@ def validate_events(events: json): } expected_task_id_error_info_dict = {} check_task_lifecycle_event_states_and_error_info( - events, expected_task_id_states_dict, expected_task_id_error_info_dict + events, + expected_task_id_states_dict, + expected_task_id_error_info_dict, + preserve_proto_field_name, ) run_driver_script_and_wait_for_events( @@ -690,7 +1147,10 @@ def validate_events(events: json): @_cluster_with_aggregator_target def test_actor_creation_failed_with_actor_task_retry( - self, ray_start_cluster_head_with_env_vars, httpserver + self, + ray_start_cluster_head_with_env_vars, + httpserver, + preserve_proto_field_name, ): script = """ import ray @@ -713,118 +1173,245 @@ def validate_events(events: json): ( driver_script_job_id, driver_task_id, - ) = get_job_ids_and_driver_script_task_ids_from_events(events) + ) = get_job_ids_and_driver_script_task_ids_from_events( + events, preserve_proto_field_name + ) driver_task_definition_received = False actor_creation_task_definition_received = False actor_task_definition_received = False actor_task_definition_retry_received = False for event in events: - if event["eventType"] == "TASK_DEFINITION_EVENT": - check_task_event_base_fields(event) + if preserve_proto_field_name: + if event["event_type"] == "TASK_DEFINITION_EVENT": + check_task_event_base_fields(event, preserve_proto_field_name) - if event["taskDefinitionEvent"]["taskType"] == "DRIVER_TASK": - driver_task_definition_received = True - assert event["taskDefinitionEvent"]["taskAttempt"] == 0 - assert event["taskDefinitionEvent"]["language"] == "PYTHON" + if event["task_definition_event"]["task_type"] == "DRIVER_TASK": + driver_task_definition_received = True + assert event["task_definition_event"]["task_attempt"] == 0 + assert ( + event["task_definition_event"]["language"] == "PYTHON" + ) - else: + else: + assert ( + event["task_definition_event"]["task_type"] + == "ACTOR_CREATION_TASK" + ) + actor_creation_task_definition_received = True + actor_creation_task_id = event["task_definition_event"][ + "task_id" + ] + assert actor_creation_task_id is not None + assert ( + event["task_definition_event"]["task_func"][ + "python_function_descriptor" + ]["module_name"] + == "__main__" + ) + assert ( + event["task_definition_event"]["task_func"][ + "python_function_descriptor" + ]["class_name"] + == "Actor" + ) + assert ( + event["task_definition_event"]["task_func"][ + "python_function_descriptor" + ]["function_name"] + == "__init__" + ) + assert ( + event["task_definition_event"]["task_func"][ + "python_function_descriptor" + ]["function_hash"] + is not None + ) + assert ( + event["task_definition_event"]["task_name"] + == "Actor.__init__" + ) + assert event["task_definition_event"][ + "required_resources" + ] == {"CPU": 1.0} + assert ( + event["task_definition_event"]["parent_task_id"] + == driver_task_id + ) + assert ( + event["task_definition_event"]["job_id"] + == driver_script_job_id + ) + assert event["task_definition_event"]["task_attempt"] == 0 + assert ( + event["task_definition_event"]["language"] == "PYTHON" + ) + elif event["event_type"] == "ACTOR_TASK_DEFINITION_EVENT": + actor_task_id = event["actor_task_definition_event"]["task_id"] + assert actor_task_id is not None + assert ( + event["actor_task_definition_event"]["actor_func"][ + "python_function_descriptor" + ]["module_name"] + == "__main__" + ) assert ( - event["taskDefinitionEvent"]["taskType"] - == "ACTOR_CREATION_TASK" + event["actor_task_definition_event"]["actor_func"][ + "python_function_descriptor" + ]["class_name"] + == "Actor" + ) + assert ( + event["actor_task_definition_event"]["actor_func"][ + "python_function_descriptor" + ]["function_name"] + == "task" + ) + assert ( + event["actor_task_definition_event"]["actor_func"][ + "python_function_descriptor" + ]["function_hash"] + is not None + ) + assert ( + event["actor_task_definition_event"]["actor_task_name"] + == "Actor.task" ) - actor_creation_task_definition_received = True - actor_creation_task_id = event["taskDefinitionEvent"]["taskId"] - assert actor_creation_task_id is not None assert ( - event["taskDefinitionEvent"]["taskFunc"][ + event["actor_task_definition_event"]["required_resources"] + == {} + ) + assert ( + event["actor_task_definition_event"]["job_id"] + == driver_script_job_id + ) + assert ( + event["actor_task_definition_event"]["parent_task_id"] + == driver_task_id + ) + if event["actor_task_definition_event"]["task_attempt"] == 0: + actor_task_definition_received = True + else: + assert ( + event["actor_task_definition_event"]["task_attempt"] + == 1 + ) + actor_task_definition_retry_received = True + assert ( + event["actor_task_definition_event"]["language"] == "PYTHON" + ) + else: + assert event["event_type"] == "TASK_LIFECYCLE_EVENT" + else: + if event["eventType"] == "TASK_DEFINITION_EVENT": + check_task_event_base_fields(event, preserve_proto_field_name) + + if event["taskDefinitionEvent"]["taskType"] == "DRIVER_TASK": + driver_task_definition_received = True + assert event["taskDefinitionEvent"]["taskAttempt"] == 0 + assert event["taskDefinitionEvent"]["language"] == "PYTHON" + + else: + assert ( + event["taskDefinitionEvent"]["taskType"] + == "ACTOR_CREATION_TASK" + ) + actor_creation_task_definition_received = True + actor_creation_task_id = event["taskDefinitionEvent"][ + "taskId" + ] + assert actor_creation_task_id is not None + assert ( + event["taskDefinitionEvent"]["taskFunc"][ + "pythonFunctionDescriptor" + ]["moduleName"] + == "__main__" + ) + assert ( + event["taskDefinitionEvent"]["taskFunc"][ + "pythonFunctionDescriptor" + ]["className"] + == "Actor" + ) + assert ( + event["taskDefinitionEvent"]["taskFunc"][ + "pythonFunctionDescriptor" + ]["functionName"] + == "__init__" + ) + assert ( + event["taskDefinitionEvent"]["taskFunc"][ + "pythonFunctionDescriptor" + ]["functionHash"] + is not None + ) + assert ( + event["taskDefinitionEvent"]["taskName"] + == "Actor.__init__" + ) + assert event["taskDefinitionEvent"][ + "requiredResources" + ] == {"CPU": 1.0} + assert ( + event["taskDefinitionEvent"]["parentTaskId"] + == driver_task_id + ) + assert ( + event["taskDefinitionEvent"]["jobId"] + == driver_script_job_id + ) + assert event["taskDefinitionEvent"]["taskAttempt"] == 0 + assert event["taskDefinitionEvent"]["language"] == "PYTHON" + elif event["eventType"] == "ACTOR_TASK_DEFINITION_EVENT": + actor_task_id = event["actorTaskDefinitionEvent"]["taskId"] + assert actor_task_id is not None + assert ( + event["actorTaskDefinitionEvent"]["actorFunc"][ "pythonFunctionDescriptor" ]["moduleName"] == "__main__" ) assert ( - event["taskDefinitionEvent"]["taskFunc"][ + event["actorTaskDefinitionEvent"]["actorFunc"][ "pythonFunctionDescriptor" ]["className"] == "Actor" ) assert ( - event["taskDefinitionEvent"]["taskFunc"][ + event["actorTaskDefinitionEvent"]["actorFunc"][ "pythonFunctionDescriptor" ]["functionName"] - == "__init__" + == "task" ) assert ( - event["taskDefinitionEvent"]["taskFunc"][ + event["actorTaskDefinitionEvent"]["actorFunc"][ "pythonFunctionDescriptor" ]["functionHash"] is not None ) assert ( - event["taskDefinitionEvent"]["taskName"] == "Actor.__init__" + event["actorTaskDefinitionEvent"]["actorTaskName"] + == "Actor.task" ) - assert event["taskDefinitionEvent"]["requiredResources"] == { - "CPU": 1.0 - } assert ( - event["taskDefinitionEvent"]["parentTaskId"] - == driver_task_id + event["actorTaskDefinitionEvent"]["requiredResources"] == {} ) assert ( - event["taskDefinitionEvent"]["jobId"] + event["actorTaskDefinitionEvent"]["jobId"] == driver_script_job_id ) - assert event["taskDefinitionEvent"]["taskAttempt"] == 0 - assert event["taskDefinitionEvent"]["language"] == "PYTHON" - elif event["eventType"] == "ACTOR_TASK_DEFINITION_EVENT": - actor_task_definition_received = True - actor_task_id = event["actorTaskDefinitionEvent"]["taskId"] - assert actor_task_id is not None - assert ( - event["actorTaskDefinitionEvent"]["actorFunc"][ - "pythonFunctionDescriptor" - ]["moduleName"] - == "__main__" - ) - assert ( - event["actorTaskDefinitionEvent"]["actorFunc"][ - "pythonFunctionDescriptor" - ]["className"] - == "Actor" - ) - assert ( - event["actorTaskDefinitionEvent"]["actorFunc"][ - "pythonFunctionDescriptor" - ]["functionName"] - == "task" - ) - assert ( - event["actorTaskDefinitionEvent"]["actorFunc"][ - "pythonFunctionDescriptor" - ]["functionHash"] - is not None - ) - assert ( - event["actorTaskDefinitionEvent"]["actorTaskName"] - == "Actor.task" - ) - assert event["actorTaskDefinitionEvent"]["requiredResources"] == {} - assert ( - event["actorTaskDefinitionEvent"]["jobId"] - == driver_script_job_id - ) - assert ( - event["actorTaskDefinitionEvent"]["parentTaskId"] - == driver_task_id - ) - if event["actorTaskDefinitionEvent"]["taskAttempt"] == 0: - actor_task_definition_received = True + assert ( + event["actorTaskDefinitionEvent"]["parentTaskId"] + == driver_task_id + ) + if event["actorTaskDefinitionEvent"]["taskAttempt"] == 0: + actor_task_definition_received = True + else: + assert event["actorTaskDefinitionEvent"]["taskAttempt"] == 1 + actor_task_definition_retry_received = True + assert event["actorTaskDefinitionEvent"]["language"] == "PYTHON" else: - assert event["actorTaskDefinitionEvent"]["taskAttempt"] == 1 - actor_task_definition_retry_received = True - assert event["actorTaskDefinitionEvent"]["language"] == "PYTHON" - else: - assert event["eventType"] == "TASK_LIFECYCLE_EVENT" + assert event["eventType"] == "TASK_LIFECYCLE_EVENT" assert driver_task_definition_received assert actor_creation_task_definition_received assert actor_task_definition_received @@ -850,22 +1437,41 @@ def validate_events(events: json): (actor_task_id, 0): expected_actor_task_states, (actor_task_id, 1): expected_actor_task_states_retry, } - expected_task_id_error_info_dict = { - (actor_creation_task_id, 0): { - "errorType": "TASK_EXECUTION_EXCEPTION", - "errorMessage": "CreationTaskError: Exception raised from an actor init method.", - }, - (actor_task_id, 0): { - "errorType": "ACTOR_DIED", - "errorMessage": "ray.exceptions.ActorDiedError: The actor died because of an error raised in its creation task", - }, - (actor_task_id, 1): { - "errorType": "ACTOR_DIED", - "errorMessage": "ray.exceptions.ActorDiedError: The actor died because of an error raised in its creation task", - }, - } + if preserve_proto_field_name: + expected_task_id_error_info_dict = { + (actor_creation_task_id, 0): { + "error_type": "TASK_EXECUTION_EXCEPTION", + "error_message": "CreationTaskError: Exception raised from an actor init method.", + }, + (actor_task_id, 0): { + "error_type": "ACTOR_DIED", + "error_message": "ray.exceptions.ActorDiedError: The actor died because of an error raised in its creation task", + }, + (actor_task_id, 1): { + "error_type": "ACTOR_DIED", + "error_message": "ray.exceptions.ActorDiedError: The actor died because of an error raised in its creation task", + }, + } + else: + expected_task_id_error_info_dict = { + (actor_creation_task_id, 0): { + "errorType": "TASK_EXECUTION_EXCEPTION", + "errorMessage": "CreationTaskError: Exception raised from an actor init method.", + }, + (actor_task_id, 0): { + "errorType": "ACTOR_DIED", + "errorMessage": "ray.exceptions.ActorDiedError: The actor died because of an error raised in its creation task", + }, + (actor_task_id, 1): { + "errorType": "ACTOR_DIED", + "errorMessage": "ray.exceptions.ActorDiedError: The actor died because of an error raised in its creation task", + }, + } check_task_lifecycle_event_states_and_error_info( - events, expected_task_id_states_dict, expected_task_id_error_info_dict + events, + expected_task_id_states_dict, + expected_task_id_error_info_dict, + preserve_proto_field_name, ) run_driver_script_and_wait_for_events( @@ -874,7 +1480,10 @@ def validate_events(events: json): @_cluster_with_aggregator_target def test_actor_creation_canceled( - self, ray_start_cluster_head_with_env_vars, httpserver + self, + ray_start_cluster_head_with_env_vars, + httpserver, + preserve_proto_field_name, ): script = """ import ray @@ -896,69 +1505,141 @@ def validate_events(events: json): ( driver_script_job_id, driver_task_id, - ) = get_job_ids_and_driver_script_task_ids_from_events(events) + ) = get_job_ids_and_driver_script_task_ids_from_events( + events, preserve_proto_field_name + ) driver_task_definition_received = False actor_creation_task_definition_received = False for event in events: - if event["eventType"] == "TASK_DEFINITION_EVENT": - check_task_event_base_fields(event) + if preserve_proto_field_name: + if event["event_type"] == "TASK_DEFINITION_EVENT": + check_task_event_base_fields(event, preserve_proto_field_name) - if event["taskDefinitionEvent"]["taskType"] == "DRIVER_TASK": - driver_task_definition_received = True - assert event["taskDefinitionEvent"]["taskAttempt"] == 0 - assert event["taskDefinitionEvent"]["language"] == "PYTHON" + if event["task_definition_event"]["task_type"] == "DRIVER_TASK": + driver_task_definition_received = True + assert event["task_definition_event"]["task_attempt"] == 0 + assert ( + event["task_definition_event"]["language"] == "PYTHON" + ) + else: + assert ( + event["task_definition_event"]["task_type"] + == "ACTOR_CREATION_TASK" + ) + actor_creation_task_definition_received = True + actor_creation_task_id = event["task_definition_event"][ + "task_id" + ] + assert actor_creation_task_id is not None + assert ( + event["task_definition_event"]["task_func"][ + "python_function_descriptor" + ]["module_name"] + == "__main__" + ) + assert ( + event["task_definition_event"]["task_func"][ + "python_function_descriptor" + ]["class_name"] + == "Actor" + ) + assert ( + event["task_definition_event"]["task_func"][ + "python_function_descriptor" + ]["function_name"] + == "__init__" + ) + assert ( + event["task_definition_event"]["task_func"][ + "python_function_descriptor" + ]["function_hash"] + is not None + ) + assert ( + event["task_definition_event"]["task_name"] + == "Actor.__init__" + ) + assert event["task_definition_event"][ + "required_resources" + ] == {"CPU": 2.0} + assert ( + event["task_definition_event"]["parent_task_id"] + == driver_task_id + ) + assert ( + event["task_definition_event"]["job_id"] + == driver_script_job_id + ) + assert event["task_definition_event"]["task_attempt"] == 0 + assert ( + event["task_definition_event"]["language"] == "PYTHON" + ) else: - assert ( - event["taskDefinitionEvent"]["taskType"] - == "ACTOR_CREATION_TASK" - ) - actor_creation_task_definition_received = True - actor_creation_task_id = event["taskDefinitionEvent"]["taskId"] - assert actor_creation_task_id is not None - assert ( - event["taskDefinitionEvent"]["taskFunc"][ - "pythonFunctionDescriptor" - ]["moduleName"] - == "__main__" - ) - assert ( - event["taskDefinitionEvent"]["taskFunc"][ - "pythonFunctionDescriptor" - ]["className"] - == "Actor" - ) - assert ( - event["taskDefinitionEvent"]["taskFunc"][ - "pythonFunctionDescriptor" - ]["functionName"] - == "__init__" - ) - assert ( - event["taskDefinitionEvent"]["taskFunc"][ - "pythonFunctionDescriptor" - ]["functionHash"] - is not None - ) - assert ( - event["taskDefinitionEvent"]["taskName"] == "Actor.__init__" - ) - assert event["taskDefinitionEvent"]["requiredResources"] == { - "CPU": 2.0 - } - assert ( - event["taskDefinitionEvent"]["parentTaskId"] - == driver_task_id - ) - assert ( - event["taskDefinitionEvent"]["jobId"] - == driver_script_job_id - ) - assert event["taskDefinitionEvent"]["taskAttempt"] == 0 - assert event["taskDefinitionEvent"]["language"] == "PYTHON" + assert event["event_type"] == "TASK_LIFECYCLE_EVENT" else: - assert event["eventType"] == "TASK_LIFECYCLE_EVENT" + if event["eventType"] == "TASK_DEFINITION_EVENT": + check_task_event_base_fields(event, preserve_proto_field_name) + + if event["taskDefinitionEvent"]["taskType"] == "DRIVER_TASK": + driver_task_definition_received = True + assert event["taskDefinitionEvent"]["taskAttempt"] == 0 + assert event["taskDefinitionEvent"]["language"] == "PYTHON" + + else: + assert ( + event["taskDefinitionEvent"]["taskType"] + == "ACTOR_CREATION_TASK" + ) + actor_creation_task_definition_received = True + actor_creation_task_id = event["taskDefinitionEvent"][ + "taskId" + ] + assert actor_creation_task_id is not None + assert ( + event["taskDefinitionEvent"]["taskFunc"][ + "pythonFunctionDescriptor" + ]["moduleName"] + == "__main__" + ) + assert ( + event["taskDefinitionEvent"]["taskFunc"][ + "pythonFunctionDescriptor" + ]["className"] + == "Actor" + ) + assert ( + event["taskDefinitionEvent"]["taskFunc"][ + "pythonFunctionDescriptor" + ]["functionName"] + == "__init__" + ) + assert ( + event["taskDefinitionEvent"]["taskFunc"][ + "pythonFunctionDescriptor" + ]["functionHash"] + is not None + ) + assert ( + event["taskDefinitionEvent"]["taskName"] + == "Actor.__init__" + ) + assert event["taskDefinitionEvent"][ + "requiredResources" + ] == {"CPU": 2.0} + assert ( + event["taskDefinitionEvent"]["parentTaskId"] + == driver_task_id + ) + assert ( + event["taskDefinitionEvent"]["jobId"] + == driver_script_job_id + ) + assert event["taskDefinitionEvent"]["taskAttempt"] == 0 + assert event["taskDefinitionEvent"]["language"] == "PYTHON" + else: + assert event["eventType"] == "TASK_LIFECYCLE_EVENT" assert driver_task_definition_received assert actor_creation_task_definition_received @@ -973,14 +1654,25 @@ def validate_events(events: json): (driver_task_id, 0): expected_driver_task_states, (actor_creation_task_id, 0): expected_actor_creation_task_states, } - expected_task_id_error_info_dict = { - (actor_creation_task_id, 0): { - "errorType": "WORKER_DIED", - "errorMessage": "", + if preserve_proto_field_name: + expected_task_id_error_info_dict = { + (actor_creation_task_id, 0): { + "error_type": "WORKER_DIED", + "error_message": "", + } + } + else: + expected_task_id_error_info_dict = { + (actor_creation_task_id, 0): { + "errorType": "WORKER_DIED", + "errorMessage": "", + } } - } check_task_lifecycle_event_states_and_error_info( - events, expected_task_id_states_dict, expected_task_id_error_info_dict + events, + expected_task_id_states_dict, + expected_task_id_error_info_dict, + preserve_proto_field_name, ) run_driver_script_and_wait_for_events( From ee2869b71ab2bf37b34d2ee95cfdb10246dc38df Mon Sep 17 00:00:00 2001 From: Mengjin Yan Date: Tue, 21 Oct 2025 10:58:09 -0700 Subject: [PATCH 05/13] move the test to large Signed-off-by: Mengjin Yan --- python/ray/tests/BUILD.bazel | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/tests/BUILD.bazel b/python/ray/tests/BUILD.bazel index 577af928df56..a9fd9c24a608 100644 --- a/python/ray/tests/BUILD.bazel +++ b/python/ray/tests/BUILD.bazel @@ -345,7 +345,6 @@ py_test_module_list( "test_placement_group_4.py", "test_placement_group_failover.py", "test_ray_debugger.py", - "test_ray_event_export_task_events.py", "test_ray_init.py", "test_ray_init_2.py", "test_ray_shutdown.py", @@ -916,6 +915,7 @@ py_test_module_list( "test_object_manager_fault_tolerance.py", "test_placement_group_3.py", "test_placement_group_5.py", + "test_ray_event_export_task_events.py", "test_raylet_fault_tolerance.py", "test_reconstruction.py", "test_reconstruction_2.py", From 855b9e440e2ed4220840d75b8e915eae2963a021 Mon Sep 17 00:00:00 2001 From: Mengjin Yan Date: Tue, 21 Oct 2025 11:19:09 -0700 Subject: [PATCH 06/13] fix cursor comment Signed-off-by: Mengjin Yan --- .../tests/test_ray_event_export_task_events.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/python/ray/tests/test_ray_event_export_task_events.py b/python/ray/tests/test_ray_event_export_task_events.py index 334c844c3e26..448f2a82a974 100644 --- a/python/ray/tests/test_ray_event_export_task_events.py +++ b/python/ray/tests/test_ray_event_export_task_events.py @@ -118,10 +118,11 @@ def check_task_lifecycle_event_states_and_error_info( for state in event["task_lifecycle_event"]["state_transitions"]: task_id_states_dict[(task_id, task_attempt)].add(state["state"]) - if "ray_error_info" in event["task_lifecycle_event"]: - task_id_error_info_dict[(task_id, task_attempt)] = event[ - "task_lifecycle_event" - ]["ray_error_info"] + + if "ray_error_info" in event["task_lifecycle_event"]: + task_id_error_info_dict[(task_id, task_attempt)] = event[ + "task_lifecycle_event" + ]["ray_error_info"] else: if event["eventType"] == "TASK_LIFECYCLE_EVENT": task_id = event["taskLifecycleEvent"]["taskId"] @@ -131,10 +132,11 @@ def check_task_lifecycle_event_states_and_error_info( for state in event["taskLifecycleEvent"]["stateTransitions"]: task_id_states_dict[(task_id, task_attempt)].add(state["state"]) - if "rayErrorInfo" in event["taskLifecycleEvent"]: - task_id_error_info_dict[(task_id, task_attempt)] = event[ - "taskLifecycleEvent" - ]["rayErrorInfo"] + + if "rayErrorInfo" in event["taskLifecycleEvent"]: + task_id_error_info_dict[(task_id, task_attempt)] = event[ + "taskLifecycleEvent" + ]["rayErrorInfo"] for ( expected_task_id_attempt, From 0024526cae86cca6290271dd307695b51b35348b Mon Sep 17 00:00:00 2001 From: Mengjin Yan Date: Wed, 22 Oct 2025 14:19:17 -0700 Subject: [PATCH 07/13] improve the test logic Signed-off-by: Mengjin Yan --- python/ray/tests/BUILD.bazel | 2 +- .../test_ray_event_export_task_events.py | 106 ++++++++++-------- 2 files changed, 61 insertions(+), 47 deletions(-) diff --git a/python/ray/tests/BUILD.bazel b/python/ray/tests/BUILD.bazel index a9fd9c24a608..097fb4ddb282 100644 --- a/python/ray/tests/BUILD.bazel +++ b/python/ray/tests/BUILD.bazel @@ -567,6 +567,7 @@ py_test_module_list( "test_placement_group_metrics.py", "test_protobuf_compatibility.py", "test_queue.py", + "test_ray_event_export_task_events.py", "test_raylet_output.py", "test_reconstruction_stress.py", "test_reconstruction_stress_spill.py", @@ -915,7 +916,6 @@ py_test_module_list( "test_object_manager_fault_tolerance.py", "test_placement_group_3.py", "test_placement_group_5.py", - "test_ray_event_export_task_events.py", "test_raylet_fault_tolerance.py", "test_reconstruction.py", "test_reconstruction_2.py", diff --git a/python/ray/tests/test_ray_event_export_task_events.py b/python/ray/tests/test_ray_event_export_task_events.py index 448f2a82a974..ca7cc8c76bd2 100644 --- a/python/ray/tests/test_ray_event_export_task_events.py +++ b/python/ray/tests/test_ray_event_export_task_events.py @@ -1,13 +1,17 @@ import json import logging +import time from typing import Union import grpc import pytest import ray +import ray.dashboard.consts as dashboard_consts from ray._common.test_utils import wait_for_condition +from ray._private import ray_constants from ray._private.test_utils import find_free_port, run_string_as_driver_nonblocking +from ray._raylet import GcsClient logger = logging.getLogger(__name__) @@ -31,7 +35,7 @@ def httpserver_listen_address(): preserve_proto_field_name, { "env_vars": { - "RAY_task_events_report_interval_ms": 100, + "RAY_task_events_report_interval_ms": 50, "RAY_enable_core_worker_ray_event_to_aggregator": "1", "RAY_DASHBOARD_AGGREGATOR_AGENT_EVENTS_EXPORT_ADDR": _EVENT_AGGREGATOR_AGENT_TARGET_ADDR, "RAY_DASHBOARD_AGGREGATOR_AGENT_PRESERVE_PROTO_FIELD_NAME": ( @@ -46,13 +50,37 @@ def httpserver_listen_address(): ) -def wait_until_grpc_channel_ready(target: str, timeout: int = 10): - channel = grpc.insecure_channel(target) - try: - grpc.channel_ready_future(channel).result(timeout=timeout) - return True - except grpc.FutureTimeoutError: - return False +def wait_until_grpc_channel_ready( + gcs_address: str, node_ids: list[str], timeout: int = 5 +): + # get the grpc port + gcs_client = GcsClient(address=gcs_address) + + def get_dashboard_agent_address(node_id: str): + return gcs_client.internal_kv_get( + f"{ray.dashboard.consts.DASHBOARD_AGENT_ADDR_NODE_ID_PREFIX}{node_id}".encode(), + namespace=ray_constants.KV_NAMESPACE_DASHBOARD, + timeout=dashboard_consts.GCS_RPC_TIMEOUT_SECONDS, + ) + + wait_for_condition( + lambda: all( + get_dashboard_agent_address(node_id) is not None for node_id in node_ids + ) + ) + grpc_ports = [ + json.loads(get_dashboard_agent_address(node_id))[2] for node_id in node_ids + ] + targets = [f"127.0.0.1:{grpc_port}" for grpc_port in grpc_ports] + + # wait for the dashboard agent grpc port to be ready + for target in targets: + channel = grpc.insecure_channel(target) + try: + grpc.channel_ready_future(channel).result(timeout=timeout) + except grpc.FutureTimeoutError: + return False + return True def get_job_ids_and_driver_script_task_ids_from_events( @@ -186,13 +214,20 @@ def get_and_validate_events(httpserver, validation_func): def run_driver_script_and_wait_for_events(script, httpserver, cluster, validation_func): httpserver.expect_request("/", method="POST").respond_with_data("", status=200) - wait_until_grpc_channel_ready( - "127.0.0.1:" + str(cluster.head_node.dashboard_agent_listen_port) - ) + node_ids = [node.node_id for node in cluster.list_all_nodes()] + assert wait_until_grpc_channel_ready(cluster.gcs_address, node_ids) + # The sleep is added here to give time for the worker grpc client to estabilish + # the connection to the aggregator agent. Due to the start up sequence, it could be + # possible that when the worker starts to connect to the dashboard agent grpc + # server, the server is not yet ready to accept the connection. The connection + # retry backoff strategy kicks and the connection won't be retried until the + # interval is reached. This sleep is a workaround to ensure the connections from + # the workers to the aggregator agent are estabilished with the backoff strategy + # before start the driver script. A longer term fix is to improve the start up + # sequence of the dashboard agent and the workers. + time.sleep(3) run_string_as_driver_nonblocking(script) - wait_for_condition( - lambda: get_and_validate_events(httpserver, validation_func), timeout=300 - ) + wait_for_condition(lambda: get_and_validate_events(httpserver, validation_func)) class TestNormalTaskEvents: @@ -843,7 +878,6 @@ def validate_task_killed(events: json): wait_for_condition( lambda: get_and_validate_events(httpserver, validate_task_killed), - timeout=300, ) @@ -1148,7 +1182,7 @@ def validate_events(events: json): ) @_cluster_with_aggregator_target - def test_actor_creation_failed_with_actor_task_retry( + def test_actor_creation_failed( self, ray_start_cluster_head_with_env_vars, httpserver, @@ -1156,19 +1190,21 @@ def test_actor_creation_failed_with_actor_task_retry( ): script = """ import ray -ray.init() +import ray.util.state +from ray._common.test_utils import wait_for_condition +import time @ray.remote(num_cpus=1) class Actor: def __init__(self): raise Exception("actor creation error") - def task(self, arg): + def task(self): pass actor = Actor.remote() -obj = ray.put("test") -ray.get(actor.task.options(max_task_retries=1, retry_exceptions=[Exception]).remote(obj)) +wait_for_condition(lambda: ray.util.state.list_actors(filters=[("class_name", "=", "Actor")])[0]["state"] == "DEAD") +ray.get(actor.task.options().remote()) """ def validate_events(events: json): @@ -1182,7 +1218,6 @@ def validate_events(events: json): driver_task_definition_received = False actor_creation_task_definition_received = False actor_task_definition_received = False - actor_task_definition_retry_received = False for event in events: if preserve_proto_field_name: if event["event_type"] == "TASK_DEFINITION_EVENT": @@ -1249,6 +1284,7 @@ def validate_events(events: json): event["task_definition_event"]["language"] == "PYTHON" ) elif event["event_type"] == "ACTOR_TASK_DEFINITION_EVENT": + actor_task_definition_received = True actor_task_id = event["actor_task_definition_event"]["task_id"] assert actor_task_id is not None assert ( @@ -1291,14 +1327,7 @@ def validate_events(events: json): event["actor_task_definition_event"]["parent_task_id"] == driver_task_id ) - if event["actor_task_definition_event"]["task_attempt"] == 0: - actor_task_definition_received = True - else: - assert ( - event["actor_task_definition_event"]["task_attempt"] - == 1 - ) - actor_task_definition_retry_received = True + assert event["actor_task_definition_event"]["task_attempt"] == 0 assert ( event["actor_task_definition_event"]["language"] == "PYTHON" ) @@ -1365,6 +1394,7 @@ def validate_events(events: json): assert event["taskDefinitionEvent"]["taskAttempt"] == 0 assert event["taskDefinitionEvent"]["language"] == "PYTHON" elif event["eventType"] == "ACTOR_TASK_DEFINITION_EVENT": + actor_task_definition_received = True actor_task_id = event["actorTaskDefinitionEvent"]["taskId"] assert actor_task_id is not None assert ( @@ -1406,18 +1436,13 @@ def validate_events(events: json): event["actorTaskDefinitionEvent"]["parentTaskId"] == driver_task_id ) - if event["actorTaskDefinitionEvent"]["taskAttempt"] == 0: - actor_task_definition_received = True - else: - assert event["actorTaskDefinitionEvent"]["taskAttempt"] == 1 - actor_task_definition_retry_received = True + assert event["actorTaskDefinitionEvent"]["taskAttempt"] == 0 assert event["actorTaskDefinitionEvent"]["language"] == "PYTHON" else: assert event["eventType"] == "TASK_LIFECYCLE_EVENT" assert driver_task_definition_received assert actor_creation_task_definition_received assert actor_task_definition_received - assert actor_task_definition_retry_received expected_driver_task_states = {"RUNNING", "FINISHED"} expected_actor_creation_task_states = { @@ -1429,15 +1454,12 @@ def validate_events(events: json): expected_actor_task_states = { "PENDING_ARGS_AVAIL", "PENDING_NODE_ASSIGNMENT", - "SUBMITTED_TO_WORKER", "FAILED", } - expected_actor_task_states_retry = {"PENDING_ARGS_AVAIL", "FAILED"} expected_task_id_states_dict = { (driver_task_id, 0): expected_driver_task_states, (actor_creation_task_id, 0): expected_actor_creation_task_states, (actor_task_id, 0): expected_actor_task_states, - (actor_task_id, 1): expected_actor_task_states_retry, } if preserve_proto_field_name: expected_task_id_error_info_dict = { @@ -1449,10 +1471,6 @@ def validate_events(events: json): "error_type": "ACTOR_DIED", "error_message": "ray.exceptions.ActorDiedError: The actor died because of an error raised in its creation task", }, - (actor_task_id, 1): { - "error_type": "ACTOR_DIED", - "error_message": "ray.exceptions.ActorDiedError: The actor died because of an error raised in its creation task", - }, } else: expected_task_id_error_info_dict = { @@ -1464,10 +1482,6 @@ def validate_events(events: json): "errorType": "ACTOR_DIED", "errorMessage": "ray.exceptions.ActorDiedError: The actor died because of an error raised in its creation task", }, - (actor_task_id, 1): { - "errorType": "ACTOR_DIED", - "errorMessage": "ray.exceptions.ActorDiedError: The actor died because of an error raised in its creation task", - }, } check_task_lifecycle_event_states_and_error_info( events, From 83544c0cb33bc68e7074148916f32ecec0ed5f07 Mon Sep 17 00:00:00 2001 From: Mengjin Yan Date: Wed, 22 Oct 2025 15:09:12 -0700 Subject: [PATCH 08/13] add followup issues Signed-off-by: Mengjin Yan --- python/ray/tests/test_ray_event_export_task_events.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/python/ray/tests/test_ray_event_export_task_events.py b/python/ray/tests/test_ray_event_export_task_events.py index ca7cc8c76bd2..3f81a6856e1c 100644 --- a/python/ray/tests/test_ray_event_export_task_events.py +++ b/python/ray/tests/test_ray_event_export_task_events.py @@ -225,6 +225,7 @@ def run_driver_script_and_wait_for_events(script, httpserver, cluster, validatio # the workers to the aggregator agent are estabilished with the backoff strategy # before start the driver script. A longer term fix is to improve the start up # sequence of the dashboard agent and the workers. + # Followup issue: https://github.com/ray-project/ray/issues/58007 time.sleep(3) run_string_as_driver_nonblocking(script) wait_for_condition(lambda: get_and_validate_events(httpserver, validation_func)) @@ -625,6 +626,10 @@ def validate_events(events: json): script, httpserver, ray_start_cluster_head_with_env_vars, validate_events ) + @pytest.mark.skipif( + True, + reason="Disabled till https://github.com/ray-project/ray/issues/58016 is fixed", + ) @_cluster_with_aggregator_target def test_task_failed_due_to_node_failure( self, From 7ef3bed14db5ef2b7b813a5d5dcd4f9a81ba073f Mon Sep 17 00:00:00 2001 From: Mengjin Yan Date: Wed, 22 Oct 2025 17:54:39 -0700 Subject: [PATCH 09/13] move the test to medium size Signed-off-by: Mengjin Yan --- python/ray/tests/BUILD.bazel | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/tests/BUILD.bazel b/python/ray/tests/BUILD.bazel index 097fb4ddb282..963f4609bd0c 100644 --- a/python/ray/tests/BUILD.bazel +++ b/python/ray/tests/BUILD.bazel @@ -64,6 +64,7 @@ py_test_module_list( "test_multiprocessing_standalone.py", "test_node_label_scheduling_strategy.py", "test_object_spilling_2.py", + "test_ray_event_export_task_events.py", "test_reference_counting_2.py", "test_reference_counting_standalone.py", "test_runtime_env_agent.py", @@ -567,7 +568,6 @@ py_test_module_list( "test_placement_group_metrics.py", "test_protobuf_compatibility.py", "test_queue.py", - "test_ray_event_export_task_events.py", "test_raylet_output.py", "test_reconstruction_stress.py", "test_reconstruction_stress_spill.py", From a53c986886a43714190703826fd759fa2f87ad2f Mon Sep 17 00:00:00 2001 From: Mengjin Yan Date: Thu, 23 Oct 2025 18:23:43 -0700 Subject: [PATCH 10/13] fix tests Signed-off-by: Mengjin Yan --- .../test_ray_event_export_task_events.py | 23 ++++++++----------- src/ray/core_worker/task_event_buffer.cc | 10 +++++--- 2 files changed, 16 insertions(+), 17 deletions(-) diff --git a/python/ray/tests/test_ray_event_export_task_events.py b/python/ray/tests/test_ray_event_export_task_events.py index 3f81a6856e1c..e5229b625524 100644 --- a/python/ray/tests/test_ray_event_export_task_events.py +++ b/python/ray/tests/test_ray_event_export_task_events.py @@ -1,6 +1,6 @@ +import base64 import json import logging -import time from typing import Union import grpc @@ -35,7 +35,7 @@ def httpserver_listen_address(): preserve_proto_field_name, { "env_vars": { - "RAY_task_events_report_interval_ms": 50, + "RAY_task_events_report_interval_ms": 100, "RAY_enable_core_worker_ray_event_to_aggregator": "1", "RAY_DASHBOARD_AGGREGATOR_AGENT_EVENTS_EXPORT_ADDR": _EVENT_AGGREGATOR_AGENT_TARGET_ADDR, "RAY_DASHBOARD_AGGREGATOR_AGENT_PRESERVE_PROTO_FIELD_NAME": ( @@ -86,7 +86,9 @@ def get_dashboard_agent_address(node_id: str): def get_job_ids_and_driver_script_task_ids_from_events( events: json, preserve_proto_field_name: bool ) -> tuple[Union[str, None], Union[str, None]]: - test_job_id = ray.get_runtime_context().get_job_id() + test_job_id = base64.b64encode( + ray.JobID.from_hex(ray.get_runtime_context().get_job_id()).binary() + ).decode() driver_script_job_id = None driver_task_id = None for event in events: @@ -215,18 +217,10 @@ def get_and_validate_events(httpserver, validation_func): def run_driver_script_and_wait_for_events(script, httpserver, cluster, validation_func): httpserver.expect_request("/", method="POST").respond_with_data("", status=200) node_ids = [node.node_id for node in cluster.list_all_nodes()] + # Here we wait for the dashboard agent grpc server to be ready before running the + # driver script. Ideally, the startup sequence should guarantee that. Created an + # issue to track this: https://github.com/ray-project/ray/issues/58007 assert wait_until_grpc_channel_ready(cluster.gcs_address, node_ids) - # The sleep is added here to give time for the worker grpc client to estabilish - # the connection to the aggregator agent. Due to the start up sequence, it could be - # possible that when the worker starts to connect to the dashboard agent grpc - # server, the server is not yet ready to accept the connection. The connection - # retry backoff strategy kicks and the connection won't be retried until the - # interval is reached. This sleep is a workaround to ensure the connections from - # the workers to the aggregator agent are estabilished with the backoff strategy - # before start the driver script. A longer term fix is to improve the start up - # sequence of the dashboard agent and the workers. - # Followup issue: https://github.com/ray-project/ray/issues/58007 - time.sleep(3) run_string_as_driver_nonblocking(script) wait_for_condition(lambda: get_and_validate_events(httpserver, validation_func)) @@ -1202,6 +1196,7 @@ def test_actor_creation_failed( @ray.remote(num_cpus=1) class Actor: def __init__(self): + time.sleep(1) raise Exception("actor creation error") def task(self): diff --git a/src/ray/core_worker/task_event_buffer.cc b/src/ray/core_worker/task_event_buffer.cc index 3cf80d71d0a9..dc0d4d14a85c 100644 --- a/src/ray/core_worker/task_event_buffer.cc +++ b/src/ray/core_worker/task_event_buffer.cc @@ -816,9 +816,13 @@ void TaskEventBufferImpl::SendRayEventsToAggregator( event_aggregator_grpc_in_progress_ = false; }; - rpc::events::AddEventsRequest request; - *request.mutable_events_data() = std::move(*data); - event_aggregator_client_->AddEvents(request, on_complete); + if (num_task_events_to_send == 0 && num_dropped_task_attempts_to_send == 0) { + event_aggregator_grpc_in_progress_ = false; + } else { + rpc::events::AddEventsRequest request; + *request.mutable_events_data() = std::move(*data); + event_aggregator_client_->AddEvents(request, on_complete); + } } void TaskEventBufferImpl::FlushEvents(bool forced) { From 18693ace36d20521d0fc21b50a2e9e4b84407603 Mon Sep 17 00:00:00 2001 From: Mengjin Yan Date: Fri, 24 Oct 2025 12:26:40 -0700 Subject: [PATCH 11/13] Update python/ray/tests/test_ray_event_export_task_events.py Co-authored-by: Jiajun Yao Signed-off-by: Mengjin Yan --- python/ray/tests/test_ray_event_export_task_events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/tests/test_ray_event_export_task_events.py b/python/ray/tests/test_ray_event_export_task_events.py index e5229b625524..3d199babdc3f 100644 --- a/python/ray/tests/test_ray_event_export_task_events.py +++ b/python/ray/tests/test_ray_event_export_task_events.py @@ -83,7 +83,7 @@ def get_dashboard_agent_address(node_id: str): return True -def get_job_ids_and_driver_script_task_ids_from_events( +def get_job_id_and_driver_script_task_id_from_events( events: json, preserve_proto_field_name: bool ) -> tuple[Union[str, None], Union[str, None]]: test_job_id = base64.b64encode( From 88648c16cc9854428e10cb23045ca48481d26eb6 Mon Sep 17 00:00:00 2001 From: Mengjin Yan Date: Fri, 24 Oct 2025 12:29:07 -0700 Subject: [PATCH 12/13] Update python/ray/tests/test_ray_event_export_task_events.py Co-authored-by: Jiajun Yao Signed-off-by: Mengjin Yan --- python/ray/tests/test_ray_event_export_task_events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/tests/test_ray_event_export_task_events.py b/python/ray/tests/test_ray_event_export_task_events.py index 3d199babdc3f..434303dfc0a3 100644 --- a/python/ray/tests/test_ray_event_export_task_events.py +++ b/python/ray/tests/test_ray_event_export_task_events.py @@ -85,7 +85,7 @@ def get_dashboard_agent_address(node_id: str): def get_job_id_and_driver_script_task_id_from_events( events: json, preserve_proto_field_name: bool -) -> tuple[Union[str, None], Union[str, None]]: +) -> tuple[Optional[str], Optional[str]]: test_job_id = base64.b64encode( ray.JobID.from_hex(ray.get_runtime_context().get_job_id()).binary() ).decode() From de4d6640cef15b9f00ea80dc5982786b931b66ec Mon Sep 17 00:00:00 2001 From: Mengjin Yan Date: Fri, 24 Oct 2025 16:06:39 -0700 Subject: [PATCH 13/13] fix lint failure Signed-off-by: Mengjin Yan --- .../ray/tests/test_ray_event_export_task_events.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/python/ray/tests/test_ray_event_export_task_events.py b/python/ray/tests/test_ray_event_export_task_events.py index 434303dfc0a3..c7ea5f9949b2 100644 --- a/python/ray/tests/test_ray_event_export_task_events.py +++ b/python/ray/tests/test_ray_event_export_task_events.py @@ -1,7 +1,7 @@ import base64 import json import logging -from typing import Union +from typing import Optional import grpc import pytest @@ -247,7 +247,7 @@ def validate_events(events): ( driver_script_job_id, driver_task_id, - ) = get_job_ids_and_driver_script_task_ids_from_events( + ) = get_job_id_and_driver_script_task_id_from_events( events, preserve_proto_field_name ) @@ -431,7 +431,7 @@ def validate_events(events: json): ( driver_script_job_id, driver_task_id, - ) = get_job_ids_and_driver_script_task_ids_from_events( + ) = get_job_id_and_driver_script_task_id_from_events( events, preserve_proto_field_name ) @@ -709,7 +709,7 @@ def validate_task_killed(events: json): ( driver_script_job_id, driver_task_id, - ) = get_job_ids_and_driver_script_task_ids_from_events( + ) = get_job_id_and_driver_script_task_id_from_events( events, preserve_proto_field_name ) @@ -909,7 +909,7 @@ def validate_events(events: json): ( driver_script_job_id, driver_task_id, - ) = get_job_ids_and_driver_script_task_ids_from_events( + ) = get_job_id_and_driver_script_task_id_from_events( events, preserve_proto_field_name ) @@ -1211,7 +1211,7 @@ def validate_events(events: json): ( driver_script_job_id, driver_task_id, - ) = get_job_ids_and_driver_script_task_ids_from_events( + ) = get_job_id_and_driver_script_task_id_from_events( events, preserve_proto_field_name ) @@ -1521,7 +1521,7 @@ def validate_events(events: json): ( driver_script_job_id, driver_task_id, - ) = get_job_ids_and_driver_script_task_ids_from_events( + ) = get_job_id_and_driver_script_task_id_from_events( events, preserve_proto_field_name )