diff --git a/bazel/ray_deps_setup.bzl b/bazel/ray_deps_setup.bzl index fd844d66d79e..31d5f06b9560 100644 --- a/bazel/ray_deps_setup.bzl +++ b/bazel/ray_deps_setup.bzl @@ -181,8 +181,8 @@ def ray_deps_setup(): auto_http_archive( name = "com_google_googletest", - url = "https://github.com/google/googletest/archive/refs/tags/release-1.11.0.tar.gz", - sha256 = "b4870bf121ff7795ba20d20bcdd8627b8e088f2d1dab299a031c1034eddc93d5", + url = "https://github.com/google/googletest/archive/refs/tags/release-1.12.1.tar.gz", + sha256 = "81964fe578e9bd7c94dfdb09c8e4d6e6759e19967e397dbea48d1c10e45d0df2", ) auto_http_archive( diff --git a/dashboard/modules/event/tests/test_event.py b/dashboard/modules/event/tests/test_event.py index 5c1d21e55aa4..cebd4ffa4e11 100644 --- a/dashboard/modules/event/tests/test_event.py +++ b/dashboard/modules/event/tests/test_event.py @@ -31,6 +31,8 @@ from ray.dashboard.modules.event.event_utils import ( monitor_events, ) +from ray.job_submission import JobSubmissionClient +from pprint import pprint logger = logging.getLogger(__name__) @@ -381,21 +383,72 @@ def verify(): cluster.shutdown() -# def test_jobs_cluster_events(shutdown_only): -# ray.init() -# address = ray._private.worker._global_node.webui_url -# address = format_web_url(address) -# client = JobSubmissionClient(address) -# client.submit_job(entrypoint="ls") +def test_jobs_cluster_events(shutdown_only): + ray.init() + address = ray._private.worker._global_node.webui_url + address = format_web_url(address) + client = JobSubmissionClient(address) + submission_id = client.submit_job(entrypoint="ls") + + def verify(): + events = list_cluster_events() + assert len(list_cluster_events()) == 2 + start_event = events[0] + completed_event = events[1] + + assert start_event["source_type"] == "JOBS" + assert f"Started a ray job {submission_id}" in start_event["message"] + assert start_event["severity"] == "INFO" + assert completed_event["source_type"] == "JOBS" + assert ( + f"Completed a ray job {submission_id} with a status SUCCEEDED." + == completed_event["message"] + ) + assert completed_event["severity"] == "INFO" + return True + + print("Test successful job run.") + wait_for_condition(verify) + pprint(list_cluster_events()) + + # Test the failure case. In this part, job fails because the runtime env + # creation fails. + submission_id = client.submit_job( + entrypoint="ls", + runtime_env={"pip": ["nonexistent_dep"]}, + ) + + def verify(): + events = list_cluster_events(detail=True) + failed_events = [] + + for e in events: + if ( + "submission_id" in e["custom_fields"] + and e["custom_fields"]["submission_id"] == submission_id + ): + failed_events.append(e) + + assert len(failed_events) == 2 + failed_start = failed_events[0] + failed_completed = failed_events[1] + + assert failed_start["source_type"] == "JOBS" + assert f"Started a ray job {submission_id}" in failed_start["message"] + assert failed_completed["source_type"] == "JOBS" + assert failed_completed["severity"] == "ERROR" + assert ( + f"Completed a ray job {submission_id} with a status FAILED." + in failed_completed["message"] + ) -# def verify(): -# assert len(list_cluster_events()) == 3 -# for e in list_cluster_events(): -# e["source_type"] = "JOBS" -# return True + # Make sure the error message is included. + assert "ERROR: No matching distribution found" in failed_completed["message"] + return True -# wait_for_condition(verify) -# print(list_cluster_events()) + print("Test failed (runtime_env failure) job run.") + wait_for_condition(verify, timeout=30) + pprint(list_cluster_events()) if __name__ == "__main__": diff --git a/dashboard/modules/job/job_agent.py b/dashboard/modules/job/job_agent.py index 47810808f7a3..a35f20bb1549 100644 --- a/dashboard/modules/job/job_agent.py +++ b/dashboard/modules/job/job_agent.py @@ -158,7 +158,9 @@ async def tail_job_logs(self, req: Request) -> Response: def get_job_manager(self): if not self._job_manager: - self._job_manager = JobManager(self._dashboard_agent.gcs_aio_client) + self._job_manager = JobManager( + self._dashboard_agent.gcs_aio_client, self._dashboard_agent.log_dir + ) return self._job_manager async def run(self, server): diff --git a/dashboard/modules/job/job_manager.py b/dashboard/modules/job/job_manager.py index 79b8dbefbe93..262fe54ffce7 100644 --- a/dashboard/modules/job/job_manager.py +++ b/dashboard/modules/job/job_manager.py @@ -33,7 +33,8 @@ from ray.dashboard.modules.job.utils import file_tail_iterator from ray.exceptions import RuntimeEnvSetupError from ray.job_submission import JobStatus - +from ray._private.event.event_logger import get_event_logger +from ray.core.generated.event_pb2 import Event logger = logging.getLogger(__name__) @@ -410,12 +411,17 @@ class JobManager: LOG_TAIL_SLEEP_S = 1 JOB_MONITOR_LOOP_PERIOD_S = 1 - def __init__(self, gcs_aio_client: GcsAioClient): + def __init__(self, gcs_aio_client: GcsAioClient, logs_dir: str): self._gcs_aio_client = gcs_aio_client self._job_info_client = JobInfoStorageClient(gcs_aio_client) self._gcs_address = gcs_aio_client._channel._gcs_address self._log_client = JobLogStorageClient() self._supervisor_actor_cls = ray.remote(JobSupervisor) + self.monitored_jobs = set() + try: + self.event_logger = get_event_logger(Event.SourceType.JOBS, logs_dir) + except Exception: + self.event_logger = None create_task(self._recover_running_jobs()) @@ -447,6 +453,19 @@ async def _monitor_job( This is necessary because we need to handle the case where the JobSupervisor dies unexpectedly. """ + if job_id in self.monitored_jobs: + logger.debug(f"Job {job_id} is already being monitored.") + return + + self.monitored_jobs.add(job_id) + try: + await self._monitor_job_internal(job_id, job_supervisor) + finally: + self.monitored_jobs.remove(job_id) + + async def _monitor_job_internal( + self, job_id: str, job_supervisor: Optional[ActorHandle] = None + ): is_alive = True if job_supervisor is None: job_supervisor = self._get_actor_for_job(job_id) @@ -467,27 +486,43 @@ async def _monitor_job( except Exception as e: is_alive = False job_status = await self._job_info_client.get_status(job_id) + job_error_message = None if job_status.is_terminal(): # If the job is already in a terminal state, then the actor # exiting is expected. pass elif isinstance(e, RuntimeEnvSetupError): logger.info(f"Failed to set up runtime_env for job {job_id}.") + job_error_message = f"runtime_env setup failed: {e}" + job_status = JobStatus.FAILED await self._job_info_client.put_status( job_id, - JobStatus.FAILED, - message=f"runtime_env setup failed: {e}", + job_status, + message=job_error_message, ) else: logger.warning( f"Job supervisor for job {job_id} failed unexpectedly: {e}." ) + job_error_message = f"Unexpected error occurred: {e}" + job_status = JobStatus.FAILED await self._job_info_client.put_status( job_id, - JobStatus.FAILED, - message=f"Unexpected error occurred: {e}", + job_status, + message=job_error_message, ) + # Log events + if self.event_logger: + event_log = ( + f"Completed a ray job {job_id} with a status {job_status}." + ) + if job_error_message: + event_log += f" {job_error_message}" + self.event_logger.error(event_log, submission_id=job_id) + else: + self.event_logger.info(event_log, submission_id=job_id) + # Kill the actor defensively to avoid leaking actors in unexpected error cases. if job_supervisor is not None: ray.kill(job_supervisor, no_restart=True) @@ -631,6 +666,10 @@ async def submit_job( # up. try: scheduling_strategy = await self._get_scheduling_strategy() + if self.event_logger: + self.event_logger.info( + f"Started a ray job {submission_id}.", submission_id=submission_id + ) supervisor = self._supervisor_actor_cls.options( lifetime="detached", name=JOB_ACTOR_NAME_TEMPLATE.format(job_id=submission_id), diff --git a/dashboard/modules/job/tests/test_job_manager.py b/dashboard/modules/job/tests/test_job_manager.py index 68b5428a4f38..5c3e1cbe3093 100644 --- a/dashboard/modules/job/tests/test_job_manager.py +++ b/dashboard/modules/job/tests/test_job_manager.py @@ -36,14 +36,16 @@ ["""ray start --head"""], indirect=True, ) -async def test_get_scheduling_strategy(call_ray_start, monkeypatch): # noqa: F811 +async def test_get_scheduling_strategy( + call_ray_start, monkeypatch, tmp_path # noqa: F811 +): monkeypatch.setenv(RAY_JOB_ALLOW_DRIVER_ON_WORKER_NODES_ENV_VAR, "0") address_info = ray.init(address=call_ray_start) gcs_aio_client = GcsAioClient( address=address_info["gcs_address"], nums_reconnect_retry=0 ) - job_manager = JobManager(gcs_aio_client) + job_manager = JobManager(gcs_aio_client, tmp_path) # If no head node id is found, we should use "DEFAULT". await gcs_aio_client.internal_kv_del( @@ -73,14 +75,14 @@ async def test_get_scheduling_strategy(call_ray_start, monkeypatch): # noqa: F8 ["""ray start --head --resources={"TestResourceKey":123}"""], indirect=True, ) -async def test_submit_no_ray_address(call_ray_start): # noqa: F811 +async def test_submit_no_ray_address(call_ray_start, tmp_path): # noqa: F811 """Test that a job script with an unspecified Ray address works.""" address_info = ray.init(address=call_ray_start) gcs_aio_client = GcsAioClient( address=address_info["gcs_address"], nums_reconnect_retry=0 ) - job_manager = JobManager(gcs_aio_client) + job_manager = JobManager(gcs_aio_client, tmp_path) init_ray_no_address_script = """ import ray @@ -120,12 +122,12 @@ def shared_ray_instance(): @pytest.mark.asyncio @pytest.fixture -async def job_manager(shared_ray_instance): +async def job_manager(shared_ray_instance, tmp_path): address_info = shared_ray_instance gcs_aio_client = GcsAioClient( address=address_info["gcs_address"], nums_reconnect_retry=0 ) - yield JobManager(gcs_aio_client) + yield JobManager(gcs_aio_client, tmp_path) def _driver_script_path(file_name: str) -> str: diff --git a/python/ray/experimental/state/common.py b/python/ray/experimental/state/common.py index 3201c8742b7e..e867dc62a220 100644 --- a/python/ray/experimental/state/common.py +++ b/python/ray/experimental/state/common.py @@ -461,6 +461,7 @@ class ClusterEventState(StateSchema): source_type: str = state_column(filterable=True) message: str = state_column(filterable=False) event_id: int = state_column(filterable=True) + custom_fields: dict = state_column(filterable=False, detail=True) @dataclass(init=True)