diff --git a/tests/entrypoints/test_omni_diffusion.py b/tests/entrypoints/test_omni_diffusion.py index 2fbe7a8b42b..6eb149ef1d3 100644 --- a/tests/entrypoints/test_omni_diffusion.py +++ b/tests/entrypoints/test_omni_diffusion.py @@ -345,7 +345,7 @@ def _setup_multiprocessing_mocks(monkeypatch: pytest.MonkeyPatch, mocker: Mocker fake_process_instance = mocker.MagicMock() fake_process_instance.start = mocker.MagicMock() fake_process_instance.join = mocker.MagicMock() - fake_process_instance.is_alive = mocker.MagicMock(return_value=False) + fake_process_instance.is_alive = mocker.MagicMock(return_value=True) fake_process_instance.terminate = mocker.MagicMock() fake_process_class.return_value = fake_process_instance diff --git a/vllm_omni/entrypoints/omni_stage.py b/vllm_omni/entrypoints/omni_stage.py index 3fa722b032a..19091034a38 100644 --- a/vllm_omni/entrypoints/omni_stage.py +++ b/vllm_omni/entrypoints/omni_stage.py @@ -693,8 +693,14 @@ def try_collect(self) -> dict[str, Any] | None: assert self._out_q is not None try: return self._out_q.get_nowait() - except Exception: - return None + except queue.Empty: + pass + except Exception as e: + logger.error("Unexpected error when collecting OmniStage output queue:", exc_info=e) + self.stop_stage_worker() + raise + if self._proc is not None and not self._proc.is_alive(): + raise RuntimeError(f"OmniStage Worker process died unexpectedly with exit code {self._proc.exitcode}") def process_engine_inputs( self,