Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .buildkite/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ steps:
timeout_in_minutes: 20
depends_on: image-build
commands:
- pytest -s -v tests/diffusion/test_gpu_worker.py
- pytest -s -v tests/diffusion/test_gpu_diffusion_worker.py
agents:
queue: "gpu_4_queue" # g6.12xlarge instance on AWS, has 4 L4 GPU
plugins:
Expand Down
2 changes: 1 addition & 1 deletion .buildkite/test-amd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ steps:
commands:
- export MIOPEN_DEBUG_CONV_DIRECT=0
- export MIOPEN_DEBUG_CONV_GEMM=0
- pytest -s -v tests/diffusion/test_gpu_worker.py
- pytest -s -v tests/diffusion/test_gpu_diffusion_worker.py

- label: "Omni Model Test Qwen2-5-Omni"
timeout_in_minutes: 15
Expand Down
5 changes: 3 additions & 2 deletions docs/api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,9 @@ Configuration classes.

Worker classes and model runners for distributed inference.

- [vllm_omni.diffusion.worker.gpu_worker.GPUWorker][]
- [vllm_omni.diffusion.worker.gpu_worker.WorkerProc][]
- [vllm_omni.diffusion.worker.gpu_diffusion_model_runner.GPUDiffusionModelRunner][]
- [vllm_omni.diffusion.worker.gpu_diffusion_worker.GPUDiffusionWorker][]
- [vllm_omni.diffusion.worker.gpu_diffusion_worker.WorkerProc][]
- [vllm_omni.diffusion.worker.npu.npu_worker.NPUWorker][]
- [vllm_omni.diffusion.worker.npu.npu_worker.NPUWorkerProc][]
- [vllm_omni.worker.gpu_ar_model_runner.ExecuteModelState][]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project

