Skip to content

[Refactor] Refactor Diffusion Scheduler/Executor Boundaries and Request State Flow#1625

Merged
wtomin merged 14 commits into
vllm-project:mainfrom
omni-nicelab:pr/scheduler
Mar 23, 2026
Merged

[Refactor] Refactor Diffusion Scheduler/Executor Boundaries and Request State Flow#1625
wtomin merged 14 commits into
vllm-project:mainfrom
omni-nicelab:pr/scheduler

Conversation

@yJader
Copy link
Copy Markdown
Contributor

@yJader yJader commented Mar 3, 2026

Purpose

RFC: #874

This PR refactors diffusion runtime boundaries by fully separating scheduler state management from multiprocess IPC execution.

Core goals:

  • Make Scheduler a pure request-state scheduler (waiting/running/finished) without owning IPC queues.
  • Make MultiprocDiffusionExecutor a pure IPC runtime (broadcast/result queues + worker lifecycle).
  • Let DiffusionEngine explicitly drive add_request -> schedule -> execute -> update_from_output.
  • Consolidate cross-API concurrency control into DiffusionEngine._rpc_lock, covering both add_req_and_wait_for_response and collective_rpc.

Architecture before/after

Before Refactor

┌─────────────────────────────────────────────────────────────────────────┐
│                          AsyncOmniDiffusion                             │
│  - ThreadPoolExecutor(max_workers=1)                                    │
│  - await loop.run_in_executor(self.engine.step, request)                │
└─────────────────────┬───────────────────────────────────────────────────┘
                      │ synchronous call
                      ▼
┌─────────────────────────────────────────────────────────────────────────┐
│                           DiffusionEngine                               │
│  - step(request)                                                        │
│  - add_req_and_wait_for_response(request)                               │
│  - (directly calls) executor.add_req(request)                           │
└─────────────────────┬───────────────────────────────────────────────────┘
                      │
                      ▼
┌─────────────────────────────────────────────────────────────────────────┐
│     MultiprocDiffusionExecutor (depends on Scheduler internal IPC)      │
│  - self.scheduler = Scheduler()                                         │
│  - add_req() -> scheduler.add_req()                                     │
│  - collective_rpc() depends on scheduler._lock / scheduler.mq/result_mq │
└─────────────────────┬───────────────────────────────────────────────────┘
                      │ scheduler.add_req() -> mq.enqueue/dequeue
                      ▼
┌─────────────────────────────────────────────────────────────────────────┐
│              Scheduler (mixed state + IPC responsibilities)             │
│  - _lock, mq, result_mq                                                 │
│  - add_req(request)                                                     │
└─────────────────────┬───────────────────────────────────────────────────┘
                      │ MessageQueue broadcast
                      ▼
┌─────────────────────────────────────────────────────────────────────────┐
│                                Workers                                  │
│  - worker.generate(request)                                             │
│  - full denoising loop executed inside workers                          │
└─────────────────────────────────────────────────────────────────────────┘

After Refactor

┌─────────────────────────────────────────────────────────────────────────┐
│                          AsyncOmniDiffusion                             │
│  - ThreadPoolExecutor(max_workers=1)                                    │
│  - await loop.run_in_executor(self.engine.step, request)                │
└─────────────────────┬───────────────────────────────────────────────────┘
                      │ synchronous call
                      ▼
┌─────────────────────────────────────────────────────────────────────────┐
│                            DiffusionEngine                              │
│  - owns: Scheduler + Executor + _rpc_lock                               │
│  - add_req_and_wait_for_response():                                     │
│      add_request -> schedule -> executor.add_req -> update_from_output  │
│  - collective_rpc(): acquires engine._rpc_lock, then calls executor RPC │
└───────────────┬───────────────────────────────────────┬─────────────────┘
                │                                       │
                │ scheduling state flow                 │ RPC/execution flow
                ▼                                       ▼
┌─────────────────────────────────┐     ┌──────────────────────────────────┐
│ Scheduler (pure state machine)  │     │ MultiprocDiffusionExecutor       │
│  - waiting/running/finished     │     │ (pure IPC runtime)               │
│  - add_request/schedule/update  │     │  - broadcast_mq / result_mq      │
│  - no mq/result_mq ownership    │     │  - add_req / collective_rpc      │
└─────────────────────────────────┘     └─────────────────┬────────────────┘
                                                          │ MessageQueue
                                                          ▼
                                            ┌──────────────────────────────┐
                                            │            Workers           │
                                            │  - generate / RPC handlers   │
                                            └──────────────────────────────┘

