-
Notifications
You must be signed in to change notification settings - Fork 7k
[FIX] raise error if job does not terminate in tail_job_logs() #57037
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FIX] raise error if job does not terminate in tail_job_logs() #57037
Conversation
Signed-off-by: machichima <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request fixes an issue where tail_job_logs() would not raise an error if the connection to the Ray head node was lost while a job was still running. The change introduces a check for the job's status when the log-tailing websocket is closed.
My review focuses on improving the performance and correctness of this new logic. The current implementation introduces a blocking, synchronous call inside an async function, and it inefficiently queries the job status on every log message. I've provided a suggestion to make the call non-blocking and to only query the status when necessary, which improves both performance and robustness.
Signed-off-by: machichima <[email protected]>
31cde67 to
aea0717
Compare
| # Query job status after receiving each message to track state | ||
| try: | ||
| job_info = self.get_job_info(job_id) | ||
| job_status = job_info.status | ||
| except Exception as e: | ||
| raise RuntimeError(f"Failed to get job status for {job_id}.") from e | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we query job info and job status outside of the loop?
in this case we only have to query 1 time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to get the up-to-date status before the connection closed, that's why we need to do it in the while loop.
The job info query will be executed each time we got new message, which is not that frequent
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible to check the msg to detect loss of connection?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried checking msg and ws, but cannot really identify the difference between normal close and abnormal one
This is the output for normal close:
❯ python reproduce_tail_logs_issue.py
Job submitted with ID: raysubmit_SVbekdrqykf3mz7c
Starting to tail job logs...
2025-10-02 20:49:50,188 INFO sdk.py:502 -- [DEBUG] msg attributes: type=1, data=2025-10-02 20:49:47,066 INFO job_manager.py:568 -- Runtime env is setting up.
Job started
1
2
, extra=
2025-10-02 20:49:50,188 INFO sdk.py:505 -- [DEBUG] ws attributes: closed=False, close_code=None, protocol=None
LOG: 2025-10-02 20:49:47,066 INFO job_manager.py:568 -- Runtime env is setting up.
Job started
1
2
job status: RUNNING
2025-10-02 20:49:51,190 INFO sdk.py:502 -- [DEBUG] msg attributes: type=1, data=3
, extra=
2025-10-02 20:49:51,191 INFO sdk.py:505 -- [DEBUG] ws attributes: closed=False, close_code=None, protocol=None
LOG: 3
job status: RUNNING
2025-10-02 20:49:52,190 INFO sdk.py:502 -- [DEBUG] msg attributes: type=1, data=4
, extra=
2025-10-02 20:49:52,190 INFO sdk.py:505 -- [DEBUG] ws attributes: closed=False, close_code=None, protocol=None
LOG: 4
job status: RUNNING
2025-10-02 20:49:53,192 INFO sdk.py:502 -- [DEBUG] msg attributes: type=1, data=5
, extra=
2025-10-02 20:49:53,192 INFO sdk.py:505 -- [DEBUG] ws attributes: closed=False, close_code=None, protocol=None
LOG: 5
job status: SUCCEEDED
2025-10-02 20:49:54,193 INFO sdk.py:502 -- [DEBUG] msg attributes: type=1, data=Job completed
, extra=
2025-10-02 20:49:54,193 INFO sdk.py:505 -- [DEBUG] ws attributes: closed=False, close_code=None, protocol=None
LOG: Job completed
job status: SUCCEEDED
2025-10-02 20:49:57,197 INFO sdk.py:502 -- [DEBUG] msg attributes: type=8, data=1000, extra=
2025-10-02 20:49:57,197 INFO sdk.py:505 -- [DEBUG] ws attributes: closed=True, close_code=1000, protocol=None
2025-10-02 20:49:57,197 INFO sdk.py:502 -- [DEBUG] msg attributes: type=257, data=None, extra=None
2025-10-02 20:49:57,197 INFO sdk.py:505 -- [DEBUG] ws attributes: closed=True, close_code=1000, protocol=None
tail_job_logs() returned normally (no exception)
This is terminate ray head before finished
❯ python reproduce_tail_logs_issue.py
Job submitted with ID: raysubmit_DFgSjCpApJcswQFT
Starting to tail job logs...
2025-10-02 20:49:28,838 INFO sdk.py:502 -- [DEBUG] msg attributes: type=1, data=2025-10-02 20:49:25,729 INFO job_manager.py:568 -- Runtime env is setting up.
Job started
1
2
, extra=
2025-10-02 20:49:28,838 INFO sdk.py:505 -- [DEBUG] ws attributes: closed=False, close_code=None, protocol=None
LOG: 2025-10-02 20:49:25,729 INFO job_manager.py:568 -- Runtime env is setting up.
Job started
1
2
job status: RUNNING
2025-10-02 20:49:29,847 INFO sdk.py:502 -- [DEBUG] msg attributes: type=1, data=3
, extra=
2025-10-02 20:49:29,847 INFO sdk.py:505 -- [DEBUG] ws attributes: closed=False, close_code=None, protocol=None
LOG: 3
job status: RUNNING
2025-10-02 20:49:30,076 INFO sdk.py:502 -- [DEBUG] msg attributes: type=8, data=1000, extra=
2025-10-02 20:49:30,076 INFO sdk.py:505 -- [DEBUG] ws attributes: closed=True, close_code=1000, protocol=None
2025-10-02 20:49:30,076 INFO sdk.py:502 -- [DEBUG] msg attributes: type=257, data=None, extra=None
2025-10-02 20:49:30,076 INFO sdk.py:505 -- [DEBUG] ws attributes: closed=True, close_code=1000, protocol=None
tail_job_logs() returned normally (no exception)| ) | ||
|
|
||
| while True: | ||
| msg = await ws.receive() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Won’t this ws.receive raise exception when the connection is broken? Isn’t that enough? I actually think there is no need to query job status.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will not raise error when connection lost actually. I think in older version it will (what we used in python/ray/dashboard/modules/job/tests/backwards_compatibility_scripts/test_backwards_compatibility.sh), but in newer version it will not and just close
Even the closed_code does not have difference compare to normal close
Signed-off-by: machichima <[email protected]>
Signed-off-by: machichima <[email protected]>
| print(f"Close code: {ws.close_code}") | ||
| if ws.close_code == aiohttp.WSCloseCode.ABNORMAL_CLOSURE: | ||
| raise RuntimeError( | ||
| f"WebSocket connection closed unexpectedly while job with close code {ws.close_code}" | ||
| ) | ||
| break | ||
| elif msg.type == aiohttp.WSMsgType.ERROR: | ||
| pass | ||
| # Old Ray versions may send ERROR on connection close | ||
| raise RuntimeError( | ||
| f"WebSocket error while tailing logs for job {job_id}. Err: {ws.exception()}" | ||
| ) | ||
| break |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I LOVE THIS!
| if msg.type == aiohttp.WSMsgType.TEXT: | ||
| yield msg.data | ||
| elif msg.type == aiohttp.WSMsgType.CLOSED: | ||
| print(f"Close code: {ws.close_code}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
del or use logger
| # Old Ray versions may send ERROR on connection close | ||
| raise RuntimeError( | ||
| f"WebSocket error while tailing logs for job {job_id}. Err: {ws.exception()}" | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to handle old Ray versions here. I think we only support job client and job server with the same version?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The client only uses HTTP/websocket protocol so the compatibility requirements are looser than that. We don't give an exact guarantee though.
Signed-off-by: machichima <[email protected]>
| Raises: | ||
| RuntimeError: If the job does not exist or if the request to the | ||
| job server fails. | ||
| RuntimeError: If the job does not exist, if the request to the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it easy to write a test for it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me have a try!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added in 899c05d
Signed-off-by: machichima <[email protected]>
…og-error-handle Signed-off-by: machichima <[email protected]>
Signed-off-by: machichima <[email protected]>
Signed-off-by: machichima <[email protected]>
Signed-off-by: machichima <[email protected]>
Signed-off-by: machichima <[email protected]>
Signed-off-by: machichima <[email protected]>
Signed-off-by: machichima <[email protected]>
Signed-off-by: machichima <[email protected]>
Signed-off-by: machichima <[email protected]>
| finally: | ||
| # Ensure Ray is stopped even if test fails | ||
| subprocess.check_output(["ray", "stop", "--force"]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please use existing fixtures for this, I believe the equivalent one to what you've written is ray_start_regular
ex:
ray/python/ray/dashboard/tests/test_dashboard.py
Line 1110 in f6f14aa
| def test_dashboard_requests_fail_on_missing_deps(ray_start_regular): |
you might need to import it on this line:
| from ray.tests.conftest import _ray_start |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated in 5ed2be4
Thanks!
| break | ||
| elif msg.type == aiohttp.WSMsgType.ERROR: | ||
| pass | ||
| # Old Ray versions may send ERROR on connection close |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when did this behavior change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I encounter the error in test python/ray/dashboard/modules/job/tests/test_backwards_compatibility.py::TestBackwardsCompatibility::test_cli. This gets into here. The test is running test_backwards_compatibility.sh which is using ray version 2.0.1
Signed-off-by: machichima <[email protected]>
Signed-off-by: machichima <[email protected]>
a4fb045 to
1406cd3
Compare
Signed-off-by: machichima <[email protected]>
edoakes
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, a few minor nits. Kicked off premerge CI tests here: https://buildkite.com/ray-project/premerge/builds/51666
|
|
||
| # Kill the dashboard after receiving a few log lines | ||
| if i == 3: | ||
| from ray._private import ray_constants |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need for lazy import here, move it to the top of the file
| from ray.runtime_env.runtime_env import RuntimeEnv, RuntimeEnvConfig | ||
| from ray.tests.conftest import _ray_start | ||
|
|
||
| import psutil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should go before pytest (import ordering is builtins, thirdparty, then ray internal)
I think we have a linter that usually enforces this 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By running pre-commit run --all, the psutil will be put in the end of imports
Signed-off-by: machichima <[email protected]>
|
Moved the test to file When I use RuntimeError: Maybe you called ray.init twice by accident? This error can be suppressed by passing in 'ignore_reinit_error=True' or by calling 'ray.shutdown()' prior to 'ray.init()'. |
…roject#57037) During the execution of tail_job_logs() after the job submission, if the ray head connection breaks, the tail_job_logs() will not raise any error. The error should be raised. Query the rayjob status when receiving the message, and raise error if connection closed with rayjob not in terminate stage. ## Related issue number Closes: ray-project#57002 --------- Signed-off-by: machichima <[email protected]> Signed-off-by: xgui <[email protected]>
During the execution of tail_job_logs() after the job submission, if the ray head connection breaks, the tail_job_logs() will not raise any error. The error should be raised. Query the rayjob status when receiving the message, and raise error if connection closed with rayjob not in terminate stage. ## Related issue number Closes: #57002 --------- Signed-off-by: machichima <[email protected]> Signed-off-by: elliot-barn <[email protected]>
…roject#57037) During the execution of tail_job_logs() after the job submission, if the ray head connection breaks, the tail_job_logs() will not raise any error. The error should be raised. Query the rayjob status when receiving the message, and raise error if connection closed with rayjob not in terminate stage. ## Related issue number Closes: ray-project#57002 --------- Signed-off-by: machichima <[email protected]>
…roject#57037) During the execution of tail_job_logs() after the job submission, if the ray head connection breaks, the tail_job_logs() will not raise any error. The error should be raised. Query the rayjob status when receiving the message, and raise error if connection closed with rayjob not in terminate stage. ## Related issue number Closes: ray-project#57002 --------- Signed-off-by: machichima <[email protected]> Signed-off-by: Aydin Abiar <[email protected]>
Why are these changes needed?
During the execution of tail_job_logs() after the job submission, if the ray head connection breaks, the tail_job_logs() will not raise any error. The error should be raised.
Query the rayjob status when receiving the message, and raise error if connection closed with rayjob not in terminate stage.
Related issue number
Closes: #57002
Checks
git commit -s) in this PR.scripts/format.shto lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/under thecorresponding
.rstfile.Note
Enhances tail_job_logs to track job status and raise errors if the WebSocket closes or errors before the job reaches a terminal state.
python/ray/dashboard/modules/job/sdk.py):get_job_infoand storejob_status.CLOSEDorERROR, raiseRuntimeErrorifjob_statusis not terminal; otherwise exit cleanly.Raisesdocstring to include unexpected connection closure before terminal state.Written by Cursor Bugbot for commit c011722. This will update automatically on new commits. Configure here.