reafator pipeline stage/step pipeline#1368
Conversation
9deb4cf to
d961085
Compare
| prompt_embeds_mask: torch.Tensor | None = None, | ||
| negative_prompt_embeds: torch.Tensor | None = None, | ||
| negative_prompt_embeds_mask: torch.Tensor | None = None, | ||
| attention_kwargs: dict[str, Any] | None = None, |
There was a problem hiding this comment.
I was wondering about a subtle difference from the existing forward() method. In forward(), the prompt/negative_prompt extraction from req.prompts happens unconditionally:
prompt = [p if isinstance(p, str) else (p.get("prompt") or "") for p in req.prompts] or promptBut here in prepare_encode, it's guarded by if prompt is None. This means if someone passes an explicit prompt argument alongside a req that also contains prompts, forward() would override with req.prompts while prepare_encode would keep the explicit argument. Could you help me understand whether this divergence is intentional? If not, it might be worth making the behavior identical to forward() to avoid subtle bugs when the two code paths are used side by side.
| scheduler_override: Any | None = None, | ||
| ): | ||
| if prompt is None: | ||
| prompt = [p if isinstance(p, str) else (p.get("prompt") or "") for p in req.prompts] or prompt |
There was a problem hiding this comment.
Small nit -- the negative_prompt handling here also diverges from forward() in a similar way to the positive prompt. In forward(), the negative prompt logic runs unconditionally:
if all(isinstance(p, str) or p.get("negative_prompt") is None for p in req.prompts):
negative_prompt = NoneBut here it's guarded by if negative_prompt is None. This means if negative_prompt is explicitly passed as an empty string "" (which is falsy but not None), the behavior would differ between the two code paths. I might be overthinking this, but it seemed worth flagging for consistency.
|
|
||
| def prepare_encode( | ||
| self, | ||
| req: OmniDiffusionRequest, |
There was a problem hiding this comment.
The return type annotation is missing from prepare_encode. Since this method returns a 15-element tuple that the caller (execute_stepwise) unpacks positionally, I was wondering if it might help maintainability to either:
- Add a return type annotation (even a
tuple[...]), or - Return a
NamedTupleor@dataclassso callers don't need to rely on positional unpacking of 15 values?
Positional unpacking of large tuples can be fragile when someone later adds/removes/reorders a return value. Just a thought -- happy to hear your perspective!
| height // self.vae_scale_factor // 2, | ||
| width // self.vae_scale_factor // 2, | ||
| ) | ||
| ] |
There was a problem hiding this comment.
I noticed that prepare_encode does not call self.prepare_timesteps() (which already exists as a method on the class) but instead inlines the timestep preparation logic directly. The existing forward() method uses self.prepare_timesteps(). Could this lead to divergence if someone later modifies prepare_timesteps()? Would it make sense to reuse that helper here for DRY-ness?
There was a problem hiding this comment.
I fixed this in 868e40a. prepare_encode() now reuses self.prepare_timesteps(...) (the same path as forward()), so the timestep/mu logic is centralized and won’t drift.
but to do this,I also updated the cfg_parallel.py scheduler_step* signatures to accept an optional scheduler argument. This allows stepwise execution with per-request scheduler state instead of temporarily rebinding self.scheduler. The change is backward-compatible — scheduler=None falls back to self.scheduler, so existing call sites are unaffected.
| latents: torch.Tensor, | ||
| img_shapes: list, | ||
| txt_seq_lens: list[int] | None, | ||
| negative_txt_seq_lens: list[int] | None, |
There was a problem hiding this comment.
Good reuse of predict_noise_maybe_with_cfg from the CFGParallelMixin. I noticed one difference though: in diffuse(), the additional_transformer_kwargs (which include return_dict and attention_kwargs) are spread into both positive_kwargs and negative_kwargs via **additional_transformer_kwargs. Here in denoise_step, these are set directly as individual keys. The behavior should be equivalent, but I wanted to confirm -- are the kwargs identical in both cases? Specifically, diffuse() passes attention_kwargs as a nested key via the spread, while here it's set as "attention_kwargs": self.attention_kwargs. They look the same to me, just wanted to double-check.
| "hidden_states": latent_model_input, | ||
| "timestep": t_for_model / 1000, | ||
| "guidance": guidance, | ||
| "encoder_hidden_states_mask": prompt_embeds_mask, |
There was a problem hiding this comment.
Handling the scheduler override by temporarily swapping self.scheduler with a try/finally is clean and safe.
One small thing I noticed: the mixin's scheduler_step_maybe_with_cfg presumably calls self.scheduler.step(...) internally. If an exception occurs during that call, the finally block will restore the original scheduler, which is great. But could there be thread-safety concerns if multiple requests are processed concurrently? In that case, temporarily mutating self.scheduler on the instance might cause race conditions. Is the current design single-threaded per pipeline instance? If so, this is perfectly fine.
| with set_forward_context(vllm_config=self.vllm_config, omni_diffusion_config=self.od_config): | ||
| with record_function("pipeline_forward"): | ||
| output = self.pipeline.forward(req) | ||
| if isinstance(self.pipeline, SupportsStepExecution): |
There was a problem hiding this comment.
The dispatch logic here is clean. One minor question: since SupportsStepExecution is a @runtime_checkable Protocol, the isinstance check will verify that the pipeline has the supports_step_execution class variable and the required methods. However, I noticed that the helper function supports_step_execution() from interface.py is not used here. Would it be slightly cleaner to use the helper function instead of the raw isinstance check? That way the logic is in one place:\npython\nfrom vllm_omni.diffusion.models.interface import supports_step_execution\nif supports_step_execution(self.pipeline):\n\nMinor style suggestion -- the current code works fine too.
| from vllm_omni.platforms import current_omni_platform | ||
|
|
||
| logger = init_logger(__name__) | ||
|
|
There was a problem hiding this comment.
Very minor: it looks like an extra blank line was introduced here (there are now two blank lines between the logger assignment and the class definition, where there was one before). Not a blocker at all, just flagging in case you want to keep formatting consistent.
There was a problem hiding this comment.
Thanks for flagging this. I checked and the spacing is enforced by our lint/CI — top-level classes require two blank lines, so reducing it to one would fail the style check.
I’ll keep it as-is to stay consistent with the formatter.
|
|
||
| supports_step_execution: ClassVar[bool] = True | ||
|
|
||
| def prepare_encode(self, req: "OmniDiffusionRequest", **kwargs: Any) -> Any: |
There was a problem hiding this comment.
Clean protocol design -- intentionally permissive with *args, **kwargs for denoise_step, step_scheduler, and post_decode.\n\nOne thought: prepare_encode takes a concrete OmniDiffusionRequest parameter, which couples the protocol to that specific request type. If future pipelines might use a different request type, would it make sense to loosen this to Any as well (matching the other methods), or is OmniDiffusionRequest intentionally the canonical request type for all diffusion pipelines? Curious about the intent.
| .to(latents.device, latents.dtype) | ||
| ) | ||
| latents_std = 1.0 / torch.tensor(self.vae.config.latents_std).view(1, self.vae.config.z_dim, 1, 1, 1).to( | ||
| latents.device, latents.dtype |
There was a problem hiding this comment.
In post_decode, when output_type == "latent", the original forward() assigns image = latents and then wraps it in DiffusionOutput(output=image). Here, you return DiffusionOutput(output=latents) directly, which is equivalent.
However, I noticed that forward() sets self._current_timestep = None after the diffuse loop ends, but neither post_decode nor execute_stepwise resets it. This could leave self._current_timestep pointing to the last timestep value after generation completes. Could this cause issues if something inspects current_timestep between requests? It might be worth adding self._current_timestep = None at the end of post_decode or in execute_stepwise to match forward()'s behavior.
There was a problem hiding this comment.
Thanks, that’s a great point. You’re right that the stepwise path should mirror forward() and clear self._current_timestep after generation.
I fixed this in the latest commit — self._current_timestep is now reset to None at the end of stepwise decoding, so it won’t retain the last timestep across requests.
DiffusionOutput(output=latents) is behaviorally unchanged compared to assigning image = latents first.
|
@vllm-omni-reviewer |
🤖 VLLM-Omni PR ReviewCode Review: Refactor Pipeline Stage/Step Pipeline1. OverviewThis PR introduces step-level execution capability at the runner/pipeline layer for diffusion models, implementing a new execution flow: Overall Assessment: Positive with suggestions - The architectural approach is sound, but there are several areas that need attention before merging. 2. Code QualityStrengths
Issuesa) Fragile tuple unpacking in The 14-element tuple returned from (
prompt_embeds, prompt_embeds_mask,
negative_prompt_embeds, negative_prompt_embeds_mask,
latents, img_shapes, txt_seq_lens, negative_txt_seq_lens,
timesteps, do_true_cfg, guidance, true_cfg_scale,
height, width, scheduler,
) = self.pipeline.prepare_encode(req=req)Recommendation: Use a @dataclass
class StepExecutionContext:
prompt_embeds: torch.Tensor
prompt_embeds_mask: torch.Tensor
negative_prompt_embeds: torch.Tensor | None
negative_prompt_embeds_mask: torch.Tensor | None
latents: torch.Tensor
img_shapes: list
txt_seq_lens: list[int] | None
negative_txt_seq_lens: list[int] | None
timesteps: torch.Tensor
do_true_cfg: bool
guidance: torch.Tensor | None
true_cfg_scale: float
height: int
width: int
scheduler: Anyb) Missing return type annotation (
c) Interrupt handling incomplete (
for _i, t in enumerate(timesteps):
noise_pred = self.pipeline.denoise_step(...) # Can return None
latents = self.pipeline.step_scheduler(...) # Uses None?Recommendation: for _i, t in enumerate(timesteps):
noise_pred = self.pipeline.denoise_step(...)
if noise_pred is None:
break # Handle interrupt
latents = self.pipeline.step_scheduler(...)3. Architecture & DesignStrengths
Issuesa) Scheduler state mutation ( The temporary binding of if scheduler is not None and scheduler is not self.scheduler:
saved = self.scheduler
self.scheduler = scheduler
try:
return self.scheduler_step_maybe_with_cfg(...)
finally:
self.scheduler = savedRecommendation: Pass the scheduler explicitly to b) Parameter redundancy in The method accepts both Recommendation: Consider either:
c) Missing protocol enforcement ( The 4. Security & SafetyIssuesa) No input validation in No validation that b) Resource cleanup The scheduler state mutation pattern uses try/finally correctly, but consider what happens if an exception occurs mid-mutation in a multi-threaded context. 5. Testing & DocumentationIssuesa) Duplicate test command in PR description The same test command appears twice in the PR description - appears to be a copy-paste error. b) Missing test coverage
c) Missing documentation for new protocol The
6. Specific Suggestions
|
|
Thanks for the detailed review. I’ve pushed an update addressing the concerns raised and included additional self-review fixes.
And i updated the test results in the PR docs. |
868e40a to
fcef0bb
Compare
|
Rebased onto |
lishunyang12
left a comment
There was a problem hiding this comment.
nice rework — previous concerns addressed. one thing inline
|
|
||
| try: | ||
| self.pipeline.prepare_encode(state) | ||
|
|
There was a problem hiding this comment.
denoise_step returns None on interrupt but the loop keeps going. Worth breaking early:
| for _i, _t in enumerate(state.timesteps): | |
| noise_pred = self.pipeline.denoise_step(state) | |
| if noise_pred is None: | |
| break | |
| # TODO: continuous batching should step per-request state. | |
| self.pipeline.step_scheduler(state, noise_pred) |
There was a problem hiding this comment.
Good catch, thanks. I applied the early break in this PR so we don’t call 'step_schedulerwithNone. I’m keeping the change minimal for now; a follow-up with proper abort support will make the interrupt/abort path explicit.
| "attention_kwargs": self.attention_kwargs, | ||
| "return_dict": False, | ||
| } | ||
| if state.do_true_cfg: |
There was a problem hiding this comment.
Can we extract a common function to avoid this massive duplicate code?
There was a problem hiding this comment.
Makes sense. I’ll refactor this by adding a private helper like _build_denoise_kwargs(...) inside qwen-image to build positive_kwargs, negative_kwargs, and output_slice, rather than pushing it into the generic CFGParallelMixin.
There was a problem hiding this comment.
not only cfg. prepare_encode also have duplicate code.
| noise_pred: torch.Tensor, | ||
| t: torch.Tensor, | ||
| latents: torch.Tensor, | ||
| scheduler: Any | None = None, |
There was a problem hiding this comment.
Why we need to add this param, state.scheduler = self.scheduler in prepare_encode
There was a problem hiding this comment.
We need this because with step-level switching, batches may be at different progress, and a request may be switched between different execution states. So we cache the scheduler in RequestStateCache to make sure the request continues with the correct local scheduling state.
There was a problem hiding this comment.
I know we need to keep the scheduling state (e.g., timesteps). However, state.scheduler = self.scheduler is just a reference assignment. When self.scheduler changes, request_state.scheduler will also change. have you test for it?
There was a problem hiding this comment.
state.scheduler is used in the later scheduler step here:
state.latents = self.scheduler_step_maybe_with_cfg(
noise_pred,
t,
state.latents,
state.do_true_cfg,
scheduler=state.scheduler,
)
state.scheduler is meant to keep per-request scheduler state, not just timesteps. In stepwise execution and future continuous batching, different requests may be resumed at different denoise progress, so they should not share the same pipeline scheduler instance. self.scheduler is the base template, while state.scheduler is the request-local scheduler used later in scheduler_step_maybe_with_cfg(...).
You're right. Assigning self.scheduler directly only keeps a shared reference. I will change this to create a request-local scheduler instance, e.g. state.scheduler =FlowMatchEulerDiscreteScheduler.from_config(self.scheduler.config), so the later scheduler_step_maybe_with_cfg(..., scheduler=state.scheduler) uses per-request scheduler state.
|
Review 1 — Bounty-hunter: "not only cfg. prepare_encode also have duplicate code." Addressed. Extracted _extract_prompts() and _prepare_generation_context() as shared helpers in pipeline_qwen_image.py. Both forward() and prepare_encode() now delegate to _prepare_generation_context() for input validation, prompt encoding, latent preparation, timestep computation, and guidance setup. Also extracted _build_denoise_kwargs() for the denoise kwargs construction used by denoise_step(), and _decode_latents() shared by forward() and post_decode(). Review 2 — Bounty-hunter: "state.scheduler = self.scheduler is just a reference assignment" Fixed. prepare_encode() now does copy.deepcopy(self.scheduler) after _prepare_generation_context() (which calls prepare_timesteps() and materializes the timestep state on self.scheduler), so the per-request scheduler carries correct dynamic-shifting state without sharing the pipeline instance. |
Refactor from pr/pipeline branch, related to vllm-project#1368. - Restructure pipeline stage/step API - Pass scheduler explicitly in stepwise pipeline and CFG mixin Signed-off-by: asukaqaq-s <1311722138@qq.com> Signed-off-by: asukaqaq <1311722138@qq.com>
Refactor from pr/pipeline branch, related to vllm-project#1368. - Restructure pipeline stage/step API - Pass scheduler explicitly in stepwise pipeline and CFG mixin Signed-off-by: asukaqaq-s <1311722138@qq.com>
| noise_pred: torch.Tensor, | ||
| t: torch.Tensor, | ||
| latents: torch.Tensor, | ||
| scheduler: Any | None = None, |
There was a problem hiding this comment.
A better name for this new argument, for example, per_request_scheduler or per_state_scheduler, is needed to differentiate it with self.scheduler. And please provide more detailed docstring for this argument. This new argument takes effect only when step-wise execution is enabled, right? Please run some parameter check.
Maybe in a future PR: a future design document is needed for step-wise execution and continuous batching in diffusion pipelines.
There was a problem hiding this comment.
Updated this API for clarity.
ccb79fd to
ff49bde
Compare
|
|
||
|
|
||
| ## List of Supported Models for Step-Execution | ||
|
|
There was a problem hiding this comment.
I find the Step-Execution section a bit odd here.
I have another PR for docs #1928. Can you check docs/user_guide/diffusion_features.md. I think it suits better to be in this doc, and works as a feature for diffusion models.
There was a problem hiding this comment.
Thanks, agreed. I removed the Step-Execution section from docs/models/supported_models.md. For now, the user-facing content is documented in docs/user_guide/diffusion/step_execution.md. After docs PR #1928 is merged, I can rebase and further align it with docs/user_guide/diffusion_features.md if needed.
|
@asukaqaq-s There is one bug refactor PR #1908 that will be merged in just a few days. After it's merged, I think you PR needs to be rebased and verify its functionality again. |
164f6dd to
7a16a57
Compare
Signed-off-by: asukaqaq-s <1311722138@qq.com>
Signed-off-by: asukaqaq-s <1311722138@qq.com>
Move the step-execution docs into the diffusion feature docs structure, add a user-facing step execution page, and remove the feature-specific section from supported models. Signed-off-by: asukaqaq-s <1311722138@qq.com>
Signed-off-by: asukaqaq-s <1311722138@qq.com>
|
I resolved the merge conflicts introduced by the rebase, and also fixed the earlier CI failure caused by the missing step_execution field in OmniDiffusionConfig. |
wtomin
left a comment
There was a problem hiding this comment.
Since all of my comments were addressed, I will approve this PR.
| if current_omni_platform.get_device_count() < world_size: | ||
| pytest.skip(f"Test requires {world_size} devices") | ||
|
|
||
| torch.multiprocessing.spawn( |
There was a problem hiding this comment.
@asukaqaq-s How long does it take to run this test script on your local machine?
There was a problem hiding this comment.
Around 5~10s for each parallel test on my local machine. The other ones finish almost immediately.
There was a problem hiding this comment.
https://docs.vllm.ai/projects/vllm-omni/en/latest/contributing/ci/tests_markers/#example-usage-for-markers
parallel means this is parallel feature related, for your test:
from tests.utils import hardware_test
@hardware_test(
res={"cuda": "L4"},
num_cards=2,
)
Thanks! I've replaced @pytest.mark.parallel with @hardware_test(res={"cuda": "L4"}, num_cards=2) on the three distributed tests. Test durations with --durations=0 (2x L4): test_execute_stepwise_with_ulysses_parallel: 10.84s |
Signed-off-by: asukaqaq-s <1311722138@qq.com>
Signed-off-by: asukaqaq-s <1311722138@qq.com>
PLEASE FILL IN THE PR DESCRIPTION HERE ENSURING ALL CHECKLIST ITEMS (AT THE BOTTOM) HAVE BEEN CONSIDERED.
Purpose
Introduce step-level execution capability at the runner/pipeline layer:
prepare_encode → denoise_step × N → step_scheduler × N → post_decode
QwenImagePipeline will be the first implementation.
Out of scope (no changes):Engine、Executor、Worker entrypoint、External APIs
This PR is strictly limited to the runner/pipeline layer and maintains full backward compatibility.
relativate RFC: #874
Test Plan
Test Result
"a cup of coffee on a wooden table, morning light, photorealistic"
"a red panda sitting on a tree branch in a bamboo forest, soft focus background"
"an astronaut riding a horse on the surface of Mars, cinematic lighting"
"a cozy cabin in the snowy mountains at sunset, warm glow from windows"
"a futuristic cityscape with flying cars and neon lights, cyberpunk style"
no preformance degradation
bit-for-bit identical output
All 15 image pairs (5 prompts x 3 resolutions) produce identical MD5 checksums between stepwise and non-stepwise modes, confirming that the stepwise refactoring does not alter the generation output in any way.
512x512/prompt_0: IDENTICAL
512x512/prompt_1: IDENTICAL
512x512/prompt_2: IDENTICAL
512x512/prompt_3: IDENTICAL
512x512/prompt_4: IDENTICAL
768x768/prompt_0: IDENTICAL
768x768/prompt_1: IDENTICAL
768x768/prompt_2: IDENTICAL
768x768/prompt_3: IDENTICAL
768x768/prompt_4: IDENTICAL
1024x1024/prompt_0: IDENTICAL
1024x1024/prompt_1: IDENTICAL
1024x1024/prompt_2: IDENTICAL
1024x1024/prompt_3: IDENTICAL
1024x1024/prompt_4: IDENTICAL
Essential Elements of an Effective PR Description Checklist
supported_models.mdandexamplesfor a new model. Please runmkdocs serveto sync the documentation editions to./docs.BEFORE SUBMITTING, PLEASE READ https://github.com/vllm-project/vllm-omni/blob/main/CONTRIBUTING.md (anything written below this line will be removed by GitHub Actions)