Key differences:

  • Scheduler changes from a mixed “state + IPC” component to a pure scheduling state machine.
  • Scheduler is also split internally into a sched/ package:
    • SchedulerInterface defines the scheduler/engine boundary.
    • _BaseScheduler owns shared request state, waiting/running/finished queues, and request-id mapping.
    • RequestScheduler only keeps the current request-mode scheduling policy.
  • MultiprocDiffusionExecutor no longer depends on scheduler internals (_lock/mq/result_mq).
  • Concurrency locking is moved up to DiffusionEngine._rpc_lock, covering generation and collective_rpc.
Main code changes:
  • vllm_omni/diffusion/sched/interface.py
    • Defines DiffusionRequestStatus, DiffusionRequestState, NewRequestData, CachedRequestData, DiffusionSchedulerOutput, and SchedulerInterface.
    • DiffusionSchedulerOutput now explicitly separates scheduled_new_reqs, scheduled_cached_reqs, and finished_req_ids as the stable engine/scheduler scheduling output.
  • vllm_omni/diffusion/sched/base_scheduler.py
    • Extracts shared scheduler state management: _request_states, _waiting, _running, _finished_req_ids.
    • Extracts request_id -> sched_req_id mapping and common finish/cleanup logic.
  • vllm_omni/diffusion/sched/request_scheduler.py
    • Implements the current request-mode scheduling policy.
    • Keeps single-request scheduling semantics (num_scheduled_reqs is currently 0/1).
  • vllm_omni/diffusion/sched/__init__.py
    • Provides the unified scheduler export surface and keeps Scheduler = RequestScheduler as an alias.
  • vllm_omni/diffusion/diffusion_engine.py
    • Initialize scheduler during engine construction.
    • Move lock ownership to engine (_rpc_lock).
    • Refactor add_req_and_wait_for_response into a scheduler-driven sync loop that advances execution and state cleanup via DiffusionSchedulerOutput.
    • Make collective_rpc(timeout=...) honor end-to-end timeout across lock wait + RPC execution.
  • vllm_omni/diffusion/executor/multiproc_executor.py
    • Executor directly manages broadcast/result queues and worker lifecycle.
    • add_req directly performs generate RPC with response type/error/timeout checks.
    • collective_rpc no longer relies on scheduler internals or locks.
  • Tests/docs
    • Add tests/diffusion/test_diffusion_scheduler.py.
    • Rename and refactor concurrency tests:
      tests/diffusion/test_multiproc_executor_concurrency.py ->
      tests/diffusion/test_multiproc_engine_concurrency.py.

Test Plan

Unit/functional tests:

pytest -m diffusion tests/diffusion/test_diffusion_scheduler.py
pytest -m diffusion tests/diffusion/test_multiproc_engine_concurrency.py

Serving benchmark command:

python3 benchmarks/diffusion/diffusion_benchmark_serving.py \
  --base-url http://localhost:8099 \
  --model Qwen/Qwen-Image \
  --task t2i \
  --dataset vbench \
  --num-prompts 20

Test Result

Benchmark setup:

  • Backend: vllm-omni
  • Model: Qwen/Qwen-Image
  • Dataset: vbench
  • Task: t2i
  • Num prompts: 20
  • Max request concurrency: 1

Benchmark comparison:

Metric Base (f52b5153) This PR (72df5305) Delta (PR - Base)
Benchmark duration (s) 406.28 406.21 -0.06
Request throughput (req/s) 0.05 0.05 0.00
Latency Mean (s) 20.3133 20.3103 -0.0030
Latency Median (s) 20.3292 20.3391 +0.0099
Latency P99 (s) 20.4397 20.4303 -0.0094
Successful requests 20/20 20/20 same

Result summary:

  • No throughput or stability regression is observed under this benchmark setup.
  • Total duration is slightly lower, P99 is slightly better, and median latency only shifts by about +0.01s; overall performance remains effectively on par.

CC List

@hsliuustc0106 @ZJY0516 @lishunyang12

@yJader yJader requested a review from hsliuustc0106 as a code owner March 3, 2026 05:55
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 1183c10d7b

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread vllm_omni/diffusion/diffusion_engine.py
Copy link
Copy Markdown
Collaborator

@hsliuustc0106 hsliuustc0106 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this the first PR of #874 ?

@yJader
Copy link
Copy Markdown
Contributor Author

yJader commented Mar 3, 2026

is this the first PR of #874 ?