"""
Unit tests for GPUWorker class.
Unit tests for GPUDiffusionWorker class.

This module tests the GPUWorker implementation:
This module tests the GPUDiffusionWorker implementation:
- load_weights: Loading model weights
- sleep: Putting worker into sleep mode (levels 1 and 2)
- wake_up: Waking worker from sleep mode
Expand All @@ -15,7 +15,7 @@
import pytest
import torch

from vllm_omni.diffusion.worker.gpu_worker import GPUWorker
from vllm_omni.diffusion.worker.gpu_diffusion_worker import GPUDiffusionWorker


@pytest.fixture
Expand All @@ -33,51 +33,52 @@ def mock_od_config():

@pytest.fixture
def mock_gpu_worker(mock_od_config):
"""Create a GPUWorker with mocked initialization."""
with patch.object(GPUWorker, "init_device_and_model"):
worker = GPUWorker(local_rank=0, rank=0, od_config=mock_od_config)
# Mock the pipeline
worker.pipeline = Mock()
worker.cache_backend = None
"""Create a GPUDiffusionWorker with mocked initialization."""
with patch.object(GPUDiffusionWorker, "init_device"):
worker = GPUDiffusionWorker(local_rank=0, rank=0, od_config=mock_od_config)
# Mock the model_runner with pipeline
worker.model_runner = Mock()
worker.model_runner.pipeline = Mock()
worker._sleep_saved_buffers = {}
return worker


class TestGPUWorkerLoadWeights:
"""Test GPUWorker.load_weights method."""
class TestGPUDiffusionWorkerLoadWeights:
"""Test GPUDiffusionWorker.load_weights method."""

def test_load_weights_calls_pipeline(self, mock_gpu_worker):
"""Test that load_weights delegates to pipeline.load_weights."""
"""Test that load_weights delegates to model_runner.load_weights."""
# Setup mock weights
mock_weights = [
("layer1.weight", torch.randn(10, 10)),
("layer2.weight", torch.randn(20, 20)),
]
expected_loaded = {"layer1.weight", "layer2.weight"}

# Configure pipeline mock
mock_gpu_worker.pipeline.load_weights = Mock(return_value=expected_loaded)
# Configure model_runner mock
mock_gpu_worker.model_runner.load_weights = Mock(return_value=expected_loaded)

# Call load_weights
result = mock_gpu_worker.load_weights(mock_weights)

# Verify pipeline.load_weights was called with the weights
mock_gpu_worker.pipeline.load_weights.assert_called_once_with(mock_weights)
# Verify model_runner.load_weights was called with the weights
mock_gpu_worker.model_runner.load_weights.assert_called_once_with(mock_weights)
assert result == expected_loaded

def test_load_weights_empty_iterable(self, mock_gpu_worker):
"""Test load_weights with empty weights iterable."""
mock_gpu_worker.pipeline.load_weights = Mock(return_value=set())
mock_gpu_worker.model_runner.load_weights = Mock(return_value=set())

result = mock_gpu_worker.load_weights([])

mock_gpu_worker.pipeline.load_weights.assert_called_once_with([])
mock_gpu_worker.model_runner.load_weights.assert_called_once_with([])
assert result == set()


class TestGPUWorkerSleep:
"""Test GPUWorker.sleep method."""
class TestGPUDiffusionWorkerSleep:
"""Test GPUDiffusionWorker.sleep method."""

@patch("vllm_omni.diffusion.worker.gpu_worker.torch.cuda.mem_get_info")
@patch("vllm_omni.diffusion.worker.gpu_diffusion_worker.torch.cuda.mem_get_info")
@patch("vllm.device_allocator.cumem.CuMemAllocator")
def test_sleep_level_1(self, mock_allocator_class, mock_mem_info, mock_gpu_worker):
"""Test sleep mode level 1 (offload weights only)."""
Expand All @@ -103,7 +104,7 @@ def test_sleep_level_1(self, mock_allocator_class, mock_mem_info, mock_gpu_worke
# Verify buffers were NOT saved (level 1 doesn't save buffers)
assert len(mock_gpu_worker._sleep_saved_buffers) == 0

@patch("vllm_omni.diffusion.worker.gpu_worker.torch.cuda.mem_get_info")
@patch("vllm_omni.diffusion.worker.gpu_diffusion_worker.torch.cuda.mem_get_info")
@patch("vllm.device_allocator.cumem.CuMemAllocator")
def test_sleep_level_2(self, mock_allocator_class, mock_mem_info, mock_gpu_worker):
"""Test sleep mode level 2 (offload all, save buffers)."""
Expand All @@ -121,7 +122,7 @@ def test_sleep_level_2(self, mock_allocator_class, mock_mem_info, mock_gpu_worke
# Mock pipeline buffers
mock_buffer1 = torch.randn(10, 10)
mock_buffer2 = torch.randn(20, 20)
mock_gpu_worker.pipeline.named_buffers = Mock(
mock_gpu_worker.model_runner.pipeline.named_buffers = Mock(
return_value=[
("buffer1", mock_buffer1),
("buffer2", mock_buffer2),
Expand All @@ -140,7 +141,7 @@ def test_sleep_level_2(self, mock_allocator_class, mock_mem_info, mock_gpu_worke
assert "buffer1" in mock_gpu_worker._sleep_saved_buffers
assert "buffer2" in mock_gpu_worker._sleep_saved_buffers

@patch("vllm_omni.diffusion.worker.gpu_worker.torch.cuda.mem_get_info")
@patch("vllm_omni.diffusion.worker.gpu_diffusion_worker.torch.cuda.mem_get_info")
@patch("vllm.device_allocator.cumem.CuMemAllocator")
def test_sleep_memory_freed_validation(self, mock_allocator_class, mock_mem_info, mock_gpu_worker):
"""Test that sleep validates memory was actually freed."""
Expand All @@ -159,8 +160,8 @@ def test_sleep_memory_freed_validation(self, mock_allocator_class, mock_mem_info
mock_gpu_worker.sleep(level=1)


class TestGPUWorkerWakeUp:
"""Test GPUWorker.wake_up method."""
class TestGPUDiffusionWorkerWakeUp:
"""Test GPUDiffusionWorker.wake_up method."""

@patch("vllm.device_allocator.cumem.CuMemAllocator")
def test_wake_up_without_buffers(self, mock_allocator_class, mock_gpu_worker):
Expand Down Expand Up @@ -202,7 +203,7 @@ def test_wake_up_with_buffers(self, mock_allocator_class, mock_gpu_worker):
mock_buffer2 = Mock()
mock_buffer2.data = Mock()

mock_gpu_worker.pipeline.named_buffers = Mock(
mock_gpu_worker.model_runner.pipeline.named_buffers = Mock(
return_value=[
("buffer1", mock_buffer1),
("buffer2", mock_buffer2),
Expand Down Expand Up @@ -243,7 +244,7 @@ def test_wake_up_partial_buffer_restore(self, mock_allocator_class, mock_gpu_wor
mock_buffer2 = Mock()
mock_buffer2.data = Mock()

mock_gpu_worker.pipeline.named_buffers = Mock(
mock_gpu_worker.model_runner.pipeline.named_buffers = Mock(
return_value=[
("buffer1", mock_buffer1),
("buffer2", mock_buffer2),
Expand Down
12 changes: 10 additions & 2 deletions vllm_omni/diffusion/worker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
"""Worker classes for diffusion models."""

from vllm_omni.diffusion.worker.gpu_worker import GPUWorker, WorkerProc
from vllm_omni.diffusion.worker.gpu_diffusion_model_runner import GPUDiffusionModelRunner
from vllm_omni.diffusion.worker.gpu_diffusion_worker import (
GPUDiffusionWorker,
WorkerProc,
)

__all__ = ["GPUWorker", "WorkerProc"]
__all__ = [
"GPUDiffusionModelRunner",
"GPUDiffusionWorker",
"WorkerProc",
]
165 changes: 165 additions & 0 deletions vllm_omni/diffusion/worker/gpu_diffusion_model_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
"""
Diffusion Model Runner for vLLM-Omni.

