Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 15 additions & 70 deletions dashboard/modules/event/tests/test_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
from ray.dashboard.modules.event.event_utils import (
monitor_events,
)
from ray.experimental.state.api import list_cluster_events
from ray.job_submission import JobSubmissionClient

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -305,74 +303,21 @@ async def _check_events(expect_events, read_events, timeout=10):
# assert infeasible_event["source_type"] == "AUTOSCALER"


def test_jobs_cluster_events(shutdown_only):
ray.init()
address = ray._private.worker._global_node.webui_url
address = format_web_url(address)
client = JobSubmissionClient(address)
submission_id = client.submit_job(entrypoint="ls")

def verify():
events = list_cluster_events()
print(events)
assert len(list_cluster_events()) == 2
start_event = events[0]
completed_event = events[1]

assert start_event["source_type"] == "JOBS"
assert f"Started a ray job {submission_id}" in start_event["message"]
assert start_event["severity"] == "INFO"
assert completed_event["source_type"] == "JOBS"
assert (
f"Completed a ray job {submission_id} with a status SUCCEEDED."
== completed_event["message"]
)
assert completed_event["severity"] == "INFO"
return True

print("Test successful job run.")
wait_for_condition(verify)

# Test the failure case. In this part, job fails because the runtime env
# creation fails.
submission_id = client.submit_job(
entrypoint="ls",
runtime_env={"working_dir": "s3://does_not_exist.zip"},
)

def verify():
events = list_cluster_events(detail=True)
failed_events = []

for e in events:
if (
"submission_id" in e["custom_fields"]
and e["custom_fields"]["submission_id"] == submission_id
):
failed_events.append(e)

assert len(failed_events) == 2
failed_start = failed_events[0]
failed_completed = failed_events[1]

assert failed_start["source_type"] == "JOBS"
assert f"Started a ray job {submission_id}" in failed_start["message"]
assert failed_completed["source_type"] == "JOBS"
assert (
f"Completed a ray job {submission_id} with a status FAILED."
in failed_completed["message"]
)
print(failed_completed["message"])
# TODO(sang): Reenable it.
# # Make sure the error message is included.
# assert (
# "An error occurred (ExpiredToken) when calling the "
# "GetObject operation:" in failed_completed["message"]
# )
return True

print("Test failed (runtime_env failure) job run.")
wait_for_condition(verify)
# def test_jobs_cluster_events(shutdown_only):
# ray.init()
# address = ray._private.worker._global_node.webui_url
# address = format_web_url(address)
# client = JobSubmissionClient(address)
# client.submit_job(entrypoint="ls")

# def verify():
# assert len(list_cluster_events()) == 3
# for e in list_cluster_events():
# e["source_type"] = "JOBS"
# return True

# wait_for_condition(verify)
# print(list_cluster_events())


if __name__ == "__main__":
Expand Down
4 changes: 1 addition & 3 deletions dashboard/modules/job/job_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,7 @@ async def tail_job_logs(self, req: Request) -> Response:

def get_job_manager(self):
if not self._job_manager:
self._job_manager = JobManager(
self._dashboard_agent.gcs_aio_client, self._dashboard_agent.log_dir
)
self._job_manager = JobManager(self._dashboard_agent.gcs_aio_client)
return self._job_manager

async def run(self, server):
Expand Down
4 changes: 1 addition & 3 deletions dashboard/modules/job/job_head.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,9 +357,7 @@ async def tail_job_logs(self, req: Request) -> Response:

async def run(self, server):
if not self._job_manager:
self._job_manager = JobManager(
self._dashboard_head.gcs_aio_client, self._dashboard_head.log_dir
)
self._job_manager = JobManager(self._dashboard_head.gcs_aio_client)

@staticmethod
def is_minimal_module():
Expand Down
32 changes: 5 additions & 27 deletions dashboard/modules/job/job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
from ray.exceptions import RuntimeEnvSetupError
from ray.job_submission import JobStatus
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy
from ray._private.event.event_logger import get_event_logger
from ray.core.generated.event_pb2 import Event

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -387,16 +385,12 @@ class JobManager:
LOG_TAIL_SLEEP_S = 1
JOB_MONITOR_LOOP_PERIOD_S = 1