No, the first PR is #1368. This is the second PR for RFC #874.

Copy link
Copy Markdown
Collaborator

@lishunyang12 lishunyang12 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left a comment inline

with self._rpc_lock:
target_req_id = self.scheduler.add_request(request)

while True:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Holding _rpc_lock for the entire generation blocks all other RPCs. Is that intentional?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

As discussed in #1448, production can have concurrent add_req and collective_rpc calls, and correctness issues can happen if request scheduling and IPC are not protected consistently.

Before this refactor, the call path was engine.add_req_and_wait_for_response -> executor.add_req -> scheduler.add_req, and scheduler.add_req held the lock for the whole enqueue/dequeue round-trip. So generation already serialized with other RPCs through the same lock domain.

After splitting scheduler/executor responsibilities, if we only lock inside the executor but leave scheduler.add_request/schedule/update_from_output outside that critical section, we can reintroduce request/response interleaving risks (the same class of issue as #1448).

So I intentionally moved the lock to engine level and hold it for the full add_req_and_wait_for_response cycle, to keep scheduler state transition + IPC execution atomic and preserve existing API semantics.

You are right that this blocks other RPCs during generation; this is a correctness-first tradeoff. A finer-grained solution would require changing add_req_and_wait_for_response semantics and a larger engine redesign, which may be out of scope for this PR.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, thanks for the context. Preserving the existing lock scope is reasonable here.

Copy link
Copy Markdown
Collaborator

@lishunyang12 lishunyang12 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scheduler as pure state machine is a nice clean-up. left one nit inline

"please check the stack trace above for the root cause"
)
if not isinstance(response, DiffusionOutput):
raise RuntimeError(f"Unexpected response type for generate: {type(response)!r}")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deadline is always None here, so the zmq.error.Again / TimeoutError catches on dequeue are dead code. Not blocking — just something to clean up or wire through a timeout param later.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True. Since this is not needed for now, it’s better to remove this dead code rather than keep it for potential future use.

Copy link
Copy Markdown
Member

@ZJY0516 ZJY0516 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, LGTM

Comment thread vllm_omni/diffusion/scheduler.py Outdated
self._running.clear()
self._finished_req_ids.clear()

def _make_req_id(self, request: OmniDiffusionRequest) -> str:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember vllm-omni will set a request id in api server level. Waht's the difference between them?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OmniDiffusionRequest.request_ids is aligned with prompts (request_ids[i] -> prompts[i])
In contrast, DiffusionRequestState.req_id identifies the entire OmniDiffusionRequest, which is treated as a single scheduling unit.

Copy link
Copy Markdown
Contributor Author

@yJader yJader Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found that req_id here is easy to confuse with OmniDiffusionRequest.request_id, while it actually refers to the scheduler-owned request identifier.

Renaming it to sched_req_id makes the ownership explicit and reduces confusion in follow-up development.

Even if this field may disappear in a future refactor when batching is fully moved into the scheduler, the rename still improves clarity today.

(edit in 03d7da4)

@ZJY0516 ZJY0516 added the ready label to trigger buildkite CI label Mar 9, 2026
@ZJY0516 ZJY0516 requested review from SamitHuang and wtomin March 9, 2026 02:21
Comment thread tests/diffusion/test_diffusion_scheduler.py
)


def test_single_request_success_lifecycle() -> None:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I added the core_model and cpu markers.

Comment thread tests/diffusion/test_diffusion_scheduler.py
asukaqaq-s pushed a commit to omni-nicelab/vllm-omni-batching that referenced this pull request Mar 11, 2026
Based on vllm-project#1625.
- Refactor scheduler/executor boundaries and request state flow
- Implement RequestScheduler and StepScheduler architecture
- Add _max_batch_size enforcement
- Handle output error in DiffusionEngine's dummy run

Signed-off-by: jader <yjader@foxmail.com>

Signed-off-by: asukaqaq <1311722138@qq.com>
asukaqaq-s pushed a commit to omni-nicelab/vllm-omni-batching that referenced this pull request Mar 11, 2026
Based on vllm-project#1625.
- Refactor scheduler/executor boundaries and request state flow
- Implement RequestScheduler and StepScheduler architecture
- Add _max_batch_size enforcement
- Handle output error in DiffusionEngine's dummy run

Signed-off-by: jader <yjader@foxmail.com>
@wtomin
Copy link
Copy Markdown
Collaborator

wtomin commented Mar 13, 2026

@yJader Please take a look at the CI error tests/diffusion/test_diffusion_scheduler.py failed.