Handles model loading, compilation, caching, and execution of diffusion model
forward passes. This follows the AR pattern where the Runner handles all
model-related operations.
"""

from __future__ import annotations

import time
from collections.abc import Iterable
from contextlib import nullcontext

import torch
from vllm.config import LoadConfig
from vllm.logger import init_logger
from vllm.utils.mem_utils import DeviceMemoryProfiler, GiB_bytes

from vllm_omni.diffusion.cache.selector import get_cache_backend
from vllm_omni.diffusion.compile import regionally_compile
from vllm_omni.diffusion.data import DiffusionOutput, OmniDiffusionConfig
from vllm_omni.diffusion.forward_context import set_forward_context
from vllm_omni.diffusion.model_loader.diffusers_loader import DiffusersPipelineLoader
from vllm_omni.diffusion.offload import apply_offload_hooks
from vllm_omni.diffusion.request import OmniDiffusionRequest

logger = init_logger(__name__)


class GPUDiffusionModelRunner:
"""
Model runner that handles model loading and execution for diffusion models.

This class follows the AR pattern where the Runner handles all model-related
operations including loading, compilation, offloading, caching, and execution.
The Worker only handles infrastructure (device, distributed env).
"""

def __init__(
self,
vllm_config,
od_config: OmniDiffusionConfig,
device: torch.device,
):
"""
Initialize the diffusion model runner.

Args:
vllm_config: vLLM configuration.
od_config: OmniDiffusion configuration.
device: The device to run on.
"""
self.vllm_config = vllm_config
self.od_config = od_config
self.device = device
self.pipeline = None
self.cache_backend = None

def load_model(
self,
memory_pool_context_fn: callable | None = None,
) -> None:
"""
Load the diffusion model, apply compilation and offloading.

Args:
memory_pool_context_fn: Optional function that returns a context manager
for memory pool allocation (used for sleep mode).
"""
load_device = "cpu" if self.od_config.enable_cpu_offload else str(self.device)

def get_memory_context():
if memory_pool_context_fn is not None:
return memory_pool_context_fn(tag="weights")
return nullcontext()

# Load model within forward context
with set_forward_context(vllm_config=self.vllm_config, omni_diffusion_config=self.od_config):
load_config = LoadConfig()
model_loader = DiffusersPipelineLoader(load_config)
time_before_load = time.perf_counter()

with get_memory_context():
with DeviceMemoryProfiler() as m:
self.pipeline = model_loader.load_model(
od_config=self.od_config,
load_device=load_device,
)
time_after_load = time.perf_counter()

logger.info(
"Model loading took %.4f GiB and %.6f seconds",
m.consumed_memory / GiB_bytes,
time_after_load - time_before_load,
)
logger.info("Model runner: Model loaded successfully.")

# Apply CPU offloading (DiT <-> encoders mutual exclusion)
if self.od_config.enable_cpu_offload:
for name in ["vae"]:
module = getattr(self.pipeline, name, None)
if module is None:
continue
try:
module.to(self.device, non_blocking=True)
except Exception as exc:
logger.debug("Failed to move %s to GPU: %s", name, exc)

apply_offload_hooks(self.pipeline, self.od_config, device=self.device)

# Apply torch.compile if not in eager mode
if not self.od_config.enforce_eager:
try:
self.pipeline.transformer = regionally_compile(
self.pipeline.transformer,
dynamic=True,
)
logger.info("Model runner: Model compiled with torch.compile.")
except Exception as e:
logger.warning(f"Model runner: torch.compile failed with error: {e}. Using eager mode.")

# Setup cache backend
self.cache_backend = get_cache_backend(self.od_config.cache_backend, self.od_config.cache_config)

if self.cache_backend is not None:
self.cache_backend.enable(self.pipeline)

logger.info("Model runner: Initialization complete.")

def load_weights(self, weights: Iterable[tuple[str, torch.Tensor]]) -> set[str]:
"""Load weights into the pipeline."""
return self.pipeline.load_weights(weights)

@torch.inference_mode()
def execute_model(self, reqs: list[OmniDiffusionRequest]) -> DiffusionOutput:
"""
Execute a forward pass for the given requests.

Args:
reqs: List of diffusion requests to process.

Returns:
DiffusionOutput with generated results.
"""
assert self.pipeline is not None, "Model not loaded. Call load_model() first."
if not reqs or len(reqs) == 0:
raise ValueError("Cannot execute model with empty request list")

# TODO: dealing with first req for now
req = reqs[0]

if req.generator is None and req.seed is not None:
req.generator = torch.Generator(device=self.device).manual_seed(req.seed)

# Refresh cache context if needed
if self.cache_backend is not None and self.cache_backend.is_enabled():
self.cache_backend.refresh(self.pipeline, req.num_inference_steps)

with set_forward_context(vllm_config=self.vllm_config, omni_diffusion_config=self.od_config):
output = self.pipeline.forward(req)

return output
Loading