Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
34c59f8
Adapt the Lingbot World Fast Pipeline to work in vllm-omni
Miguel0312 May 6, 2026
fae96e2
Add step execution and denoise_micro_step to Wan2.2 pipeline
mnasser02 Apr 15, 2026
94141ec
remove denoise_micro_step
mnasser02 Apr 15, 2026
e144462
add unit tests for wan22 step exec
mnasser02 Apr 15, 2026
a542a6c
add RankTask and track per-rank tasks in DiffusionSchedulerOutput
mnasser02 Apr 16, 2026
a493fb3
add num_chunks sampling param
mnasser02 Apr 17, 2026
5491d38
Add chunk related data structures to track its state.
mnasser02 Apr 17, 2026
d9cb798
Add execute_micro_step execution path that works on a per-rank assign…
mnasser02 Apr 17, 2026
3414a6e
Add StreamBatchScheduler that controls the flow of chunks through the…
mnasser02 Apr 17, 2026
778bc66
Add StreamBatchScheduler tests
mnasser02 Apr 20, 2026
95a2bd1
Add micro-step execution pipeline tests
mnasser02 Apr 20, 2026
e18732d
Add stream_batch arg
mnasser02 Apr 21, 2026
90a9bfe
Set reply rank to 0
mnasser02 Apr 21, 2026
5374266
Fix blocking send/recv resulting in deadlock by registering metadata …
mnasser02 Apr 21, 2026
864ccd1
bugfix
mnasser02 Apr 21, 2026
18daf55
bugfix
mnasser02 Apr 21, 2026
ea12e8b
bugfix
mnasser02 Apr 21, 2026
78d80e9
Implement fully async isend/recv_dicts and migrate PPMixin to use them
mnasser02 Apr 22, 2026
986571c
Rmv stale code
mnasser02 Apr 22, 2026
f502e96
bugfix
mnasser02 Apr 22, 2026
d111ca0
Make sending dict schema unblocking
mnasser02 Apr 22, 2026
26e7b8e
Add warmup for nccl comms on init
mnasser02 Apr 22, 2026
34a8002
Use batch_isend_irecv instead of plain isend/recv
mnasser02 Apr 22, 2026
49f5807
Handle tensor receive as a task
mnasser02 Apr 22, 2026
63bb687
Chunk-aware buffers
mnasser02 Apr 22, 2026
e77972d
fix
mnasser02 Apr 22, 2026
b95fdfd
Improve and fix PP communication.
mnasser02 Apr 23, 2026
c9b622a
bugfixes
mnasser02 Apr 23, 2026
826b505
Remove stale tests
mnasser02 Apr 24, 2026
081818e
Add unit tests for StreamBatchScheduler
mnasser02 Apr 27, 2026
a87a9ae
add SupportsMicroStepExecution
mnasser02 Apr 27, 2026
814a1a5
add unit tests for micro-step execution pipeline
mnasser02 Apr 27, 2026
8bc0ed5
edit
mnasser02 Apr 27, 2026
5aee2a2
add slo-adaptive scheduling
mnasser02 May 6, 2026
26962f7
bugfix
mnasser02 May 6, 2026
824975e
bugfix
mnasser02 May 6, 2026
31ed1d5
fix and improve stream batching
mnasser02 May 11, 2026
95b46e2
modify tests
mnasser02 May 12, 2026
735eb54
Implement video continuation
Miguel0312 May 19, 2026
db6bb72
support v2v
mnasser02 May 19, 2026
dde6849
update tests
mnasser02 May 19, 2026
010cfc9
Remove dependency on external code from Lingbot World repo
Miguel0312 May 19, 2026
47cdf31
bugfix
mnasser02 May 20, 2026
e9fadb4
fixes
mnasser02 May 20, 2026
31317c3
fix the flow of latents aross ranks
mnasser02 May 25, 2026
0109533
Implement tests for Lingbot World Fast
Miguel0312 May 26, 2026
69f64e5
Merge branch 'main' into lingbot-fast
Miguel0312 May 26, 2026
778e2ff
bugfixes
mnasser02 May 26, 2026
b68064e
Update documentation
Miguel0312 May 27, 2026
883270d
Merge branch 'lingbot-fast' of github.com:zzhang-fr/vllm-omni into st…
mnasser02 May 28, 2026
c7c2cf8
revert wan changes
mnasser02 May 29, 2026
21a1cbd
fixes
mnasser02 May 29, 2026
234841c
add stream batch support to lingbot
mnasser02 May 29, 2026
10a5e40
update tests
mnasser02 May 29, 2026
7762df5
add docs
mnasser02 May 29, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions docs/contributing/model/adding_diffusion_model.md
Original file line number Diff line number Diff line change
Expand Up @@ -776,6 +776,41 @@ turning them on. For Qwen-Image-style serving examples, document
`--step-execution` as the feature gate and `--max-num-seqs N` as the
companion batching knob.

