Skip to content
Merged
3 changes: 3 additions & 0 deletions python/ray/dashboard/modules/job/job_supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,9 @@ def _exec_entrypoint(self, env: dict, logs_path: str) -> subprocess.Popen:
# Open in append mode to avoid overwriting runtime_env setup logs for the
# supervisor actor, which are also written to the same file.
with open(logs_path, "a") as logs_file:
logs_file.write(
f"Running entrypoint for job {self._job_id}: {self._entrypoint}\n"
)
child_process = subprocess.Popen(
self._entrypoint,
shell=True,
Expand Down
10 changes: 8 additions & 2 deletions python/ray/dashboard/modules/job/tests/test_cli_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,10 @@ def test_basic_submit(self, ray_start_stop):
"""Should tail logs and wait for process to exit."""
cmd = "sleep 1 && echo hello && sleep 1 && echo hello"
stdout, _ = _run_cmd(f"ray job submit -- bash -c '{cmd}'")
assert stdout.count("hello") == 2

# 'hello' should appear four times: twice when we print the entrypoint, then
# two more times in the logs from the `echo`.
assert stdout.count("hello") == 4
assert "succeeded" in stdout

def test_submit_no_wait(self, ray_start_stop):
Expand All @@ -200,7 +203,10 @@ def test_submit_with_logs_instant_job(self, ray_start_stop):
"""Should exit immediately and print logs even if job returns instantly."""
cmd = "echo hello"
stdout, _ = _run_cmd(f"ray job submit -- bash -c '{cmd}'")
assert "hello" in stdout

# 'hello' should appear twice: once when we print the entrypoint, then
# again from the `echo`.
assert stdout.count("hello") == 2

def test_multiple_ray_init(self, ray_start_stop):
cmd = (
Expand Down
5 changes: 4 additions & 1 deletion python/ray/dashboard/modules/job/tests/test_job_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,10 @@ async def test_tail_job_logs_with_echo(job_sdk_client):
async for lines in agent_client.tail_job_logs(job_id):
print(lines, end="")
for line in lines.strip().split("\n"):
if "Runtime env is setting up." in line:
if (
"Runtime env is setting up." in line
or "Running entrypoint for job" in line
):
continue
assert line.split(" ") == ["Hello", str(i)]
i += 1
Expand Down
24 changes: 17 additions & 7 deletions python/ray/dashboard/modules/job/tests/test_job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,8 +335,9 @@ async def test_runtime_env_setup_logged_to_job_driver_logs(
gcs_client = ray._private.worker.global_worker.gcs_client
job_manager = JobManager(gcs_client, tmp_path)

entrypoint = "echo hello 1"
job_id = await job_manager.submit_job(
entrypoint="echo hello 1", submission_id="test_runtime_env_setup_logs"
entrypoint=entrypoint, submission_id="test_runtime_env_setup_logs"
)
await async_wait_for_condition(
check_job_succeeded, job_manager=job_manager, job_id=job_id
Expand All @@ -347,10 +348,10 @@ async def test_runtime_env_setup_logged_to_job_driver_logs(
ray._private.worker._global_node.get_logs_dir_path(),
f"job-driver-{job_id}.log",
)
start_message = "Runtime env is setting up."
with open(job_driver_log_path, "r") as f:
logs = f.read()
assert start_message in logs
assert "Runtime env is setting up." in logs
assert f"Running entrypoint for job {job_id}: {entrypoint}" in logs


@pytest.mark.asyncio
Expand Down Expand Up @@ -1040,7 +1041,9 @@ async def _tail_and_assert_logs(
i = 0
async for lines in job_manager.tail_job_logs(job_id):
assert all(
s == expected_log or "Runtime env" in s
s == expected_log
or "Runtime env" in s
or "Running entrypoint for job" in s
for s in lines.strip().split("\n")
)
print(lines, end="")
Expand Down Expand Up @@ -1080,7 +1083,9 @@ async def test_successful_job(self, job_manager):

async for lines in job_manager.tail_job_logs(job_id):
assert all(
s == "Waiting..." or "Runtime env" in s
s == "Waiting..."
or "Runtime env" in s
or "Running entrypoint for job" in s
for s in lines.strip().split("\n")
)
print(lines, end="")
Expand All @@ -1104,7 +1109,9 @@ async def test_failed_job(self, job_manager):

async for lines in job_manager.tail_job_logs(job_id):
assert all(
s == "Waiting..." or "Runtime env" in s
s == "Waiting..."
or "Runtime env" in s
or "Running entrypoint for job" in s
for s in lines.strip().split("\n")
)
print(lines, end="")
Expand Down Expand Up @@ -1133,7 +1140,10 @@ async def test_stopped_job(self, job_manager):

async for lines in job_manager.tail_job_logs(job_id):
assert all(
s == "Waiting..." or s == "Terminated" or "Runtime env" in s
s == "Waiting..."
or s == "Terminated"
or "Runtime env" in s
or "Running entrypoint for job" in s
for s in lines.strip().split("\n")
)
print(lines, end="")
Expand Down