diff --git a/dashboard/modules/event/tests/test_event.py b/dashboard/modules/event/tests/test_event.py index a072d926cac4..3adea4280df5 100644 --- a/dashboard/modules/event/tests/test_event.py +++ b/dashboard/modules/event/tests/test_event.py @@ -27,8 +27,6 @@ from ray.dashboard.modules.event.event_utils import ( monitor_events, ) -from ray.experimental.state.api import list_cluster_events -from ray.job_submission import JobSubmissionClient logger = logging.getLogger(__name__) @@ -305,74 +303,21 @@ async def _check_events(expect_events, read_events, timeout=10): # assert infeasible_event["source_type"] == "AUTOSCALER" -def test_jobs_cluster_events(shutdown_only): - ray.init() - address = ray._private.worker._global_node.webui_url - address = format_web_url(address) - client = JobSubmissionClient(address) - submission_id = client.submit_job(entrypoint="ls") - - def verify(): - events = list_cluster_events() - print(events) - assert len(list_cluster_events()) == 2 - start_event = events[0] - completed_event = events[1] - - assert start_event["source_type"] == "JOBS" - assert f"Started a ray job {submission_id}" in start_event["message"] - assert start_event["severity"] == "INFO" - assert completed_event["source_type"] == "JOBS" - assert ( - f"Completed a ray job {submission_id} with a status SUCCEEDED." - == completed_event["message"] - ) - assert completed_event["severity"] == "INFO" - return True - - print("Test successful job run.") - wait_for_condition(verify) - - # Test the failure case. In this part, job fails because the runtime env - # creation fails. - submission_id = client.submit_job( - entrypoint="ls", - runtime_env={"working_dir": "s3://does_not_exist.zip"}, - ) - - def verify(): - events = list_cluster_events(detail=True) - failed_events = [] - - for e in events: - if ( - "submission_id" in e["custom_fields"] - and e["custom_fields"]["submission_id"] == submission_id - ): - failed_events.append(e) - - assert len(failed_events) == 2 - failed_start = failed_events[0] - failed_completed = failed_events[1] - - assert failed_start["source_type"] == "JOBS" - assert f"Started a ray job {submission_id}" in failed_start["message"] - assert failed_completed["source_type"] == "JOBS" - assert ( - f"Completed a ray job {submission_id} with a status FAILED." - in failed_completed["message"] - ) - print(failed_completed["message"]) - # TODO(sang): Reenable it. - # # Make sure the error message is included. - # assert ( - # "An error occurred (ExpiredToken) when calling the " - # "GetObject operation:" in failed_completed["message"] - # ) - return True - - print("Test failed (runtime_env failure) job run.") - wait_for_condition(verify) +# def test_jobs_cluster_events(shutdown_only): +# ray.init() +# address = ray._private.worker._global_node.webui_url +# address = format_web_url(address) +# client = JobSubmissionClient(address) +# client.submit_job(entrypoint="ls") + +# def verify(): +# assert len(list_cluster_events()) == 3 +# for e in list_cluster_events(): +# e["source_type"] = "JOBS" +# return True + +# wait_for_condition(verify) +# print(list_cluster_events()) if __name__ == "__main__": diff --git a/dashboard/modules/job/job_agent.py b/dashboard/modules/job/job_agent.py index 3374f44e6760..b0a1eed68018 100644 --- a/dashboard/modules/job/job_agent.py +++ b/dashboard/modules/job/job_agent.py @@ -156,9 +156,7 @@ async def tail_job_logs(self, req: Request) -> Response: def get_job_manager(self): if not self._job_manager: - self._job_manager = JobManager( - self._dashboard_agent.gcs_aio_client, self._dashboard_agent.log_dir - ) + self._job_manager = JobManager(self._dashboard_agent.gcs_aio_client) return self._job_manager async def run(self, server): diff --git a/dashboard/modules/job/job_head.py b/dashboard/modules/job/job_head.py index 9b862d77ae7f..1cd782c72214 100644 --- a/dashboard/modules/job/job_head.py +++ b/dashboard/modules/job/job_head.py @@ -357,9 +357,7 @@ async def tail_job_logs(self, req: Request) -> Response: async def run(self, server): if not self._job_manager: - self._job_manager = JobManager( - self._dashboard_head.gcs_aio_client, self._dashboard_head.log_dir - ) + self._job_manager = JobManager(self._dashboard_head.gcs_aio_client) @staticmethod def is_minimal_module(): diff --git a/dashboard/modules/job/job_manager.py b/dashboard/modules/job/job_manager.py index 67389c4a56a8..16b07ecea68f 100644 --- a/dashboard/modules/job/job_manager.py +++ b/dashboard/modules/job/job_manager.py @@ -30,8 +30,6 @@ from ray.exceptions import RuntimeEnvSetupError from ray.job_submission import JobStatus from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy -from ray._private.event.event_logger import get_event_logger -from ray.core.generated.event_pb2 import Event logger = logging.getLogger(__name__) @@ -387,16 +385,12 @@ class JobManager: LOG_TAIL_SLEEP_S = 1 JOB_MONITOR_LOOP_PERIOD_S = 1 - def __init__(self, gcs_aio_client: GcsAioClient, logs_dir: str): + def __init__(self, gcs_aio_client: GcsAioClient): self._gcs_aio_client = gcs_aio_client self._job_info_client = JobInfoStorageClient(gcs_aio_client) self._gcs_address = gcs_aio_client._channel._gcs_address self._log_client = JobLogStorageClient() self._supervisor_actor_cls = ray.remote(JobSupervisor) - try: - self.event_logger = get_event_logger(Event.SourceType.JOBS, logs_dir) - except Exception: - self.event_logger = None create_task(self._recover_running_jobs()) @@ -448,39 +442,27 @@ async def _monitor_job( except Exception as e: is_alive = False job_status = await self._job_info_client.get_status(job_id) - job_error_message = None if job_status.is_terminal(): # If the job is already in a terminal state, then the actor # exiting is expected. pass elif isinstance(e, RuntimeEnvSetupError): logger.info(f"Failed to set up runtime_env for job {job_id}.") - job_error_message = f"runtime_env setup failed: {e}" - job_status = JobStatus.FAILED await self._job_info_client.put_status( job_id, - job_status, - message=job_error_message, + JobStatus.FAILED, + message=f"runtime_env setup failed: {e}", ) else: logger.warning( f"Job supervisor for job {job_id} failed unexpectedly: {e}." ) - job_error_message = f"Unexpected error occurred: {e}" - job_status = JobStatus.FAILED await self._job_info_client.put_status( job_id, - job_status, - message=job_error_message, + JobStatus.FAILED, + message=f"Unexpected error occurred: {e}", ) - # Log events - event_log = f"Completed a ray job {job_id} with a status {job_status}." - if job_error_message: - event_log += f" {job_error_message}" - if self.event_logger: - self.event_logger.info(event_log, submission_id=job_id) - # Kill the actor defensively to avoid leaking actors in unexpected error cases. if job_supervisor is not None: ray.kill(job_supervisor, no_restart=True) @@ -602,10 +584,6 @@ async def submit_job( node_id=ray.get_runtime_context().node_id, soft=False, ) - if self.event_logger: - self.event_logger.info( - f"Started a ray job {submission_id}.", submission_id=submission_id - ) supervisor = self._supervisor_actor_cls.options( lifetime="detached", name=JOB_ACTOR_NAME_TEMPLATE.format(job_id=submission_id), diff --git a/dashboard/modules/job/tests/test_job_manager.py b/dashboard/modules/job/tests/test_job_manager.py index e30b7c2f7c06..9ded996f1d18 100644 --- a/dashboard/modules/job/tests/test_job_manager.py +++ b/dashboard/modules/job/tests/test_job_manager.py @@ -31,14 +31,14 @@ ["""ray start --head --resources={"TestResourceKey":123}"""], indirect=True, ) -async def test_submit_no_ray_address(call_ray_start, tmp_path): # noqa: F811 +async def test_submit_no_ray_address(call_ray_start): # noqa: F811 """Test that a job script with an unspecified Ray address works.""" address_info = ray.init(address=call_ray_start) gcs_aio_client = GcsAioClient( address=address_info["gcs_address"], nums_reconnect_retry=0 ) - job_manager = JobManager(gcs_aio_client, tmp_path) + job_manager = JobManager(gcs_aio_client) init_ray_no_address_script = """ import ray @@ -78,12 +78,12 @@ def shared_ray_instance(): @pytest.mark.asyncio @pytest.fixture -async def job_manager(shared_ray_instance, tmp_path): +async def job_manager(shared_ray_instance): address_info = shared_ray_instance gcs_aio_client = GcsAioClient( address=address_info["gcs_address"], nums_reconnect_retry=0 ) - yield JobManager(gcs_aio_client, tmp_path) + yield JobManager(gcs_aio_client) def _driver_script_path(file_name: str) -> str: diff --git a/python/ray/experimental/state/common.py b/python/ray/experimental/state/common.py index 999db5da47ad..901c3e689aaa 100644 --- a/python/ray/experimental/state/common.py +++ b/python/ray/experimental/state/common.py @@ -448,7 +448,6 @@ class ClusterEventState(StateSchema): source_type: str = state_column(filterable=True) message: str = state_column(filterable=False) event_id: int = state_column(filterable=True) - custom_fields: dict = state_column(filterable=False, detail=True) @dataclass(init=True)