### Micro-Step Execution

See detailed design guide: [How to add micro-step execution support](../../design/feature/diffusion_micro_step_execution.md)

Use this only when your pipeline is built for *streaming chunked* output
(e.g. video chunks) and you want stream batch —
at each tick every PP rank denoises a different chunk at a different
timestep, then chunks shift one rank downstream.

Micro-step is a superset of step execution. On top of the four
step-execution methods, the pipeline must also implement:

1. `set_pp_recv_dict_buffers()` to pre-register PP recv buffers the request will use.
2. `encode_chunk_inputs()` to build per-chunk initial latents and any
per-chunk conditioning.
3. `prefetch_tensors()` to pre-post the next-step recv (latents on the
first rank, intermediate tensors elsewhere) so it overlaps with compute.

`denoise_step()` and `step_scheduler()` are also redesigned to operate on a
row-batched mix of chunks at different denoising step indices.
`post_decode()` becomes incremental — it runs on rank 0 every tick that has
freshly finished chunks, not just once at the end.

Prerequisites:

- The transformer is PP-partitioned (`make_layers`, `PPMissingLayer`) — see
[Pipeline Parallel](../../design/feature/pipeline_parallel.md).
- The pipeline inherits `PipelineParallelMixin` and `CFGParallelMixin`r.
- The pipeline declares `supports_micro_step_execution: ClassVar[bool] =
True`.
- Each request sets `chunk_frames`, `num_chunks`, and
`num_inference_steps` in `OmniDiffusionSamplingParams`.

Reference implementation: `LingbotWorldFastPipeline`

### Cache Acceleration

#### TeaCache
Expand Down
239 changes: 239 additions & 0 deletions docs/design/feature/diffusion_micro_step_execution.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
# Adding Micro-Step Execution Support for Diffusion Pipelines

This guide documents vLLM-Omni's micro-step diffusion contract for model
authors and contributors implementing `stream_batch=True` support for a
diffusion pipeline.

For end-user enablement, supported models, and current limitations, see
[Micro-Step Execution](../../user_guide/diffusion/micro_step_execution.md).

This document describes the micro-step execution contract only. It builds on
the request-/step-level contract in
[Step Execution](diffusion_step_execution.md) and the PP partitioning rules in
[Pipeline Parallel](pipeline_parallel.md). Read those first.

## Current Support Scope

`stream_batch` is **not** a generic diffusion toggle. It only works for
pipelines that implement the segmented stateful contract in
[`vllm_omni/diffusion/models/interface.py`](gh-file:vllm_omni/diffusion/models/interface.py)
as `SupportsMicroStepExecution`.

This page is intentionally author-facing. Treat runtime enablement
(`stream_batch=True` when constructing `Omni`) as an opt-in user knob layered
on top of the implementation contract below.

Current in-tree support:

| Pipeline | Example models | Micro-step execution |
|----------|----------------|----------------------|
| `LingbotWorldFastPipeline` | `lingbot_world/lingbot-world-base-cam/Lingbot-World-Fast` | Yes |
| All other diffusion pipelines | — | No |

Current engine/runtime limitations:

- `max_num_seqs == 1` — exactly one in-flight request per engine.
- `cache_backend` is not supported.
- Unsupported pipelines fail early during model loading instead of
failing on the first request.

## Execution Contract

Micro-step mode is driven by seven pipeline methods plus the shared mutable
request state object:

- `prepare_encode(state)`: one-time request preparation (inherited from
step execution).
- `set_pp_recv_dict_buffers(state)`: register PP recv buffers and schema
cache for every `(name, segment_idx, batch_size)` this request will use.
- `encode_chunk_inputs(state, new_idxs)`: per-chunk latent initialization.
Returns a tensor stacked along dim 0 over `new_idxs`; the runner stitches
it onto `state.latents` and into each chunk's `chunk.latents`.
- `denoise_step(state, batch_size)`: row-batched noise prediction over
`batch_size` chunks at different denoising step indices.
- `step_scheduler(state, noise_pred, per_request_scheduler, batch_size)`:
per-row scheduler update on the last rank; sends the updated latents
back to rank 0 via the ring (rank 0 picks them up via `prefetch_tensors`,
not inside this call). Every rank increments `state.step_index`.
- `prefetch_tensors(state, batch_size)`: pre-post the next-step recv on the
comms stream so it overlaps with this rank's compute.
- `post_decode(state)`: incremental decode of one or more freshly-finished
chunks (called whenever the previous tick produced `finished_idxs`).

The state lives in
[`vllm_omni/diffusion/worker/utils.py`](gh-file:vllm_omni/diffusion/worker/utils.py)
as `DiffusionRequestState` plus per-chunk `ChunkState` entries under
`state.extra["chunks"]`.

The worker-side micro-step loop lives in
[`vllm_omni/diffusion/worker/diffusion_model_runner.py`](gh-file:vllm_omni/diffusion/worker/diffusion_model_runner.py)
under `execute_micro_step`:

1. `prepare_encode()` runs once for a new request.
2. `set_pp_recv_dict_buffers()` runs immediately after, before any P2P.
3. Each micro-step:
- Rank 0 calls `post_decode()` for any chunks the previous tick
reported as finished, and accumulates the decoded output.
- Rank 0 and rank N-1 call `encode_chunk_inputs()` for their layout's
`new_idxs`. On rank 0 those are chunks freshly admitted this tick;
on rank N-1 they are the same chunks arriving at the back of the
ring N-1 ticks later — both ranks must produce identical initial
noise so the scheduler step on the last rank starts from the same
latents the first rank started from.
- All ranks with `chunk_indices` non-empty call `denoise_step()` then
`step_scheduler()`. The last rank also snapshots
`chunk.latents = state.latents[i:i+1]` per row so the next time those
chunks reach the last rank they can resume.
- `prefetch_tensors()` runs sized to the previous rank's load so the
next recv is posted before the next micro-step's compute.

## Per-Rank Chunk Layout

`StreamBatchScheduler` builds one `RankTask` per PP rank per micro-step:

| Field | Meaning |
|-------|---------|
| `chunk_indices` | Chunks this rank will denoise this tick |
| `layout.circulating_idxs` | Chunks that drained from rank N-1 last tick still needing more steps, looping back to rank 0 |
| `layout.finished_idxs` | Chunks that completed `num_inference_steps` at rank N-1 last tick, ready for decode |
| `layout.new_idxs` | Chunks freshly admitted at rank 0 (up to SLO `B_target`, capped by `num_chunks - admitted_so_far`) |

Layouts travel with their chunks: at rank R the current layout was built at
rank 0 R ticks ago, so `new_idxs` at rank R names the chunks admitted R ticks
ago and now reaching this rank for the first time on their first lap.

The runner uses rank 0's layout to assemble `state.latents` along dim 0 from
the circulating snapshot + fresh-noise rows for `new_idxs`, and to
incrementally decode `finished_idxs`. The last rank does the same assembly
when it owns `new_idxs` so step_scheduler has the matching initial latents.


## Recommended Split

