Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,8 @@ Worker classes and model runners for distributed inference.
- [vllm_omni.worker.gpu_generation_model_runner.GPUGenerationModelRunner][]
- [vllm_omni.worker.gpu_generation_worker.GPUGenerationWorker][]
- [vllm_omni.worker.gpu_model_runner.OmniGPUModelRunner][]
- [vllm_omni.worker.npu.npu_ar_model_runner.NPUARModelRunner][]
- [vllm_omni.worker.npu.npu_ar_worker.NPUARWorker][]
- [vllm_omni.worker.npu.npu_diffusion_model_runner.NPUDiffusionModelRunner][]
- [vllm_omni.worker.npu.npu_diffusion_worker.NPUDiffusionWorker][]
- [vllm_omni.worker.npu.npu_model_runner.OmniNPUModelRunner][]
2 changes: 1 addition & 1 deletion docs/getting_started/installation/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ vLLM supports the following hardware platforms:

- [GPU](gpu.md)
- [NVIDIA CUDA](gpu.md)
- [Ascend NPU](npu.md)
- [NPU](npu.md)
24 changes: 22 additions & 2 deletions docs/getting_started/installation/npu.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,23 @@
# Ascend-NPU
# NPU

vLLM-Omni is a Python library that supports the following NPU variants. Select your NPU type to see vendor specific instructions:
vLLM-Omni supports NPU through the vLLM Ascend Plugin (vllm-ascend). This is a community maintained hardware plugin for running vLLM on NPU.

## Requirements

- OS: Linux
- Python: 3.12

!!! note
vLLM-Omni is currently not natively supported on Windows.

=== "NPU"

--8<-- "docs/getting_started/installation/npu/npu.inc.md:requirements"

## Installation

### Recommended

=== "NPU"

--8<-- "docs/getting_started/installation/npu/npu.inc.md:installation"
45 changes: 44 additions & 1 deletion docs/getting_started/installation/npu/npu.inc.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,48 @@
# --8<-- [start:requirements]

For detailed hardware and software requirements, please refer to the [vllm-ascend installation documentation](https://docs.vllm.ai/projects/ascend/en/latest/installation.html).

# --8<-- [end:requirements]
# --8<-- [start:installation]

vLLM-Omni mainly contains python implementations for framework and models.
The recommended way to use vLLM-Omni on NPU is through the vllm-ascend pre-built Docker images:

```bash
# Update DEVICE according to your NPUs (/dev/davinci[0-7])
export DEVICE0=/dev/davinci0
export DEVICE1=/dev/davinci1
# Update the vllm-ascend image
# Atlas A2:
# export IMAGE=quay.io/ascend/vllm-ascend:v0.11.0rc2
# Atlas A3:
# export IMAGE=quay.io/ascend/vllm-ascend:v0.11.0rc2-a3
export IMAGE=quay.io/ascend/vllm-ascend:v0.11.0rc2
docker run --rm \
--name vllm-omni-npu \
--device $DEVICE0 \
--device $DEVICE1 \
--device /dev/davinci_manager \
--device /dev/devmm_svm \
--device /dev/hisi_hdc \
-v /usr/local/dcmi:/usr/local/dcmi \
-v /usr/local/bin/npu-smi:/usr/local/bin/npu-smi \
-v /usr/local/Ascend/driver/lib64/:/usr/local/Ascend/driver/lib64/ \
-v /usr/local/Ascend/driver/version.info:/usr/local/Ascend/driver/version.info \
-v /etc/ascend_install.info:/etc/ascend_install.info \
-v /root/.cache:/root/.cache \
-p 8000:8000 \
-it $IMAGE bash

# Inside the container, install vLLM-Omni from source
cd /vllm-workspace
git clone https://github.com/vllm-project/vllm-omni.git
cd vllm-omni
pip install -v -e .
export VLLM_WORKER_MULTIPROC_METHOD=spawn
```

The default workdir is `/workspace`, with vLLM, vLLM-Ascend and vLLM-Omni code placed in `/vllm-workspace` installed in development mode.

Comment thread
gcanlin marked this conversation as resolved.
For other installation methods (pip installation, building from source, custom Docker builds), please refer to the [vllm-ascend installation guide](https://docs.vllm.ai/projects/ascend/en/latest/installation.html).

# --8<-- [end:installation]
3 changes: 2 additions & 1 deletion examples/offline_inference/qwen_image/gradio_demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import torch

from vllm_omni.entrypoints.omni import Omni
from vllm_omni.utils.platform_utils import detect_device_type

ASPECT_RATIOS: dict[str, tuple[int, int]] = {
"1:1": (1328, 1328),
Expand Down Expand Up @@ -62,7 +63,7 @@ def get_omni(model_name: str) -> Omni:


def build_demo(args: argparse.Namespace) -> gr.Blocks:
device = "cuda"
device = detect_device_type()
omni = get_omni(args.model)

def run_inference(
Expand Down
3 changes: 2 additions & 1 deletion examples/offline_inference/qwen_image/text_to_image.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import torch

from vllm_omni.entrypoints.omni import Omni
from vllm_omni.utils.platform_utils import detect_device_type


def parse_args() -> argparse.Namespace:
Expand Down Expand Up @@ -45,7 +46,7 @@ def parse_args() -> argparse.Namespace:

def main():
args = parse_args()
device = "cuda" if torch.cuda.is_available() else "cpu"
device = detect_device_type()
generator = torch.Generator(device=device).manual_seed(args.seed)

omni = Omni(model=args.model)
Expand Down
4 changes: 2 additions & 2 deletions vllm_omni/core/sched/diffusion_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

from vllm.v1.core.kv_cache_manager import KVCacheBlocks
from vllm.v1.core.sched.request_queue import create_request_queue
from vllm.v1.core.sched.scheduler import EngineCoreOutputs, Request, RequestStatus, SchedulerOutput, SpecDecodingStats
from vllm.v1.core.sched.scheduler import Request, RequestStatus, SchedulerOutput, SpecDecodingStats
from vllm.v1.core.sched.utils import remove_all
from vllm.v1.engine import EngineCoreEventType, EngineCoreOutput
from vllm.v1.engine import EngineCoreEventType, EngineCoreOutput, EngineCoreOutputs
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This commit also fix the common bug both on GPU and NPU. We should import EngineCoreOutputs from vllm.v1.engine to make sure the patch work.

Copy link
Copy Markdown
Collaborator Author

@gcanlin gcanlin Nov 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't take any break in GPU.

  • run_multiple_prompts.sh
  • run_single_prompt.sh
  • text
  • use_audio_in_video
  • online: break by other bugs for now.
image


from vllm_omni.core.sched.output import OmniNewRequestData
from vllm_omni.core.sched.scheduler import OmniScheduler
Expand Down
3 changes: 1 addition & 2 deletions vllm_omni/core/sched/generation_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,13 @@
from vllm.v1.core.kv_cache_manager import KVCacheBlocks
from vllm.v1.core.sched.request_queue import create_request_queue
from vllm.v1.core.sched.scheduler import (
EngineCoreOutputs,
Request,
RequestStatus,
SchedulerOutput,
SpecDecodingStats,
)
from vllm.v1.core.sched.utils import remove_all
from vllm.v1.engine import EngineCoreEventType, EngineCoreOutput
from vllm.v1.engine import EngineCoreEventType, EngineCoreOutput, EngineCoreOutputs

from vllm_omni.core.sched.output import OmniNewRequestData
from vllm_omni.core.sched.scheduler import OmniScheduler
Expand Down
7 changes: 5 additions & 2 deletions vllm_omni/diffusion/diffusion_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from vllm_omni.diffusion.registry import get_diffusion_post_process_func
from vllm_omni.diffusion.request import OmniDiffusionRequest
from vllm_omni.diffusion.scheduler import scheduler
from vllm_omni.diffusion.worker.gpu_worker import WorkerProc
from vllm_omni.utils.platform_utils import get_diffusion_worker_class

logger = init_logger(__name__)

Expand Down Expand Up @@ -76,6 +76,9 @@ def _launch_workers(self, broadcast_handle):
mp.set_start_method("spawn", force=True)
processes = []

# Get the appropriate worker class for current device
worker_proc = get_diffusion_worker_class()

# Launch all worker processes
scheduler_pipe_readers = []
scheduler_pipe_writers = []
Expand All @@ -84,7 +87,7 @@ def _launch_workers(self, broadcast_handle):
reader, writer = mp.Pipe(duplex=False)
scheduler_pipe_writers.append(writer)
process = mp.Process(
target=WorkerProc.worker_main,
target=worker_proc.worker_main,
args=(
i, # rank
od_config,
Expand Down
8 changes: 6 additions & 2 deletions vllm_omni/diffusion/distributed/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@

import torch

from vllm_omni.utils.platform_utils import detect_device_type


def get_local_device() -> torch.device:
"""Return the torch device for the current rank."""
return torch.device(f"cuda:{os.environ.get('LOCAL_RANK', 0)}")
"""Return the torch device for the current rank based on detected device type."""
device_type = detect_device_type()
local_rank = os.environ.get("LOCAL_RANK", 0)
return torch.device(f"{device_type}:{local_rank}")
196 changes: 196 additions & 0 deletions vllm_omni/diffusion/worker/npu/npu_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import multiprocessing as mp
import os

import torch
import zmq
from vllm.config import VllmConfig, set_current_vllm_config
from vllm.distributed.device_communicators.shm_broadcast import MessageQueue
from vllm.distributed.parallel_state import (
init_distributed_environment,
initialize_model_parallel,
)
from vllm.logger import init_logger

from vllm_omni.diffusion.data import DiffusionOutput, OmniDiffusionConfig
from vllm_omni.diffusion.registry import initialize_model
from vllm_omni.diffusion.request import OmniDiffusionRequest

logger = init_logger(__name__)


class NPUWorker:
"""
A worker that executes the model on a single NPU.
"""

def __init__(
self,
local_rank: int,
rank: int,
od_config: OmniDiffusionConfig,
):
self.local_rank = local_rank
self.rank = rank
self.od_config = od_config
self.pipeline = None

self.init_device_and_model()

def init_device_and_model(self) -> None:
"""Initialize the device and load the model."""
world_size = self.od_config.num_gpus
rank = self.rank
# Set environment variables for distributed initialization
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = str(self.od_config.master_port)
os.environ["LOCAL_RANK"] = str(self.local_rank)
os.environ["RANK"] = str(rank)
os.environ["WORLD_SIZE"] = str(world_size)

device = torch.device(f"npu:{rank}")
torch.npu.set_device(device)

# hack
vllm_config = VllmConfig()
vllm_config.parallel_config.tensor_parallel_size = self.od_config.num_gpus
set_current_vllm_config(vllm_config)

init_distributed_environment(world_size=world_size, rank=rank)
initialize_model_parallel(tensor_model_parallel_size=world_size)

with device:
self.pipeline = initialize_model(self.od_config)
self.pipeline.load_weights()
self.pipeline.eval()
logger.info(f"Worker {self.rank}: Initialized device, model, and distributed environment.")
logger.info(f"Worker {self.rank}: Model loaded successfully.")

@torch.inference_mode()
def execute_model(self, reqs: list[OmniDiffusionRequest], od_config: OmniDiffusionConfig) -> DiffusionOutput:
"""
Execute a forward pass.
"""
assert self.pipeline is not None
# TODO: dealing with first req for now
req = reqs[0]
output = self.pipeline.forward(req)
return output


class NPUWorkerProc:
"""Wrapper that runs one Worker in a separate process."""

def __init__(
self,
od_config: OmniDiffusionConfig,
gpu_id: int,
broadcast_handle,
):
self.od_config = od_config

# Inter-process Communication
self.context = zmq.Context(io_threads=2)

# Initialize MessageQueue reader from handle
self.mq = MessageQueue.create_from_handle(broadcast_handle, gpu_id)

self.result_mq = None
self.result_mq_handle = None

# Setup result sender (only for rank 0 for now, or whoever needs to reply)
# Assuming only rank 0 replies to scheduler as per original logic
if gpu_id == 0:
# Create MessageQueue for results (1 writer -> 1 reader)
# We assume the reader (SyncScheduler) will act as rank 0
self.result_mq = MessageQueue(n_reader=1, n_local_reader=1, local_reader_ranks=[0])
self.result_mq_handle = self.result_mq.export_handle()
logger.info(f"Worker {gpu_id} created result MessageQueue")

assert od_config.master_port is not None
worker = NPUWorker(
local_rank=gpu_id,
rank=gpu_id,
od_config=od_config,
)
self.worker = worker
self.gpu_id = gpu_id
self._running = True

def return_result(self, output: DiffusionOutput):
"""
replies to client, only on rank 0
"""
if self.result_mq is not None:
self.result_mq.enqueue(output)

def recv_reqs(self):
"""
Receive requests from broadcast queue
"""
return self.mq.dequeue()

# TODO: queueing, cancellation
def worker_busy_loop(self) -> None:
"""Main busy loop for Multiprocessing Workers"""

logger.info(f"Worker {self.gpu_id} ready to receive requests via shared memory")

while self._running:
reqs = None
# 1: receive requests
try:
reqs = self.recv_reqs()
except Exception as e:
logger.error(
f"Error receiving requests in scheduler event loop: {e}",
exc_info=True,
)
continue

# 2: execute, make sure a reply is always sent
try:
output = self.worker.execute_model(reqs, self.od_config)
except Exception as e:
logger.error(
f"Error executing forward in event loop: {e}",
exc_info=True,
)
output = DiffusionOutput(error=str(e))

try:
self.return_result(output)
except zmq.ZMQError as e:
# Reply failed; log and keep loop alive to accept future requests
logger.error(f"ZMQ error sending reply: {e}")
continue

logger.info("event loop terminated.")
# if self.result_sender is not None:
# self.result_sender.close()
self.context.term()

@staticmethod
def worker_main(
rank: int,
od_config: OmniDiffusionConfig,
pipe_writer: mp.connection.Connection,
broadcast_handle,
) -> None:
"""Worker initialization and execution loops."""

worker_proc = NPUWorkerProc(
od_config,
gpu_id=rank,
broadcast_handle=broadcast_handle,
)
logger.info(f"Worker {rank}: Scheduler loop started.")
pipe_writer.send(
{
"status": "ready",
"result_handle": worker_proc.result_mq_handle if rank == 0 else None,
}
)
worker_proc.worker_busy_loop()
logger.info(f"Worker {rank}: Shutdown complete.")
Loading