Skip to content
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
efa79d6
fix: raise err if job not terminate or has err
machichima Sep 30, 2025
aea0717
refactor: fmt
machichima Sep 30, 2025
c011722
fix: not raise err if job terminated
machichima Oct 1, 2025
d4b73e6
fix: get abnormal close from ws
machichima Oct 5, 2025
83ad810
refactor: remove print and use logger
machichima Oct 8, 2025
21e1ecc
fix: log instead of raise error when ws Error
machichima Oct 9, 2025
0dcb9aa
Merge branch 'master' of github.com:ray-project/ray into 57002-tail-l…
machichima Oct 9, 2025
f1bfdd5
refactor: format
machichima Oct 9, 2025
e62dac9
Merge branch 'master' into 57002-tail-log-error-handle
machichima Oct 10, 2025
fc81dee
Merge branch 'master' into 57002-tail-log-error-handle
machichima Oct 11, 2025
d1c0c29
refactor: fix error message
machichima Oct 13, 2025
899c05d
test: add test for abnormal close for tail job log
machichima Oct 13, 2025
a21145f
fix: remove unused tmp dir
machichima Oct 13, 2025
a2e76fd
fix: dashboard-port to port
machichima Oct 13, 2025
2df9d50
refactor: lint
machichima Oct 13, 2025
b51feb2
Merge branch 'master' into 57002-tail-log-error-handle
machichima Oct 13, 2025
8035a6b
test: update test to use existing ray cluster
machichima Oct 14, 2025
681a536
refactor: lint
machichima Oct 14, 2025
5ed2be4
test: use ray_start_regular fixture
machichima Oct 15, 2025
1406cd3
test: kill dashboard instead
machichima Oct 15, 2025
c1e93cd
refactor: lint
machichima Oct 15, 2025
905d742
fix: move test to test_sdk to prevent ray cluster collision
machichima Oct 16, 2025
238e0fb
Merge branch 'master' into 57002-tail-log-error-handle
machichima Oct 16, 2025
5186f4c
Merge branch 'master' into 57002-tail-log-error-handle
machichima Oct 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions python/ray/dashboard/modules/job/sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,8 +482,9 @@ async def tail_job_logs(self, job_id: str) -> AsyncIterator[str]:
The iterator.

Raises:
RuntimeError: If the job does not exist or if the request to the
job server fails.
RuntimeError: If the job does not exist, if the request to the
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it easy to write a test for it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me have a try!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in 899c05d

job server fails, or if the connection closes unexpectedly
before the job reaches a terminal state.
"""
async with aiohttp.ClientSession(
cookies=self._cookies, headers=self._headers
Expand All @@ -498,6 +499,17 @@ async def tail_job_logs(self, job_id: str) -> AsyncIterator[str]:
if msg.type == aiohttp.WSMsgType.TEXT:
yield msg.data
elif msg.type == aiohttp.WSMsgType.CLOSED:
logger.debug(
f"WebSocket closed for job {job_id} with close code {ws.close_code}"
)
if ws.close_code == aiohttp.WSCloseCode.ABNORMAL_CLOSURE:
raise RuntimeError(
f"WebSocket connection closed unexpectedly with close code {ws.close_code}"
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: WebSocket Error Message Formatting Issue

The RuntimeError message for abnormal WebSocket closures is malformed, missing the job_id and containing grammatical errors.

Fix in Cursor Fix in Web

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: WebSocket Closure Triggers Unintended Job Errors

The tail_job_logs function raises a RuntimeError on any abnormal WebSocket closure without checking the job's status. The original intent was to only raise if the job isn't in a terminal state. This can lead to false positive errors, reporting successful jobs as failed due to unrelated network issues.

Fix in Cursor Fix in Web

break
elif msg.type == aiohttp.WSMsgType.ERROR:
pass
# Old Ray versions may send ERROR on connection close
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when did this behavior change?

Copy link
Contributor Author

@machichima machichima Oct 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I encounter the error in test python/ray/dashboard/modules/job/tests/test_backwards_compatibility.py::TestBackwardsCompatibility::test_cli. This gets into here. The test is running test_backwards_compatibility.sh which is using ray version 2.0.1

logger.debug(
f"WebSocket error for job {job_id}, treating as normal close. Err: {ws.exception()}"
)
break
48 changes: 46 additions & 2 deletions python/ray/dashboard/modules/job/tests/test_http_job_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
from ray.runtime_env.runtime_env import RuntimeEnv, RuntimeEnvConfig
from ray.tests.conftest import _ray_start

import psutil
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should go before pytest (import ordering is builtins, thirdparty, then ray internal)

I think we have a linter that usually enforces this 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By running pre-commit run --all, the psutil will be put in the end of imports


# This test requires you have AWS credentials set up (any AWS credentials will
# do, this test only accesses a public bucket).

Expand Down Expand Up @@ -268,8 +270,7 @@ def f():
yield {
"runtime_env": {"py_modules": [str(Path(tmp_dir) / "test_module")]},
"entrypoint": (
"python -c 'import test_module;"
"print(test_module.run_test())'"
"python -c 'import test_module;print(test_module.run_test())'"
),
"expected_logs": "Hello from test_module!\n",
}
Expand Down Expand Up @@ -711,6 +712,49 @@ async def test_tail_job_logs(job_sdk_client):
wait_for_condition(_check_job_succeeded, client=client, job_id=job_id)


@pytest.mark.asyncio
async def test_tail_job_logs_websocket_abnormal_closure(ray_start_regular):
"""
Test that ABNORMAL_CLOSURE raises RuntimeError when tailing logs.

This test starts its own Ray cluster using default ports,
then forcefully stops Ray while tailing logs to simulate an abnormal WebSocket closure.
"""
dashboard_url = ray_start_regular.dashboard_url
client = JobSubmissionClient(format_web_url(dashboard_url))

# Submit a long-running job
driver_script = """
import time
for i in range(100):
print("Hello", i)
time.sleep(0.5)
"""
entrypoint = f"python -c '{driver_script}'"
job_id = client.submit_job(entrypoint=entrypoint)

# Start tailing logs and stop Ray while tailing
# Expect RuntimeError when WebSocket closes abnormally
with pytest.raises(
RuntimeError,
match="WebSocket connection closed unexpectedly with close code",
):
i = 0
async for lines in client.tail_job_logs(job_id):
print(lines, end="")
i += 1

# Kill the dashboard after receiving a few log lines
if i == 3:
from ray._private import ray_constants
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need for lazy import here, move it to the top of the file


print("\nKilling the dashboard to close websocket abnormally...")
dash_info = ray._private.worker._global_node.all_processes[
ray_constants.PROCESS_TYPE_DASHBOARD
][0]
psutil.Process(dash_info.process.pid).kill()


def _hook(env):
with open(env["env_vars"]["TEMPPATH"], "w+") as f:
f.write(env["env_vars"]["TOKEN"])
Expand Down