| Request-level phase | Micro-step method | What belongs there |
|---------------------|-------------------|--------------------|
| Input validation, prompt encoding, timestep prep, per-request scheduler | `prepare_encode()` | Anything that should happen once per request |
| PP recv buffer / schema registration for every `(name, segment_idx, B)` | `set_pp_recv_dict_buffers()` | Iterate `1..slo_max_batch * num_inference_steps` |
| Per-chunk latent init (fresh randn, V2V VAE encode, anchor latents, plucker, etc.) | `encode_chunk_inputs()` | Build per-chunk initial latents (RNG must match across rank 0 and rank N-1); write per-chunk conditioning into `state.extra["chunks"][idx].extra` only on the rank that will read it |
| Row-batched transformer forward | `denoise_step()` | Row-aware kwargs, `predict_noise_maybe_with_cfg(buf_idx=step_index % 2, batch_size=B, preposted_its=...)` |
| Per-row `scheduler.step` and `state.step_index += 1` | `step_scheduler()` | `scheduler_step_maybe_with_cfg(..., receive_latents=False, batch_size=B)` |
| Pre-post next-step recv | `prefetch_tensors()` | `prefetch_tensors_maybe_with_cfg(buf_idx=step_index % 2, batch_size=B)` and stash on state |
| Per-chunk VAE decode | `post_decode()` | Decode the leading `len(finished_idxs)` rows of `state.latents` (runner narrows the slice for you) |

Keep the micro-step path reusing the same helpers as the request-level path
whenever possible. Reimplementing the denoise loop from scratch is the easiest
way to introduce behavioral drift.

## PP Communication

`PipelineGroupCoordinator` provides three primitives the micro-step path
leans on:

| Primitive | Purpose |
|-----------|---------|
| `set_recv_dict_buffer(name, segment_idx, template_dict, batch_size)` | Register the schema and pre-allocate a double-buffer pair (slots 0 and 1) for one logical channel |
| `pipeline_isend_tensor_dict(...)` | Async send of an arbitrary dict to the next rank |
| `pipeline_irecv_tensor_dict(..., buf_idx)` | Posts async recv into the pre-allocated buffer slot; returns an `AsyncIntermediateTensors`/`AsyncLatents` that defers `.wait()` until consumed |

[`PipelineParallelMixin`](gh-file:vllm_omni/diffusion/distributed/pipeline_parallel.py)
already wraps these in `predict_noise_maybe_with_cfg`,
`scheduler_step_maybe_with_cfg`, and `prefetch_tensors_maybe_with_cfg`.
Pipelines should call those, not the coordinator primitives directly.

### Why schemas must be pre-registered

The first call to `pipeline_isend_tensor_dict` on a previously unseen
`(name, segment_idx, batch_size)` triggers a blocking schema exchange.
`set_pp_recv_dict_buffers` populates the cache identically on all ranks so the
schema path is never entered during the data loop.

Enumerate every `B` the request can hit. For SLO-driven admission the upper
bound is `slo_max_batch * num_inference_steps`.

### Double buffering

Caller picks `buf_idx = state.step_index % 2` consistently across
`denoise_step`, `step_scheduler`, and `prefetch_tensors` on the same
micro-step. Alternating slots keeps the previous result readable while the
next recv is in flight.

## Row-Batched Computation

`state.batched_timesteps` is a 1-D tensor of length `B`; row `i` carries
`state.timesteps[chunks[i].step_index]`. Inside `denoise_step` and
`step_scheduler`, treat the leading dim as a mix of independent chunks at
*different* progress points.

## Lingbot Reference

[`pipeline_lingbot_world_fast.py`](gh-file:vllm_omni/diffusion/models/lingbot_world_fast/pipeline_lingbot_world_fast.py)
is the reference for the *self-forcing* pattern and is split
correctly for the current contract:

