diff --git a/.buildkite/pipeline.yml b/.buildkite/pipeline.yml index f18aef61771..429321d2fdb 100644 --- a/.buildkite/pipeline.yml +++ b/.buildkite/pipeline.yml @@ -116,7 +116,7 @@ steps: timeout_in_minutes: 20 depends_on: image-build commands: - - pytest -s -v tests/diffusion/test_gpu_worker.py + - pytest -s -v tests/diffusion/test_gpu_diffusion_worker.py agents: queue: "gpu_4_queue" # g6.12xlarge instance on AWS, has 4 L4 GPU plugins: diff --git a/.buildkite/test-amd.yaml b/.buildkite/test-amd.yaml index bcece8c495d..86d65f15bcf 100644 --- a/.buildkite/test-amd.yaml +++ b/.buildkite/test-amd.yaml @@ -54,7 +54,7 @@ steps: commands: - export MIOPEN_DEBUG_CONV_DIRECT=0 - export MIOPEN_DEBUG_CONV_GEMM=0 - - pytest -s -v tests/diffusion/test_gpu_worker.py + - pytest -s -v tests/diffusion/test_gpu_diffusion_worker.py - label: "Omni Model Test Qwen2-5-Omni" timeout_in_minutes: 15 diff --git a/docs/api/README.md b/docs/api/README.md index a9d751bce25..a1f07011118 100644 --- a/docs/api/README.md +++ b/docs/api/README.md @@ -103,8 +103,9 @@ Configuration classes. Worker classes and model runners for distributed inference. -- [vllm_omni.diffusion.worker.gpu_worker.GPUWorker][] -- [vllm_omni.diffusion.worker.gpu_worker.WorkerProc][] +- [vllm_omni.diffusion.worker.gpu_diffusion_model_runner.GPUDiffusionModelRunner][] +- [vllm_omni.diffusion.worker.gpu_diffusion_worker.GPUDiffusionWorker][] +- [vllm_omni.diffusion.worker.gpu_diffusion_worker.WorkerProc][] - [vllm_omni.diffusion.worker.npu.npu_worker.NPUWorker][] - [vllm_omni.diffusion.worker.npu.npu_worker.NPUWorkerProc][] - [vllm_omni.worker.gpu_ar_model_runner.ExecuteModelState][] diff --git a/tests/diffusion/test_gpu_worker.py b/tests/diffusion/test_gpu_diffusion_worker.py similarity index 81% rename from tests/diffusion/test_gpu_worker.py rename to tests/diffusion/test_gpu_diffusion_worker.py index defeffe5b56..7a43710c878 100644 --- a/tests/diffusion/test_gpu_worker.py +++ b/tests/diffusion/test_gpu_diffusion_worker.py @@ -2,9 +2,9 @@ # SPDX-FileCopyrightText: Copyright contributors to the vLLM project """ -Unit tests for GPUWorker class. +Unit tests for GPUDiffusionWorker class. -This module tests the GPUWorker implementation: +This module tests the GPUDiffusionWorker implementation: - load_weights: Loading model weights - sleep: Putting worker into sleep mode (levels 1 and 2) - wake_up: Waking worker from sleep mode @@ -15,7 +15,7 @@ import pytest import torch -from vllm_omni.diffusion.worker.gpu_worker import GPUWorker +from vllm_omni.diffusion.worker.gpu_diffusion_worker import GPUDiffusionWorker @pytest.fixture @@ -33,20 +33,21 @@ def mock_od_config(): @pytest.fixture def mock_gpu_worker(mock_od_config): - """Create a GPUWorker with mocked initialization.""" - with patch.object(GPUWorker, "init_device_and_model"): - worker = GPUWorker(local_rank=0, rank=0, od_config=mock_od_config) - # Mock the pipeline - worker.pipeline = Mock() - worker.cache_backend = None + """Create a GPUDiffusionWorker with mocked initialization.""" + with patch.object(GPUDiffusionWorker, "init_device"): + worker = GPUDiffusionWorker(local_rank=0, rank=0, od_config=mock_od_config) + # Mock the model_runner with pipeline + worker.model_runner = Mock() + worker.model_runner.pipeline = Mock() + worker._sleep_saved_buffers = {} return worker -class TestGPUWorkerLoadWeights: - """Test GPUWorker.load_weights method.""" +class TestGPUDiffusionWorkerLoadWeights: + """Test GPUDiffusionWorker.load_weights method.""" def test_load_weights_calls_pipeline(self, mock_gpu_worker): - """Test that load_weights delegates to pipeline.load_weights.""" + """Test that load_weights delegates to model_runner.load_weights.""" # Setup mock weights mock_weights = [ ("layer1.weight", torch.randn(10, 10)), @@ -54,30 +55,30 @@ def test_load_weights_calls_pipeline(self, mock_gpu_worker): ] expected_loaded = {"layer1.weight", "layer2.weight"} - # Configure pipeline mock - mock_gpu_worker.pipeline.load_weights = Mock(return_value=expected_loaded) + # Configure model_runner mock + mock_gpu_worker.model_runner.load_weights = Mock(return_value=expected_loaded) # Call load_weights result = mock_gpu_worker.load_weights(mock_weights) - # Verify pipeline.load_weights was called with the weights - mock_gpu_worker.pipeline.load_weights.assert_called_once_with(mock_weights) + # Verify model_runner.load_weights was called with the weights + mock_gpu_worker.model_runner.load_weights.assert_called_once_with(mock_weights) assert result == expected_loaded def test_load_weights_empty_iterable(self, mock_gpu_worker): """Test load_weights with empty weights iterable.""" - mock_gpu_worker.pipeline.load_weights = Mock(return_value=set()) + mock_gpu_worker.model_runner.load_weights = Mock(return_value=set()) result = mock_gpu_worker.load_weights([]) - mock_gpu_worker.pipeline.load_weights.assert_called_once_with([]) + mock_gpu_worker.model_runner.load_weights.assert_called_once_with([]) assert result == set() -class TestGPUWorkerSleep: - """Test GPUWorker.sleep method.""" +class TestGPUDiffusionWorkerSleep: + """Test GPUDiffusionWorker.sleep method.""" - @patch("vllm_omni.diffusion.worker.gpu_worker.torch.cuda.mem_get_info") + @patch("vllm_omni.diffusion.worker.gpu_diffusion_worker.torch.cuda.mem_get_info") @patch("vllm.device_allocator.cumem.CuMemAllocator") def test_sleep_level_1(self, mock_allocator_class, mock_mem_info, mock_gpu_worker): """Test sleep mode level 1 (offload weights only).""" @@ -103,7 +104,7 @@ def test_sleep_level_1(self, mock_allocator_class, mock_mem_info, mock_gpu_worke # Verify buffers were NOT saved (level 1 doesn't save buffers) assert len(mock_gpu_worker._sleep_saved_buffers) == 0 - @patch("vllm_omni.diffusion.worker.gpu_worker.torch.cuda.mem_get_info") + @patch("vllm_omni.diffusion.worker.gpu_diffusion_worker.torch.cuda.mem_get_info") @patch("vllm.device_allocator.cumem.CuMemAllocator") def test_sleep_level_2(self, mock_allocator_class, mock_mem_info, mock_gpu_worker): """Test sleep mode level 2 (offload all, save buffers).""" @@ -121,7 +122,7 @@ def test_sleep_level_2(self, mock_allocator_class, mock_mem_info, mock_gpu_worke # Mock pipeline buffers mock_buffer1 = torch.randn(10, 10) mock_buffer2 = torch.randn(20, 20) - mock_gpu_worker.pipeline.named_buffers = Mock( + mock_gpu_worker.model_runner.pipeline.named_buffers = Mock( return_value=[ ("buffer1", mock_buffer1), ("buffer2", mock_buffer2), @@ -140,7 +141,7 @@ def test_sleep_level_2(self, mock_allocator_class, mock_mem_info, mock_gpu_worke assert "buffer1" in mock_gpu_worker._sleep_saved_buffers assert "buffer2" in mock_gpu_worker._sleep_saved_buffers - @patch("vllm_omni.diffusion.worker.gpu_worker.torch.cuda.mem_get_info") + @patch("vllm_omni.diffusion.worker.gpu_diffusion_worker.torch.cuda.mem_get_info") @patch("vllm.device_allocator.cumem.CuMemAllocator") def test_sleep_memory_freed_validation(self, mock_allocator_class, mock_mem_info, mock_gpu_worker): """Test that sleep validates memory was actually freed.""" @@ -159,8 +160,8 @@ def test_sleep_memory_freed_validation(self, mock_allocator_class, mock_mem_info mock_gpu_worker.sleep(level=1) -class TestGPUWorkerWakeUp: - """Test GPUWorker.wake_up method.""" +class TestGPUDiffusionWorkerWakeUp: + """Test GPUDiffusionWorker.wake_up method.""" @patch("vllm.device_allocator.cumem.CuMemAllocator") def test_wake_up_without_buffers(self, mock_allocator_class, mock_gpu_worker): @@ -202,7 +203,7 @@ def test_wake_up_with_buffers(self, mock_allocator_class, mock_gpu_worker): mock_buffer2 = Mock() mock_buffer2.data = Mock() - mock_gpu_worker.pipeline.named_buffers = Mock( + mock_gpu_worker.model_runner.pipeline.named_buffers = Mock( return_value=[ ("buffer1", mock_buffer1), ("buffer2", mock_buffer2), @@ -243,7 +244,7 @@ def test_wake_up_partial_buffer_restore(self, mock_allocator_class, mock_gpu_wor mock_buffer2 = Mock() mock_buffer2.data = Mock() - mock_gpu_worker.pipeline.named_buffers = Mock( + mock_gpu_worker.model_runner.pipeline.named_buffers = Mock( return_value=[ ("buffer1", mock_buffer1), ("buffer2", mock_buffer2), diff --git a/vllm_omni/diffusion/worker/__init__.py b/vllm_omni/diffusion/worker/__init__.py index dc3306dae3f..dfec4596bc2 100644 --- a/vllm_omni/diffusion/worker/__init__.py +++ b/vllm_omni/diffusion/worker/__init__.py @@ -2,6 +2,14 @@ # SPDX-FileCopyrightText: Copyright contributors to the vLLM project """Worker classes for diffusion models.""" -from vllm_omni.diffusion.worker.gpu_worker import GPUWorker, WorkerProc +from vllm_omni.diffusion.worker.gpu_diffusion_model_runner import GPUDiffusionModelRunner +from vllm_omni.diffusion.worker.gpu_diffusion_worker import ( + GPUDiffusionWorker, + WorkerProc, +) -__all__ = ["GPUWorker", "WorkerProc"] +__all__ = [ + "GPUDiffusionModelRunner", + "GPUDiffusionWorker", + "WorkerProc", +] diff --git a/vllm_omni/diffusion/worker/gpu_diffusion_model_runner.py b/vllm_omni/diffusion/worker/gpu_diffusion_model_runner.py new file mode 100644 index 00000000000..8ffa12ed2cc --- /dev/null +++ b/vllm_omni/diffusion/worker/gpu_diffusion_model_runner.py @@ -0,0 +1,165 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +""" +Diffusion Model Runner for vLLM-Omni. + +Handles model loading, compilation, caching, and execution of diffusion model +forward passes. This follows the AR pattern where the Runner handles all +model-related operations. +""" + +from __future__ import annotations + +import time +from collections.abc import Iterable +from contextlib import nullcontext + +import torch +from vllm.config import LoadConfig +from vllm.logger import init_logger +from vllm.utils.mem_utils import DeviceMemoryProfiler, GiB_bytes + +from vllm_omni.diffusion.cache.selector import get_cache_backend +from vllm_omni.diffusion.compile import regionally_compile +from vllm_omni.diffusion.data import DiffusionOutput, OmniDiffusionConfig +from vllm_omni.diffusion.forward_context import set_forward_context +from vllm_omni.diffusion.model_loader.diffusers_loader import DiffusersPipelineLoader +from vllm_omni.diffusion.offload import apply_offload_hooks +from vllm_omni.diffusion.request import OmniDiffusionRequest + +logger = init_logger(__name__) + + +class GPUDiffusionModelRunner: + """ + Model runner that handles model loading and execution for diffusion models. + + This class follows the AR pattern where the Runner handles all model-related + operations including loading, compilation, offloading, caching, and execution. + The Worker only handles infrastructure (device, distributed env). + """ + + def __init__( + self, + vllm_config, + od_config: OmniDiffusionConfig, + device: torch.device, + ): + """ + Initialize the diffusion model runner. + + Args: + vllm_config: vLLM configuration. + od_config: OmniDiffusion configuration. + device: The device to run on. + """ + self.vllm_config = vllm_config + self.od_config = od_config + self.device = device + self.pipeline = None + self.cache_backend = None + + def load_model( + self, + memory_pool_context_fn: callable | None = None, + ) -> None: + """ + Load the diffusion model, apply compilation and offloading. + + Args: + memory_pool_context_fn: Optional function that returns a context manager + for memory pool allocation (used for sleep mode). + """ + load_device = "cpu" if self.od_config.enable_cpu_offload else str(self.device) + + def get_memory_context(): + if memory_pool_context_fn is not None: + return memory_pool_context_fn(tag="weights") + return nullcontext() + + # Load model within forward context + with set_forward_context(vllm_config=self.vllm_config, omni_diffusion_config=self.od_config): + load_config = LoadConfig() + model_loader = DiffusersPipelineLoader(load_config) + time_before_load = time.perf_counter() + + with get_memory_context(): + with DeviceMemoryProfiler() as m: + self.pipeline = model_loader.load_model( + od_config=self.od_config, + load_device=load_device, + ) + time_after_load = time.perf_counter() + + logger.info( + "Model loading took %.4f GiB and %.6f seconds", + m.consumed_memory / GiB_bytes, + time_after_load - time_before_load, + ) + logger.info("Model runner: Model loaded successfully.") + + # Apply CPU offloading (DiT <-> encoders mutual exclusion) + if self.od_config.enable_cpu_offload: + for name in ["vae"]: + module = getattr(self.pipeline, name, None) + if module is None: + continue + try: + module.to(self.device, non_blocking=True) + except Exception as exc: + logger.debug("Failed to move %s to GPU: %s", name, exc) + + apply_offload_hooks(self.pipeline, self.od_config, device=self.device) + + # Apply torch.compile if not in eager mode + if not self.od_config.enforce_eager: + try: + self.pipeline.transformer = regionally_compile( + self.pipeline.transformer, + dynamic=True, + ) + logger.info("Model runner: Model compiled with torch.compile.") + except Exception as e: + logger.warning(f"Model runner: torch.compile failed with error: {e}. Using eager mode.") + + # Setup cache backend + self.cache_backend = get_cache_backend(self.od_config.cache_backend, self.od_config.cache_config) + + if self.cache_backend is not None: + self.cache_backend.enable(self.pipeline) + + logger.info("Model runner: Initialization complete.") + + def load_weights(self, weights: Iterable[tuple[str, torch.Tensor]]) -> set[str]: + """Load weights into the pipeline.""" + return self.pipeline.load_weights(weights) + + @torch.inference_mode() + def execute_model(self, reqs: list[OmniDiffusionRequest]) -> DiffusionOutput: + """ + Execute a forward pass for the given requests. + + Args: + reqs: List of diffusion requests to process. + + Returns: + DiffusionOutput with generated results. + """ + assert self.pipeline is not None, "Model not loaded. Call load_model() first." + if not reqs or len(reqs) == 0: + raise ValueError("Cannot execute model with empty request list") + + # TODO: dealing with first req for now + req = reqs[0] + + if req.generator is None and req.seed is not None: + req.generator = torch.Generator(device=self.device).manual_seed(req.seed) + + # Refresh cache context if needed + if self.cache_backend is not None and self.cache_backend.is_enabled(): + self.cache_backend.refresh(self.pipeline, req.num_inference_steps) + + with set_forward_context(vllm_config=self.vllm_config, omni_diffusion_config=self.od_config): + output = self.pipeline.forward(req) + + return output diff --git a/vllm_omni/diffusion/worker/gpu_worker.py b/vllm_omni/diffusion/worker/gpu_diffusion_worker.py similarity index 65% rename from vllm_omni/diffusion/worker/gpu_worker.py rename to vllm_omni/diffusion/worker/gpu_diffusion_worker.py index 99a718e389f..7fa1e3da4d5 100644 --- a/vllm_omni/diffusion/worker/gpu_worker.py +++ b/vllm_omni/diffusion/worker/gpu_diffusion_worker.py @@ -1,20 +1,23 @@ # SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright contributors to the vLLM project +""" +Diffusion Worker for vLLM-Omni. + +Handles GPU infrastructure initialization and delegates model operations +to GPUDiffusionModelRunner. +""" + import multiprocessing as mp import os -import time -from collections.abc import Iterable from contextlib import AbstractContextManager, nullcontext import torch import zmq -from vllm.config import LoadConfig, VllmConfig +from vllm.config import VllmConfig from vllm.distributed.device_communicators.shm_broadcast import MessageQueue from vllm.logger import init_logger -from vllm.utils.mem_utils import DeviceMemoryProfiler, GiB_bytes +from vllm.utils.mem_utils import GiB_bytes -from vllm_omni.diffusion.cache.selector import get_cache_backend -from vllm_omni.diffusion.compile import regionally_compile from vllm_omni.diffusion.data import ( DiffusionOutput, OmniDiffusionConfig, @@ -25,16 +28,23 @@ initialize_model_parallel, ) from vllm_omni.diffusion.forward_context import set_forward_context -from vllm_omni.diffusion.model_loader.diffusers_loader import DiffusersPipelineLoader -from vllm_omni.diffusion.offload import apply_offload_hooks from vllm_omni.diffusion.request import OmniDiffusionRequest +from vllm_omni.diffusion.worker.gpu_diffusion_model_runner import GPUDiffusionModelRunner logger = init_logger(__name__) -class GPUWorker: +class GPUDiffusionWorker: """ - A worker that executes the model on a single GPU. + A worker that manages GPU infrastructure and delegates to the model runner. + + This class handles infrastructure initialization only: + - Device setup (CUDA device selection) + - Distributed environment (NCCL, model parallel) + - Memory management (sleep/wake) + + All model-related operations (loading, compilation, execution) are + delegated to GPUDiffusionModelRunner. """ def __init__( @@ -46,15 +56,17 @@ def __init__( self.local_rank = local_rank self.rank = rank self.od_config = od_config - self.pipeline = None - self.device = None + self.device: torch.device | None = None + self.vllm_config: VllmConfig | None = None + self.model_runner: GPUDiffusionModelRunner | None = None self._sleep_saved_buffers: dict[str, torch.Tensor] = {} - self.init_device_and_model() + self.init_device() - def init_device_and_model(self) -> None: - """Initialize the device and load the model.""" + def init_device(self) -> None: + """Initialize the device and distributed environment.""" world_size = self.od_config.num_gpus rank = self.rank + # Set environment variables for distributed initialization os.environ["MASTER_ADDR"] = "localhost" os.environ["MASTER_PORT"] = str(self.od_config.master_port) @@ -62,19 +74,21 @@ def init_device_and_model(self) -> None: os.environ["RANK"] = str(rank) os.environ["WORLD_SIZE"] = str(world_size) + # Setup device self.device = torch.device(f"cuda:{rank}") torch.cuda.set_device(self.device) - # hack + # Create vllm_config for parallel configuration vllm_config = VllmConfig() vllm_config.parallel_config.tensor_parallel_size = self.od_config.parallel_config.tensor_parallel_size vllm_config.parallel_config.data_parallel_size = self.od_config.parallel_config.data_parallel_size self.vllm_config = vllm_config - load_device = "cpu" if self.od_config.enable_cpu_offload else str(self.device) + # Initialize distributed environment with set_forward_context(vllm_config=vllm_config, omni_diffusion_config=self.od_config): init_distributed_environment(world_size=world_size, rank=rank) logger.info(f"Worker {self.rank}: Initialized device and distributed environment.") + parallel_config = self.od_config.parallel_config initialize_model_parallel( data_parallel_size=parallel_config.data_parallel_size, @@ -86,107 +100,45 @@ def init_device_and_model(self) -> None: pipeline_parallel_size=parallel_config.pipeline_parallel_size, ) - load_config = LoadConfig() - model_loader = DiffusersPipelineLoader(load_config) - time_before_load = time.perf_counter() - with self._maybe_get_memory_pool_context(tag="weights"): - with DeviceMemoryProfiler() as m: - self.pipeline = model_loader.load_model( - od_config=self.od_config, - load_device=load_device, - ) - time_after_load = time.perf_counter() - - logger.info( - "Model loading took %.4f GiB and %.6f seconds", - m.consumed_memory / GiB_bytes, - time_after_load - time_before_load, + # Create model runner and load model + self.model_runner = GPUDiffusionModelRunner( + vllm_config=self.vllm_config, + od_config=self.od_config, + device=self.device, ) - logger.info(f"Worker {self.rank}: Model loaded successfully.") - - # Apply CPU offloading (DiT <-> encoders mutual exclusion) - if self.od_config.enable_cpu_offload: - for name in ["vae"]: - module = getattr(self.pipeline, name, None) - if module is None: - continue - try: - module.to(self.device, non_blocking=True) - except Exception as exc: - logger.debug("Failed to move %s to GPU: %s", name, exc) - - apply_offload_hooks(self.pipeline, self.od_config, device=self.device) - - if not self.od_config.enforce_eager: - try: - self.pipeline.transformer = regionally_compile( - self.pipeline.transformer, - dynamic=True, - ) - logger.info(f"Worker {self.rank}: Model compiled with torch.compile.") - except Exception as e: - logger.warning(f"Worker {self.rank}: torch.compile failed with error: {e}. Using eager mode.") - - # Setup cache backend based on type (both backends use enable()/reset() interface) - self.cache_backend = get_cache_backend(self.od_config.cache_backend, self.od_config.cache_config) - - if self.cache_backend is not None: - self.cache_backend.enable(self.pipeline) + self.model_runner.load_model( + memory_pool_context_fn=self._maybe_get_memory_pool_context, + ) + logger.info(f"Worker {self.rank}: Initialization complete.") def generate(self, requests: list[OmniDiffusionRequest]) -> DiffusionOutput: - """ - Generate output for the given requests. - - Args: - requests: List of diffusion requests - - Returns: - DiffusionOutput with generated results - """ + """Generate output for the given requests.""" return self.execute_model(requests, self.od_config) - @torch.inference_mode() def execute_model(self, reqs: list[OmniDiffusionRequest], od_config: OmniDiffusionConfig) -> DiffusionOutput: - """ - Execute a forward pass. - """ - assert self.pipeline is not None - if not reqs or len(reqs) == 0: - raise ValueError("Cannot execute model with empty request list") - # TODO: dealing with first req for now - req = reqs[0] + """Execute a forward pass by delegating to the model runner.""" + assert self.model_runner is not None, "Model runner not initialized" + return self.model_runner.execute_model(reqs) - if req.generator is None and req.seed is not None: - req.generator = torch.Generator(device=self.device).manual_seed(req.seed) - - # Refresh cache context if needed - if self.cache_backend is not None and self.cache_backend.is_enabled(): - self.cache_backend.refresh(self.pipeline, req.num_inference_steps) - with set_forward_context(vllm_config=self.vllm_config, omni_diffusion_config=self.od_config): - output = self.pipeline.forward(req) - return output - - def load_weights(self, weights: Iterable[tuple[str, torch.Tensor]]) -> set[str]: - return self.pipeline.load_weights(weights) + def load_weights(self, weights) -> set[str]: + """Load weights by delegating to the model runner.""" + assert self.model_runner is not None, "Model runner not initialized" + return self.model_runner.load_weights(weights) def sleep(self, level: int = 1) -> bool: """ - Put the worker to sleep. The worker should not process any requests. - The caller should guarantee that no requests are being processed - during the sleep period, before `wake_up` is called. + Put the worker to sleep, offloading model weights. Args: - level: The sleep level. Level 1 sleep will offload the model - weights and discard the kv cache. - Currently only support level 1. + level: Sleep level. Level 1 offloads weights, level 2 also saves buffers. """ from vllm.device_allocator.cumem import CuMemAllocator free_bytes_before_sleep = torch.cuda.mem_get_info()[0] # Save the buffers before level 2 sleep - if level == 2: - model = self.pipeline + if level == 2 and self.model_runner is not None: + model = self.model_runner.pipeline self._sleep_saved_buffers = {name: buffer.cpu().clone() for name, buffer in model.named_buffers()} allocator = CuMemAllocator.get_instance() @@ -220,8 +172,8 @@ def wake_up(self, tags: list[str] | None = None) -> bool: allocator.wake_up(tags) # Restore the buffers after level 2 sleep - if len(self._sleep_saved_buffers): - model = self.pipeline + if len(self._sleep_saved_buffers) and self.model_runner is not None: + model = self.model_runner.pipeline for name, buffer in model.named_buffers(): if name in self._sleep_saved_buffers: buffer.data.copy_(self._sleep_saved_buffers[name].data) @@ -229,6 +181,7 @@ def wake_up(self, tags: list[str] | None = None) -> bool: return True def _maybe_get_memory_pool_context(self, tag: str) -> AbstractContextManager: + """Get memory pool context for sleep mode support.""" if self.od_config.enable_sleep_mode: from vllm.device_allocator.cumem import CuMemAllocator @@ -240,6 +193,7 @@ def _maybe_get_memory_pool_context(self, tag: str) -> AbstractContextManager: return nullcontext() def shutdown(self) -> None: + """Shutdown the worker and cleanup distributed environment.""" destroy_distributed_env() @@ -257,17 +211,14 @@ def __init__( # Inter-process Communication self.context = zmq.Context(io_threads=2) - # Initialize MessageQueue reader from handle (unified for generation & RPC) + # Initialize MessageQueue reader from handle self.mq = MessageQueue.create_from_handle(broadcast_handle, gpu_id) self.result_mq = None self.result_mq_handle = None - # Setup result sender (only for rank 0 for now, or whoever needs to reply) - # Assuming only rank 0 replies to scheduler as per original logic + # Setup result sender (only for rank 0) if gpu_id == 0: - # Create MessageQueue for results (1 writer -> 1 reader) - # We assume the reader (SyncScheduler) will act as rank 0 self.result_mq = MessageQueue(n_reader=1, n_local_reader=1, local_reader_ranks=[0]) self.result_mq_handle = self.result_mq.export_handle() logger.info(f"Worker {gpu_id} created result MessageQueue") @@ -277,31 +228,25 @@ def __init__( self.gpu_id = gpu_id self._running = True - def _create_worker(self, gpu_id: int, od_config: OmniDiffusionConfig) -> GPUWorker: + def _create_worker(self, gpu_id: int, od_config: OmniDiffusionConfig) -> GPUDiffusionWorker: """Create a worker instance. Override in subclasses for different worker types.""" - return GPUWorker( + return GPUDiffusionWorker( local_rank=gpu_id, rank=gpu_id, od_config=od_config, ) def return_result(self, output: DiffusionOutput): - """ - replies to client, only on rank 0 - """ + """Reply to client, only on rank 0.""" if self.result_mq is not None: self.result_mq.enqueue(output) def recv_message(self): - """ - Receive unified messages (RPC requests, shutdown) from broadcast queue. - Uses indefinite=True to block until a message arrives. - """ + """Receive messages from broadcast queue.""" return self.mq.dequeue(indefinite=True) def execute_rpc(self, rpc_request: dict) -> tuple[object | None, bool]: """Execute an RPC request and indicate whether to reply.""" - method = rpc_request["method"] args = rpc_request.get("args", ()) kwargs = rpc_request.get("kwargs", {}) @@ -325,14 +270,11 @@ def execute_rpc(self, rpc_request: dict) -> tuple[object | None, bool]: logger.error(f"Error executing RPC: {e}", exc_info=True) return {"status": "error", "error": str(e)}, should_reply - # TODO: queueing, cancellation def worker_busy_loop(self) -> None: - """Main busy loop for Multiprocessing Workers""" - + """Main busy loop for Multiprocessing Workers.""" logger.info(f"Worker {self.gpu_id} ready to receive requests via shared memory") while self._running: - # Receive unified message (generation request, RPC request, or shutdown) msg = None try: msg = self.recv_message() @@ -349,7 +291,6 @@ def worker_busy_loop(self) -> None: # Route message based on type if isinstance(msg, dict) and msg.get("type") == "rpc": - # Handle RPC request try: result, should_reply = self.execute_rpc(msg) if should_reply: @@ -360,13 +301,12 @@ def worker_busy_loop(self) -> None: self.return_result({"status": "error", "error": str(e)}) elif isinstance(msg, dict) and msg.get("type") == "shutdown": - # Handle shutdown message logger.info("Worker %s: Received shutdown message", self.gpu_id) self._running = False continue else: - # Handle generation request (OmniDiffusionRequest list) + # Handle generation request try: output = self.worker.execute_model(msg, self.od_config) except Exception as e: @@ -379,17 +319,14 @@ def worker_busy_loop(self) -> None: try: self.return_result(output) except zmq.ZMQError as e: - # Reply failed; log and keep loop alive to accept future requests logger.error(f"ZMQ error sending reply: {e}") continue logger.info("event loop terminated.") try: self.worker.shutdown() - except Exception as exc: # pragma: no cover - best effort cleanup + except Exception as exc: logger.warning("Worker %s: Shutdown encountered an error: %s", self.gpu_id, exc) - # if self.result_sender is not None: - # self.result_sender.close() self.context.term() @staticmethod @@ -400,7 +337,6 @@ def worker_main( broadcast_handle, ) -> None: """Worker initialization and execution loops.""" - worker_proc = WorkerProc( od_config, gpu_id=rank, diff --git a/vllm_omni/diffusion/worker/npu/npu_worker.py b/vllm_omni/diffusion/worker/npu/npu_worker.py index bfeb0d914c9..446c29cae4d 100644 --- a/vllm_omni/diffusion/worker/npu/npu_worker.py +++ b/vllm_omni/diffusion/worker/npu/npu_worker.py @@ -3,6 +3,8 @@ import multiprocessing as mp import os import time +from collections.abc import Iterable +from contextlib import AbstractContextManager, nullcontext import torch from vllm.config import LoadConfig, VllmConfig @@ -11,25 +13,42 @@ from vllm_omni.diffusion.cache.selector import get_cache_backend from vllm_omni.diffusion.data import ( + DiffusionOutput, OmniDiffusionConfig, ) from vllm_omni.diffusion.distributed.parallel_state import ( + destroy_distributed_env, init_distributed_environment, initialize_model_parallel, ) from vllm_omni.diffusion.forward_context import set_forward_context from vllm_omni.diffusion.model_loader.diffusers_loader import DiffusersPipelineLoader -from vllm_omni.diffusion.worker.gpu_worker import GPUWorker, WorkerProc +from vllm_omni.diffusion.request import OmniDiffusionRequest +from vllm_omni.diffusion.worker.gpu_diffusion_worker import WorkerProc logger = init_logger(__name__) -class NPUWorker(GPUWorker): +class NPUWorker: """ A worker that executes the model on a single NPU. Inherits from GPUWorker and overrides device-specific initialization. """ + def __init__( + self, + local_rank: int, + rank: int, + od_config: OmniDiffusionConfig, + ): + self.local_rank = local_rank + self.rank = rank + self.od_config = od_config + self.pipeline = None + self.device = None + self._sleep_saved_buffers: dict[str, torch.Tensor] = {} + self.init_device_and_model() + def init_device_and_model(self) -> None: """Initialize the NPU device and load the model.""" world_size = self.od_config.num_gpus @@ -86,6 +105,115 @@ def init_device_and_model(self) -> None: if self.cache_backend is not None: self.cache_backend.enable(self.pipeline) + def generate(self, requests: list[OmniDiffusionRequest]) -> DiffusionOutput: + """ + Generate output for the given requests. + + Args: + requests: List of diffusion requests + + Returns: + DiffusionOutput with generated results + """ + return self.execute_model(requests, self.od_config) + + @torch.inference_mode() + def execute_model(self, reqs: list[OmniDiffusionRequest], od_config: OmniDiffusionConfig) -> DiffusionOutput: + """ + Execute a forward pass. + """ + assert self.pipeline is not None + if not reqs or len(reqs) == 0: + raise ValueError("Cannot execute model with empty request list") + # TODO: dealing with first req for now + req = reqs[0] + + if req.generator is None and req.seed is not None: + req.generator = torch.Generator(device=self.device).manual_seed(req.seed) + + # Refresh cache context if needed + if self.cache_backend is not None and self.cache_backend.is_enabled(): + self.cache_backend.refresh(self.pipeline, req.num_inference_steps) + with set_forward_context(vllm_config=self.vllm_config, omni_diffusion_config=self.od_config): + output = self.pipeline.forward(req) + return output + + def load_weights(self, weights: Iterable[tuple[str, torch.Tensor]]) -> set[str]: + return self.pipeline.load_weights(weights) + + def sleep(self, level: int = 1) -> bool: + """ + Put the worker to sleep. The worker should not process any requests. + The caller should guarantee that no requests are being processed + during the sleep period, before `wake_up` is called. + + Args: + level: The sleep level. Level 1 sleep will offload the model + weights and discard the kv cache. + Currently only support level 1. + """ + from vllm.device_allocator.cumem import CuMemAllocator + + free_bytes_before_sleep = torch.cuda.mem_get_info()[0] + + # Save the buffers before level 2 sleep + if level == 2: + model = self.pipeline + self._sleep_saved_buffers = {name: buffer.cpu().clone() for name, buffer in model.named_buffers()} + + allocator = CuMemAllocator.get_instance() + allocator.sleep(offload_tags=("weights",) if level == 1 else tuple()) + free_bytes_after_sleep, total = torch.cuda.mem_get_info() + freed_bytes = free_bytes_after_sleep - free_bytes_before_sleep + used_bytes = total - free_bytes_after_sleep + assert freed_bytes >= 0, "Memory usage increased after sleeping." + logger.info( + "Sleep mode freed %.2f GiB memory, %.2f GiB memory is still in use.", + freed_bytes / GiB_bytes, + used_bytes / GiB_bytes, + ) + return True + + def wake_up(self, tags: list[str] | None = None) -> bool: + """ + Wake up the worker from sleep mode. See the sleep function + method for more details. + + Args: + tags: An optional list of tags to reallocate the worker memory + for specific memory allocations. Values must be in + `("weights")`. If None, all memory is reallocated. + wake_up should be called with all tags (or None) before the + worker is used again. + """ + from vllm.device_allocator.cumem import CuMemAllocator + + allocator = CuMemAllocator.get_instance() + allocator.wake_up(tags) + + # Restore the buffers after level 2 sleep + if len(self._sleep_saved_buffers): + model = self.pipeline + for name, buffer in model.named_buffers(): + if name in self._sleep_saved_buffers: + buffer.data.copy_(self._sleep_saved_buffers[name].data) + self._sleep_saved_buffers = {} + return True + + def _maybe_get_memory_pool_context(self, tag: str) -> AbstractContextManager: + if self.od_config.enable_sleep_mode: + from vllm.device_allocator.cumem import CuMemAllocator + + allocator = CuMemAllocator.get_instance() + if tag == "weights": + assert allocator.get_current_usage() == 0, "Sleep mode can only be used for one instance per process." + return allocator.use_memory_pool(tag=tag) + else: + return nullcontext() + + def shutdown(self) -> None: + destroy_distributed_env() + class NPUWorkerProc(WorkerProc): """Wrapper that runs one NPUWorker in a separate process.""" diff --git a/vllm_omni/utils/platform_utils.py b/vllm_omni/utils/platform_utils.py index 5f8259ab83d..fb47018e789 100644 --- a/vllm_omni/utils/platform_utils.py +++ b/vllm_omni/utils/platform_utils.py @@ -53,6 +53,6 @@ def get_diffusion_worker_class() -> type: return NPUWorkerProc else: # Default to GPU worker for cuda and other devices - from vllm_omni.diffusion.worker.gpu_worker import WorkerProc + from vllm_omni.diffusion.worker.gpu_diffusion_worker import WorkerProc return WorkerProc