Skip to content

Commit 00a9b03

Browse files
committed
Revert "Revert "[Cluster Events] Add basic job events. (ray-project#29164)" (ray-project#29195)"
This reverts commit 75a0c49.
1 parent 75a0c49 commit 00a9b03

File tree

6 files changed

+108
-26
lines changed

6 files changed

+108
-26
lines changed

dashboard/modules/event/tests/test_event.py

Lines changed: 70 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
from ray.dashboard.modules.event.event_utils import (
2828
monitor_events,
2929
)
30+
from ray.experimental.state.api import list_cluster_events
31+
from ray.job_submission import JobSubmissionClient
3032

3133
logger = logging.getLogger(__name__)
3234

@@ -303,21 +305,74 @@ async def _check_events(expect_events, read_events, timeout=10):
303305
# assert infeasible_event["source_type"] == "AUTOSCALER"
304306

305307

306-
# def test_jobs_cluster_events(shutdown_only):
307-
# ray.init()
308-
# address = ray._private.worker._global_node.webui_url
309-
# address = format_web_url(address)
310-
# client = JobSubmissionClient(address)
311-
# client.submit_job(entrypoint="ls")
312-
313-
# def verify():
314-
# assert len(list_cluster_events()) == 3
315-
# for e in list_cluster_events():
316-
# e["source_type"] = "JOBS"
317-
# return True
318-
319-
# wait_for_condition(verify)
320-
# print(list_cluster_events())
308+
def test_jobs_cluster_events(shutdown_only):
309+
ray.init()
310+
address = ray._private.worker._global_node.webui_url
311+
address = format_web_url(address)
312+
client = JobSubmissionClient(address)
313+
submission_id = client.submit_job(entrypoint="ls")
314+
315+
def verify():
316+
events = list_cluster_events()
317+
print(events)
318+
assert len(list_cluster_events()) == 2
319+
start_event = events[0]
320+
completed_event = events[1]
321+
322+
assert start_event["source_type"] == "JOBS"
323+
assert f"Started a ray job {submission_id}" in start_event["message"]
324+
assert start_event["severity"] == "INFO"
325+
assert completed_event["source_type"] == "JOBS"
326+
assert (
327+
f"Completed a ray job {submission_id} with a status SUCCEEDED."
328+
== completed_event["message"]
329+
)
330+
assert completed_event["severity"] == "INFO"
331+
return True
332+
333+
print("Test successful job run.")
334+
wait_for_condition(verify)
335+
336+
# Test the failure case. In this part, job fails because the runtime env
337+
# creation fails.
338+
submission_id = client.submit_job(
339+
entrypoint="ls",
340+
runtime_env={"working_dir": "s3://does_not_exist.zip"},
341+
)
342+
343+
def verify():
344+
events = list_cluster_events(detail=True)
345+
failed_events = []
346+
347+
for e in events:
348+
if (
349+
"submission_id" in e["custom_fields"]
350+
and e["custom_fields"]["submission_id"] == submission_id
351+
):
352+
failed_events.append(e)
353+
354+
assert len(failed_events) == 2
355+
failed_start = failed_events[0]
356+
failed_completed = failed_events[1]
357+
358+
assert failed_start["source_type"] == "JOBS"
359+
assert f"Started a ray job {submission_id}" in failed_start["message"]
360+
assert failed_completed["source_type"] == "JOBS"
361+
assert (
362+
f"Completed a ray job {submission_id} with a status FAILED."
363+
in failed_completed["message"]
364+
)
365+
print(failed_completed["message"])
366+
# TODO(sang): Reenable it.
367+
# # Make sure the error message is included.
368+
# assert (
369+
# "An error occurred (ExpiredToken) when calling the "
370+
# "GetObject operation:" in failed_completed["message"]
371+
# )
372+
return True
373+
374+
print("Test failed (runtime_env failure) job run.")
375+
wait_for_condition(verify)
321376

322377

323378
if __name__ == "__main__":

dashboard/modules/job/job_agent.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,9 @@ async def tail_job_logs(self, req: Request) -> Response:
156156

157157
def get_job_manager(self):
158158
if not self._job_manager:
159-
self._job_manager = JobManager(self._dashboard_agent.gcs_aio_client)
159+
self._job_manager = JobManager(
160+
self._dashboard_agent.gcs_aio_client, self._dashboard_agent.log_dir
161+
)
160162
return self._job_manager
161163

162164
async def run(self, server):

dashboard/modules/job/job_head.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,9 @@ async def tail_job_logs(self, req: Request) -> Response:
357357

358358
async def run(self, server):
359359
if not self._job_manager:
360-
self._job_manager = JobManager(self._dashboard_head.gcs_aio_client)
360+
self._job_manager = JobManager(
361+
self._dashboard_head.gcs_aio_client, self._dashboard_head.log_dir
362+
)
361363

362364
@staticmethod
363365
def is_minimal_module():

dashboard/modules/job/job_manager.py

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
from ray.exceptions import RuntimeEnvSetupError
3131
from ray.job_submission import JobStatus
3232
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy
33+
from ray._private.event.event_logger import get_event_logger
34+
from ray.core.generated.event_pb2 import Event
3335

3436
logger = logging.getLogger(__name__)
3537

@@ -385,12 +387,16 @@ class JobManager:
385387
LOG_TAIL_SLEEP_S = 1
386388
JOB_MONITOR_LOOP_PERIOD_S = 1
387389

388-
def __init__(self, gcs_aio_client: GcsAioClient):
390+
def __init__(self, gcs_aio_client: GcsAioClient, logs_dir: str):
389391
self._gcs_aio_client = gcs_aio_client
390392
self._job_info_client = JobInfoStorageClient(gcs_aio_client)
391393
self._gcs_address = gcs_aio_client._channel._gcs_address
392394
self._log_client = JobLogStorageClient()
393395
self._supervisor_actor_cls = ray.remote(JobSupervisor)
396+
try:
397+
self.event_logger = get_event_logger(Event.SourceType.JOBS, logs_dir)
398+
except Exception:
399+
self.event_logger = None
394400

395401
create_task(self._recover_running_jobs())
396402

@@ -442,27 +448,39 @@ async def _monitor_job(
442448
except Exception as e:
443449
is_alive = False
444450
job_status = await self._job_info_client.get_status(job_id)
451+
job_error_message = None
445452
if job_status.is_terminal():
446453
# If the job is already in a terminal state, then the actor
447454
# exiting is expected.
448455
pass
449456
elif isinstance(e, RuntimeEnvSetupError):
450457
logger.info(f"Failed to set up runtime_env for job {job_id}.")
458+
job_error_message = f"runtime_env setup failed: {e}"
459+
job_status = JobStatus.FAILED
451460
await self._job_info_client.put_status(
452461
job_id,
453-
JobStatus.FAILED,
454-
message=f"runtime_env setup failed: {e}",
462+
job_status,
463+
message=job_error_message,
455464
)
456465
else:
457466
logger.warning(
458467
f"Job supervisor for job {job_id} failed unexpectedly: {e}."
459468
)
469+
job_error_message = f"Unexpected error occurred: {e}"
470+
job_status = JobStatus.FAILED
460471
await self._job_info_client.put_status(
461472
job_id,
462-
JobStatus.FAILED,
463-
message=f"Unexpected error occurred: {e}",
473+
job_status,
474+
message=job_error_message,
464475
)
465476

477+
# Log events
478+
event_log = f"Completed a ray job {job_id} with a status {job_status}."
479+
if job_error_message:
480+
event_log += f" {job_error_message}"
481+
if self.event_logger:
482+
self.event_logger.info(event_log, submission_id=job_id)
483+
466484
# Kill the actor defensively to avoid leaking actors in unexpected error cases.
467485
if job_supervisor is not None:
468486
ray.kill(job_supervisor, no_restart=True)
@@ -584,6 +602,10 @@ async def submit_job(
584602
node_id=ray.get_runtime_context().node_id,
585603
soft=False,
586604
)
605+
if self.event_logger:
606+
self.event_logger.info(
607+
f"Started a ray job {submission_id}.", submission_id=submission_id
608+
)
587609
supervisor = self._supervisor_actor_cls.options(
588610
lifetime="detached",
589611
name=JOB_ACTOR_NAME_TEMPLATE.format(job_id=submission_id),

dashboard/modules/job/tests/test_job_manager.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,14 @@
3131
["""ray start --head --resources={"TestResourceKey":123}"""],
3232
indirect=True,
3333
)
34-
async def test_submit_no_ray_address(call_ray_start): # noqa: F811
34+
async def test_submit_no_ray_address(call_ray_start, tmp_path): # noqa: F811
3535
"""Test that a job script with an unspecified Ray address works."""
3636

3737
address_info = ray.init(address=call_ray_start)
3838
gcs_aio_client = GcsAioClient(
3939
address=address_info["gcs_address"], nums_reconnect_retry=0
4040
)
41-
job_manager = JobManager(gcs_aio_client)
41+
job_manager = JobManager(gcs_aio_client, tmp_path)
4242

4343
init_ray_no_address_script = """
4444
import ray
@@ -78,12 +78,12 @@ def shared_ray_instance():
7878

7979
@pytest.mark.asyncio
8080
@pytest.fixture
81-
async def job_manager(shared_ray_instance):
81+
async def job_manager(shared_ray_instance, tmp_path):
8282
address_info = shared_ray_instance
8383
gcs_aio_client = GcsAioClient(
8484
address=address_info["gcs_address"], nums_reconnect_retry=0
8585
)
86-
yield JobManager(gcs_aio_client)
86+
yield JobManager(gcs_aio_client, tmp_path)
8787

8888

8989
def _driver_script_path(file_name: str) -> str:

python/ray/experimental/state/common.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -448,6 +448,7 @@ class ClusterEventState(StateSchema):
448448
source_type: str = state_column(filterable=True)
449449
message: str = state_column(filterable=False)
450450
event_id: int = state_column(filterable=True)
451+
custom_fields: dict = state_column(filterable=False, detail=True)
451452

452453

453454
@dataclass(init=True)

0 commit comments

Comments
 (0)