- `prepare_encode()` wraps `self.scheduler` in `LingbotFlowScheduler` so the
last denoise step returns the cached x0 and intermediate steps re-noise to
the next `t`. Two `torch.Generator`s are created on every rank: `seed_g`
for chunk noise (consumed identically on every rank that calls
`encode_chunk_inputs`) and `seed_g_addnoise` for the re-noise step
(consumed only on the last rank).
- `set_pp_recv_dict_buffers()` registers `("latents", -1, B)` and
`("intermediate", 0, B)` templates for every B in
`1..slo_max_batch * num_inference_steps`.
- `encode_chunk_inputs()` builds per-chunk noise on every rank using
`seed_g`. Only rank 0 (first stage) additionally stream-encodes per-chunk
`y` (with anchor-frame handling on the first chunk) and computes Plucker
embeddings, stashing both into `state.extra["chunks"][idx].extra` for
`denoise_step` to read.
- `denoise_step()` slices per-row `current_starts`, `y`, and
`c2ws_plucker_emb` from `state.extra["chunks"][idx]` keyed by the current
micro-step's `chunk_idxs`, then calls
`predict_noise_maybe_with_cfg(...)`. The per-chunk conditioning is only
read on the first stage; the last stage receives processed hidden states
via intermediate tensors.
- `step_scheduler()` rides the shared `scheduler_step_maybe_with_cfg(...,
receive_latents=False, batch_size=B, generator=state.extra["seed_g_addnoise"])`
and bumps `state.step_index`.
- `prefetch_tensors()` calls
`prefetch_tensors_maybe_with_cfg(buf_idx=state.step_index % 2,
batch_size=B)` and stashes results into `state.latents` (rank 0) or
`state.extra["preposted_its"]` (others).

That decomposition is the target pattern for future micro-step models.

## Rules For New Pipelines

- Inherit `PipelineParallelMixin` and `CFGParallelMixin`.
- Declare `supports_micro_step_execution: ClassVar[bool] = True` on the
pipeline class.
- Pre-populate every `(name, segment_idx, batch_size)` in
`set_pp_recv_dict_buffers`. Skipping a `B` triggers the blocking schema
path and risks PP deadlock.
- Use `state.extra["chunks"][idx]` (a `ChunkState`) for per-chunk persistent
state: latents snapshot at the last rank, per-chunk scheduler, conditioning
slices.
- Do not put request-scoped scheduler state on `self.scheduler`. Deep-copy
it into `state.scheduler` during `prepare_encode` (the runner then
deep-copies that into each new `ChunkState.scheduler` on admission).
- Do not mutate `state.step_index` inside `denoise_step`. Only
`step_scheduler` should advance it.
- Use `buf_idx = state.step_index % 2` across `denoise_step`,
`step_scheduler`, and `prefetch_tensors`.

## Validation Checklist

Before marking a pipeline `supports_micro_step_execution = True`, verify:

- `pipeline_parallel_size=2` and `pipeline_parallel_size>=3` both complete.
- `B=1` and `B>1` outputs match — verifies per-row scheduler / cache /
conditioning slicing.
- CFG-parallel and non-CFG paths both work if the pipeline supports them.

## Related Files

