From 9e7b9e884716e222a5044d27ba6f3cd7a844e5b9 Mon Sep 17 00:00:00 2001 From: Daniel Huang Date: Fri, 27 Feb 2026 13:44:33 -0800 Subject: [PATCH 1/4] Patch try collect, check proc alive Signed-off-by: Daniel Huang --- vllm_omni/entrypoints/omni_stage.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/vllm_omni/entrypoints/omni_stage.py b/vllm_omni/entrypoints/omni_stage.py index 3fa722b032a..2f0299cb2d7 100644 --- a/vllm_omni/entrypoints/omni_stage.py +++ b/vllm_omni/entrypoints/omni_stage.py @@ -691,10 +691,16 @@ def try_collect(self) -> dict[str, Any] | None: request_id, engine_outputs (or engine_outputs_shm), and metrics. """ assert self._out_q is not None + if self._proc is not None and not self._proc.is_alive(): + raise RuntimeError("OmniStage Worker process died unexpectedly") try: return self._out_q.get_nowait() - except Exception: + except queue.Empty: return None + except Exception as e: + logger.error("Unexpected error when collecting OmniStage output queue:", exc_info=e) + self.stop_stage_worker() + raise def process_engine_inputs( self, From 70b28b08698fa86b3bda9a09c78f296edec47982 Mon Sep 17 00:00:00 2001 From: Daniel Huang Date: Sat, 28 Feb 2026 00:03:46 -0800 Subject: [PATCH 2/4] Flip order to avoid race conditions Signed-off-by: Daniel Huang --- vllm_omni/entrypoints/omni_stage.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/vllm_omni/entrypoints/omni_stage.py b/vllm_omni/entrypoints/omni_stage.py index 2f0299cb2d7..c445d8a2f24 100644 --- a/vllm_omni/entrypoints/omni_stage.py +++ b/vllm_omni/entrypoints/omni_stage.py @@ -691,16 +691,16 @@ def try_collect(self) -> dict[str, Any] | None: request_id, engine_outputs (or engine_outputs_shm), and metrics. """ assert self._out_q is not None - if self._proc is not None and not self._proc.is_alive(): - raise RuntimeError("OmniStage Worker process died unexpectedly") try: return self._out_q.get_nowait() except queue.Empty: - return None + pass except Exception as e: logger.error("Unexpected error when collecting OmniStage output queue:", exc_info=e) self.stop_stage_worker() raise + if self._proc is not None and not self._proc.is_alive(): + raise RuntimeError("OmniStage Worker process died unexpectedly") def process_engine_inputs( self, From 8e79a78ae01e96261294de882286ba2c586c28de Mon Sep 17 00:00:00 2001 From: Daniel Huang Date: Sat, 28 Feb 2026 00:04:57 -0800 Subject: [PATCH 3/4] Add exit code in error Signed-off-by: Daniel Huang --- vllm_omni/entrypoints/omni_stage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm_omni/entrypoints/omni_stage.py b/vllm_omni/entrypoints/omni_stage.py index c445d8a2f24..19091034a38 100644 --- a/vllm_omni/entrypoints/omni_stage.py +++ b/vllm_omni/entrypoints/omni_stage.py @@ -700,7 +700,7 @@ def try_collect(self) -> dict[str, Any] | None: self.stop_stage_worker() raise if self._proc is not None and not self._proc.is_alive(): - raise RuntimeError("OmniStage Worker process died unexpectedly") + raise RuntimeError(f"OmniStage Worker process died unexpectedly with exit code {self._proc.exitcode}") def process_engine_inputs( self, From 4e0fa0cdaba6633d2354f298e6c302f1e6ea082e Mon Sep 17 00:00:00 2001 From: Daniel Huang Date: Mon, 9 Mar 2026 16:05:56 -0700 Subject: [PATCH 4/4] Use proper is_alive value Signed-off-by: Daniel Huang --- tests/entrypoints/test_omni_diffusion.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/entrypoints/test_omni_diffusion.py b/tests/entrypoints/test_omni_diffusion.py index 2fbe7a8b42b..6eb149ef1d3 100644 --- a/tests/entrypoints/test_omni_diffusion.py +++ b/tests/entrypoints/test_omni_diffusion.py @@ -345,7 +345,7 @@ def _setup_multiprocessing_mocks(monkeypatch: pytest.MonkeyPatch, mocker: Mocker fake_process_instance = mocker.MagicMock() fake_process_instance.start = mocker.MagicMock() fake_process_instance.join = mocker.MagicMock() - fake_process_instance.is_alive = mocker.MagicMock(return_value=False) + fake_process_instance.is_alive = mocker.MagicMock(return_value=True) fake_process_instance.terminate = mocker.MagicMock() fake_process_class.return_value = fake_process_instance