def __init__(self, gcs_aio_client: GcsAioClient, logs_dir: str):
def __init__(self, gcs_aio_client: GcsAioClient):
self._gcs_aio_client = gcs_aio_client
self._job_info_client = JobInfoStorageClient(gcs_aio_client)
self._gcs_address = gcs_aio_client._channel._gcs_address
self._log_client = JobLogStorageClient()
self._supervisor_actor_cls = ray.remote(JobSupervisor)
try:
self.event_logger = get_event_logger(Event.SourceType.JOBS, logs_dir)
except Exception:
self.event_logger = None

create_task(self._recover_running_jobs())

Expand Down Expand Up @@ -448,39 +442,27 @@ async def _monitor_job(
except Exception as e:
is_alive = False
job_status = await self._job_info_client.get_status(job_id)
job_error_message = None
if job_status.is_terminal():
# If the job is already in a terminal state, then the actor
# exiting is expected.
pass
elif isinstance(e, RuntimeEnvSetupError):
logger.info(f"Failed to set up runtime_env for job {job_id}.")
job_error_message = f"runtime_env setup failed: {e}"
job_status = JobStatus.FAILED
await self._job_info_client.put_status(
job_id,
job_status,
message=job_error_message,
JobStatus.FAILED,
message=f"runtime_env setup failed: {e}",
)
else:
logger.warning(
f"Job supervisor for job {job_id} failed unexpectedly: {e}."
)
job_error_message = f"Unexpected error occurred: {e}"
job_status = JobStatus.FAILED
await self._job_info_client.put_status(
job_id,
job_status,
message=job_error_message,
JobStatus.FAILED,
message=f"Unexpected error occurred: {e}",
)

# Log events
event_log = f"Completed a ray job {job_id} with a status {job_status}."
if job_error_message:
event_log += f" {job_error_message}"
if self.event_logger:
self.event_logger.info(event_log, submission_id=job_id)

# Kill the actor defensively to avoid leaking actors in unexpected error cases.
if job_supervisor is not None:
ray.kill(job_supervisor, no_restart=True)
Expand Down Expand Up @@ -602,10 +584,6 @@ async def submit_job(
node_id=ray.get_runtime_context().node_id,
soft=False,
)
if self.event_logger:
self.event_logger.info(
f"Started a ray job {submission_id}.", submission_id=submission_id
)
supervisor = self._supervisor_actor_cls.options(
lifetime="detached",
name=JOB_ACTOR_NAME_TEMPLATE.format(job_id=submission_id),
Expand Down
8 changes: 4 additions & 4 deletions dashboard/modules/job/tests/test_job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@
["""ray start --head --resources={"TestResourceKey":123}"""],
indirect=True,
)
async def test_submit_no_ray_address(call_ray_start, tmp_path): # noqa: F811
async def test_submit_no_ray_address(call_ray_start): # noqa: F811
"""Test that a job script with an unspecified Ray address works."""

address_info = ray.init(address=call_ray_start)
gcs_aio_client = GcsAioClient(
address=address_info["gcs_address"], nums_reconnect_retry=0
)
job_manager = JobManager(gcs_aio_client, tmp_path)
job_manager = JobManager(gcs_aio_client)

init_ray_no_address_script = """
import ray
Expand Down Expand Up @@ -78,12 +78,12 @@ def shared_ray_instance():

@pytest.mark.asyncio
@pytest.fixture
async def job_manager(shared_ray_instance, tmp_path):
async def job_manager(shared_ray_instance):
address_info = shared_ray_instance
gcs_aio_client = GcsAioClient(
address=address_info["gcs_address"], nums_reconnect_retry=0
)
yield JobManager(gcs_aio_client, tmp_path)
yield JobManager(gcs_aio_client)


def _driver_script_path(file_name: str) -> str:
Expand Down
1 change: 0 additions & 1 deletion python/ray/experimental/state/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,6 @@ class ClusterEventState(StateSchema):
source_type: str = state_column(filterable=True)
message: str = state_column(filterable=False)
event_id: int = state_column(filterable=True)
custom_fields: dict = state_column(filterable=False, detail=True)


@dataclass(init=True)
Expand Down