- Contract: [`vllm_omni/diffusion/models/interface.py`](gh-file:vllm_omni/diffusion/models/interface.py)
- State: [`vllm_omni/diffusion/worker/utils.py`](gh-file:vllm_omni/diffusion/worker/utils.py)
- Runner loop: [`vllm_omni/diffusion/worker/diffusion_model_runner.py`](gh-file:vllm_omni/diffusion/worker/diffusion_model_runner.py)
- Scheduler: [`vllm_omni/diffusion/sched/stream_batch_scheduler.py`](gh-file:vllm_omni/diffusion/sched/stream_batch_scheduler.py)
- PP coordinator: [`vllm_omni/diffusion/distributed/group_coordinator.py`](gh-file:vllm_omni/diffusion/distributed/group_coordinator.py)
- PP mixin: [`vllm_omni/diffusion/distributed/pipeline_parallel.py`](gh-file:vllm_omni/diffusion/distributed/pipeline_parallel.py)
- Reference pipeline: [`vllm_omni/diffusion/models/lingbot_world_fast/pipeline_lingbot_world_fast.py`](gh-file:vllm_omni/diffusion/models/lingbot_world_fast/pipeline_lingbot_world_fast.py)
1 change: 1 addition & 0 deletions docs/models/supported_models.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,5 +73,6 @@ th {
|`DyninOmniForConditionalGeneration` | Dynin-Omni | `snu-aidas/Dynin-Omni` | ✅︎ | | | |
| `ErnieImagePipeline` | ERNIE-Image | `baidu/ERNIE-Image`, `baidu/ERNIE-Image-Turbo` | ✅︎ | ✅︎ | ✅︎ | ✅︎ |
|`HiDreamImagePipeline` | HiDream-I1-Full | `HiDream-ai/HiDream-I1-Full` | ✅︎ | ✅︎ | | |
|`LingbotWorldFastPipeline`| Lingbot-World-Fast | `robbyant/lingbot-world-fast`|✅︎ | | | |

✅︎ indicates the model is supported on that backend. Empty cells mean not listed as supported on that backend.
105 changes: 105 additions & 0 deletions docs/user_guide/diffusion/micro_step_execution.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# Micro-Step Execution

Micro-step execution is an opt-in diffusion execution mode enabled with
`stream_batch=True` when constructing `Omni`. It runs *temporal pipeline
parallelism* on streaming chunked diffusion: at each tick every PP rank
denoises a different chunk at a different timestep, then chunks shift one
rank downstream. One tick = one micro-step.

It is not a generic diffusion toggle for every pipeline. Only pipelines that
implement the micro-step contract support it today.

## Quick Start

```python
import PIL.Image
import numpy as np

from vllm_omni import Omni
from vllm_omni.diffusion.data import DiffusionParallelConfig
from vllm_omni.inputs.data import OmniDiffusionSamplingParams

omni = Omni(
model="lingbot_world/lingbot-world-base-cam/Lingbot-World-Fast",
model_class_name="LingbotWorldFastPipeline",
stream_batch=True,
parallel_config=DiffusionParallelConfig(pipeline_parallel_size=4),
enforce_eager=True,
)

outputs = omni.generate(
{
"prompt": "A sweeping cinematic journey along the Great Wall of China",
"multi_modal_data": {
"image": PIL.Image.open("anchor.jpg"),
"camera": {
"poses": np.load("poses.npy"),
"intrinsics": np.load("intrinsics.npy"),
},
},
},
OmniDiffusionSamplingParams(
height=480,
width=832,
num_chunks=20,
chunk_frames=12,
num_inference_steps=5,
slo_fps=16.0,
slo_max_batch=4,
extra_args={"session_id": "demo"},
),
)
```

## Sampling Parameters

| Parameter | Required | Description |
|-----------|----------|-------------|
| `chunk_frames` | yes | Pixel frames produced per chunk |
| `num_chunks` | yes | Total number of chunks per request. Output frames = `num_chunks * chunk_frames` after VAE decode |
| `num_inference_steps` | yes | Denoising steps per chunk |
| `slo_fps` | no | Frames-per-second target. Enables SLO-adaptive batching that grows or shrinks per-step admission `B` to meet the budget |
| `slo_max_batch` | no, default 8 | Upper bound on per-step admission `B` |

When `slo_fps` is set, the scheduler observes the wall-clock latency of each
micro-step and adjusts `B_target` for the next admission tick. If latency
exceeds the budget, `B` decreases; if it is comfortably under, `B` grows up
to `slo_max_batch`.

## Supported Pipelines

| Pipeline | Example models | Micro-step execution |
|----------|----------------|----------------------|
| `LingbotWorldFastPipeline` | `lingbot_world/lingbot-world-base-cam/Lingbot-World-Fast` | Yes |
| All other diffusion pipelines | — | No |

## Current Limitations

- `max_num_seqs == 1` — exactly one in-flight request per engine.
- `cache_backend` is not supported together with `stream_batch`.
- Unsupported pipelines fail early during model loading.

## When To Use It

Use micro-step execution when:

- The pipeline is built for streaming chunked output (video chunks, audio
segments) and you want temporal PP to overlap per-chunk denoising across
ranks.
- You want SLO-aware admission control to keep up with a real-time
frame-rate budget under variable load.

For single-request stepwise execution without temporal PP, use
[Step Execution](step_execution.md) instead.

For non-streaming PP (memory scaling on a normal diffusion pipeline), see
[Pipeline Parallelism Guide](parallelism/pipeline_parallel.md).

## For Model Authors

If you want to add micro-step execution support to a new diffusion pipeline,
see the implementation guide:
[Diffusion Micro-Step Execution Design](../../design/feature/diffusion_micro_step_execution.md).

The pipeline must already support PP partitioning. See
[Pipeline Parallel Design](../../design/feature/pipeline_parallel.md).
Loading