From c5a513c5184d51df3bc42d83a5a75f567859419e Mon Sep 17 00:00:00 2001 From: Daniel Huang Date: Fri, 27 Feb 2026 13:53:25 -0800 Subject: [PATCH 1/8] Check ray task alive Signed-off-by: Daniel Huang --- vllm_omni/distributed/ray_utils/utils.py | 13 +++++++++++-- vllm_omni/entrypoints/omni_stage.py | 12 ++++++++---- 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/vllm_omni/distributed/ray_utils/utils.py b/vllm_omni/distributed/ray_utils/utils.py index 07513b6601e..ec65cc9fed5 100644 --- a/vllm_omni/distributed/ray_utils/utils.py +++ b/vllm_omni/distributed/ray_utils/utils.py @@ -176,6 +176,15 @@ def run(self, func, *args, **kwargs): runtime_env={"env_vars": {"PYTHONPATH": os.environ.get("PYTHONPATH", "")}, "CUDA_LAUNCH_BLOCKING": "1"}, ).remote() - worker_actor.run.remote(worker_entry_fn, *args, **kwargs) + task_ref = worker_actor.run.remote(worker_entry_fn, *args, **kwargs) - return worker_actor + return worker_actor, task_ref + + +def is_ray_task_alive(task_ref: Any, **kwargs): + """Checks ray task status. Returns FALSE if ray task has exited for any reason.""" + if not RAY_AVAILABLE: + raise ImportError("ray is required to query ray tasks") + + ready, _ = ray.wait([task_ref], **kwargs) + return not bool(ready) diff --git a/vllm_omni/entrypoints/omni_stage.py b/vllm_omni/entrypoints/omni_stage.py index 19091034a38..9cd71aefea3 100644 --- a/vllm_omni/entrypoints/omni_stage.py +++ b/vllm_omni/entrypoints/omni_stage.py @@ -36,7 +36,7 @@ from vllm_omni.distributed.omni_connectors import build_stage_connectors from vllm_omni.distributed.omni_connectors.adapter import try_recv_via_connector from vllm_omni.distributed.omni_connectors.connectors.base import OmniConnectorBase -from vllm_omni.distributed.ray_utils.utils import kill_ray_actor, start_ray_actor +from vllm_omni.distributed.ray_utils.utils import is_ray_task_alive, kill_ray_actor, start_ray_actor from vllm_omni.engine.arg_utils import AsyncOmniEngineArgs, OmniEngineArgs from vllm_omni.entrypoints.async_omni_diffusion import AsyncOmniDiffusion from vllm_omni.entrypoints.async_omni_llm import AsyncOmniLLM @@ -303,6 +303,8 @@ def __init__(self, stage_config: Any, stage_init_timeout: int = 300): self._in_q: mp.queues.Queue | ZmqQueue | str | None = None self._out_q: mp.queues.Queue | ZmqQueue | str | None = None self._proc: mp.Process | None = None + self._ray_actor: Any | None = None + self._ray_task_ref: Any | None = None self._shm_threshold_bytes: int = 65536 self._stage_init_timeout: int = stage_init_timeout # Callback used by the orchestrator's output handler to stash @@ -544,7 +546,7 @@ def init_stage_worker( os.environ["VLLM_LOGGING_PREFIX"] = new_env if worker_backend == "ray": if is_async: - self._ray_actor = start_ray_actor( + self._ray_actor, self._ray_task_ref = start_ray_actor( _stage_worker_async_entry, ray_placement_group, self.stage_id, @@ -556,7 +558,7 @@ def init_stage_worker( stage_init_timeout=self._stage_init_timeout, ) else: - self._ray_actor = start_ray_actor( + self._ray_actor, self._ray_task_ref = start_ray_actor( _stage_worker, ray_placement_group, self.stage_id, @@ -629,7 +631,7 @@ def stop_stage_worker(self) -> None: pass self._out_q = None - if hasattr(self, "_ray_actor") and self._ray_actor: + if self._ray_actor is not None: kill_ray_actor(self._ray_actor) self._ray_actor = None elif self._proc is not None: @@ -691,6 +693,8 @@ def try_collect(self) -> dict[str, Any] | None: request_id, engine_outputs (or engine_outputs_shm), and metrics. """ assert self._out_q is not None + if self._ray_task_ref is not None and is_ray_task_alive(self._ray_task_ref, timeout=0): + raise RuntimeError("OmniStage Ray actor died unexpectedly") try: return self._out_q.get_nowait() except queue.Empty: From cbd17b628f69c66c7ce9781de09f21ce7149cd62 Mon Sep 17 00:00:00 2001 From: Daniel Huang Date: Fri, 27 Feb 2026 14:03:27 -0800 Subject: [PATCH 2/8] Fix logic Signed-off-by: Daniel Huang --- vllm_omni/entrypoints/omni_stage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm_omni/entrypoints/omni_stage.py b/vllm_omni/entrypoints/omni_stage.py index 9cd71aefea3..af05937949e 100644 --- a/vllm_omni/entrypoints/omni_stage.py +++ b/vllm_omni/entrypoints/omni_stage.py @@ -693,7 +693,7 @@ def try_collect(self) -> dict[str, Any] | None: request_id, engine_outputs (or engine_outputs_shm), and metrics. """ assert self._out_q is not None - if self._ray_task_ref is not None and is_ray_task_alive(self._ray_task_ref, timeout=0): + if self._ray_task_ref is not None and not is_ray_task_alive(self._ray_task_ref, timeout=0): raise RuntimeError("OmniStage Ray actor died unexpectedly") try: return self._out_q.get_nowait() From 98c78816dc51e71f5f45e8b802d82f2983966c7e Mon Sep 17 00:00:00 2001 From: Daniel Huang Date: Sat, 28 Feb 2026 00:07:20 -0800 Subject: [PATCH 3/8] Update vllm_omni/entrypoints/omni_stage.py Co-authored-by: SYLAR <125541396+lishunyang12@users.noreply.github.com> Signed-off-by: Daniel Huang --- vllm_omni/entrypoints/omni_stage.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/vllm_omni/entrypoints/omni_stage.py b/vllm_omni/entrypoints/omni_stage.py index af05937949e..4df92fb6c5b 100644 --- a/vllm_omni/entrypoints/omni_stage.py +++ b/vllm_omni/entrypoints/omni_stage.py @@ -634,7 +634,9 @@ def stop_stage_worker(self) -> None: if self._ray_actor is not None: kill_ray_actor(self._ray_actor) self._ray_actor = None - elif self._proc is not None: + kill_ray_actor(self._ray_actor) + self._ray_actor = None + self._ray_task_ref = None try: self._proc.join(timeout=5) except Exception as e: From 31618b2dcd3317af34f1ebd2c4850bc365f426f1 Mon Sep 17 00:00:00 2001 From: Daniel Huang Date: Sat, 28 Feb 2026 00:10:09 -0800 Subject: [PATCH 4/8] Resolve suggestion duplication Signed-off-by: Daniel Huang --- vllm_omni/entrypoints/omni_stage.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/vllm_omni/entrypoints/omni_stage.py b/vllm_omni/entrypoints/omni_stage.py index 4df92fb6c5b..d07917b2a8e 100644 --- a/vllm_omni/entrypoints/omni_stage.py +++ b/vllm_omni/entrypoints/omni_stage.py @@ -632,8 +632,6 @@ def stop_stage_worker(self) -> None: self._out_q = None if self._ray_actor is not None: - kill_ray_actor(self._ray_actor) - self._ray_actor = None kill_ray_actor(self._ray_actor) self._ray_actor = None self._ray_task_ref = None From 3c855a69cf5563ff229643efbda309a7c7591476 Mon Sep 17 00:00:00 2001 From: Daniel Huang Date: Mon, 2 Mar 2026 14:56:32 -0800 Subject: [PATCH 5/8] Resolve potential race condition Signed-off-by: Daniel Huang --- vllm_omni/entrypoints/omni_stage.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/vllm_omni/entrypoints/omni_stage.py b/vllm_omni/entrypoints/omni_stage.py index d07917b2a8e..2fbeb89dc81 100644 --- a/vllm_omni/entrypoints/omni_stage.py +++ b/vllm_omni/entrypoints/omni_stage.py @@ -693,8 +693,6 @@ def try_collect(self) -> dict[str, Any] | None: request_id, engine_outputs (or engine_outputs_shm), and metrics. """ assert self._out_q is not None - if self._ray_task_ref is not None and not is_ray_task_alive(self._ray_task_ref, timeout=0): - raise RuntimeError("OmniStage Ray actor died unexpectedly") try: return self._out_q.get_nowait() except queue.Empty: @@ -705,6 +703,8 @@ def try_collect(self) -> dict[str, Any] | None: raise if self._proc is not None and not self._proc.is_alive(): raise RuntimeError(f"OmniStage Worker process died unexpectedly with exit code {self._proc.exitcode}") + if self._ray_task_ref is not None and not is_ray_task_alive(self._ray_task_ref, timeout=0): + raise RuntimeError("OmniStage Ray actor died unexpectedly") def process_engine_inputs( self, From 9f4997026810d2861d0a662dc8dc79fe0c712979 Mon Sep 17 00:00:00 2001 From: Daniel Huang Date: Mon, 2 Mar 2026 17:12:34 -0800 Subject: [PATCH 6/8] Add ray error traceback Signed-off-by: Daniel Huang --- vllm_omni/distributed/ray_utils/utils.py | 11 +++++++++++ vllm_omni/entrypoints/omni_stage.py | 10 ++++++++-- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/vllm_omni/distributed/ray_utils/utils.py b/vllm_omni/distributed/ray_utils/utils.py index ec65cc9fed5..56872ef4d8a 100644 --- a/vllm_omni/distributed/ray_utils/utils.py +++ b/vllm_omni/distributed/ray_utils/utils.py @@ -188,3 +188,14 @@ def is_ray_task_alive(task_ref: Any, **kwargs): ready, _ = ray.wait([task_ref], **kwargs) return not bool(ready) + + +def get_ray_task_error(task_ref: Any, **kwargs) -> ray.exceptions.RayTaskError | None: + """Gets ray task. Returns RayTaskError if ray instance exited with any error, else None.""" + if not RAY_AVAILABLE: + raise ImportError("ray is required to query ray tasks") + + try: + ray.get(task_ref, **kwargs) + except ray.exceptions.RayTaskError as e: + return e diff --git a/vllm_omni/entrypoints/omni_stage.py b/vllm_omni/entrypoints/omni_stage.py index 2fbeb89dc81..28e1ec79237 100644 --- a/vllm_omni/entrypoints/omni_stage.py +++ b/vllm_omni/entrypoints/omni_stage.py @@ -36,7 +36,12 @@ from vllm_omni.distributed.omni_connectors import build_stage_connectors from vllm_omni.distributed.omni_connectors.adapter import try_recv_via_connector from vllm_omni.distributed.omni_connectors.connectors.base import OmniConnectorBase -from vllm_omni.distributed.ray_utils.utils import is_ray_task_alive, kill_ray_actor, start_ray_actor +from vllm_omni.distributed.ray_utils.utils import ( + get_ray_task_error, + is_ray_task_alive, + kill_ray_actor, + start_ray_actor, +) from vllm_omni.engine.arg_utils import AsyncOmniEngineArgs, OmniEngineArgs from vllm_omni.entrypoints.async_omni_diffusion import AsyncOmniDiffusion from vllm_omni.entrypoints.async_omni_llm import AsyncOmniLLM @@ -704,7 +709,8 @@ def try_collect(self) -> dict[str, Any] | None: if self._proc is not None and not self._proc.is_alive(): raise RuntimeError(f"OmniStage Worker process died unexpectedly with exit code {self._proc.exitcode}") if self._ray_task_ref is not None and not is_ray_task_alive(self._ray_task_ref, timeout=0): - raise RuntimeError("OmniStage Ray actor died unexpectedly") + e = get_ray_task_error(self._ray_task_ref, timeout=0) + raise RuntimeError("OmniStage Ray actor died unexpectedly") from e def process_engine_inputs( self, From 9beffb59d264eca693b92cc78a5c6f66923f571f Mon Sep 17 00:00:00 2001 From: Daniel Huang Date: Mon, 2 Mar 2026 23:02:56 -0800 Subject: [PATCH 7/8] Catch generic errors from ray task errors Signed-off-by: Daniel Huang --- vllm_omni/distributed/ray_utils/utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/vllm_omni/distributed/ray_utils/utils.py b/vllm_omni/distributed/ray_utils/utils.py index 56872ef4d8a..519460256ee 100644 --- a/vllm_omni/distributed/ray_utils/utils.py +++ b/vllm_omni/distributed/ray_utils/utils.py @@ -190,12 +190,12 @@ def is_ray_task_alive(task_ref: Any, **kwargs): return not bool(ready) -def get_ray_task_error(task_ref: Any, **kwargs) -> ray.exceptions.RayTaskError | None: +def get_ray_task_error(task_ref: Any, **kwargs) -> Exception | None: """Gets ray task. Returns RayTaskError if ray instance exited with any error, else None.""" if not RAY_AVAILABLE: raise ImportError("ray is required to query ray tasks") try: ray.get(task_ref, **kwargs) - except ray.exceptions.RayTaskError as e: + except Exception as e: return e From b5b72ad8e41afa1a896322b98ec0db668f74e70e Mon Sep 17 00:00:00 2001 From: Daniel Huang Date: Wed, 4 Mar 2026 09:37:56 -0800 Subject: [PATCH 8/8] Restore elif branch Signed-off-by: Daniel Huang --- vllm_omni/entrypoints/omni_stage.py | 1 + 1 file changed, 1 insertion(+) diff --git a/vllm_omni/entrypoints/omni_stage.py b/vllm_omni/entrypoints/omni_stage.py index 28e1ec79237..3f5a14e975d 100644 --- a/vllm_omni/entrypoints/omni_stage.py +++ b/vllm_omni/entrypoints/omni_stage.py @@ -640,6 +640,7 @@ def stop_stage_worker(self) -> None: kill_ray_actor(self._ray_actor) self._ray_actor = None self._ray_task_ref = None + elif self._proc is not None: try: self._proc.join(timeout=5) except Exception as e: