Skip to content

Commit c8b60a6

Browse files
chrisfellowes-anyscaleedoakes
authored andcommitted
[core] add entrypoint log for jobs (ray-project#58300)
this helps prevent an edge case when using file based log exporters like vector that use fingerprinting [ref](https://vector.dev/docs/reference/configuration/sources/file/#fingerprint) to identify unique files. example edge case that this fixes: two jobs are submitted to a cluster and begin executing at the same time, they both contain an invalid entrypoint that references a nonexistant file before fix: - both jobs have the identical "Runtime env is setting up" log with identical timestamps - both jobs have identical entrypoint failure logs as a result, the log files for these jobs are identical, so vector will only export one. after fix: - both jobs have the identical "Runtime env is setting up" log with identical timestamps - each job has a **unique** entrypoint log containing its job_id - both jobs have identical entrypoint failure logs vector can differentiate between these two files, so both will be exported --------- Signed-off-by: Chris Fellowes <[email protected]> Signed-off-by: chrisfellowes <[email protected]> Signed-off-by: Edward Oakes <[email protected]> Co-authored-by: Edward Oakes <[email protected]>
1 parent 1310a3c commit c8b60a6

File tree

4 files changed

+32
-10
lines changed

4 files changed

+32
-10
lines changed

python/ray/dashboard/modules/job/job_supervisor.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,9 @@ def _exec_entrypoint(self, env: dict, logs_path: str) -> subprocess.Popen:
173173
# Open in append mode to avoid overwriting runtime_env setup logs for the
174174
# supervisor actor, which are also written to the same file.
175175
with open(logs_path, "a") as logs_file:
176+
logs_file.write(
177+
f"Running entrypoint for job {self._job_id}: {self._entrypoint}\n"
178+
)
176179
child_process = subprocess.Popen(
177180
self._entrypoint,
178181
shell=True,

python/ray/dashboard/modules/job/tests/test_cli_integration.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,10 @@ def test_basic_submit(self, ray_start_stop):
186186
"""Should tail logs and wait for process to exit."""
187187
cmd = "sleep 1 && echo hello && sleep 1 && echo hello"
188188
stdout, _ = _run_cmd(f"ray job submit -- bash -c '{cmd}'")
189-
assert stdout.count("hello") == 2
189+
190+
# 'hello' should appear four times: twice when we print the entrypoint, then
191+
# two more times in the logs from the `echo`.
192+
assert stdout.count("hello") == 4
190193
assert "succeeded" in stdout
191194

192195
def test_submit_no_wait(self, ray_start_stop):
@@ -200,7 +203,10 @@ def test_submit_with_logs_instant_job(self, ray_start_stop):
200203
"""Should exit immediately and print logs even if job returns instantly."""
201204
cmd = "echo hello"
202205
stdout, _ = _run_cmd(f"ray job submit -- bash -c '{cmd}'")
203-
assert "hello" in stdout
206+
207+
# 'hello' should appear twice: once when we print the entrypoint, then
208+
# again from the `echo`.
209+
assert stdout.count("hello") == 2
204210

205211
def test_multiple_ray_init(self, ray_start_stop):
206212
cmd = (

python/ray/dashboard/modules/job/tests/test_job_agent.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -428,7 +428,10 @@ async def test_tail_job_logs_with_echo(job_sdk_client):
428428
async for lines in agent_client.tail_job_logs(job_id):
429429
print(lines, end="")
430430
for line in lines.strip().split("\n"):
431-
if "Runtime env is setting up." in line:
431+
if (
432+
"Runtime env is setting up." in line
433+
or "Running entrypoint for job" in line
434+
):
432435
continue
433436
assert line.split(" ") == ["Hello", str(i)]
434437
i += 1

python/ray/dashboard/modules/job/tests/test_job_manager.py

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -335,8 +335,9 @@ async def test_runtime_env_setup_logged_to_job_driver_logs(
335335
gcs_client = ray._private.worker.global_worker.gcs_client
336336
job_manager = JobManager(gcs_client, tmp_path)
337337

338+
entrypoint = "echo hello 1"
338339
job_id = await job_manager.submit_job(
339-
entrypoint="echo hello 1", submission_id="test_runtime_env_setup_logs"
340+
entrypoint=entrypoint, submission_id="test_runtime_env_setup_logs"
340341
)
341342
await async_wait_for_condition(
342343
check_job_succeeded, job_manager=job_manager, job_id=job_id
@@ -347,10 +348,10 @@ async def test_runtime_env_setup_logged_to_job_driver_logs(
347348
ray._private.worker._global_node.get_logs_dir_path(),
348349
f"job-driver-{job_id}.log",
349350
)
350-
start_message = "Runtime env is setting up."
351351
with open(job_driver_log_path, "r") as f:
352352
logs = f.read()
353-
assert start_message in logs
353+
assert "Runtime env is setting up." in logs
354+
assert f"Running entrypoint for job {job_id}: {entrypoint}" in logs
354355

355356

356357
@pytest.mark.asyncio
@@ -1040,7 +1041,9 @@ async def _tail_and_assert_logs(
10401041
i = 0
10411042
async for lines in job_manager.tail_job_logs(job_id):
10421043
assert all(
1043-
s == expected_log or "Runtime env" in s
1044+
s == expected_log
1045+
or "Runtime env" in s
1046+
or "Running entrypoint for job" in s
10441047
for s in lines.strip().split("\n")
10451048
)
10461049
print(lines, end="")
@@ -1080,7 +1083,9 @@ async def test_successful_job(self, job_manager):
10801083

10811084
async for lines in job_manager.tail_job_logs(job_id):
10821085
assert all(
1083-
s == "Waiting..." or "Runtime env" in s
1086+
s == "Waiting..."
1087+
or "Runtime env" in s
1088+
or "Running entrypoint for job" in s
10841089
for s in lines.strip().split("\n")
10851090
)
10861091
print(lines, end="")
@@ -1104,7 +1109,9 @@ async def test_failed_job(self, job_manager):
11041109

11051110
async for lines in job_manager.tail_job_logs(job_id):
11061111
assert all(
1107-
s == "Waiting..." or "Runtime env" in s
1112+
s == "Waiting..."
1113+
or "Runtime env" in s
1114+
or "Running entrypoint for job" in s
11081115
for s in lines.strip().split("\n")
11091116
)
11101117
print(lines, end="")
@@ -1133,7 +1140,10 @@ async def test_stopped_job(self, job_manager):
11331140

11341141
async for lines in job_manager.tail_job_logs(job_id):
11351142
assert all(
1136-
s == "Waiting..." or s == "Terminated" or "Runtime env" in s
1143+
s == "Waiting..."
1144+
or s == "Terminated"
1145+
or "Runtime env" in s
1146+
or "Running entrypoint for job" in s
11371147
for s in lines.strip().split("\n")
11381148
)
11391149
print(lines, end="")

0 commit comments

Comments
 (0)