Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
efa79d6
fix: raise err if job not terminate or has err
machichima Sep 30, 2025
aea0717
refactor: fmt
machichima Sep 30, 2025
c011722
fix: not raise err if job terminated
machichima Oct 1, 2025
d4b73e6
fix: get abnormal close from ws
machichima Oct 5, 2025
83ad810
refactor: remove print and use logger
machichima Oct 8, 2025
21e1ecc
fix: log instead of raise error when ws Error
machichima Oct 9, 2025
0dcb9aa
Merge branch 'master' of github.com:ray-project/ray into 57002-tail-l…
machichima Oct 9, 2025
f1bfdd5
refactor: format
machichima Oct 9, 2025
e62dac9
Merge branch 'master' into 57002-tail-log-error-handle
machichima Oct 10, 2025
fc81dee
Merge branch 'master' into 57002-tail-log-error-handle
machichima Oct 11, 2025
d1c0c29
refactor: fix error message
machichima Oct 13, 2025
899c05d
test: add test for abnormal close for tail job log
machichima Oct 13, 2025
a21145f
fix: remove unused tmp dir
machichima Oct 13, 2025
a2e76fd
fix: dashboard-port to port
machichima Oct 13, 2025
2df9d50
refactor: lint
machichima Oct 13, 2025
b51feb2
Merge branch 'master' into 57002-tail-log-error-handle
machichima Oct 13, 2025
8035a6b
test: update test to use existing ray cluster
machichima Oct 14, 2025
681a536
refactor: lint
machichima Oct 14, 2025
5ed2be4
test: use ray_start_regular fixture
machichima Oct 15, 2025
1406cd3
test: kill dashboard instead
machichima Oct 15, 2025
c1e93cd
refactor: lint
machichima Oct 15, 2025
905d742
fix: move test to test_sdk to prevent ray cluster collision
machichima Oct 16, 2025
238e0fb
Merge branch 'master' into 57002-tail-log-error-handle
machichima Oct 16, 2025
5186f4c
Merge branch 'master' into 57002-tail-log-error-handle
machichima Oct 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions python/ray/dashboard/modules/job/sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,8 +482,9 @@ async def tail_job_logs(self, job_id: str) -> AsyncIterator[str]:
The iterator.

Raises:
RuntimeError: If the job does not exist or if the request to the
job server fails.
RuntimeError: If the job does not exist, if the request to the
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it easy to write a test for it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me have a try!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in 899c05d

job server fails, or if the connection closes unexpectedly
before the job reaches a terminal state.
"""
async with aiohttp.ClientSession(
cookies=self._cookies, headers=self._headers
Expand All @@ -498,6 +499,17 @@ async def tail_job_logs(self, job_id: str) -> AsyncIterator[str]:
if msg.type == aiohttp.WSMsgType.TEXT:
yield msg.data
elif msg.type == aiohttp.WSMsgType.CLOSED:
logger.debug(
f"WebSocket closed for job {job_id} with close code {ws.close_code}"
)
if ws.close_code == aiohttp.WSCloseCode.ABNORMAL_CLOSURE:
raise RuntimeError(
f"WebSocket connection closed unexpectedly with close code {ws.close_code}"
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: WebSocket Error Message Formatting Issue

The RuntimeError message for abnormal WebSocket closures is malformed, missing the job_id and containing grammatical errors.

Fix in Cursor Fix in Web

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: WebSocket Closure Triggers Unintended Job Errors

The tail_job_logs function raises a RuntimeError on any abnormal WebSocket closure without checking the job's status. The original intent was to only raise if the job isn't in a terminal state. This can lead to false positive errors, reporting successful jobs as failed due to unrelated network issues.

Fix in Cursor Fix in Web

break
elif msg.type == aiohttp.WSMsgType.ERROR:
pass
# Old Ray versions may send ERROR on connection close
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when did this behavior change?

Copy link
Contributor Author

@machichima machichima Oct 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I encounter the error in test python/ray/dashboard/modules/job/tests/test_backwards_compatibility.py::TestBackwardsCompatibility::test_cli. This gets into here. The test is running test_backwards_compatibility.sh which is using ray version 2.0.1

logger.debug(
f"WebSocket error for job {job_id}, treating as normal close. Err: {ws.exception()}"
)
break
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,7 @@ def f():
yield {
"runtime_env": {"py_modules": [str(Path(tmp_dir) / "test_module")]},
"entrypoint": (
"python -c 'import test_module;"
"print(test_module.run_test())'"
"python -c 'import test_module;print(test_module.run_test())'"
),
"expected_logs": "Hello from test_module!\n",
}
Expand Down
54 changes: 50 additions & 4 deletions python/ray/dashboard/modules/job/tests/test_sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@

import ray.experimental.internal_kv as kv
from ray._common.test_utils import wait_for_condition
from ray._private import worker
from ray._private.ray_constants import (
KV_NAMESPACE_DASHBOARD,
PROCESS_TYPE_DASHBOARD,
)
from ray._private.test_utils import (
format_web_url,
Expand All @@ -35,6 +37,8 @@
from ray.tests.conftest import _ray_start
from ray.util.state import list_nodes

import psutil


def _check_job_succeeded(client: JobSubmissionClient, job_id: str) -> bool:
status = client.get_job_status(job_id)
Expand Down Expand Up @@ -80,10 +84,13 @@ def test_parse_cluster_info(

address, module_string, inner_address = address_param

with patch.multiple(
"ray.dashboard.modules.dashboard_sdk",
get_job_submission_client_cluster_info=mock_get_job_submission_client_cluster,
), patch.multiple("importlib", import_module=mock_import_module):
with (
patch.multiple(
"ray.dashboard.modules.dashboard_sdk",
get_job_submission_client_cluster_info=mock_get_job_submission_client_cluster,
),
patch.multiple("importlib", import_module=mock_import_module),
):
if module_string == "ray":
with pytest.raises(ValueError, match="ray://"):
parse_cluster_info(
Expand Down Expand Up @@ -289,5 +296,44 @@ def test_job_submission_with_runtime_env_as_object(
assert "gcs://" in parsed_runtime_env["py_modules"][0]


@pytest.mark.asyncio
async def test_tail_job_logs_websocket_abnormal_closure(ray_start_regular):
"""
Test that ABNORMAL_CLOSURE raises RuntimeError when tailing logs.

This test uses its own Ray cluster and kills the dashboard while tailing logs
to simulate an abnormal WebSocket closure.
"""
dashboard_url = ray_start_regular.dashboard_url
client = JobSubmissionClient(format_web_url(dashboard_url))

# Submit a long-running job
driver_script = """
import time
for i in range(100):
print("Hello", i)
time.sleep(0.5)
"""
entrypoint = f"python -c '{driver_script}'"
job_id = client.submit_job(entrypoint=entrypoint)

# Start tailing logs and stop Ray while tailing
# Expect RuntimeError when WebSocket closes abnormally
with pytest.raises(
RuntimeError,
match="WebSocket connection closed unexpectedly with close code",
):
i = 0
async for lines in client.tail_job_logs(job_id):
print(lines, end="")
i += 1

# Kill the dashboard after receiving a few log lines
if i == 3:
print("\nKilling the dashboard to close websocket abnormally...")
dash_info = worker._global_node.all_processes[PROCESS_TYPE_DASHBOARD][0]
psutil.Process(dash_info.process.pid).kill()


if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))