Skip to content

Commit 0540b1f

Browse files
authored
Revert "Revert "[Cluster Events] Add basic job events. (ray-project#29164)" (#29… (ray-project#29196)
This reverts the PR and fixes the test failures This also fixes a bug around _monitor_jobs API. the monitor job can be called on the same job twice now, which will break the event (because at the end of monitor job, we record the event that job is completed). The same completed event can be reported twice without the fix
1 parent 9a020a2 commit 0540b1f

File tree

5 files changed

+123
-26
lines changed

5 files changed

+123
-26
lines changed

dashboard/modules/event/tests/test_event.py

Lines changed: 66 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
from ray.dashboard.modules.event.event_utils import (
3232
monitor_events,
3333
)
34+
from ray.job_submission import JobSubmissionClient
35+
from pprint import pprint
3436

3537
logger = logging.getLogger(__name__)
3638

@@ -381,21 +383,72 @@ def verify():
381383
cluster.shutdown()
382384

383385

384-
# def test_jobs_cluster_events(shutdown_only):
385-
# ray.init()
386-
# address = ray._private.worker._global_node.webui_url
387-
# address = format_web_url(address)
388-
# client = JobSubmissionClient(address)
389-
# client.submit_job(entrypoint="ls")
386+
def test_jobs_cluster_events(shutdown_only):
387+
ray.init()
388+
address = ray._private.worker._global_node.webui_url
389+
address = format_web_url(address)
390+
client = JobSubmissionClient(address)
391+
submission_id = client.submit_job(entrypoint="ls")
392+
393+
def verify():
394+
events = list_cluster_events()
395+
assert len(list_cluster_events()) == 2
396+
start_event = events[0]
397+
completed_event = events[1]
398+
399+
assert start_event["source_type"] == "JOBS"
400+
assert f"Started a ray job {submission_id}" in start_event["message"]
401+
assert start_event["severity"] == "INFO"
402+
assert completed_event["source_type"] == "JOBS"
403+
assert (
404+
f"Completed a ray job {submission_id} with a status SUCCEEDED."
405+
== completed_event["message"]
406+
)
407+
assert completed_event["severity"] == "INFO"
408+
return True
409+
410+
print("Test successful job run.")
411+
wait_for_condition(verify)
412+
pprint(list_cluster_events())
413+
414+
# Test the failure case. In this part, job fails because the runtime env
415+
# creation fails.
416+
submission_id = client.submit_job(
417+
entrypoint="ls",
418+
runtime_env={"pip": ["nonexistent_dep"]},
419+
)
420+
421+
def verify():
422+
events = list_cluster_events(detail=True)
423+
failed_events = []
424+
425+
for e in events:
426+
if (
427+
"submission_id" in e["custom_fields"]
428+
and e["custom_fields"]["submission_id"] == submission_id
429+
):
430+
failed_events.append(e)
431+
432+
assert len(failed_events) == 2
433+
failed_start = failed_events[0]
434+
failed_completed = failed_events[1]
435+
436+
assert failed_start["source_type"] == "JOBS"
437+
assert f"Started a ray job {submission_id}" in failed_start["message"]
438+
assert failed_completed["source_type"] == "JOBS"
439+
assert failed_completed["severity"] == "ERROR"
440+
assert (
441+
f"Completed a ray job {submission_id} with a status FAILED."
442+
in failed_completed["message"]
443+
)
390444

391-
# def verify():
392-
# assert len(list_cluster_events()) == 3
393-
# for e in list_cluster_events():
394-
# e["source_type"] = "JOBS"
395-
# return True
445+
# Make sure the error message is included.
446+
assert "ERROR: No matching distribution found" in failed_completed["message"]
447+
return True
396448

397-
# wait_for_condition(verify)
398-
# print(list_cluster_events())
449+
print("Test failed (runtime_env failure) job run.")
450+
wait_for_condition(verify, timeout=30)
451+
pprint(list_cluster_events())
399452

400453

401454
if __name__ == "__main__":

dashboard/modules/job/job_agent.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,9 @@ async def tail_job_logs(self, req: Request) -> Response:
158158

159159
def get_job_manager(self):
160160
if not self._job_manager:
161-
self._job_manager = JobManager(self._dashboard_agent.gcs_aio_client)
161+
self._job_manager = JobManager(
162+
self._dashboard_agent.gcs_aio_client, self._dashboard_agent.log_dir
163+
)
162164
return self._job_manager
163165

164166
async def run(self, server):

dashboard/modules/job/job_manager.py

Lines changed: 45 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@
3333
from ray.dashboard.modules.job.utils import file_tail_iterator
3434
from ray.exceptions import RuntimeEnvSetupError
3535
from ray.job_submission import JobStatus
36-
36+
from ray._private.event.event_logger import get_event_logger
37+
from ray.core.generated.event_pb2 import Event
3738

3839
logger = logging.getLogger(__name__)
3940

@@ -410,12 +411,17 @@ class JobManager:
410411
LOG_TAIL_SLEEP_S = 1
411412
JOB_MONITOR_LOOP_PERIOD_S = 1
412413

413-
def __init__(self, gcs_aio_client: GcsAioClient):
414+
def __init__(self, gcs_aio_client: GcsAioClient, logs_dir: str):
414415
self._gcs_aio_client = gcs_aio_client
415416
self._job_info_client = JobInfoStorageClient(gcs_aio_client)
416417
self._gcs_address = gcs_aio_client._channel._gcs_address
417418
self._log_client = JobLogStorageClient()
418419
self._supervisor_actor_cls = ray.remote(JobSupervisor)
420+
self.monitored_jobs = set()
421+
try:
422+
self.event_logger = get_event_logger(Event.SourceType.JOBS, logs_dir)
423+
except Exception:
424+
self.event_logger = None
419425

420426
create_task(self._recover_running_jobs())
421427

@@ -447,6 +453,19 @@ async def _monitor_job(
447453
This is necessary because we need to handle the case where the
448454
JobSupervisor dies unexpectedly.
449455
"""
456+
if job_id in self.monitored_jobs:
457+
logger.debug(f"Job {job_id} is already being monitored.")
458+
return
459+
460+
self.monitored_jobs.add(job_id)
461+
try:
462+
await self._monitor_job_internal(job_id, job_supervisor)
463+
finally:
464+
self.monitored_jobs.remove(job_id)
465+
466+
async def _monitor_job_internal(
467+
self, job_id: str, job_supervisor: Optional[ActorHandle] = None
468+
):
450469
is_alive = True
451470
if job_supervisor is None:
452471
job_supervisor = self._get_actor_for_job(job_id)
@@ -467,27 +486,43 @@ async def _monitor_job(
467486
except Exception as e:
468487
is_alive = False
469488
job_status = await self._job_info_client.get_status(job_id)
489+
job_error_message = None
470490
if job_status.is_terminal():
471491
# If the job is already in a terminal state, then the actor
472492
# exiting is expected.
473493
pass
474494
elif isinstance(e, RuntimeEnvSetupError):
475495
logger.info(f"Failed to set up runtime_env for job {job_id}.")
496+
job_error_message = f"runtime_env setup failed: {e}"
497+
job_status = JobStatus.FAILED
476498
await self._job_info_client.put_status(
477499
job_id,
478-
JobStatus.FAILED,
479-
message=f"runtime_env setup failed: {e}",
500+
job_status,
501+
message=job_error_message,
480502
)
481503
else:
482504
logger.warning(
483505
f"Job supervisor for job {job_id} failed unexpectedly: {e}."
484506
)
507+
job_error_message = f"Unexpected error occurred: {e}"
508+
job_status = JobStatus.FAILED
485509
await self._job_info_client.put_status(
486510
job_id,
487-
JobStatus.FAILED,
488-
message=f"Unexpected error occurred: {e}",
511+
job_status,
512+
message=job_error_message,
489513
)
490514

515+
# Log events
516+
if self.event_logger:
517+
event_log = (
518+
f"Completed a ray job {job_id} with a status {job_status}."
519+
)
520+
if job_error_message:
521+
event_log += f" {job_error_message}"
522+
self.event_logger.error(event_log, submission_id=job_id)
523+
else:
524+
self.event_logger.info(event_log, submission_id=job_id)
525+
491526
# Kill the actor defensively to avoid leaking actors in unexpected error cases.
492527
if job_supervisor is not None:
493528
ray.kill(job_supervisor, no_restart=True)
@@ -631,6 +666,10 @@ async def submit_job(
631666
# up.
632667
try:
633668
scheduling_strategy = await self._get_scheduling_strategy()
669+
if self.event_logger:
670+
self.event_logger.info(
671+
f"Started a ray job {submission_id}.", submission_id=submission_id
672+
)
634673
supervisor = self._supervisor_actor_cls.options(
635674
lifetime="detached",
636675
name=JOB_ACTOR_NAME_TEMPLATE.format(job_id=submission_id),

dashboard/modules/job/tests/test_job_manager.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,16 @@
3636
["""ray start --head"""],
3737
indirect=True,
3838
)
39-
async def test_get_scheduling_strategy(call_ray_start, monkeypatch): # noqa: F811
39+
async def test_get_scheduling_strategy(
40+
call_ray_start, monkeypatch, tmp_path # noqa: F811
41+
):
4042
monkeypatch.setenv(RAY_JOB_ALLOW_DRIVER_ON_WORKER_NODES_ENV_VAR, "0")
4143
address_info = ray.init(address=call_ray_start)
4244
gcs_aio_client = GcsAioClient(
4345
address=address_info["gcs_address"], nums_reconnect_retry=0
4446
)
4547

46-
job_manager = JobManager(gcs_aio_client)
48+
job_manager = JobManager(gcs_aio_client, tmp_path)
4749

4850
# If no head node id is found, we should use "DEFAULT".
4951
await gcs_aio_client.internal_kv_del(
@@ -73,14 +75,14 @@ async def test_get_scheduling_strategy(call_ray_start, monkeypatch): # noqa: F8
7375
["""ray start --head --resources={"TestResourceKey":123}"""],
7476
indirect=True,
7577
)
76-
async def test_submit_no_ray_address(call_ray_start): # noqa: F811
78+
async def test_submit_no_ray_address(call_ray_start, tmp_path): # noqa: F811
7779
"""Test that a job script with an unspecified Ray address works."""
7880

7981
address_info = ray.init(address=call_ray_start)
8082
gcs_aio_client = GcsAioClient(
8183
address=address_info["gcs_address"], nums_reconnect_retry=0
8284
)
83-
job_manager = JobManager(gcs_aio_client)
85+
job_manager = JobManager(gcs_aio_client, tmp_path)
8486

8587
init_ray_no_address_script = """
8688
import ray
@@ -120,12 +122,12 @@ def shared_ray_instance():
120122

121123
@pytest.mark.asyncio
122124
@pytest.fixture
123-
async def job_manager(shared_ray_instance):
125+
async def job_manager(shared_ray_instance, tmp_path):
124126
address_info = shared_ray_instance
125127
gcs_aio_client = GcsAioClient(
126128
address=address_info["gcs_address"], nums_reconnect_retry=0
127129
)
128-
yield JobManager(gcs_aio_client)
130+
yield JobManager(gcs_aio_client, tmp_path)
129131

130132

131133
def _driver_script_path(file_name: str) -> str:

python/ray/experimental/state/common.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -461,6 +461,7 @@ class ClusterEventState(StateSchema):
461461
source_type: str = state_column(filterable=True)
462462
message: str = state_column(filterable=False)
463463
event_id: int = state_column(filterable=True)
464+
custom_fields: dict = state_column(filterable=False, detail=True)
464465

465466

466467
@dataclass(init=True)

0 commit comments

Comments
 (0)