From 00a9b030d7fa97ba88ba5362cee4e9ca215eb36f Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Fri, 7 Oct 2022 18:13:30 -0700 Subject: [PATCH 1/5] Revert "Revert "[Cluster Events] Add basic job events. (#29164)" (#29195)" This reverts commit 75a0c491e8ee4009e8828081059cfc98543c4d51. --- dashboard/modules/event/tests/test_event.py | 85 +++++++++++++++---- dashboard/modules/job/job_agent.py | 4 +- dashboard/modules/job/job_head.py | 4 +- dashboard/modules/job/job_manager.py | 32 +++++-- .../modules/job/tests/test_job_manager.py | 8 +- python/ray/experimental/state/common.py | 1 + 6 files changed, 108 insertions(+), 26 deletions(-) diff --git a/dashboard/modules/event/tests/test_event.py b/dashboard/modules/event/tests/test_event.py index 3adea4280df5..a072d926cac4 100644 --- a/dashboard/modules/event/tests/test_event.py +++ b/dashboard/modules/event/tests/test_event.py @@ -27,6 +27,8 @@ from ray.dashboard.modules.event.event_utils import ( monitor_events, ) +from ray.experimental.state.api import list_cluster_events +from ray.job_submission import JobSubmissionClient logger = logging.getLogger(__name__) @@ -303,21 +305,74 @@ async def _check_events(expect_events, read_events, timeout=10): # assert infeasible_event["source_type"] == "AUTOSCALER" -# def test_jobs_cluster_events(shutdown_only): -# ray.init() -# address = ray._private.worker._global_node.webui_url -# address = format_web_url(address) -# client = JobSubmissionClient(address) -# client.submit_job(entrypoint="ls") - -# def verify(): -# assert len(list_cluster_events()) == 3 -# for e in list_cluster_events(): -# e["source_type"] = "JOBS" -# return True - -# wait_for_condition(verify) -# print(list_cluster_events()) +def test_jobs_cluster_events(shutdown_only): + ray.init() + address = ray._private.worker._global_node.webui_url + address = format_web_url(address) + client = JobSubmissionClient(address) + submission_id = client.submit_job(entrypoint="ls") + + def verify(): + events = list_cluster_events() + print(events) + assert len(list_cluster_events()) == 2 + start_event = events[0] + completed_event = events[1] + + assert start_event["source_type"] == "JOBS" + assert f"Started a ray job {submission_id}" in start_event["message"] + assert start_event["severity"] == "INFO" + assert completed_event["source_type"] == "JOBS" + assert ( + f"Completed a ray job {submission_id} with a status SUCCEEDED." + == completed_event["message"] + ) + assert completed_event["severity"] == "INFO" + return True + + print("Test successful job run.") + wait_for_condition(verify) + + # Test the failure case. In this part, job fails because the runtime env + # creation fails. + submission_id = client.submit_job( + entrypoint="ls", + runtime_env={"working_dir": "s3://does_not_exist.zip"}, + ) + + def verify(): + events = list_cluster_events(detail=True) + failed_events = [] + + for e in events: + if ( + "submission_id" in e["custom_fields"] + and e["custom_fields"]["submission_id"] == submission_id + ): + failed_events.append(e) + + assert len(failed_events) == 2 + failed_start = failed_events[0] + failed_completed = failed_events[1] + + assert failed_start["source_type"] == "JOBS" + assert f"Started a ray job {submission_id}" in failed_start["message"] + assert failed_completed["source_type"] == "JOBS" + assert ( + f"Completed a ray job {submission_id} with a status FAILED." + in failed_completed["message"] + ) + print(failed_completed["message"]) + # TODO(sang): Reenable it. + # # Make sure the error message is included. + # assert ( + # "An error occurred (ExpiredToken) when calling the " + # "GetObject operation:" in failed_completed["message"] + # ) + return True + + print("Test failed (runtime_env failure) job run.") + wait_for_condition(verify) if __name__ == "__main__": diff --git a/dashboard/modules/job/job_agent.py b/dashboard/modules/job/job_agent.py index b0a1eed68018..3374f44e6760 100644 --- a/dashboard/modules/job/job_agent.py +++ b/dashboard/modules/job/job_agent.py @@ -156,7 +156,9 @@ async def tail_job_logs(self, req: Request) -> Response: def get_job_manager(self): if not self._job_manager: - self._job_manager = JobManager(self._dashboard_agent.gcs_aio_client) + self._job_manager = JobManager( + self._dashboard_agent.gcs_aio_client, self._dashboard_agent.log_dir + ) return self._job_manager async def run(self, server): diff --git a/dashboard/modules/job/job_head.py b/dashboard/modules/job/job_head.py index 1cd782c72214..9b862d77ae7f 100644 --- a/dashboard/modules/job/job_head.py +++ b/dashboard/modules/job/job_head.py @@ -357,7 +357,9 @@ async def tail_job_logs(self, req: Request) -> Response: async def run(self, server): if not self._job_manager: - self._job_manager = JobManager(self._dashboard_head.gcs_aio_client) + self._job_manager = JobManager( + self._dashboard_head.gcs_aio_client, self._dashboard_head.log_dir + ) @staticmethod def is_minimal_module(): diff --git a/dashboard/modules/job/job_manager.py b/dashboard/modules/job/job_manager.py index 16b07ecea68f..67389c4a56a8 100644 --- a/dashboard/modules/job/job_manager.py +++ b/dashboard/modules/job/job_manager.py @@ -30,6 +30,8 @@ from ray.exceptions import RuntimeEnvSetupError from ray.job_submission import JobStatus from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy +from ray._private.event.event_logger import get_event_logger +from ray.core.generated.event_pb2 import Event logger = logging.getLogger(__name__) @@ -385,12 +387,16 @@ class JobManager: LOG_TAIL_SLEEP_S = 1 JOB_MONITOR_LOOP_PERIOD_S = 1 - def __init__(self, gcs_aio_client: GcsAioClient): + def __init__(self, gcs_aio_client: GcsAioClient, logs_dir: str): self._gcs_aio_client = gcs_aio_client self._job_info_client = JobInfoStorageClient(gcs_aio_client) self._gcs_address = gcs_aio_client._channel._gcs_address self._log_client = JobLogStorageClient() self._supervisor_actor_cls = ray.remote(JobSupervisor) + try: + self.event_logger = get_event_logger(Event.SourceType.JOBS, logs_dir) + except Exception: + self.event_logger = None create_task(self._recover_running_jobs()) @@ -442,27 +448,39 @@ async def _monitor_job( except Exception as e: is_alive = False job_status = await self._job_info_client.get_status(job_id) + job_error_message = None if job_status.is_terminal(): # If the job is already in a terminal state, then the actor # exiting is expected. pass elif isinstance(e, RuntimeEnvSetupError): logger.info(f"Failed to set up runtime_env for job {job_id}.") + job_error_message = f"runtime_env setup failed: {e}" + job_status = JobStatus.FAILED await self._job_info_client.put_status( job_id, - JobStatus.FAILED, - message=f"runtime_env setup failed: {e}", + job_status, + message=job_error_message, ) else: logger.warning( f"Job supervisor for job {job_id} failed unexpectedly: {e}." ) + job_error_message = f"Unexpected error occurred: {e}" + job_status = JobStatus.FAILED await self._job_info_client.put_status( job_id, - JobStatus.FAILED, - message=f"Unexpected error occurred: {e}", + job_status, + message=job_error_message, ) + # Log events + event_log = f"Completed a ray job {job_id} with a status {job_status}." + if job_error_message: + event_log += f" {job_error_message}" + if self.event_logger: + self.event_logger.info(event_log, submission_id=job_id) + # Kill the actor defensively to avoid leaking actors in unexpected error cases. if job_supervisor is not None: ray.kill(job_supervisor, no_restart=True) @@ -584,6 +602,10 @@ async def submit_job( node_id=ray.get_runtime_context().node_id, soft=False, ) + if self.event_logger: + self.event_logger.info( + f"Started a ray job {submission_id}.", submission_id=submission_id + ) supervisor = self._supervisor_actor_cls.options( lifetime="detached", name=JOB_ACTOR_NAME_TEMPLATE.format(job_id=submission_id), diff --git a/dashboard/modules/job/tests/test_job_manager.py b/dashboard/modules/job/tests/test_job_manager.py index 9ded996f1d18..e30b7c2f7c06 100644 --- a/dashboard/modules/job/tests/test_job_manager.py +++ b/dashboard/modules/job/tests/test_job_manager.py @@ -31,14 +31,14 @@ ["""ray start --head --resources={"TestResourceKey":123}"""], indirect=True, ) -async def test_submit_no_ray_address(call_ray_start): # noqa: F811 +async def test_submit_no_ray_address(call_ray_start, tmp_path): # noqa: F811 """Test that a job script with an unspecified Ray address works.""" address_info = ray.init(address=call_ray_start) gcs_aio_client = GcsAioClient( address=address_info["gcs_address"], nums_reconnect_retry=0 ) - job_manager = JobManager(gcs_aio_client) + job_manager = JobManager(gcs_aio_client, tmp_path) init_ray_no_address_script = """ import ray @@ -78,12 +78,12 @@ def shared_ray_instance(): @pytest.mark.asyncio @pytest.fixture -async def job_manager(shared_ray_instance): +async def job_manager(shared_ray_instance, tmp_path): address_info = shared_ray_instance gcs_aio_client = GcsAioClient( address=address_info["gcs_address"], nums_reconnect_retry=0 ) - yield JobManager(gcs_aio_client) + yield JobManager(gcs_aio_client, tmp_path) def _driver_script_path(file_name: str) -> str: diff --git a/python/ray/experimental/state/common.py b/python/ray/experimental/state/common.py index 901c3e689aaa..999db5da47ad 100644 --- a/python/ray/experimental/state/common.py +++ b/python/ray/experimental/state/common.py @@ -448,6 +448,7 @@ class ClusterEventState(StateSchema): source_type: str = state_column(filterable=True) message: str = state_column(filterable=False) event_id: int = state_column(filterable=True) + custom_fields: dict = state_column(filterable=False, detail=True) @dataclass(init=True) From de20e716598c842c7818c8a527f0dc31f5626881 Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Wed, 2 Nov 2022 01:16:32 -0700 Subject: [PATCH 2/5] Lint Signed-off-by: SangBin Cho --- dashboard/modules/event/tests/test_event.py | 1 - dashboard/modules/job/job_manager.py | 1 - 2 files changed, 2 deletions(-) diff --git a/dashboard/modules/event/tests/test_event.py b/dashboard/modules/event/tests/test_event.py index 17a586cd6ab3..a3aea0da5f70 100644 --- a/dashboard/modules/event/tests/test_event.py +++ b/dashboard/modules/event/tests/test_event.py @@ -29,7 +29,6 @@ from ray.dashboard.modules.event.event_utils import ( monitor_events, ) -from ray.experimental.state.api import list_cluster_events from ray.job_submission import JobSubmissionClient logger = logging.getLogger(__name__) diff --git a/dashboard/modules/job/job_manager.py b/dashboard/modules/job/job_manager.py index 0ec00ae50562..d263f645b0c8 100644 --- a/dashboard/modules/job/job_manager.py +++ b/dashboard/modules/job/job_manager.py @@ -33,7 +33,6 @@ from ray.dashboard.modules.job.utils import file_tail_iterator from ray.exceptions import RuntimeEnvSetupError from ray.job_submission import JobStatus -from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy from ray._private.event.event_logger import get_event_logger from ray.core.generated.event_pb2 import Event From 6cc0320439cab0dba4a620418ff5cb4624e17c93 Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Wed, 2 Nov 2022 06:43:05 -0700 Subject: [PATCH 3/5] Fix test failures. Signed-off-by: SangBin Cho --- dashboard/modules/event/tests/test_event.py | 19 +++++++-------- dashboard/modules/job/job_manager.py | 26 +++++++++++++++++---- 2 files changed, 31 insertions(+), 14 deletions(-) diff --git a/dashboard/modules/event/tests/test_event.py b/dashboard/modules/event/tests/test_event.py index a3aea0da5f70..051f401f3608 100644 --- a/dashboard/modules/event/tests/test_event.py +++ b/dashboard/modules/event/tests/test_event.py @@ -30,6 +30,7 @@ monitor_events, ) from ray.job_submission import JobSubmissionClient +from pprint import pprint logger = logging.getLogger(__name__) @@ -399,7 +400,6 @@ def test_jobs_cluster_events(shutdown_only): def verify(): events = list_cluster_events() - print(events) assert len(list_cluster_events()) == 2 start_event = events[0] completed_event = events[1] @@ -417,12 +417,13 @@ def verify(): print("Test successful job run.") wait_for_condition(verify) + pprint(list_cluster_events()) # Test the failure case. In this part, job fails because the runtime env # creation fails. submission_id = client.submit_job( entrypoint="ls", - runtime_env={"working_dir": "s3://does_not_exist.zip"}, + runtime_env={"pip": ["nonexistent_dep"]}, ) def verify(): @@ -443,21 +444,19 @@ def verify(): assert failed_start["source_type"] == "JOBS" assert f"Started a ray job {submission_id}" in failed_start["message"] assert failed_completed["source_type"] == "JOBS" + assert failed_completed["severity"] == "ERROR" assert ( f"Completed a ray job {submission_id} with a status FAILED." in failed_completed["message"] ) - print(failed_completed["message"]) - # TODO(sang): Reenable it. - # # Make sure the error message is included. - # assert ( - # "An error occurred (ExpiredToken) when calling the " - # "GetObject operation:" in failed_completed["message"] - # ) + + # Make sure the error message is included. + assert "ERROR: No matching distribution found" in failed_completed["message"] return True print("Test failed (runtime_env failure) job run.") - wait_for_condition(verify) + wait_for_condition(verify, timeout=30) + pprint(list_cluster_events()) if __name__ == "__main__": diff --git a/dashboard/modules/job/job_manager.py b/dashboard/modules/job/job_manager.py index d263f645b0c8..262fe54ffce7 100644 --- a/dashboard/modules/job/job_manager.py +++ b/dashboard/modules/job/job_manager.py @@ -417,6 +417,7 @@ def __init__(self, gcs_aio_client: GcsAioClient, logs_dir: str): self._gcs_address = gcs_aio_client._channel._gcs_address self._log_client = JobLogStorageClient() self._supervisor_actor_cls = ray.remote(JobSupervisor) + self.monitored_jobs = set() try: self.event_logger = get_event_logger(Event.SourceType.JOBS, logs_dir) except Exception: @@ -452,6 +453,19 @@ async def _monitor_job( This is necessary because we need to handle the case where the JobSupervisor dies unexpectedly. """ + if job_id in self.monitored_jobs: + logger.debug(f"Job {job_id} is already being monitored.") + return + + self.monitored_jobs.add(job_id) + try: + await self._monitor_job_internal(job_id, job_supervisor) + finally: + self.monitored_jobs.remove(job_id) + + async def _monitor_job_internal( + self, job_id: str, job_supervisor: Optional[ActorHandle] = None + ): is_alive = True if job_supervisor is None: job_supervisor = self._get_actor_for_job(job_id) @@ -499,11 +513,15 @@ async def _monitor_job( ) # Log events - event_log = f"Completed a ray job {job_id} with a status {job_status}." - if job_error_message: - event_log += f" {job_error_message}" if self.event_logger: - self.event_logger.info(event_log, submission_id=job_id) + event_log = ( + f"Completed a ray job {job_id} with a status {job_status}." + ) + if job_error_message: + event_log += f" {job_error_message}" + self.event_logger.error(event_log, submission_id=job_id) + else: + self.event_logger.info(event_log, submission_id=job_id) # Kill the actor defensively to avoid leaking actors in unexpected error cases. if job_supervisor is not None: From 2bfb0af8bf4d1bcca0b34f52f2ae51080b166463 Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Wed, 2 Nov 2022 08:14:47 -0700 Subject: [PATCH 4/5] Fix a tes failure. Signed-off-by: SangBin Cho --- dashboard/modules/job/tests/test_job_manager.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/dashboard/modules/job/tests/test_job_manager.py b/dashboard/modules/job/tests/test_job_manager.py index 8a318563de77..04934adef670 100644 --- a/dashboard/modules/job/tests/test_job_manager.py +++ b/dashboard/modules/job/tests/test_job_manager.py @@ -36,14 +36,16 @@ ["""ray start --head"""], indirect=True, ) -async def test_get_scheduling_strategy(call_ray_start, monkeypatch): # noqa: F811 +async def test_get_scheduling_strategy( + call_ray_start, monkeypatch, tmp_path +): # noqa: F811 monkeypatch.setenv(RAY_JOB_ALLOW_DRIVER_ON_WORKER_NODES_ENV_VAR, "0") address_info = ray.init(address=call_ray_start) gcs_aio_client = GcsAioClient( address=address_info["gcs_address"], nums_reconnect_retry=0 ) - job_manager = JobManager(gcs_aio_client) + job_manager = JobManager(gcs_aio_client, tmp_path) # If no head node id is found, we should use "DEFAULT". await gcs_aio_client.internal_kv_del( From 03d95d6e56c7c3cdbffe6b672e8667935e8448fd Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Wed, 2 Nov 2022 08:19:28 -0700 Subject: [PATCH 5/5] linting Signed-off-by: SangBin Cho --- dashboard/modules/job/tests/test_job_manager.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dashboard/modules/job/tests/test_job_manager.py b/dashboard/modules/job/tests/test_job_manager.py index 04934adef670..5c3e1cbe3093 100644 --- a/dashboard/modules/job/tests/test_job_manager.py +++ b/dashboard/modules/job/tests/test_job_manager.py @@ -37,8 +37,8 @@ indirect=True, ) async def test_get_scheduling_strategy( - call_ray_start, monkeypatch, tmp_path -): # noqa: F811 + call_ray_start, monkeypatch, tmp_path # noqa: F811 +): monkeypatch.setenv(RAY_JOB_ALLOW_DRIVER_ON_WORKER_NODES_ENV_VAR, "0") address_info = ray.init(address=call_ray_start) gcs_aio_client = GcsAioClient(