[2026-03-11T10:14:52Z] FAILED tests/diffusion/test_diffusion_scheduler.py::test_single_request_success_lifecycle - AttributeError: 'DiffusionRequestState' object has no attribute 'req_id'
--
[2026-03-11T10:14:52Z] FAILED tests/diffusion/test_diffusion_scheduler.py::test_fifo_single_request_scheduling - AttributeError: 'DiffusionRequestState' object has no attribute 'req_id'
[2026-03-11T10:14:52Z] FAILED tests/diffusion/test_diffusion_scheduler.py::test_abort_request_for_waiting_and_running - AttributeError: 'DiffusionRequestState' object has no attribute 'req_id'

@yJader
Copy link
Copy Markdown
Contributor Author

yJader commented Mar 14, 2026

For a0c043b

While working on the follow-up step scheduler implementation, I found that the current scheduler design does not support future feature development very well. The scheduling interface, shared state management, and request-mode policy were too tightly coupled, which made it difficult to extend the scheduler cleanly and reuse common logic.

Because of that, I split the scheduler into interface / base_scheduler / request_scheduler:

  • interface defines the scheduler lifecycle and output contract
  • base_scheduler owns the shared request state, waiting/running/finished queues, and common cleanup logic
  • request_scheduler keeps the current request-mode scheduling policy only
    This makes it easier to continue with step scheduler development without disturbing the existing request-mode execution path.

I also updated the PR README with the corresponding design notes, test status, and benchmark results. The current test and benchmark results look normal, and I did not observe regressions from this refactor.

@Gaohan123 Gaohan123 added this to the v0.18.0 milestone Mar 14, 2026
@yJader
Copy link
Copy Markdown
Contributor Author

yJader commented Mar 15, 2026

@yJader Please take a look at the CI error tests/diffusion/test_diffusion_scheduler.py failed.

[2026-03-11T10:14:52Z] FAILED tests/diffusion/test_diffusion_scheduler.py::test_single_request_success_lifecycle - AttributeError: 'DiffusionRequestState' object has no attribute 'req_id'
--
[2026-03-11T10:14:52Z] FAILED tests/diffusion/test_diffusion_scheduler.py::test_fifo_single_request_scheduling - AttributeError: 'DiffusionRequestState' object has no attribute 'req_id'
[2026-03-11T10:14:52Z] FAILED tests/diffusion/test_diffusion_scheduler.py::test_abort_request_for_waiting_and_running - AttributeError: 'DiffusionRequestState' object has no attribute 'req_id'

Fixed

@yJader
Copy link
Copy Markdown
Contributor Author

yJader commented Mar 15, 2026

buildkite/vllm-omni-amd-ci — Build #3163 failed (42 minutes, 47 seconds)

I checked this ci failure locally and could not reproduce it.

Local run of
pytest -s tests/e2e/offline_inference/test_diffusion_cpu_offload.py
passed with:

  • Offload peak memory: 18668.125 MB
  • No offload peak memory: 21388.125 MB
  • Observed reduction: 2720 MB

In CI, the failure log shows:

  • Offload peak memory: 14036.0 MB
  • No offload peak memory: 16434.0 MB
  • Observed reduction: 2398 MB

So the CI run still shows lower peak memory with CPU offload enabled, but it missed the current threshold by about 102 MB. My guess is that this may be caused by environment-dependent GPU memory noise / measurement variance in CI.

@yJader
Copy link
Copy Markdown
Contributor Author

yJader commented Mar 18, 2026

