v1 path: Add/align Worker and Model Runner for AR and Diffusion (vLLM-compatible)#3
v1 path: Add/align Worker and Model Runner for AR and Diffusion (vLLM-compatible)#3tzhouam wants to merge 1 commit intovllm-project:dev-ztcfrom
Conversation
Summary of ChangesHello @tzhouam, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request is a great step towards extending vLLM for multimodal and non-autoregressive models. The introduction of ARGPUModelRunner and DiffusionGPUModelRunner along with their respective workers aligns well with the existing vLLM architecture. The code is generally well-structured. My review focuses on a few critical bugs that seem to be copy-paste errors, some significant code duplication that impacts maintainability, and minor inconsistencies in documentation and module exports. Addressing these points will improve the robustness and long-term health of the codebase.
| num_nans_in_logits = {} | ||
| if envs.VLLM_COMPUTE_NANS_IN_LOGITS: | ||
| num_nans_in_logits = {} |
| def init_device(self): | ||
| if self.device_config.device.type == "cuda": | ||
| # torch.distributed.all_reduce does not free the input tensor until | ||
| # the synchronization point. This causes the memory usage to grow | ||
| # as the number of all_reduce calls increases. This env var disables | ||
| # this behavior. | ||
| # Related issue: | ||
| # https://discuss.pytorch.org/t/cuda-allocation-lifetime-for-inputs-to-distributed-all-reduce/191573 | ||
| os.environ["TORCH_NCCL_AVOID_RECORD_STREAMS"] = "1" | ||
|
|
||
| # This env var set by Ray causes exceptions with graph building. | ||
| os.environ.pop("NCCL_ASYNC_ERROR_HANDLING", None) | ||
| self.device = torch.device(f"cuda:{self.local_rank}") | ||
| current_platform.set_device(self.device) | ||
|
|
||
| _check_if_gpu_supports_dtype(self.model_config.dtype) | ||
| gc.collect() | ||
| torch.cuda.empty_cache() | ||
|
|
||
| # take current memory snapshot | ||
| self.init_snapshot = MemorySnapshot() | ||
| self.requested_memory = (self.init_snapshot.total_memory * | ||
| self.cache_config.gpu_memory_utilization) | ||
| if self.init_snapshot.free_memory < self.requested_memory: | ||
| GiB = lambda b: round(b / GiB_bytes, 2) | ||
| raise ValueError( | ||
| f"Free memory on device " | ||
| f"({GiB(self.init_snapshot.free_memory)}/" | ||
| f"{GiB(self.init_snapshot.total_memory)} GiB) on startup " | ||
| f"is less than desired GPU memory utilization " | ||
| f"({self.cache_config.gpu_memory_utilization}, " | ||
| f"{GiB(self.requested_memory)} GiB). Decrease GPU memory " | ||
| f"utilization or reduce GPU memory used by other processes." | ||
| ) | ||
| else: | ||
| raise RuntimeError( | ||
| f"Not support device type: {self.device_config.device}") | ||
| # Initialize the distributed environment. | ||
| init_worker_distributed_environment(self.vllm_config, self.rank, | ||
| self.distributed_init_method, | ||
| self.local_rank, | ||
| current_platform.dist_backend) | ||
| # Set random seed. | ||
| set_random_seed(self.model_config.seed) | ||
|
|
||
| # Construct the model runner | ||
| self.model_runner: ARGPUModelRunner = ARGPUModelRunner( | ||
| self.vllm_config, self.device) | ||
|
|
||
| if self.rank == 0: | ||
| # If usage stat is enabled, collect relevant info. | ||
| report_usage_stats(self.vllm_config) No newline at end of file |
There was a problem hiding this comment.
This method duplicates a large amount of code from the parent GPUWorker class, with the only significant change being the instantiation of ARGPUModelRunner instead of GPUModelRunner. This creates a significant maintainability burden, as any changes in the base class's init_device method will need to be manually propagated here. This duplication poses a high risk and should be avoided. Please consider refactoring to reduce code duplication, for example by splitting the parent's init_device method or finding another way to inject the specific model runner class.
| def init_device(self): | ||
| if self.device_config.device.type == "cuda": | ||
| # torch.distributed.all_reduce does not free the input tensor until | ||
| # the synchronization point. This causes the memory usage to grow | ||
| # as the number of all_reduce calls increases. This env var disables | ||
| # this behavior. | ||
| # Related issue: | ||
| # https://discuss.pytorch.org/t/cuda-allocation-lifetime-for-inputs-to-distributed-all-reduce/191573 | ||
| os.environ["TORCH_NCCL_AVOID_RECORD_STREAMS"] = "1" | ||
|
|
||
| # This env var set by Ray causes exceptions with graph building. | ||
| os.environ.pop("NCCL_ASYNC_ERROR_HANDLING", None) | ||
| self.device = torch.device(f"cuda:{self.local_rank}") | ||
| current_platform.set_device(self.device) | ||
|
|
||
| _check_if_gpu_supports_dtype(self.model_config.dtype) | ||
| gc.collect() | ||
| torch.cuda.empty_cache() | ||
|
|
||
| # take current memory snapshot | ||
| self.init_snapshot = MemorySnapshot() | ||
| self.requested_memory = (self.init_snapshot.total_memory * | ||
| self.cache_config.gpu_memory_utilization) | ||
| if self.init_snapshot.free_memory < self.requested_memory: | ||
| GiB = lambda b: round(b / GiB_bytes, 2) | ||
| raise ValueError( | ||
| f"Free memory on device " | ||
| f"({GiB(self.init_snapshot.free_memory)}/" | ||
| f"{GiB(self.init_snapshot.total_memory)} GiB) on startup " | ||
| f"is less than desired GPU memory utilization " | ||
| f"({self.cache_config.gpu_memory_utilization}, " | ||
| f"{GiB(self.requested_memory)} GiB). Decrease GPU memory " | ||
| f"utilization or reduce GPU memory used by other processes." | ||
| ) | ||
| else: | ||
| raise RuntimeError( | ||
| f"Not support device type: {self.device_config.device}") | ||
| # Initialize the distributed environment. | ||
| init_worker_distributed_environment(self.vllm_config, self.rank, | ||
| self.distributed_init_method, | ||
| self.local_rank, | ||
| current_platform.dist_backend) | ||
| # Set random seed. | ||
| set_random_seed(self.model_config.seed) | ||
|
|
||
| # Construct the model runner | ||
| self.model_runner: DiffusionGPUModelRunner = DiffusionGPUModelRunner( | ||
| self.vllm_config, self.device) | ||
|
|
||
| if self.rank == 0: | ||
| # If usage stat is enabled, collect relevant info. | ||
| report_usage_stats(self.vllm_config) No newline at end of file |
There was a problem hiding this comment.
This method duplicates a large amount of code from the parent GPUWorker class, with the only significant change being the instantiation of DiffusionGPUModelRunner instead of GPUModelRunner. This creates a significant maintainability burden, as any changes in the base class's init_device method will need to be manually propagated here. This is the same issue as in ARGPUWorker and poses a high maintainability risk.
| - Outputs: Reuse ModelRunnerOutput; diffusion results are carried as tensors (e.g., via pooler_output), leaving text‑specific fields as orginal vllm. | ||
| - KV cache: Treated as not required; when not registered in vllm_config, KV‑related initialization becomes a no‑op automatically. | ||
| - Distributed: Existing TP/PP/DP initialization, process orchestration, profiling, and sleep/wake behaviors remain intact for the worker‑based path. | ||
| - Acceleration: Torch compile/CUDA Graph warmup follows existing compile_or_warm_up_model hooks or the pipeline executor’s warmup helper. | ||
|
|
||
| Assumptions and non‑goals | ||
| - No prompt logprobs, grammar bitmask, or token sampler in diffusion. | ||
| - No new public RPCs are added; we rely on existing EngineCore/Executor/Worker/Runner calls. | ||
| - Scheduler retains the same interface (see the separate diffusion scheduler design doc for bucketing key definition and step/shape grouping). | ||
|
|
||
| Deliverables | ||
| - This design and the interface skeletons for: DiffusionModelRunner, (config‑selected) Worker with minimal overrides, MultiprocExecutor reuse, and DiffusersPipelineExecutor (no worker). | ||
| - Clear data flow and compatibility notes to ensure coherence across EngineCore, executors, workers, and runners. | ||
|
|
||
| Reading guide | ||
| - Canonical v1 Call Path gives the end‑to‑end flow we preserve. | ||
| - Executor covers both the reused MultiprocExecutor and the no‑worker DiffusersPipelineExecutor. | ||
| - Worker explains the minimal inheritance strategy from GPUWorker. | ||
| - Model Runner enumerates overridden vs. kept methods for diffusion. | ||
|
|
||
| ## Canonical v1 Call Path (for context) | ||
| ```python | ||
| # class: EngineCore | ||
| # EngineCore.step (simplified) | ||
| class EngineCore: | ||
| def step(self): | ||
| scheduler_output = scheduler.schedule() | ||
| model_output = executor.execute_model(scheduler_output) | ||
| engine_outputs = scheduler.update_from_output( | ||
| scheduler_output, model_output | ||
| ) | ||
| return engine_outputs | ||
| ``` | ||
|
|
||
| ```python | ||
| # class: MultiprocExecutor (v1) | ||
| from concurrent.futures import Future | ||
| from typing import Union | ||
| from vllm.v1.outputs import ModelRunnerOutput | ||
| from vllm.v1.executor.abstract import Executor | ||
|
|
||
| class MultiprocExecutor(Executor): | ||
| # Single RPC hop | ||
| def execute_model( | ||
| self, scheduler_output | ||
| ) -> Union[ModelRunnerOutput, Future[ModelRunnerOutput]]: | ||
| output = collective_rpc("execute_model", args=(scheduler_output, )) | ||
| return output | ||
| ``` | ||
|
|
||
| ```python | ||
| # class: GPUWorker (v1) | ||
| # Required v1 method | ||
| from vllm.worker.worker_base import WorkerBase | ||
| from vllm.v1.outputs import ModelRunnerOutput | ||
|
|
||
| class GPUWorker(WorkerBase): | ||
| def execute_model(self, scheduler_output) -> ModelRunnerOutput: | ||
| # (driver/TP metadata broadcast handled by LocalOrDistributedWorkerBase subclasses) | ||
| return self.model_runner.execute_model( | ||
| scheduler_output=scheduler_output, | ||
| intermediate_tensors=None, | ||
| ) | ||
| ``` | ||
|
|
||
|
|
||
| ## Executor | ||
|
|
||
| ### Function map (Executor) | ||
|
|
||
| #### 1) Inherited and overridden | ||
| ```python | ||
| # None required for diffusion. Reuse MultiprocExecutor as-is. | ||
| # Worker class selection is driven by vllm_config; no _init_executor override. | ||
| ``` | ||
| <!-- | ||
|
|
||
| #### Multi-worker (TP/PP/DP) behavior and aggregation (Executor) | ||
| - No KVConnector in diffusion by default → `kv_output_aggregator` is not used; the path short-circuits as in v1 when `kv_transfer_config` is None. | ||
| - Executor collects output only from the output rank, consistent with v1: | ||
| - `output_rank = world_size - tensor_parallel_size` (i.e., TP0 of the last PP stage). | ||
| - Other ranks participate in compute but do not emit final `ModelRunnerOutput`. | ||
| - Pipeline Parallel (PP): | ||
| - Intermediate PP stages return `IntermediateTensors` only; last PP stage returns `ModelRunnerOutput`. | ||
| - Executor receives only from `output_rank`. | ||
| - Tensor Parallel (TP): | ||
| - TP ranks collaborate; TP0 produces/holds the final tensors for return. | ||
| - Executor still receives only from `output_rank`. | ||
| - Data Parallel (DP): | ||
| - Each DP group executes independently; each group’s executor returns its own `output_rank` result. | ||
| - Async batches (batch queue / `max_concurrent_batches>1`): | ||
| - Results are returned via `Future`, but still only from `output_rank`; no cross-worker aggregation is needed. | ||
| - Worker/Runner type is resolved from vllm_config (e.g., `worker_cls`, `model_cls`), not hardcoded in the executor. --> | ||
|
|
||
| ### Diffusers Pipeline Executor (no worker) | ||
|
|
||
| A single-process executor that directly runs the Diffusers pipeline without spawning workers or using RPC. Interfaces remain identical to `Executor` so the EngineCore loop is unchanged. | ||
|
|
||
| #### Function map(Pipeline Executor) | ||
|
|
||
| ##### 1) Inherited and overridden | ||
| ```python | ||
| from concurrent.futures import Future | ||
| from typing import Optional, Union, Callable, Any | ||
| import torch | ||
| import torch.nn as nn | ||
| from vllm.v1.executor.abstract import Executor | ||
| from vllm.v1.kv_cache_interface import KVCacheConfig, KVCacheSpec | ||
| from vllm.v1.outputs import ModelRunnerOutput | ||
| from vllm.tasks import SupportedTask | ||
| from diffusers import DiffusionPipeline | ||
|
|
||
| class DiffusersPipelineExecutor(Executor): | ||
| supports_pp: bool = False # Single-process, no TP/PP/DP | ||
|
|
||
| def _init_executor(self) -> None: | ||
| # Called by ExecutorBase.__init__ | ||
| self._failure_callback: Optional[Callable[[], None]] = None | ||
| self._device = self._resolve_device() | ||
| self._dtype = self._resolve_dtype() | ||
| self._pipeline = self._build_pipeline(device=self._device, dtype=self._dtype) | ||
| self._profiler = None | ||
| self._is_failed = False | ||
| self.is_sleeping = False | ||
| self.sleeping_tags: set[str] = set() | ||
|
|
||
| # major functions to build/run diffusers pipeline | ||
| def _build_pipeline(self, device:torch.device, dtype:torch.dtype)->DiffusionPipeline: | ||
| model_name = "Qwen/Qwen-Image" | ||
|
|
||
| self.pipe = DiffusionPipeline.from_pretrained(model_name, torch_dtype=dtype) | ||
| self.pipe = pipe.to(device) | ||
|
|
||
| def _run_pipeline(self, scheduler_output) -> ModelRunnerOutput: ... | ||
| positive_magic = { | ||
| "en": ", Ultra HD, 4K, cinematic composition.", # for english prompt | ||
| "zh": ", 超清,4K,电影级构图." # for chinese prompt | ||
| } | ||
|
|
||
| # Generate image | ||
| prompt_embeds = self._get_and_process_prompt_embeds(scheduler_output, positive_magic) | ||
| negtive_prompt_embeds = self.pipe.embed_prompt(" ") | ||
|
|
||
|
|
||
| # Generate with different aspect ratios | ||
| aspect_ratios = { | ||
| "1:1": (1328, 1328), | ||
| "16:9": (1664, 928), | ||
| "9:16": (928, 1664), | ||
| "4:3": (1472, 1140), | ||
| "3:4": (1140, 1472), | ||
| "3:2": (1584, 1056), | ||
| "2:3": (1056, 1584), | ||
| } | ||
|
|
||
| width, height = aspect_ratios["16:9"] | ||
|
|
||
| image = pipe( | ||
| prompt_embeds=prompt_embeds, | ||
| negtive_prompt_embeds=negtive_prompt_embeds, |
There was a problem hiding this comment.
I've found a few typos and errors in the code examples within this design document. Correcting them will improve clarity and prevent confusion for developers:
- Line 19:
orginalshould beoriginal. - Line 150:
self.pipe = pipe.to(device)uses an undefined variablepipe. It should probably beself.pipe = self.pipe.to(device). - Line 160:
negtive_prompt_embedsshould benegative_prompt_embeds. - Line 176:
image = pipe(...)uses an undefined variablepipe. It should beself.pipe. - Line 178:
negtive_prompt_embedsshould benegative_prompt_embeds.
| pooler_output=[Tensor,...], # Return Hidden states | ||
| kv_connector_output=None, | ||
| num_nans_in_logits=None, | ||
| )# return multi modal tensors via pooler_output=[Tensor,...] | ||
|
|
||
|
|
||
| class ARModelRunner(GPUModelRunner): | ||
| @torch.inference_mode() | ||
| def execute_model( | ||
| self, | ||
| scheduler_output: "SchedulerOutput", | ||
| intermediate_tensors: Optional[IntermediateTensors] = None, | ||
| ) -> Union[ModelRunnerOutput, IntermediateTensors]: | ||
| ... | ||
| return ModelRunnerOutput( | ||
| req_ids=[...], | ||
| req_id_to_index={...}, | ||
| sampled_token_ids=[], | ||
| spec_token_ids=None, | ||
| logprobs=None, | ||
| prompt_logprobs_dict={}, | ||
| pooler_output=[Tensor,...], # Return Hidden states | ||
| kv_connector_output=None, | ||
| num_nans_in_logits=None, | ||
| )# return hidden states via pooler_output=[Tensor,...] |
There was a problem hiding this comment.
The comments explaining the purpose of pooler_output in the ARModelRunner and DiffusionModelRunner examples seem to be mixed up or incorrectly placed.
- In
DiffusionModelRunneron line 271, the comment says# Return Hidden states, but it should refer to returning diffusion tensors. The comment on line 274 is also misplaced. - In
ARModelRunneron line 295, the comment)# return hidden states via pooler_output=[Tensor,...]is misplaced.
| """Runs the diffusion process and returns per-request tensors. | ||
|
|
||
| Tries model interfaces in the following order for maximal compatibility: | ||
| 1) model.sample(condition=..., **kwargs) | ||
| 2) model.forward(condition=..., **kwargs) | ||
| 3) model.diffuse(condition=..., **kwargs) | ||
| """ |
There was a problem hiding this comment.
The docstring states that the method will try model.sample, model.forward, and model.diffuse in order. However, the implementation only checks for model.forward, with the other checks commented out. To avoid confusion, the docstring should be updated to reflect the current implementation.
"""Runs the diffusion process and returns per-request tensors.
For Qwen 2.5 Omni's current implementation, this method expects the model
to have a `forward` method that can handle the diffusion process.
Future versions may support `sample` or `diffuse` methods as well.
"""There was a problem hiding this comment.
Pull Request Overview
Adds v1-aligned Model Runners and Workers for autoregressive (AR) and diffusion paths, reusing vLLM v1 scheduling/worker abstractions and returning non-text tensors via ModelRunnerOutput.pooler_output.
- Introduces ARGPUModelRunner to expose per-request hidden states alongside standard sampling/logprobs.
- Introduces DiffusionGPUModelRunner and DiffusionGPUWorker to run diffusion-style models and return tensors via pooler_output (no logits/sampling).
- Updates architecture docs to describe design, flow, and extension points for AR and diffusion.
Reviewed Changes
Copilot reviewed 7 out of 22 changed files in this pull request and generated 13 comments.
Show a summary per file
| File | Description |
|---|---|
| vllm_omni/worker/AR_gpu_model_runner.py | AR runner implementation: prepares inputs, computes logits, samples tokens, and returns hidden states via pooler_output. |
| vllm_omni/worker/AR_gpu_worker.py | Worker wiring to instantiate ARGPUModelRunner and initialize device/distributed state. |
| vllm_omni/worker/diffusion_model_runner.py | Diffusion runner implementation: prepares inputs, runs model forward, returns tensors via pooler_output. |
| vllm_omni/worker/diffusion_gpu_worker.py | Worker wiring to instantiate DiffusionGPUModelRunner and initialize device/distributed state. |
| vllm_omni/worker/init.py | Package exports; currently only exposes ARGPUModelRunner. |
| docs/architecture/omni_arch_summary.md | Architecture overview (Chinese) for AR and diffusion paths and layering. |
| docs/architecture/diffusion_executor_worker_runner.md | Design doc for executors/workers/runners including example stubs for diffusion pipeline. |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
| # Mid PP stages return intermediate tensors unmodified | ||
| if not get_pp_group().is_last_rank: | ||
| assert isinstance(text_hidden_states, IntermediateTensors) | ||
| text_hidden_states.kv_connector_output = kv_connector_output | ||
| return text_hidden_states | ||
|
|
||
| # Broadcast PP output for external_launcher (torchrun) | ||
| broadcast_pp_output = \ | ||
| self.parallel_config.distributed_executor_backend \ | ||
| == "external_launcher" and len(get_pp_group().ranks) > 0 | ||
| if not get_pp_group().is_last_rank: | ||
| assert isinstance(text_hidden_states, IntermediateTensors) | ||
| if not broadcast_pp_output: | ||
| text_hidden_states.kv_connector_output = kv_connector_output | ||
| return text_hidden_states | ||
| get_pp_group().send_tensor_dict(text_hidden_states.tensors, | ||
| all_gather_group=get_tp_group()) |
There was a problem hiding this comment.
The early return at lines 152–156 makes the later non-last-rank broadcast path (lines 161–168) unreachable, breaking PP broadcasting for external_launcher and risking deadlock when the last rank tries to broadcast/receive. Merge the non-last-rank handling into a single block that conditionally sends tensors when broadcast_pp_output is True and only returns when appropriate. For example, remove the first early return and keep the broadcast-aware branch:
- Remove lines 152–156.
- Use the lines 161–168 logic to handle both broadcast and non-broadcast cases for non-last PP ranks.
| # Mid PP stages return intermediate tensors unmodified | |
| if not get_pp_group().is_last_rank: | |
| assert isinstance(text_hidden_states, IntermediateTensors) | |
| text_hidden_states.kv_connector_output = kv_connector_output | |
| return text_hidden_states | |
| # Broadcast PP output for external_launcher (torchrun) | |
| broadcast_pp_output = \ | |
| self.parallel_config.distributed_executor_backend \ | |
| == "external_launcher" and len(get_pp_group().ranks) > 0 | |
| if not get_pp_group().is_last_rank: | |
| assert isinstance(text_hidden_states, IntermediateTensors) | |
| if not broadcast_pp_output: | |
| text_hidden_states.kv_connector_output = kv_connector_output | |
| return text_hidden_states | |
| get_pp_group().send_tensor_dict(text_hidden_states.tensors, | |
| all_gather_group=get_tp_group()) | |
| # Handle non-last PP ranks: return or broadcast intermediate tensors as needed | |
| broadcast_pp_output = ( | |
| self.parallel_config.distributed_executor_backend == "external_launcher" | |
| and len(get_pp_group().ranks) > 0 | |
| ) | |
| if not get_pp_group().is_last_rank: | |
| assert isinstance(text_hidden_states, IntermediateTensors) | |
| if not broadcast_pp_output: | |
| text_hidden_states.kv_connector_output = kv_connector_output | |
| return text_hidden_states | |
| get_pp_group().send_tensor_dict( | |
| text_hidden_states.tensors, | |
| all_gather_group=get_tp_group() | |
| ) |
| assert spec_decode_common_attn_metadata is not None | ||
| spec_token_ids = self.propose_draft_token_ids( | ||
| scheduler_output, | ||
| valid_sampled_token_ids, | ||
| sampling_metadata, | ||
| text_hidden_states, | ||
| sample_hidden_states, | ||
| _aux_hidden_states if ' _aux_hidden_states' in locals() else None, | ||
| spec_decode_metadata, | ||
| spec_decode_common_attn_metadata, | ||
| ) |
There was a problem hiding this comment.
The locals() guard checks the wrong name (' _aux_hidden_states' has a leading space), so _aux_hidden_states is always treated as absent and None is passed even when available. Fix by removing the space and ensuring a defined default: either initialize _aux_hidden_states = None before assignment or check the flag directly, e.g. pass _aux_hidden_states if self.use_aux_hidden_state_outputs else None.
| num_nans_in_logits = {} | ||
| if envs.VLLM_COMPUTE_NANS_IN_LOGITS: | ||
| num_nans_in_logits = self._get_nans_in_logits(logits) | ||
|
|
There was a problem hiding this comment.
num_nans_in_logits is computed at lines 214–216 but is later unconditionally reset to an empty dict at lines 288–291, discarding the measurement. Remove the second block (lines 288–291) and return the previously computed value.
| num_nans_in_logits = {} | ||
| if envs.VLLM_COMPUTE_NANS_IN_LOGITS: | ||
| num_nans_in_logits = {} |
There was a problem hiding this comment.
num_nans_in_logits is computed at lines 214–216 but is later unconditionally reset to an empty dict at lines 288–291, discarding the measurement. Remove the second block (lines 288–291) and return the previously computed value.
| num_nans_in_logits = {} | |
| if envs.VLLM_COMPUTE_NANS_IN_LOGITS: | |
| num_nans_in_logits = {} |
| ) -> Union[torch.Tensor, list[torch.Tensor]]: | ||
| """Runs the diffusion process and returns per-request tensors. | ||
|
|
||
| Tries model interfaces in the following order for maximal compatibility: | ||
| 1) model.sample(condition=..., **kwargs) | ||
| 2) model.forward(condition=..., **kwargs) | ||
| 3) model.diffuse(condition=..., **kwargs) | ||
| """ |
There was a problem hiding this comment.
The docstring states a fallback order (sample → forward → diffuse), but the implementation only attempts forward and comments out the other branches. Either update the docstring to reflect the current behavior or implement the stated fallback by enabling the sample/diffuse branches in the intended order.
| ) | ||
| else: | ||
| raise RuntimeError( | ||
| f"Not support device type: {self.device_config.device}") |
There was a problem hiding this comment.
The error message is ungrammatical. Rephrase to "Unsupported device type: {self.device_config.device}" for clarity.
| f"Not support device type: {self.device_config.device}") | |
| f"Unsupported device type: {self.device_config.device}") |
| Core design features | ||
| - Inheritance strategy: DiffusionModelRunner extends GPUModelRunner; Worker remains GPUWorker (with optional minimal overrides only to initialize the diffusion runner); standard MultiprocExecutor is reused as‑is. A no‑worker DiffusersPipelineExecutor extends Executor for single‑process use. | ||
| - Data flow parity: schedule() → executor.execute_model() → scheduler.update_from_output(); Worker delegates to ModelRunner; ModelRunner returns ModelRunnerOutput. | ||
| - Outputs: Reuse ModelRunnerOutput; diffusion results are carried as tensors (e.g., via pooler_output), leaving text‑specific fields as orginal vllm. |
There was a problem hiding this comment.
Typo: "orginal" should be "original".
| - Outputs: Reuse ModelRunnerOutput; diffusion results are carried as tensors (e.g., via pooler_output), leaving text‑specific fields as orginal vllm. | |
| - Outputs: Reuse ModelRunnerOutput; diffusion results are carried as tensors (e.g., via pooler_output), leaving text‑specific fields as original vllm. |
|
|
||
| # Generate image | ||
| prompt_embeds = self._get_and_process_prompt_embeds(scheduler_output, positive_magic) | ||
| negtive_prompt_embeds = self.pipe.embed_prompt(" ") |
There was a problem hiding this comment.
Typos: "negtive" should be "negative" in both the variable name and usage.
|
|
||
| image = pipe( | ||
| prompt_embeds=prompt_embeds, | ||
| negtive_prompt_embeds=negtive_prompt_embeds, |
There was a problem hiding this comment.
Typos: "negtive" should be "negative" in both the variable name and usage.
Support bagel ar in vllm-omni
add connector config
Review: 5 issues identified, all fixed: vllm-project#1 (hard) KV cache ghost allocation: Add init-time assertions that SKIP_SCAFFOLD requires enable_prefix_caching=false and max_num_seqs=1. These implicit preconditions are now explicit ValueError checks. vllm-project#2 (hard) _scaffold_dummy device mismatch: Recreate dummy tensor when input device changes (multi-GPU / PP scenario). vllm-project#3 (hard) is_active_decode false positive: Add _prefill_completed flag set at the end of _forward_prefill(). Scaffold skip now requires both _prefill_completed=True AND _prev_feat_embed exists, preventing stale state from a previous request from triggering skip. vllm-project#4 (soft) _FREE_SCAFFOLD + _SKIP_SCAFFOLD interaction: Skip the zero-out operation when SKIP_SCAFFOLD is set (no point zeroing weights that will never be read). vllm-project#5 (soft) Perf timer on skip path: Removed timer from the skip branch so scaffold_forward metric only reflects real scaffold runs.
P0 fixes: vllm-project#1: _free_scaffold_weights now shrinks storage to zero (actually releases VRAM). Only runs when SKIP_SCAFFOLD is also set. Called lazily after first prefill, not at load time. vllm-project#2: Sliding VAE default OFF (splice algorithm had alignment bug). _sliding_vae_decode now falls back to full decode until proper overlap-add is implemented. vllm-project#3: Complete per-request state reset in preprocess: now clears _curr_prefix_feat_cond, _last_audio_patch_gpu, _prev_audio, _prev_audio_len, _decode_step_count, _precomputed_stop_logits. vllm-project#4: compute_logits fallback forces stop (not continue) when _prefill_completed=True, preventing runaway generation. vllm-project#5: Scaffold VRAM: load_weights no longer frees immediately; _free_scaffold_weights called after first prefill completes, so scaffold is available for prefill then released. P1 fixes: vllm-project#6: Log all active config flags at load time. vllm-project#7: Remove dead _STOP_CHECK_INTERVAL code. vllm-project#8: Remove broken audio_duration formula from postprocess. vllm-project#9/vllm-project#14: Move `from einops import rearrange` to module top level. vllm-project#11: Remove torch.no_grad() context from _forward_decode_graphable (incompatible with CUDA Graph capture).
Review: 5 issues identified, all fixed: vllm-project#1 (hard) KV cache ghost allocation: Add init-time assertions that SKIP_SCAFFOLD requires enable_prefix_caching=false and max_num_seqs=1. These implicit preconditions are now explicit ValueError checks. vllm-project#2 (hard) _scaffold_dummy device mismatch: Recreate dummy tensor when input device changes (multi-GPU / PP scenario). vllm-project#3 (hard) is_active_decode false positive: Add _prefill_completed flag set at the end of _forward_prefill(). Scaffold skip now requires both _prefill_completed=True AND _prev_feat_embed exists, preventing stale state from a previous request from triggering skip. vllm-project#4 (soft) _FREE_SCAFFOLD + _SKIP_SCAFFOLD interaction: Skip the zero-out operation when SKIP_SCAFFOLD is set (no point zeroing weights that will never be read). vllm-project#5 (soft) Perf timer on skip path: Removed timer from the skip branch so scaffold_forward metric only reflects real scaffold runs.
P0 fixes: vllm-project#1: _free_scaffold_weights now shrinks storage to zero (actually releases VRAM). Only runs when SKIP_SCAFFOLD is also set. Called lazily after first prefill, not at load time. vllm-project#2: Sliding VAE default OFF (splice algorithm had alignment bug). _sliding_vae_decode now falls back to full decode until proper overlap-add is implemented. vllm-project#3: Complete per-request state reset in preprocess: now clears _curr_prefix_feat_cond, _last_audio_patch_gpu, _prev_audio, _prev_audio_len, _decode_step_count, _precomputed_stop_logits. vllm-project#4: compute_logits fallback forces stop (not continue) when _prefill_completed=True, preventing runaway generation. vllm-project#5: Scaffold VRAM: load_weights no longer frees immediately; _free_scaffold_weights called after first prefill completes, so scaffold is available for prefill then released. P1 fixes: vllm-project#6: Log all active config flags at load time. vllm-project#7: Remove dead _STOP_CHECK_INTERVAL code. vllm-project#8: Remove broken audio_duration formula from postprocess. vllm-project#9/vllm-project#14: Move `from einops import rearrange` to module top level. vllm-project#11: Remove torch.no_grad() context from _forward_decode_graphable (incompatible with CUDA Graph capture).
…up claims Fix three things the earlier phase-D draft got wrong by re-reading the DreamZero paper carefully: 1. Decimal separator: the naive baseline is 5.7s per chunk (not 7s), and the bimanual action horizon is 1.6s per chunk (not 6s). pypdf had silently dropped the leading digit at line breaks. 2. Step-reduction progression: DreamZero does not "stay at 16 steps". DiT Caching (velocity reuse within a chunk, based on cosine similarity of flow-matching velocities) reduces effective steps from 16 to 4. DreamZero-Flash, a training-time noise-schedule change, further reduces to 1 step. The paper's Table 3 shows the task-progress cost: naive 1-step loses 31 points on table-bussing, Flash 1-step loses only 9. 3. The 38x headline is GB200-only. Table 1 shows the cumulative speedup caps at 9.6x on H100; NVFP4 and DreamZero-Flash rows are dashed for H100. CFG parallelism is also multi-GPU from row 2, so the post-baseline config in the paper is never single-GPU. Also separate DreamZero's "DiT Caching" (intra-chunk velocity reuse) from RFC vllm-project#1987 / StreamDiffusionV2's "rolling KV cache across chunks" -- these are different optimizations and should not be conflated. Drop all cross-baseline speedup attribution from phase_d_cross_check.md and the journal's Phase D/E sections. Earlier drafts claimed that e.g. "36x of DreamZero's 38x is accounted for by choosing streaming-point hyperparameters", which compares across different baselines, hardware, training regimes, and chunk shapes. That comparison is invalid and is removed. Our own measurement (53.30s -> 2.585s = 20.6x on Wan-1.3B at our offline vs our streaming point on 1x A100) is retained as a fact about our own two configurations, with no claim about how it relates to any published speedup. Contribution-target vllm-project#3 (rolling KV cache + blockwise-causal attention) is downgraded from "~1.5-2x at our streaming operating point" to "speed-up not measured on our hardware; research commitment, not quantified target." The measured ~260 ms framework overhead is kept as the concrete motivating number for target #2 (per-call overhead reduction, RFC vllm-project#2073). Add a correction notice at the top of the earlier SOTA-scan DreamZero subsection pointing readers to the Phase D corrections.
Summary
This PR adds two model runners and their corresponding workers to
vllm-omni, aligning with vLLM v1’s Worker/Runner abstractions:ModelRunnerOutput.pooler_outputwhile still producing sampled tokens/logprobs.ModelRunnerOutput.pooler_outputwithout logits/sampling.Both are drop-in replacements within the existing
EngineCore / Executor / Workerloop.Motivation
vllm-omniaims to support multimodal and non-autoregressive tasks under vLLM v1.To minimize changes to control-plane logic (
EngineCore / Executor / Worker),vllm-omniconcentrates customization in the Model Runner layer while reusing vLLM’s scheduling, batching, and distributed infrastructure.Key Changes
1.
ARGPUModelRunner(vllm_omni/worker/AR_gpu_model_runner.py)vllm.v1.worker.gpu_model_runner.GPUModelRunnerModelRunnerOutput.pooler_output2.
DiffusionGPUModelRunner(vllm_omni/worker/diffusion_model_runner.py)ModelRunnerOutput.pooler_outputIntermediateTensors_run_diffusioncurrently prefersmodel.forward(...)(Qwen 2.5 Omni path); future-compatible withsample/diffuseif exposed3. New Workers
vllm_omni/worker/AR_gpu_worker.py→self.model_runner = ARGPUModelRunner(...)vllm_omni/worker/diffusion_gpu_worker.py→self.model_runner = DiffusionGPUModelRunner(...)4. Documentation
docs/architecture/vllm_omni_design.mdto reflect v1 path and components for AR and Diffusion.Design & Compatibility
vLLM v1 Alignment
EngineCore → Executor → WorkerRPC flowModelRunnerOutput; emphasizespooler_outputfor non-text tensorsDiffusion Integration (Minimal Changes)
SchedulerOutputand batchingpooler_output(no Engine/Executor modifications required)Multimodal + PP / TP / DP
Robustness
model.forward(...)nowUsage
Autoregressive (AR)
pooler_outputDiffusion
DiffusionGPUWorkerpooler_outputBackward Compatibility
pooler_outputis optional / opt-inFiles Changed
vllm_omni/worker/AR_gpu_model_runner.pyvllm_omni/worker/AR_gpu_worker.pyvllm_omni/worker/diffusion_model_runner.pyvllm_omni/worker/diffusion_gpu_worker.py