diff --git a/docs/.nav.yml b/docs/.nav.yml index 455a0525056..df0aa3c2dfd 100644 --- a/docs/.nav.yml +++ b/docs/.nav.yml @@ -34,6 +34,7 @@ nav: - Online Serving: - BAGEL-7B-MoT: user_guide/examples/online_serving/bagel.md - vLLM-Omni Helm Chart: user_guide/examples/online_serving/chart-helm.md + - Diffusers Backend Adapter Example: user_guide/examples/online_serving/diffusers_pipeline_adapter.md - Fish Speech S2 Pro: user_guide/examples/online_serving/fish_speech.md - GLM-Image Online Serving: user_guide/examples/online_serving/glm_image.md - Image-To-Image: user_guide/examples/online_serving/image_to_image.md diff --git a/docs/user_guide/examples/online_serving/diffusers_pipeline_adapter.md b/docs/user_guide/examples/online_serving/diffusers_pipeline_adapter.md new file mode 100644 index 00000000000..ac88071d53f --- /dev/null +++ b/docs/user_guide/examples/online_serving/diffusers_pipeline_adapter.md @@ -0,0 +1,93 @@ +# Diffusers Backend Adapter Example + +Source . + + +This example demonstrates how to serve any 🤗 Diffusers pipeline through vLLM-Omni +using the `diffusers` load format. + +## Supported Models + +Any model loadable via `DiffusionPipeline.from_pretrained()` should be supported, including text-to-image, image-to-image, text-to-video, image-to-video, and text-to-audio. + +## Limitations + +The diffusers backend is a black-box adapter. The following features are NOT yet supported. +It is not guaranteed whether they will be supported in the future. + +- CFG parallel execution +- Sequence parallel execution +- TeaCache / Cache-DiT acceleration +- Step-wise execution (continuous batching) + +For these features, it is recommended to use natively supported pipelines instead. + +## Usage + +### Option 1: CLI arguments + +```bash +vllm serve "stable-diffusion-v1-5/stable-diffusion-v1-5" \ + --omni \ + --diffusion-load-format diffusers \ + --diffusers-load-kwargs '{"use_safetensors": true}' \ + --diffusers-call-kwargs '{"num_inference_steps": 30, "guidance_scale": 7.5}' +``` + +`--diffusers-load-kwargs` and `--diffusers-call-kwargs` are only valid together with `--diffusion-load-format diffusers`. + +### Option 2: Stage config YAML + +```bash +vllm serve stable-diffusion-v1-5/stable-diffusion-v1-5 --stage-configs-path examples/online_serving/diffusers_pipeline_adapter/stage_config.yaml --omni +``` + +The particular fields of interest are `model`, `diffusion_load_format`, `diffusers_load_kwargs`, and `diffusers_call_kwargs` under `engine_args`. They are the same as the CLI arguments. + +## Send a Request + +```bash +curl http://localhost:8000/v1/images/generations \ + -H "Content-Type: application/json" \ + -d '{ + "model": "stable-diffusion-v1-5/stable-diffusion-v1-5", + "prompt": "a photo of an astronaut riding a horse on mars", + "n": 1, + "size": "512x512" + }' +``` + +Or refer to other documentation pages on how to request a particular input/output modality, such as `examples/online_serving/text_to_image/openai_chat_client.py`. + +## Configuration Reference + +For the diffusers adapter, set options under **`engine_args`**: + +### `diffusion_load_format: "diffusers"` + +This field selects the Hugging Face diffusers adapter path (see `DiffusersPipelineLoader`). + +### `diffusers_load_kwargs` + +Passed to `DiffusionPipeline.from_pretrained()`. + +This is suitable for model-specific configurations not available through the vLLM-Omni interface (such as `Omni.__init__()`, `vllm serve` CLI arguments, and stage config YAML fields outside `diffusers_load_kwargs`). + +When a parameter is available in the vLLM-Omni interface, it will be adapted here. +But if that parameter is simultaneously set in both the vLLM-Omni interface and `diffusers_load_kwargs`, the **latter** will take precedence. + +### `diffusers_call_kwargs` + +Passed to `pipeline.__call__()`. + +This is suitable for sampling parameters not available through the vLLM-Omni interface (such as `Omni.generate()` and online serving payloads). + +When a parameter is available in the vLLM-Omni interface, it will be adapted here. +But if that parameter is simultaneously set in both the vLLM-Omni interface and `diffusers_call_kwargs`, the **former** will take precedence (because it is set at request time). + +## Example materials + +??? abstract "stage_config.yaml" + ``````yaml + --8<-- "examples/online_serving/diffusers_pipeline_adapter/stage_config.yaml" + `````` diff --git a/examples/online_serving/diffusers_pipeline_adapter/README.md b/examples/online_serving/diffusers_pipeline_adapter/README.md new file mode 100644 index 00000000000..8dbf9369ae8 --- /dev/null +++ b/examples/online_serving/diffusers_pipeline_adapter/README.md @@ -0,0 +1,83 @@ +# Diffusers Backend Adapter Example + +This example demonstrates how to serve any 🤗 Diffusers pipeline through vLLM-Omni +using the `diffusers` load format. + +## Supported Models + +Any model loadable via `DiffusionPipeline.from_pretrained()` should be supported, including text-to-image, image-to-image, text-to-video, image-to-video, and text-to-audio. + +## Limitations + +The diffusers backend is a black-box adapter. The following features are NOT yet supported. +It is not guaranteed whether they will be supported in the future. + +- CFG parallel execution +- Sequence parallel execution +- TeaCache / Cache-DiT acceleration +- Step-wise execution (continuous batching) + +For these features, it is recommended to use natively supported pipelines instead. + +## Usage + +### Option 1: CLI arguments + +```bash +vllm serve "stable-diffusion-v1-5/stable-diffusion-v1-5" \ + --omni \ + --diffusion-load-format diffusers \ + --diffusers-load-kwargs '{"use_safetensors": true}' \ + --diffusers-call-kwargs '{"num_inference_steps": 30, "guidance_scale": 7.5}' +``` + +`--diffusers-load-kwargs` and `--diffusers-call-kwargs` are only valid together with `--diffusion-load-format diffusers`. + +### Option 2: Stage config YAML + +```bash +vllm serve stable-diffusion-v1-5/stable-diffusion-v1-5 --stage-configs-path examples/online_serving/diffusers_pipeline_adapter/stage_config.yaml --omni +``` + +The particular fields of interest are `model`, `diffusion_load_format`, `diffusers_load_kwargs`, and `diffusers_call_kwargs` under `engine_args`. They are the same as the CLI arguments. + +## Send a Request + +```bash +curl http://localhost:8000/v1/images/generations \ + -H "Content-Type: application/json" \ + -d '{ + "model": "stable-diffusion-v1-5/stable-diffusion-v1-5", + "prompt": "a photo of an astronaut riding a horse on mars", + "n": 1, + "size": "512x512" + }' +``` + +Or refer to other documentation pages on how to request a particular input/output modality, such as `examples/online_serving/text_to_image/openai_chat_client.py`. + +## Configuration Reference + +For the diffusers adapter, set options under **`engine_args`**: + +### `diffusion_load_format: "diffusers"` + +This field selects the Hugging Face diffusers adapter path (see `DiffusersPipelineLoader`). + +### `diffusers_load_kwargs` + +Passed to `DiffusionPipeline.from_pretrained()`. + +This is suitable for model-specific configurations not available through the vLLM-Omni interface (such as `Omni.__init__()`, `vllm serve` CLI arguments, and stage config YAML fields outside `diffusers_load_kwargs`). + +When a parameter is available in the vLLM-Omni interface, it will be adapted here. +But if that parameter is simultaneously set in both the vLLM-Omni interface and `diffusers_load_kwargs`, the **latter** will take precedence. + +### `diffusers_call_kwargs` + +Passed to `pipeline.__call__()`. + +This is suitable for sampling parameters not available through the vLLM-Omni interface (such as `Omni.generate()` and online serving payloads). + +When a parameter is available in the vLLM-Omni interface, it will be adapted here. +But if that parameter is simultaneously set in both the vLLM-Omni interface and `diffusers_call_kwargs`, the **former** will take precedence (because it is set at request time). diff --git a/examples/online_serving/diffusers_pipeline_adapter/stage_config.yaml b/examples/online_serving/diffusers_pipeline_adapter/stage_config.yaml new file mode 100644 index 00000000000..7c96eb6c167 --- /dev/null +++ b/examples/online_serving/diffusers_pipeline_adapter/stage_config.yaml @@ -0,0 +1,31 @@ +# Example stage config for diffusers backend +# This config demonstrates serving Stable Diffusion 1.5 via the diffusers adapter. +# Users should copy and modify this for their own models. + +model_type: diffusion + +stage_args: + - stage_id: 0 + stage_type: diffusion + engine_args: + model_stage: diffusion + model: "stable-diffusion-v1-5/stable-diffusion-v1-5" + distributed_executor_backend: "mp" + # gpu_memory_utilization: 0.9 + engine_output_type: image + # Select the HF diffusers adapter + diffusion_load_format: "diffusers" + # model_class_name: "DiffusersAdapterPipeline" # default when diffusion_load_format is diffusers + diffusers_load_kwargs: + # Passed to DiffusionPipeline.from_pretrained(). + # Good for model-specific loading parameters not covered by OmniDiffusionConfig. + # During model load time, parameters here override their counterparts in the vLLM-Omni interface. + use_safetensors: true + diffusers_call_kwargs: + # Passed to pipeline.__call__(). + # Good for model-specific sampling parameters not covered by OmniDiffusionSamplingParams. + # During request time, parameters here are overridden by the counterparts in OmniDiffusionSamplingParams. + num_inference_steps: 30 + guidance_scale: 7.5 + final_output: true + final_output_type: image diff --git a/tests/diffusion/test_diffusers_adapter.py b/tests/diffusion/test_diffusers_adapter.py new file mode 100644 index 00000000000..ac2ec2e3fef --- /dev/null +++ b/tests/diffusion/test_diffusers_adapter.py @@ -0,0 +1,186 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project + +from collections import namedtuple +from types import SimpleNamespace + +import pytest +import torch +from diffusers import DiffusionPipeline +from PIL import Image + +from vllm_omni.diffusion.data import ( + DiffusionOutput, + DiffusionParallelConfig, + OmniDiffusionConfig, +) +from vllm_omni.diffusion.models.diffusers_adapter import DiffusersAdapterPipeline +from vllm_omni.diffusion.request import OmniDiffusionRequest +from vllm_omni.inputs.data import OmniDiffusionSamplingParams + +pytestmark = [pytest.mark.core_model, pytest.mark.diffusion, pytest.mark.cpu] + + +def _make_od_config(**overrides) -> OmniDiffusionConfig: + od_config = OmniDiffusionConfig( + model="test/model", + model_class_name="DiffusersAdapterPipeline", + dtype=torch.float16, + diffusion_load_format="diffusers", + diffusers_load_kwargs={}, + diffusers_call_kwargs={}, + output_type="pil", + parallel_config=DiffusionParallelConfig(cfg_parallel_size=1, sequence_parallel_size=1), + cache_backend="none", + ) + for key, value in overrides.items(): + setattr(od_config, key, value) + return od_config + + +def _make_request(**overrides) -> OmniDiffusionRequest: + prompt = overrides.pop("prompt", "a test prompt") + negative_prompt = overrides.pop("negative_prompt", None) + prompt_obj: dict[str, str] = {"prompt": prompt} + if negative_prompt is not None: + prompt_obj["negative_prompt"] = negative_prompt + + defaults = { + "prompts": [prompt_obj], + "sampling_params": OmniDiffusionSamplingParams( + num_inference_steps=20, + guidance_scale=7.5, + height=16, + width=16, + num_frames=1, + num_outputs_per_prompt=1, + seed=42, + output_type="pil", + generator_device="cpu", + ), + } + defaults.update(overrides) + return OmniDiffusionRequest(**defaults) + + +class TestDiffusersAdapterPipeline: + def test_adapter_forward_returns_output(self, mocker): + od_config = _make_od_config() + request = _make_request() + stub_image = Image.new("RGB", (request.sampling_params.width, request.sampling_params.height)) # pyright: ignore[reportArgumentType] + + adapter = DiffusersAdapterPipeline(od_config=od_config) + MockPipelineOutput = namedtuple("MockPipelineOutput", ["image"]) + MockPipeline = type("MockPipeline", (DiffusionPipeline,), {}) + adapter._pipeline = MockPipeline() + + mocker.patch.object( + MockPipeline, + "__call__", + return_value=MockPipelineOutput(image=stub_image), + ) + output = adapter.forward(request) + + assert isinstance(output, DiffusionOutput) + assert isinstance(output.output, MockPipelineOutput) + assert output.output.image is stub_image + + @pytest.mark.parametrize( + "feature_id", + ["cfg_parallel", "ulysses", "ring", "teacache", "cache_dit", "enforce_eager", "quantization"], + ) + def test_adapter_guard_unsupported_feature(self, feature_id): + if feature_id == "cfg_parallel": + od_config = _make_od_config( + parallel_config=DiffusionParallelConfig(cfg_parallel_size=2, sequence_parallel_size=1), + cache_backend="none", + ) + elif feature_id == "ulysses": + od_config = _make_od_config( + parallel_config=DiffusionParallelConfig(cfg_parallel_size=1, ulysses_degree=2), + cache_backend="none", + ) + elif feature_id == "ring": + od_config = _make_od_config( + parallel_config=DiffusionParallelConfig(cfg_parallel_size=1, ring_degree=2), + cache_backend="none", + ) + elif feature_id == "teacache": + od_config = _make_od_config( + parallel_config=DiffusionParallelConfig(cfg_parallel_size=1, sequence_parallel_size=1), + cache_backend="tea_cache", + ) + elif feature_id == "cache_dit": + od_config = _make_od_config( + parallel_config=DiffusionParallelConfig(cfg_parallel_size=1, sequence_parallel_size=1), + cache_backend="cache_dit", + ) + elif feature_id == "enforce_eager": + od_config = _make_od_config(enforce_eager=True) + elif feature_id == "quantization": + od_config = _make_od_config(quantization_config=SimpleNamespace(quant_method="fp8")) + else: + raise ValueError(f"Unknown feature ID: {feature_id}") + + with pytest.raises(NotImplementedError): + DiffusersAdapterPipeline(od_config=od_config) + + def test_adapter_guard_unknown_output_type(self, mocker): + """Test that the adapter wraps an unknown output type as-is. + This is useful when `return_dict=True` and the diffusers pipeline returns an OrderedDict subclass.""" + + adapter = DiffusersAdapterPipeline(od_config=_make_od_config()) + raw_output = {"unexpected": "dict-output"} + + MockPipeline = type("MockPipeline", (DiffusionPipeline,), {}) + adapter._pipeline = MockPipeline() + + mocker.patch.object( + MockPipeline, + "__call__", + return_value=raw_output, + ) + output = adapter.forward(_make_request()) + + assert isinstance(output, DiffusionOutput) + assert output.output == raw_output + + def test_adapter_build_call_kwargs(self): + adapter = DiffusersAdapterPipeline( + od_config=_make_od_config( + diffusers_call_kwargs={ + "guidance_scale": 1.25, + "eta": 0.3, + "output_type": "np", + } + ) + ) + req = _make_request( + prompt="a cat on mars", + negative_prompt="low quality", + sampling_params=OmniDiffusionSamplingParams( + num_inference_steps=9, + guidance_scale=8.0, + height=320, + width=640, + num_frames=8, + num_outputs_per_prompt=2, + seed=123, + output_type="pil", + ), + ) + + kwargs = adapter._build_call_kwargs(req) + + assert kwargs["prompt"] == "a cat on mars" + assert kwargs["negative_prompt"] == "low quality" + assert kwargs["num_inference_steps"] == 9 + assert kwargs["guidance_scale"] == 8.0 + assert kwargs["height"] == 320 + assert kwargs["width"] == 640 + assert kwargs["num_frames"] == 8 + assert kwargs["num_images_per_prompt"] == 2 + assert kwargs["output_type"] == "pil" + assert isinstance(kwargs["generator"], torch.Generator) + assert kwargs["generator"].device.type == "cpu" + assert kwargs["generator"].initial_seed() == 123 diff --git a/tests/e2e/online_serving/test_diffusers_adapter.py b/tests/e2e/online_serving/test_diffusers_adapter.py new file mode 100644 index 00000000000..8b41db13a53 --- /dev/null +++ b/tests/e2e/online_serving/test_diffusers_adapter.py @@ -0,0 +1,56 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project + +"""End-to-end tests for DiffusersAdapterPipeline. + +It tests the full user flow of launching a diffusers-backed model and running inference. +""" + +import pytest +from PIL import Image + +from tests.helpers.mark import hardware_test +from tests.helpers.runtime import OmniServer, OmniServerParams, OpenAIClientHandler, dummy_messages_from_mix_data + +pytestmark = [pytest.mark.diffusion, pytest.mark.core_model] + + +@pytest.mark.parametrize( + "omni_server", + [ + OmniServerParams( + model="tiny-random/Qwen-Image", + server_args=[ + "--diffusion-load-format", + "diffusers", + "--diffusers-call-kwargs", + '{"height": 512, "width": 0}', # deliberately weird width to be overridden + ], + ), + ], + indirect=True, +) +@hardware_test(res={"cuda": "L4"}, num_cards=1) +def test_t2i_with_diffusers_adapter( + omni_server: OmniServer, + openai_client: OpenAIClientHandler, +): + messages = dummy_messages_from_mix_data(content_text="a photo of an astronaut riding a horse on mars") + + request_config = { + "model": omni_server.model, + "messages": messages, + "extra_body": { + "width": 512, + "num_inference_steps": 2, + "negative_prompt": "blurry", + "true_cfg_scale": 4.0, + "seed": 42, + }, + } + + response = openai_client.send_diffusion_request(request_config) + image: Image.Image = response[0].images[0] # pyright: ignore[reportOptionalSubscript] + + # Request config has incomplete width/height, so internal assertion in `send_diffusion_request` is incomplete. + assert image.size == (512, 512) diff --git a/vllm_omni/diffusion/data.py b/vllm_omni/diffusion/data.py index 012c0130c7c..b9307657f5c 100644 --- a/vllm_omni/diffusion/data.py +++ b/vllm_omni/diffusion/data.py @@ -452,7 +452,14 @@ class OmniDiffusionConfig: custom_pipeline_args: dict[str, Any] | None = None # Diffusion model loading format - diffusion_load_format: str = "default" # "default", "custom_pipeline", "dummy" + # "default", "custom_pipeline", "dummy", "diffusers" (HF diffusers adapter) + diffusion_load_format: str = "default" + + # Diffusers adapter kwargs + # kwargs forwarded to DiffusionPipeline.from_pretrained() + diffusers_load_kwargs: dict[str, Any] = field(default_factory=dict) + # kwargs forwarded to pipeline.__call__() + diffusers_call_kwargs: dict[str, Any] = field(default_factory=dict) # http server endpoint config, would be ignored in local mode host: str | None = None @@ -648,6 +655,12 @@ def __post_init__(self): elif self.max_cpu_loras < 1: raise ValueError("max_cpu_loras must be >= 1 for diffusion LoRA") + if self.diffusion_load_format != "diffusers" and (self.diffusers_load_kwargs or self.diffusers_call_kwargs): + raise ValueError( + "diffusers_load_kwargs and diffusers_call_kwargs are only " + "valid together with diffusion_load_format=diffusers" + ) + def set_tf_model_config(self, tf_config: "TransformerConfig") -> None: """Assign `tf_model_config` and propagate quantization if detected. @@ -686,6 +699,10 @@ def enrich_config(self) -> None: """ from vllm.transformers_utils.config import get_hf_file_to_dict + # Default model_class_name for diffusers adapter + if self.model_class_name is None and self.diffusion_load_format == "diffusers": + self.model_class_name = "DiffusersAdapterPipeline" + try: config_dict = get_hf_file_to_dict("model_index.json", self.model) if config_dict is not None: @@ -693,32 +710,42 @@ def enrich_config(self) -> None: self.model_class_name = config_dict.get("_class_name", None) self.update_multimodal_support() - tf_config_dict = get_hf_file_to_dict("transformer/config.json", self.model) - self.tf_model_config = TransformerConfig.from_dict(tf_config_dict) + # Skip transformer config loading for diffusers adapter + # (non-DiT models don't have a separate transformer folder/config) + if self.diffusion_load_format == "diffusers": + self.tf_model_config = TransformerConfig() + else: + tf_config_dict = get_hf_file_to_dict("transformer/config.json", self.model) + self.tf_model_config = TransformerConfig.from_dict(tf_config_dict) else: raise FileNotFoundError("model_index.json not found") except (AttributeError, OSError, ValueError, FileNotFoundError): - cfg = get_hf_file_to_dict("config.json", self.model) - if cfg is None: - raise ValueError(f"Could not find config.json or model_index.json for model {self.model}") - - self.tf_model_config = TransformerConfig.from_dict(cfg) - model_type = cfg.get("model_type") - architectures = cfg.get("architectures") or [] - - if model_type == "bagel" or "BagelForConditionalGeneration" in architectures: - self.model_class_name = "BagelPipeline" - self.tf_model_config = TransformerConfig() - self.update_multimodal_support() - elif model_type == "nextstep": - if self.model_class_name is None: - self.model_class_name = "NextStep11Pipeline" + # Skip transformer config loading for diffusers adapter + # (non-DiT models don't have a separate transformer folder/config) + if self.diffusion_load_format == "diffusers": self.tf_model_config = TransformerConfig() - self.update_multimodal_support() - elif architectures and len(architectures) == 1: - self.model_class_name = architectures[0] else: - raise + cfg = get_hf_file_to_dict("config.json", self.model) + if cfg is None: + raise ValueError(f"Could not find config.json or model_index.json for model {self.model}") + + self.tf_model_config = TransformerConfig.from_dict(cfg) + model_type = cfg.get("model_type") + architectures = cfg.get("architectures") or [] + + if model_type == "bagel" or "BagelForConditionalGeneration" in architectures: + self.model_class_name = "BagelPipeline" + self.tf_model_config = TransformerConfig() + self.update_multimodal_support() + elif model_type == "nextstep": + if self.model_class_name is None: + self.model_class_name = "NextStep11Pipeline" + self.tf_model_config = TransformerConfig() + self.update_multimodal_support() + elif architectures and len(architectures) == 1: + self.model_class_name = architectures[0] + else: + raise @classmethod def from_kwargs(cls, **kwargs: Any) -> "OmniDiffusionConfig": @@ -743,6 +770,12 @@ def from_kwargs(cls, **kwargs: Any) -> "OmniDiffusionConfig": cache_backend = os.environ.get("DIFFUSION_CACHE_BACKEND") or os.environ.get("DIFFUSION_CACHE_ADAPTER") kwargs["cache_backend"] = cache_backend.lower() if cache_backend else "none" + # Falsy-value check for not-None fields (convert potential None values in YAML config to empty containers) + if "diffusers_load_kwargs" in kwargs and kwargs["diffusers_load_kwargs"] is None: + kwargs["diffusers_load_kwargs"] = {} + if "diffusers_call_kwargs" in kwargs and kwargs["diffusers_call_kwargs"] is None: + kwargs["diffusers_call_kwargs"] = {} + # Filter kwargs to only include valid fields valid_fields = {f.name for f in fields(cls)} filtered_kwargs = {k: v for k, v in kwargs.items() if k in valid_fields} diff --git a/vllm_omni/diffusion/model_loader/diffusers_loader.py b/vllm_omni/diffusion/model_loader/diffusers_loader.py index 146afb26fbc..91f3574b185 100644 --- a/vllm_omni/diffusion/model_loader/diffusers_loader.py +++ b/vllm_omni/diffusion/model_loader/diffusers_loader.py @@ -32,6 +32,7 @@ from vllm_omni.diffusion.data import OmniDiffusionConfig from vllm_omni.diffusion.distributed.hsdp import HSDPInferenceConfig from vllm_omni.diffusion.model_loader.gguf_adapters import get_gguf_adapter +from vllm_omni.diffusion.models.diffusers_adapter.pipeline_diffusers_adapter import DiffusersAdapterPipeline from vllm_omni.diffusion.registry import initialize_model if TYPE_CHECKING: @@ -257,11 +258,14 @@ def load_model( self, od_config: OmniDiffusionConfig, load_device: str, - load_format: str = "default", + load_format: str | None = "default", custom_pipeline_name: str | None = None, device: torch.device | None = None, ) -> nn.Module: """Load a model with the given configurations.""" + if load_format is None: + load_format = "default" + # CPU offload + FP8: load weights on device for FP8 quantization if load_device == "cpu" and od_config.quantization_config is not None: load_device = device.type @@ -277,11 +281,21 @@ def load_model( with target_device: if load_format == "default": model = initialize_model(od_config) + elif load_format == "diffusers": + model = DiffusersAdapterPipeline(od_config=od_config, device=target_device) elif load_format == "custom_pipeline": model_cls = resolve_obj_by_qualname(custom_pipeline_name) model = model_cls(od_config=od_config) + else: + # 'dummy' format should not call this function at all + raise ValueError(f"Unknown load_format: {load_format}") logger.debug("Loading weights on %s ...", load_device) - if self._is_gguf_quantization(od_config): + if load_format == "diffusers": + # DiffusersAdapterPipeline.load_weights() calls + # DiffusionPipeline.from_pretrained() internally — it does + # NOT use our native (customized) pipeline classes. + cast(DiffusersAdapterPipeline, model).load_weights() + elif self._is_gguf_quantization(od_config): self._load_weights_with_gguf(model, od_config) else: # Quantization does not happen in `load_weights` but after it diff --git a/vllm_omni/diffusion/models/diffusers_adapter/__init__.py b/vllm_omni/diffusion/models/diffusers_adapter/__init__.py new file mode 100644 index 00000000000..c8dd51c8e7c --- /dev/null +++ b/vllm_omni/diffusion/models/diffusers_adapter/__init__.py @@ -0,0 +1,12 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project + +"""Diffusers backend adapter for vLLM-Omni.""" + +from vllm_omni.diffusion.models.diffusers_adapter.pipeline_diffusers_adapter import ( + DiffusersAdapterPipeline, +) + +__all__ = [ + "DiffusersAdapterPipeline", +] diff --git a/vllm_omni/diffusion/models/diffusers_adapter/pipeline_diffusers_adapter.py b/vllm_omni/diffusion/models/diffusers_adapter/pipeline_diffusers_adapter.py new file mode 100644 index 00000000000..8a1fdfc08f3 --- /dev/null +++ b/vllm_omni/diffusion/models/diffusers_adapter/pipeline_diffusers_adapter.py @@ -0,0 +1,358 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +"""Diffusers backend adapter for vLLM-Omni. + +Provides a black-box wrapper around any 🤗 Diffusers pipeline, enabling +vLLM-Omni to directly serve Diffusers models with near-zero per-model code. + +The adapter delegates full pipeline execution to diffusers' ``__call__()``. +It does NOT support: +- CFG parallel (diffusers handles CFG via guidance_scale internally) +- Sequence parallel (requires model-specific attention surgery) +- TeaCache / Cache-DiT (requires hooking into transformer blocks) +- Step-wise execution (continuous batching) +""" + +import logging +import os +from typing import Any + +import torch +from diffusers.pipelines.pipeline_utils import DiffusionPipeline +from torch import nn + +from vllm_omni.diffusion.data import DiffusionOutput, OmniDiffusionConfig +from vllm_omni.diffusion.profiler.diffusion_pipeline_profiler import DiffusionPipelineProfilerMixin +from vllm_omni.diffusion.request import OmniDiffusionRequest +from vllm_omni.inputs.data import OmniPromptType +from vllm_omni.platforms import current_omni_platform + +logger = logging.getLogger(__name__) + + +class DiffusersAdapterPipeline(nn.Module, DiffusionPipelineProfilerMixin): + """Black-box adapter that delegates full pipeline execution to a diffusers pipeline. + + Usage:: + + adapter = DiffusersAdapterPipeline(od_config=od_config) + adapter.load_weights() # calls DiffusionPipeline.from_pretrained() + output = adapter.forward(req) + + Step-wise execution is explicitly rejected — diffusers encapsulates the + full denoising loop internally. Use native pipelines for continuous + batching mode. + """ + + supports_step_execution: bool = False + + def __init__(self, *, od_config: OmniDiffusionConfig, device: torch.device | None = None): + super().__init__() + self._pipeline: DiffusionPipeline + self.od_config = od_config + self.device = device + self._capabilities: dict[str, Any] = {} + self._raise_unsupported_features() + + self.setup_diffusion_pipeline_profiler( + enable_diffusion_pipeline_profiler=od_config.enable_diffusion_pipeline_profiler, + profiler_targets=["forward"], + ) + if od_config.enable_diffusion_pipeline_profiler: + logger.info("Profiling enabled for DiffusersAdapterPipeline. Only 'forward' is supported.") + + # ------------------------------------------------------------------ + # Weight loading + # ------------------------------------------------------------------ + + def load_weights(self) -> None: + """Load the diffusers pipeline via ``DiffusionPipeline.from_pretrained()``.""" + + model_id = self.od_config.model + dtype = self.od_config.dtype + + load_kwargs = { + "torch_dtype": dtype, + **self.od_config.diffusers_load_kwargs, + } + logger.debug(f"Loading diffusers pipeline with kwargs: {load_kwargs}") + + self._pipeline = DiffusionPipeline.from_pretrained( + model_id, + **load_kwargs, + ).to(self.device) + + # CPU offloading + if self.od_config.enable_layerwise_offload: + self._pipeline.enable_sequential_cpu_offload() + elif self.od_config.enable_cpu_offload: + self._pipeline.enable_model_cpu_offload() + + # VAE slicing and tiling: try-catch because not all models have VAE + if self.od_config.vae_use_slicing: + try: + self._pipeline.enable_vae_slicing() + except Exception as e: + logger.warning( + f"Failed to enable VAE slicing for diffusers pipeline {self._pipeline.__class__.__name__}: {e}" + ) + if self.od_config.vae_use_tiling: + try: + self._pipeline.enable_vae_tiling() + except Exception as e: + logger.warning( + f"Failed to enable VAE tiling for diffusers pipeline {self._pipeline.__class__.__name__}: {e}" + ) + + # Attention backend + self._set_attention_backend() + + # ------------------------------------------------------------------ + # Step-wise execution — explicitly rejected + # ------------------------------------------------------------------ + + def prepare_encode(self, **_: Any) -> Any: + raise NotImplementedError( + "Step-wise execution is not yet supported with the diffusers backend. " + "Use a native pipeline for continuous batching mode." + ) + + def denoise_step(self, **_: Any) -> torch.Tensor | None: + raise NotImplementedError( + "Step-wise execution is not yet supported with the diffusers backend. " + "Use a native pipeline for continuous batching mode." + ) + + def step_scheduler(self, **_: Any) -> None: + raise NotImplementedError( + "Step-wise execution is not yet supported with the diffusers backend. " + "Use a native pipeline for continuous batching mode." + ) + + def post_decode(self, **_: Any) -> Any: + raise NotImplementedError( + "Step-wise execution is not yet supported with the diffusers backend. " + "Use a native pipeline for continuous batching mode." + ) + + # ------------------------------------------------------------------ + # Forward pass + # ------------------------------------------------------------------ + + def forward(self, req: OmniDiffusionRequest) -> DiffusionOutput: + """Full delegation to diffusers ``pipeline.__call__()``.""" + + kwargs = self._build_call_kwargs(req) + logger.debug(f"Calling diffusers pipeline with kwargs: {kwargs}") + + with torch.inference_mode(): + output = self._pipeline(**kwargs) # pyright: ignore[reportCallIssue] + + return self._wrap_output(output) + + # ------------------------------------------------------------------ + # Validation guards + # ------------------------------------------------------------------ + + def _raise_unsupported_features(self) -> None: + """Raise an error for incompatible feature switches.""" + pc = self.od_config.parallel_config + if pc.cfg_parallel_size > 1: + raise NotImplementedError( + "CFG parallel is not supported with the diffusers backend. " + "Diffusers handles CFG internally via guidance_scale." + ) + if pc.sequence_parallel_size is not None and pc.sequence_parallel_size > 1: + raise NotImplementedError( + "Sequence parallel is not supported with the diffusers backend. " + "It requires model-specific attention surgery." + ) + if self.od_config.cache_backend not in ("none", None): + raise NotImplementedError( + f"Cache backend '{self.od_config.cache_backend}' is not supported " + "with the diffusers backend. TeaCache/Cache-DiT require hooking " + "into individual transformer blocks." + ) + if self.od_config.enforce_eager: + raise NotImplementedError( + "Eager execution is not supported with the diffusers backend. " + "Use a native pipeline for continuous batching mode." + ) + if self.od_config.quantization_config is not None: + raise NotImplementedError( + "Quantization is not supported with the diffusers backend. Use a native pipeline for quantization." + ) + + # ------------------------------------------------------------------ + # Wrap settings, inputs, and outputs + # ------------------------------------------------------------------ + + def _set_attention_backend(self) -> None: + """Set the attention backend. + + Roughly follow the logic in vllm_omni/diffusion/attention/backends/utils/fa.py, + But also consider the available attention backends in diffusers. + (See: https://huggingface.co/docs/diffusers/optimization/attention_backends) + """ + if not hasattr(self._pipeline, "transformer"): + logging.info("No transformer found in diffusers pipeline. Skipping attention backend setting.") + return + + attention_backend_config = self.od_config.attention_backend or os.environ.get("DIFFUSION_ATTENTION_BACKEND") + attention_backend_attempts: list[str] = [] + match attention_backend_config: + case "FLASH_ATTN" | None: + if current_omni_platform.is_rocm(): + attention_backend_attempts.append("aiter") + elif current_omni_platform.is_xpu(): + attention_backend_attempts.append("_native_xla") + elif current_omni_platform.is_musa(): + logger.warning( + "Unknown diffusers attention backend option for MUSA platform. Falling back to SDPA." + ) + attention_backend_attempts.append("native") + else: + attention_backend_attempts.extend( + [ + "_flash_3_hub", + "_flash_3_varlen_hub", + "_flash_3", + "_flash_varlen_3", + "flash_hub", + "flash_varlen_hub", + "flash", + "flash_varlen", + "_native_flash", + ] + ) + case "SAGE_ATTN": + attention_backend_attempts.extend(["sage_hub", "sage", "sage", "sage_varlen"]) + case "ASCEND": + attention_backend_attempts.append("_native_npu") + case "TORCH_SDPA": + attention_backend_attempts.append("native") + case _: + logger.warning(f"Invalid attention backend: {attention_backend_config}. Falling back to SDPA.") + attention_backend_attempts.append("native") + + attempt_errors: list[str] = [] + set_backend: str | None = None + for backend in attention_backend_attempts: + try: + self._pipeline.transformer.set_attention_backend(backend) + set_backend = backend + break + except Exception as e: + attempt_errors.append(str(e)) + + # If all attempts fail, fallback to SDPA and warn the user about the failures + if len(attempt_errors) == len(attention_backend_attempts): + self._pipeline.transformer.set_attention_backend("native") + logger.warning( + f"Failed to set attention backend '{attention_backend_config}' for " + f"diffusers pipeline {self._pipeline.__class__.__name__}. " + "Falling back to SDPA. " + f"The following attempts were made: {dict(zip(attention_backend_attempts, attempt_errors))}" + ) + return + + # If some attempts fail, only warn the user about the failures + logger.info( + f"Set diffusers attention backend to '{set_backend}', adapted from " + f"user config value '{attention_backend_config}'." + ) + if len(attempt_errors) > 0: + logger.warning( + f"The following failed attempts were made before choosing this diffusers backend: " + f"{dict(zip(attention_backend_attempts, attempt_errors))}" + ) + + def _build_call_kwargs(self, req: OmniDiffusionRequest) -> dict[str, Any]: + """Translate ``OmniDiffusionRequest`` into diffusers ``__call__`` kwargs.""" + sampling = req.sampling_params + prompt, neg_prompt = self._extract_prompt(req.prompts) + + # Merge user-provided call kwargs from stage/CLI defaults. + # Request-time parameters take precedence over stage-config defaults + call_kwargs = self.od_config.diffusers_call_kwargs + kwargs: dict[str, Any] = { + **call_kwargs, + "prompt": prompt, + "num_inference_steps": sampling.num_inference_steps, + "guidance_scale": sampling.guidance_scale, + "output_type": sampling.output_type or self.od_config.output_type, + } + + if sampling.height is not None: + kwargs["height"] = sampling.height + if sampling.width is not None: + kwargs["width"] = sampling.width + if sampling.num_frames is not None and sampling.num_frames > 1: + kwargs["num_frames"] = sampling.num_frames + if sampling.num_outputs_per_prompt is not None and sampling.num_outputs_per_prompt > 1: + kwargs["num_images_per_prompt"] = sampling.num_outputs_per_prompt + + if neg_prompt is not None: + kwargs["negative_prompt"] = neg_prompt + + if sampling.generator is not None: + kwargs["generator"] = sampling.generator + elif sampling.seed is not None: + kwargs["generator"] = torch.Generator(device=sampling.generator_device).manual_seed(sampling.seed) + else: + kwargs["generator"] = torch.Generator(device=sampling.generator_device) + + if sampling.latents is not None: + kwargs["latents"] = sampling.latents + + return kwargs + + @staticmethod + def _extract_prompt(prompt_obj: list[OmniPromptType]) -> tuple[str | list[str], str | list[str] | None]: + """Extract the text prompts and negative prompts from a list of prompt objects.""" + if len(prompt_obj) == 1: + if isinstance(prompt_obj[0], str): + return prompt_obj[0], None + else: + return prompt_obj[0].get("prompt", ""), prompt_obj[0].get("negative_prompt", None) + + prompts = [] + negative_prompts: list[str] | None = [] + for prompt in prompt_obj: + if isinstance(prompt, str): + prompts.append(prompt) + else: + prompts.append(prompt.get("prompt", "")) + negative_prompts.append(prompt.get("negative_prompt", "")) + if all(not np for np in negative_prompts): + negative_prompts = None + return prompts, negative_prompts + + @staticmethod + def _extract_negative_prompt(prompt_obj: Any) -> str | None: + """Extract the negative prompt from a prompt object, if present.""" + if isinstance(prompt_obj, dict): + return prompt_obj.get("negative_prompt") + return getattr(prompt_obj, "negative_prompt", None) + + def _wrap_output(self, output: Any) -> DiffusionOutput: + """Convert diffusers pipeline output to ``DiffusionOutput``. + + Diffusers output types: + - ``ImagePipelineOutput(images=...)`` — text2img, img2img + - ``VideoPipelineOutput(frames=...)`` — text2vid, img2vid + """ + from vllm_omni.diffusion.data import DiffusionOutput + + if hasattr(output, "images"): + # Preserve diffusers image format (`output_type`) + return DiffusionOutput(output=output.images) + + if hasattr(output, "frames"): + # Preserve diffusers video format (`output_type`) + return DiffusionOutput(output=output.frames) + + if hasattr(output, "audios"): + return DiffusionOutput(output=output.audios) + + return DiffusionOutput(output=output) diff --git a/vllm_omni/diffusion/registry.py b/vllm_omni/diffusion/registry.py index 4001109cc9e..2da2a68f4dc 100644 --- a/vllm_omni/diffusion/registry.py +++ b/vllm_omni/diffusion/registry.py @@ -208,6 +208,11 @@ "pipeline_omnivoice", "OmniVoicePipeline", ), + "DiffusersAdapterPipeline": ( + "diffusers_adapter", + "pipeline_diffusers_adapter", + "DiffusersAdapterPipeline", + ), } diff --git a/vllm_omni/engine/async_omni_engine.py b/vllm_omni/engine/async_omni_engine.py index 7b98c13b828..7198b2228aa 100644 --- a/vllm_omni/engine/async_omni_engine.py +++ b/vllm_omni/engine/async_omni_engine.py @@ -1303,6 +1303,12 @@ def _create_default_diffusion_stage_cfg(kwargs: dict[str, Any]) -> list: if "dtype" in normalized_kwargs: stage_engine_args["dtype"] = normalized_kwargs["dtype"] + # New split fields for diffusers adapter kwargs. + if kwargs.get("diffusers_load_kwargs") is not None: + stage_engine_args["diffusers_load_kwargs"] = kwargs["diffusers_load_kwargs"] + if kwargs.get("diffusers_call_kwargs") is not None: + stage_engine_args["diffusers_call_kwargs"] = kwargs["diffusers_call_kwargs"] + default_stage_cfg = [ { "stage_id": 0, diff --git a/vllm_omni/entrypoints/cli/serve.py b/vllm_omni/entrypoints/cli/serve.py index e355ee679d8..841858778e2 100644 --- a/vllm_omni/entrypoints/cli/serve.py +++ b/vllm_omni/entrypoints/cli/serve.py @@ -267,6 +267,40 @@ def subparser_init(self, subparsers: argparse._SubParsersAction) -> FlexibleArgu default=None, help="Override the diffusion pipeline class name (e.g. LTX2ImageToVideoPipeline).", ) + omni_config_group.add_argument( + "--diffusion-load-format", + dest="diffusion_load_format", + type=str, + default=None, + choices=["default", "custom_pipeline", "dummy", "diffusers"], + help=( + "How to load the diffusion pipeline: native/registry (default), " + "custom_pipeline, dummy, or diffusers for the HF diffusers adapter." + ), + ) + omni_config_group.add_argument( + "--diffusers-load-kwargs", + dest="diffusers_load_kwargs", + type=json.loads, + default="{}", + help=( + "JSON object passed to DiffusionPipeline.from_pretrained()." + "It overrides corresponding parameters in the standard vLLM-Omni interface." + '(e.g. \'{"use_safetensors": true, "variant": "fp16"}\').' + ), + ) + omni_config_group.add_argument( + "--diffusers-call-kwargs", + dest="diffusers_call_kwargs", + type=json.loads, + default="{}", + help=( + "JSON object passed to pipeline.__call__(). " + "Useful for model-specific sampling parameters not covered by the vLLM-Omni interface." + "During request time, it is overridden by corresponding parameters in the vLLM-Omni interface." + '(e.g. \'{"num_inference_steps": 30, "guidance_scale": 7.5}\').' + ), + ) omni_config_group.add_argument( "--usp", "--ulysses-degree",