I’ve rebased this branch onto the latest upstream main. The recent major changes (#1908) do not affect this PR.

Copy link
Copy Markdown
Collaborator

@lishunyang12 lishunyang12 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re-reviewed after the latest refactor. Previous nit (dead timeout code) is addressed. Two new items from the interface split.

Comment thread vllm_omni/diffusion/sched/interface.py
raise RuntimeError("Diffusion scheduler has no runnable requests.")
continue

sched_req_id = sched_output.scheduled_req_ids[0]
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assumes the scheduled request is always new (scheduled_new_reqs[0]). Currently safe because _max_batch_size=1 + _rpc_lock guarantees the first schedule for the target is a new request. But if batch size ever changes, this will IndexError on cached-only schedules. Worth a guard or at least a comment explaining the invariant.

Copy link
Copy Markdown
Contributor Author

@yJader yJader Mar 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion. Making max_batch_size > 1 would require more changes across the current execution path to support real scheduler-side batching, so a clarifying comment is the right fix for now. I added that in afe0d34.

yJader added a commit to omni-nicelab/vllm-omni-batching that referenced this pull request Mar 21, 2026
- add notes in scheduler
- align to vllm-project#1908, move "step_execution" into AsyncOmniEngine._create_default_diffusion_stage_cfg
- note: due to 6bdb55a, tests can't pass

Signed-off-by: jader <yjader@foxmail.com>
yJader added a commit to omni-nicelab/vllm-omni-batching that referenced this pull request Mar 21, 2026
- Add notes to scheduler
- Align with vllm-project#1908; move "step_execution" into `AsyncOmniEngine._create_default_diffusion_stage_cfg`
- NOTE: Due to 6bdb55a, tests are currently failing and need to be fixed later

Signed-off-by: jader <yjader@foxmail.com>
yJader and others added 12 commits March 22, 2026 08:20
…st State Flow

Refactor diffusion runtime boundaries to separate scheduler state management from multiprocess IPC execution.

Core goals:
- Make Scheduler a pure request-state scheduler (waiting/running/finished) without owning IPC queues.
- Make MultiprocDiffusionExecutor a pure IPC runtime (broadcast/result queues + worker lifecycle).
- Let DiffusionEngine explicitly drive add_request -> schedule -> execute -> update_from_output.
- Consolidate cross-API concurrency control into DiffusionEngine._rpc_lock.

Main code changes:
- scheduler.py: introduce request status/state output types and pure scheduling APIs; remove scheduler-side IPC ownership.
- diffusion_engine.py: engine owns scheduler and _rpc_lock; refactor add_req_and_wait_for_response to scheduler-driven flow.
- multiproc_executor.py: executor directly manages IPC queues and worker lifecycle; decouple from scheduler internals.
- tests: add diffusion scheduler tests; rename/refactor multiproc concurrency test to engine-focused variant.

Test plan:
- pytest -m diffusion tests/diffusion/test_diffusion_scheduler.py
- pytest -m diffusion tests/diffusion/test_multiproc_engine_concurrency.py

Signed-off-by: jader <yjader@foxmail.com>
Signed-off-by: jader <yjader@foxmail.com>
Signed-off-by: jader <yjader@foxmail.com>
…ests

Signed-off-by: jader <yjader@foxmail.com>
Signed-off-by: jader <yjader@foxmail.com>
Signed-off-by: jader <yjader@foxmail.com>
Co-authored-by: SYLAR <125541396+lishunyang12@users.noreply.github.com>
Signed-off-by: JiangJie Zhang <76905040+yJader@users.noreply.github.com>
…dling

Signed-off-by: jader <yjader@foxmail.com>
Signed-off-by: jader <yjader@foxmail.com>
Signed-off-by: jader <yjader@foxmail.com>
Co-authored-by: Didan Deng <33117903+wtomin@users.noreply.github.com>
Signed-off-by: JiangJie Zhang <76905040+yJader@users.noreply.github.com>
@yJader yJader force-pushed the pr/scheduler branch 2 times, most recently from 0d6e2f7 to b4ac994 Compare March 22, 2026 15:35
@david6666666
Copy link
Copy Markdown
Collaborator

please fix CI, If your PR is ready, please @wtomin or @SamitHuang

Co-authored-by: asukaqaq-s <1311722138@qq.com>

Signed-off-by: jader <yjader@foxmail.com>
…e cleanup

Signed-off-by: jader <yjader@foxmail.com>
@yJader
Copy link
Copy Markdown
Contributor Author

yJader commented Mar 23, 2026

please fix CI, If your PR is ready, please @wtomin or @SamitHuang

Fixed. Please take a look when you have time. @wtomin @SamitHuang

Copy link
Copy Markdown
Collaborator

@wtomin wtomin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of my comments were addressed. LGTM.

@wtomin wtomin merged commit 61cd532 into vllm-project:main Mar 23, 2026
7 of 8 checks passed
clodaghwalsh17 pushed a commit to clodaghwalsh17/nm-vllm-omni-ent that referenced this pull request May 12, 2026
…st State Flow (vllm-project#1625)

Signed-off-by: jader <yjader@foxmail.com>
Signed-off-by: JiangJie Zhang <76905040+yJader@users.noreply.github.com>
Co-authored-by: SYLAR <125541396+lishunyang12@users.noreply.github.com>
Co-authored-by: Didan Deng <33117903+wtomin@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ready label to trigger buildkite CI

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants