Add AR and Diffusion GPU Model Runners to vllm-omni (vLLM v1-Compatible)#4
Add AR and Diffusion GPU Model Runners to vllm-omni (vLLM v1-Compatible)#4tzhouam wants to merge 2 commits intovllm-project:dev-ztcfrom
Conversation
Summary of ChangesHello @tzhouam, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly expands vllm-omni's capabilities by introducing dedicated GPU model runners and workers for both autoregressive (AR) models that output hidden states and non-autoregressive diffusion models. The core objective is to enable multimodal and non-autoregressive tasks within the vLLM v1 framework by concentrating customization at the Model Runner layer, thereby minimizing changes to the existing control-plane logic for scheduling, batching, and distributed infrastructure. This ensures vLLM's robust performance and scalability can be leveraged for a broader range of AI workloads. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Pull Request Overview
This PR introduces Autoregressive (AR) and Diffusion GPU model runners to vllm-omni that align with vLLM v1's Worker/Runner architecture. The implementation enables multimodal and non-autoregressive tasks while reusing vLLM's existing scheduling, batching, and distributed infrastructure.
- Adds ARGPUModelRunner for autoregressive models that returns hidden states via pooler_output while maintaining token sampling
- Adds DiffusionGPUModelRunner for non-autoregressive diffusion models that skip sampling and return diffusion tensors
- Introduces corresponding GPU workers that instantiate the appropriate model runners
Reviewed Changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| vllm_omni/worker/AR_gpu_model_runner.py | AR model runner extending GPUModelRunner with hidden state output support |
| vllm_omni/worker/AR_gpu_worker.py | GPU worker wrapper that instantiates ARGPUModelRunner |
| vllm_omni/worker/diffusion_model_runner.py | Diffusion model runner for non-autoregressive models |
| vllm_omni/worker/diffusion_gpu_worker.py | GPU worker wrapper that instantiates DiffusionGPUModelRunner |
| vllm_omni/worker/init.py | Module initialization exposing the new classes |
| docs/architecture/diffusion_executor_worker_runner.md | Architecture documentation for the diffusion components |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
| # For Qwen 2.5 Omni's current implementation, we only support the forward method | ||
| if hasattr(self.model, "forward"): | ||
| return self.model.forward(**kwargs) | ||
|
|
||
| # if hasattr(self.model, "sample"): | ||
| # return self.model.sample(**kwargs) | ||
| # if hasattr(self.model, "forward"): | ||
| # return self.model.forward(**kwargs) | ||
| # if hasattr(self.model, "diffuse"): | ||
| # return self.model.diffuse(**kwargs) | ||
|
|
||
| raise RuntimeError( | ||
| "The loaded model does not expose diffusion interfaces 'sample', " | ||
| "'forward', or 'diffuse'. Please implement one of them or adapt the runner.") |
There was a problem hiding this comment.
The commented code contains duplicated logic and inconsistent ordering compared to the docstring. The docstring mentions trying methods in order: sample, forward, diffuse - but the commented code checks forward twice. Remove the commented code or fix the ordering to match the documented behavior.
| class ARGPUWorker(GPUWorker): | ||
| def init_device(self): |
There was a problem hiding this comment.
The init_device method is duplicated between ARGPUWorker and DiffusionGPUWorker with identical implementations except for the model runner type. Consider extracting a base class or using a factory pattern to reduce code duplication.
| "The loaded model does not expose diffusion interfaces 'sample', " | ||
| "'forward', or 'diffuse'. Please implement one of them or adapt the runner.") |
There was a problem hiding this comment.
The error message mentions three interfaces ('sample', 'forward', 'diffuse') but the current code only checks for 'forward'. The message is misleading since it suggests all three methods are checked when only one is actually verified.
| "The loaded model does not expose diffusion interfaces 'sample', " | |
| "'forward', or 'diffuse'. Please implement one of them or adapt the runner.") | |
| "The loaded model does not expose the required diffusion interface 'forward'. " | |
| "Please implement it or adapt the runner.") |
There was a problem hiding this comment.
Code Review
This pull request introduces ARGPUModelRunner and DiffusionGPUModelRunner to support autoregressive models with hidden state outputs and non-autoregressive diffusion models within the vllm-omni framework. The changes are well-aligned with the vLLM v1 architecture. My review identifies a critical issue in the pipeline parallelism logic, suggests refactoring to reduce code duplication in the new worker classes, and provides several minor fixes in the documentation and model runner implementations to improve robustness and maintainability.
| if not get_pp_group().is_last_rank: | ||
| assert isinstance(text_hidden_states, IntermediateTensors) | ||
| text_hidden_states.kv_connector_output = kv_connector_output | ||
| return text_hidden_states |
There was a problem hiding this comment.
This if not get_pp_group().is_last_rank: block causes an early return for all non-last pipeline parallel ranks. This makes the subsequent logic for handling broadcast_pp_output (lines 158-168) for non-last ranks unreachable. This appears to be a copy-paste error, and this block should be removed to allow the correct pipeline parallelism logic to execute.
| #### 1) Inherited and overridden | ||
| Those parts relied to the KV Cache will be omitted if we do not register the model to the vllm config. The engine core will view it as do not require KV Cache, and handle it properly | ||
|
|
||
| Reuse `vllm/v1/outputs.py::ModelRunnerOutput`: |
| model_name = "Qwen/Qwen-Image" | ||
|
|
||
| self.pipe = DiffusionPipeline.from_pretrained(model_name, torch_dtype=dtype) | ||
| self.pipe = pipe.to(device) |
|
|
||
| # Generate image | ||
| prompt_embeds = self._get_and_process_prompt_embeds(scheduler_output, positive_magic) | ||
| negtive_prompt_embeds = self.pipe.embed_prompt(" ") |
| image = pipe( | ||
| prompt_embeds=prompt_embeds, | ||
| negtive_prompt_embeds=negtive_prompt_embeds, |
There was a problem hiding this comment.
There are a couple of issues in this code block:
pipeis used instead ofself.pipe.negtive_prompt_embedshas a typo and should benegative_prompt_embeds.
| image = pipe( | |
| prompt_embeds=prompt_embeds, | |
| negtive_prompt_embeds=negtive_prompt_embeds, | |
| image = self.pipe( | |
| prompt_embeds=prompt_embeds, | |
| negative_prompt_embeds=negtive_prompt_embeds, |
| class ARGPUWorker(GPUWorker): | ||
| def init_device(self): | ||
| if self.device_config.device.type == "cuda": | ||
| # torch.distributed.all_reduce does not free the input tensor until | ||
| # the synchronization point. This causes the memory usage to grow | ||
| # as the number of all_reduce calls increases. This env var disables | ||
| # this behavior. | ||
| # Related issue: | ||
| # https://discuss.pytorch.org/t/cuda-allocation-lifetime-for-inputs-to-distributed-all-reduce/191573 | ||
| os.environ["TORCH_NCCL_AVOID_RECORD_STREAMS"] = "1" | ||
|
|
||
| # This env var set by Ray causes exceptions with graph building. | ||
| os.environ.pop("NCCL_ASYNC_ERROR_HANDLING", None) | ||
| self.device = torch.device(f"cuda:{self.local_rank}") | ||
| current_platform.set_device(self.device) | ||
|
|
||
| _check_if_gpu_supports_dtype(self.model_config.dtype) | ||
| gc.collect() | ||
| torch.cuda.empty_cache() | ||
|
|
||
| # take current memory snapshot | ||
| self.init_snapshot = MemorySnapshot() | ||
| self.requested_memory = (self.init_snapshot.total_memory * | ||
| self.cache_config.gpu_memory_utilization) | ||
| if self.init_snapshot.free_memory < self.requested_memory: | ||
| GiB = lambda b: round(b / GiB_bytes, 2) | ||
| raise ValueError( | ||
| f"Free memory on device " | ||
| f"({GiB(self.init_snapshot.free_memory)}/" | ||
| f"{GiB(self.init_snapshot.total_memory)} GiB) on startup " | ||
| f"is less than desired GPU memory utilization " | ||
| f"({self.cache_config.gpu_memory_utilization}, " | ||
| f"{GiB(self.requested_memory)} GiB). Decrease GPU memory " | ||
| f"utilization or reduce GPU memory used by other processes." | ||
| ) | ||
| else: | ||
| raise RuntimeError( | ||
| f"Not support device type: {self.device_config.device}") | ||
| # Initialize the distributed environment. | ||
| init_worker_distributed_environment(self.vllm_config, self.rank, | ||
| self.distributed_init_method, | ||
| self.local_rank, | ||
| current_platform.dist_backend) | ||
| # Set random seed. | ||
| set_random_seed(self.model_config.seed) | ||
|
|
||
| # Construct the model runner | ||
| self.model_runner: ARGPUModelRunner = ARGPUModelRunner( | ||
| self.vllm_config, self.device) | ||
|
|
||
| if self.rank == 0: | ||
| # If usage stat is enabled, collect relevant info. | ||
| report_usage_stats(self.vllm_config) No newline at end of file |
There was a problem hiding this comment.
The init_device method is almost entirely duplicated in ARGPUWorker and DiffusionGPUWorker. The only difference is the type of model runner being instantiated. To avoid this code duplication and improve maintainability, consider creating a common base worker class that contains the shared init_device logic and uses a class attribute to specify the model runner class.
For example, you could create a base class OmniGPUWorker:
# In a new file, e.g., vllm_omni/worker/base_worker.py
from vllm.v1.worker.gpu_worker import Worker as GPUWorker
# ... other imports
class OmniGPUWorker(GPUWorker):
_model_runner_cls = None
def init_device(self):
# All the common code from init_device goes here
# ...
if self._model_runner_cls is None:
raise NotImplementedError("Subclasses of OmniGPUWorker must set _model_runner_cls")
# Construct the model runner
self.model_runner = self._model_runner_cls(
self.vllm_config, self.device)
if self.rank == 0:
report_usage_stats(self.vllm_config)Then ARGPUWorker and DiffusionGPUWorker would inherit from it and just set _model_runner_cls, removing the need to override init_device.
|
|
||
| if self.rank == 0: | ||
| # If usage stat is enabled, collect relevant info. | ||
| report_usage_stats(self.vllm_config) No newline at end of file |
| class DiffusionGPUWorker(GPUWorker): | ||
| def init_device(self): | ||
| if self.device_config.device.type == "cuda": | ||
| # torch.distributed.all_reduce does not free the input tensor until | ||
| # the synchronization point. This causes the memory usage to grow | ||
| # as the number of all_reduce calls increases. This env var disables | ||
| # this behavior. | ||
| # Related issue: | ||
| # https://discuss.pytorch.org/t/cuda-allocation-lifetime-for-inputs-to-distributed-all-reduce/191573 | ||
| os.environ["TORCH_NCCL_AVOID_RECORD_STREAMS"] = "1" | ||
|
|
||
| # This env var set by Ray causes exceptions with graph building. | ||
| os.environ.pop("NCCL_ASYNC_ERROR_HANDLING", None) | ||
| self.device = torch.device(f"cuda:{self.local_rank}") | ||
| current_platform.set_device(self.device) | ||
|
|
||
| _check_if_gpu_supports_dtype(self.model_config.dtype) | ||
| gc.collect() | ||
| torch.cuda.empty_cache() | ||
|
|
||
| # take current memory snapshot | ||
| self.init_snapshot = MemorySnapshot() | ||
| self.requested_memory = (self.init_snapshot.total_memory * | ||
| self.cache_config.gpu_memory_utilization) | ||
| if self.init_snapshot.free_memory < self.requested_memory: | ||
| GiB = lambda b: round(b / GiB_bytes, 2) | ||
| raise ValueError( | ||
| f"Free memory on device " | ||
| f"({GiB(self.init_snapshot.free_memory)}/" | ||
| f"{GiB(self.init_snapshot.total_memory)} GiB) on startup " | ||
| f"is less than desired GPU memory utilization " | ||
| f"({self.cache_config.gpu_memory_utilization}, " | ||
| f"{GiB(self.requested_memory)} GiB). Decrease GPU memory " | ||
| f"utilization or reduce GPU memory used by other processes." | ||
| ) | ||
| else: | ||
| raise RuntimeError( | ||
| f"Not support device type: {self.device_config.device}") | ||
| # Initialize the distributed environment. | ||
| init_worker_distributed_environment(self.vllm_config, self.rank, | ||
| self.distributed_init_method, | ||
| self.local_rank, | ||
| current_platform.dist_backend) | ||
| # Set random seed. | ||
| set_random_seed(self.model_config.seed) | ||
|
|
||
| # Construct the model runner | ||
| self.model_runner: DiffusionGPUModelRunner = DiffusionGPUModelRunner( | ||
| self.vllm_config, self.device) | ||
|
|
||
| if self.rank == 0: | ||
| # If usage stat is enabled, collect relevant info. | ||
| report_usage_stats(self.vllm_config) No newline at end of file |
There was a problem hiding this comment.
This init_device method is almost entirely duplicated between DiffusionGPUWorker and ARGPUWorker. To improve maintainability and reduce code duplication, consider introducing a shared base class that handles the common initialization logic. Please see my comment on vllm_omni/worker/AR_gpu_worker.py for a detailed suggestion on how to refactor this.
|
|
||
| if self.rank == 0: | ||
| # If usage stat is enabled, collect relevant info. | ||
| report_usage_stats(self.vllm_config) No newline at end of file |
| if hasattr(self.model, "forward"): | ||
| return self.model.forward(**kwargs) | ||
|
|
||
| # if hasattr(self.model, "sample"): | ||
| # return self.model.sample(**kwargs) | ||
| # if hasattr(self.model, "forward"): | ||
| # return self.model.forward(**kwargs) | ||
| # if hasattr(self.model, "diffuse"): | ||
| # return self.model.diffuse(**kwargs) | ||
|
|
||
| raise RuntimeError( | ||
| "The loaded model does not expose diffusion interfaces 'sample', " | ||
| "'forward', or 'diffuse'. Please implement one of them or adapt the runner.") |
There was a problem hiding this comment.
The implementation of _run_diffusion currently only supports models with a forward method. However, the docstring and the commented-out code suggest a more flexible approach with fallbacks to sample and diffuse methods. To make this runner more generally applicable to different diffusion models and align with the design goal of being future-compatible, it would be beneficial to implement the fallback logic.
| if hasattr(self.model, "forward"): | |
| return self.model.forward(**kwargs) | |
| # if hasattr(self.model, "sample"): | |
| # return self.model.sample(**kwargs) | |
| # if hasattr(self.model, "forward"): | |
| # return self.model.forward(**kwargs) | |
| # if hasattr(self.model, "diffuse"): | |
| # return self.model.diffuse(**kwargs) | |
| raise RuntimeError( | |
| "The loaded model does not expose diffusion interfaces 'sample', " | |
| "'forward', or 'diffuse'. Please implement one of them or adapt the runner.") | |
| if hasattr(self.model, "sample"): | |
| return self.model.sample(**kwargs) | |
| if hasattr(self.model, "forward"): | |
| return self.model.forward(**kwargs) | |
| if hasattr(self.model, "diffuse"): | |
| return self.model.diffuse(**kwargs) | |
| raise RuntimeError( | |
| "The loaded model does not expose diffusion interfaces 'sample', " | |
| "'forward', or 'diffuse'. Please implement one of them or adapt the runner.") |
|
other PRs finished |
Wzliu bagel dev
Signed-off-by: wuhang <wuhang6@huawei.com>
Signed-off-by: wuhang <wuhang6@huawei.com>
Signed-off-by: wuhang <wuhang6@huawei.com>
Signed-off-by: wuhang <wuhang6@huawei.com>
Signed-off-by: wuhang <wuhang6@huawei.com> Signed-off-by: wuhang <whlbx@hotmail.com>
Review: 5 issues identified, all fixed: vllm-project#1 (hard) KV cache ghost allocation: Add init-time assertions that SKIP_SCAFFOLD requires enable_prefix_caching=false and max_num_seqs=1. These implicit preconditions are now explicit ValueError checks. vllm-project#2 (hard) _scaffold_dummy device mismatch: Recreate dummy tensor when input device changes (multi-GPU / PP scenario). vllm-project#3 (hard) is_active_decode false positive: Add _prefill_completed flag set at the end of _forward_prefill(). Scaffold skip now requires both _prefill_completed=True AND _prev_feat_embed exists, preventing stale state from a previous request from triggering skip. vllm-project#4 (soft) _FREE_SCAFFOLD + _SKIP_SCAFFOLD interaction: Skip the zero-out operation when SKIP_SCAFFOLD is set (no point zeroing weights that will never be read). vllm-project#5 (soft) Perf timer on skip path: Removed timer from the skip branch so scaffold_forward metric only reflects real scaffold runs.
P0 fixes: vllm-project#1: _free_scaffold_weights now shrinks storage to zero (actually releases VRAM). Only runs when SKIP_SCAFFOLD is also set. Called lazily after first prefill, not at load time. vllm-project#2: Sliding VAE default OFF (splice algorithm had alignment bug). _sliding_vae_decode now falls back to full decode until proper overlap-add is implemented. vllm-project#3: Complete per-request state reset in preprocess: now clears _curr_prefix_feat_cond, _last_audio_patch_gpu, _prev_audio, _prev_audio_len, _decode_step_count, _precomputed_stop_logits. vllm-project#4: compute_logits fallback forces stop (not continue) when _prefill_completed=True, preventing runaway generation. vllm-project#5: Scaffold VRAM: load_weights no longer frees immediately; _free_scaffold_weights called after first prefill completes, so scaffold is available for prefill then released. P1 fixes: vllm-project#6: Log all active config flags at load time. vllm-project#7: Remove dead _STOP_CHECK_INTERVAL code. vllm-project#8: Remove broken audio_duration formula from postprocess. vllm-project#9/vllm-project#14: Move `from einops import rearrange` to module top level. vllm-project#11: Remove torch.no_grad() context from _forward_decode_graphable (incompatible with CUDA Graph capture).
Review: 5 issues identified, all fixed: vllm-project#1 (hard) KV cache ghost allocation: Add init-time assertions that SKIP_SCAFFOLD requires enable_prefix_caching=false and max_num_seqs=1. These implicit preconditions are now explicit ValueError checks. vllm-project#2 (hard) _scaffold_dummy device mismatch: Recreate dummy tensor when input device changes (multi-GPU / PP scenario). vllm-project#3 (hard) is_active_decode false positive: Add _prefill_completed flag set at the end of _forward_prefill(). Scaffold skip now requires both _prefill_completed=True AND _prev_feat_embed exists, preventing stale state from a previous request from triggering skip. vllm-project#4 (soft) _FREE_SCAFFOLD + _SKIP_SCAFFOLD interaction: Skip the zero-out operation when SKIP_SCAFFOLD is set (no point zeroing weights that will never be read). vllm-project#5 (soft) Perf timer on skip path: Removed timer from the skip branch so scaffold_forward metric only reflects real scaffold runs.
P0 fixes: vllm-project#1: _free_scaffold_weights now shrinks storage to zero (actually releases VRAM). Only runs when SKIP_SCAFFOLD is also set. Called lazily after first prefill, not at load time. vllm-project#2: Sliding VAE default OFF (splice algorithm had alignment bug). _sliding_vae_decode now falls back to full decode until proper overlap-add is implemented. vllm-project#3: Complete per-request state reset in preprocess: now clears _curr_prefix_feat_cond, _last_audio_patch_gpu, _prev_audio, _prev_audio_len, _decode_step_count, _precomputed_stop_logits. vllm-project#4: compute_logits fallback forces stop (not continue) when _prefill_completed=True, preventing runaway generation. vllm-project#5: Scaffold VRAM: load_weights no longer frees immediately; _free_scaffold_weights called after first prefill completes, so scaffold is available for prefill then released. P1 fixes: vllm-project#6: Log all active config flags at load time. vllm-project#7: Remove dead _STOP_CHECK_INTERVAL code. vllm-project#8: Remove broken audio_duration formula from postprocess. vllm-project#9/vllm-project#14: Move `from einops import rearrange` to module top level. vllm-project#11: Remove torch.no_grad() context from _forward_decode_graphable (incompatible with CUDA Graph capture).
Refactor scheduler to centralize shared flow and add batching support
Summary
This PR adds two model runners and their corresponding workers to
vllm-omni, aligning with vLLM v1’s Worker/Runner abstractions:ModelRunnerOutput.pooler_outputwhile still producing sampled tokens/logprobs.ModelRunnerOutput.pooler_outputwithout logits/sampling.Both are drop-in replacements within the existing EngineCore / Executor / Worker loop.
Motivation
vllm-omniaims to also support multimodal and non-autoregressive tasks under vLLM v1.To minimize changes to control-plane logic (
EngineCore/Executor/Worker),vllm-omniconcentrates customization in the Model Runner layer while reusing vLLM’s scheduling, batching, and distributed infrastructure.Key Changes
1. ARGPUModelRunner (
vllm_omni/worker/AR_gpu_model_runner.py)Extends:
vllm.v1.worker.gpu_model_runner.GPUModelRunnerPreserves:
Adds:
ModelRunnerOutput.pooler_output2. DiffusionGPUModelRunner (
vllm_omni/worker/diffusion_model_runner.py)ModelRunnerOutput.pooler_outputIntermediateTensors_run_diffusioncurrently prefersmodel.forward(...)(Qwen 2.5 Omni path); future-compatible withsample/diffuseif exposed3. New Workers
vllm_omni/worker/AR_gpu_worker.py→self.model_runner = ARGPUModelRunner(...)vllm_omni/worker/diffusion_gpu_worker.py→self.model_runner = DiffusionGPUModelRunner(...)4. Documentation
docs/architecture/vllm_omni_design.mdto reflect v1 path and components for AR and Diffusion.Design & Compatibility
vLLM v1 Alignment
EngineCore → Executor → WorkerRPC flowModelRunnerOutput; emphasizespooler_outputfor non-text tensorsDiffusion Integration (Minimal Changes)
SchedulerOutputand batchingpooler_output(no Engine/Executor modifications required)Multimodal + PP / TP / DP
Robustness
model.forward(...)nowUsage
Autoregressive (AR)
pooler_outputDiffusion
DiffusionGPUWorkerpooler_outputBackward Compatibility
pooler_outputis optional / opt-inFiles Changed
vllm_omni/worker/AR_gpu_model_runner.pyvllm_omni/worker/AR_gpu_worker.pyvllm_omni/worker/diffusion_model_runner.pyvllm_omni/worker/diffusion_gpu_worker.pyvllm_omni/worker/__init__.py