[Feature] Add support for Pipeline Parallel and integrate it into Wan 2.2#2322
[Feature] Add support for Pipeline Parallel and integrate it into Wan 2.2#2322hadipash wants to merge 28 commits into
Conversation
|
@zzhang-fr added async latents transfer to the first rank. |
lishunyang12
left a comment
There was a problem hiding this comment.
left a couple comments, mostly around the async send lifetime
|
Codex usage limits have been reached for code reviews. Please check with the admins of this repo to increase the limits by adding credits. |
Signed-off-by: Rustam Khadipash <16683750+hadipash@users.noreply.github.com>
Signed-off-by: Rustam Khadipash <16683750+hadipash@users.noreply.github.com>
Signed-off-by: Rustam Khadipash <16683750+hadipash@users.noreply.github.com>
Signed-off-by: Rustam Khadipash <16683750+hadipash@users.noreply.github.com>
Signed-off-by: Rustam Khadipash <16683750+hadipash@users.noreply.github.com>
Signed-off-by: Rustam Khadipash <16683750+hadipash@users.noreply.github.com>
Signed-off-by: Rustam Khadipash <16683750+hadipash@users.noreply.github.com>
cce7bff to
eb9d9ac
Compare
|
Rebased onto the main branch. |
Signed-off-by: Rustam Khadipash <16683750+hadipash@users.noreply.github.com> # Conflicts: # docs/user_guide/diffusion_features.md
Signed-off-by: Rustam Khadipash <16683750+hadipash@users.noreply.github.com>
Signed-off-by: Rustam Khadipash <16683750+hadipash@users.noreply.github.com>
Signed-off-by: Rustam Khadipash <16683750+hadipash@users.noreply.github.com>
Signed-off-by: Rustam Khadipash <16683750+hadipash@users.noreply.github.com> # Conflicts: # vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_ti2v.py # vllm_omni/diffusion/models/wan2_2/wan2_2_transformer.py
Signed-off-by: Rustam Khadipash <16683750+hadipash@users.noreply.github.com> # Conflicts: # docs/user_guide/diffusion_features.md
# Conflicts: # docs/user_guide/diffusion_features.md
Signed-off-by: Rustam Khadipash <16683750+hadipash@users.noreply.github.com> # Conflicts: # docs/user_guide/diffusion_features.md # vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2.py # vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_i2v.py # vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_ti2v.py # vllm_omni/diffusion/models/wan2_2/wan2_2_transformer.py
Signed-off-by: Rustam Khadipash <16683750+hadipash@users.noreply.github.com>
Signed-off-by: Rustam Khadipash <16683750+hadipash@users.noreply.github.com>
Signed-off-by: Rustam Khadipash <16683750+hadipash@users.noreply.github.com> # Conflicts: # docs/user_guide/diffusion_features.md # examples/offline_inference/text_to_video/text_to_video.py # tests/diffusion/models/wan2_2/test_wan22_ti2v_pipeline.py # vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_ti2v.py # vllm_omni/engine/async_omni_engine.py
Signed-off-by: Rustam Khadipash <16683750+hadipash@users.noreply.github.com> # Conflicts: # docs/user_guide/diffusion_features.md # vllm_omni/diffusion/models/wan2_2/__init__.py
Signed-off-by: Rustam Khadipash <16683750+hadipash@users.noreply.github.com> # Conflicts: # docs/user_guide/diffusion_features.md # vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_i2v.py # vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_ti2v.py # vllm_omni/diffusion/models/wan2_2/wan2_2_transformer.py
Signed-off-by: Rustam Khadipash <16683750+hadipash@users.noreply.github.com> # Conflicts: # examples/offline_inference/image_to_video/image_to_video.py # examples/offline_inference/text_to_video/text_to_video.py # vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_ti2v.py # vllm_omni/diffusion/models/wan2_2/wan2_2_transformer.py
Signed-off-by: Rustam Khadipash <16683750+hadipash@users.noreply.github.com>
Signed-off-by: Rustam Khadipash <16683750+hadipash@users.noreply.github.com>
Signed-off-by: Rustam Khadipash <16683750+hadipash@users.noreply.github.com>
wtomin
left a comment
There was a problem hiding this comment.
All of my comments were addressed. LGTM.
Also hear thoughts from @lishunyang12 @RuixiangMa @SamitHuang @david6666666 @yuanheng-zhao @hsliuustc0106
Signed-off-by: Rustam Khadipash <16683750+hadipash@users.noreply.github.com>
hsliuustc0106
left a comment
There was a problem hiding this comment.
Review Summary
Well-structured PR with clean mixin design and thorough test coverage. The PP communication pattern (async isend/irecv with Gloo metadata + NCCL tensors) follows existing vLLM-Omni conventions. A few items to address — see inline comments.
| if current_omni_platform.is_npu(): | ||
| assert pipeline_parallel_size == 1, "Current pipefusion is not ready for NPU" | ||
|
|
||
| dit_parallel_size = ( |
There was a problem hiding this comment.
The NPU guard was removed here. The original FIXME specifically mentions that NPU async P2P differs from CUDA in torch. Since this PR replaces the old pipefusion with a new implementation using isend_tensor_dict/irecv_tensor_dict, was this validated on NPU? If not, it would be safer to keep the guard (possibly updated for the new PP implementation) to avoid silent hangs on NPU hardware.
# Previously:
# FIXME: Since the async p2p communication operation of NPU is not same as cuda in torch,
# the pipefusion is not ready for npu yet
if current_omni_platform.is_npu():
assert pipeline_parallel_size == 1, "Current pipefusion is not ready for NPU"| Non-last Pipeline Parallel stages return IntermediateTensors instead of final noise tensors. | ||
| """ | ||
| return self.transformer(*args, **kwargs)[0] | ||
| result = self.transformer(*args, **kwargs) |
There was a problem hiding this comment.
This changes the return contract of CFGParallelMixin.predict_noise globally — previously it always did result[0], now it passes through non-tuple results. This is necessary for PP (where IntermediateTensors is returned on non-last stages), but it subtly changes behavior for all pipelines inheriting CFGParallelMixin.
Consider adding a brief comment here explaining the PP motivation, e.g.:
# Support Pipeline Parallel: non-last PP stages return IntermediateTensors
# instead of a tuple, so we must handle both cases.This makes the intent clear to future maintainers who might wonder why the simple result[0] was changed.
|
|
||
| def _wrapped_vae_decode(self) -> None: | ||
| orig_decode = self.vae.decode | ||
| vae_distributed = hasattr(self.vae, "is_distributed_enabled") and self.vae.is_distributed_enabled() |
There was a problem hiding this comment.
vae_distributed is captured once at __init__ time and baked into the closure. If the VAE's distributed state changes after construction (e.g. dynamic enable/disable), the wrapper would use stale state.
This is likely fine given current usage, but worth noting with a comment or assertion. Alternatively, you could check self.vae.is_distributed_enabled() on each decode call if the state could change.
| all_kwargs = [positive_kwargs if get_classifier_free_guidance_rank() == 0 else negative_kwargs] | ||
| else: | ||
| # Sequential CFG (or no CFG): this PP pipeline handles all branches. | ||
| all_kwargs = [positive_kwargs] + ([negative_kwargs] if do_true_cfg else []) |
There was a problem hiding this comment.
For sequential CFG (cfg_parallel_size=1) with PP, this doubles the communication volume per denoising step — each step now runs two full forward chains through the PP pipeline (positive + negative branch).
This is correct, but it would be worth adding a note in the design doc about this perf characteristic so users are aware that PP + sequential CFG has higher communication cost than PP + CFG-Parallel.
| ) | ||
| # Patch embedding only on first PP stage; other stages receive hidden_states via P2P | ||
| if is_pipeline_first_stage(): | ||
| self.patch_embedding = Conv3dLayer( |
There was a problem hiding this comment.
is_pipeline_first_stage() is called here at __init__ time. This requires initialize_model_parallel() to have been called before the transformer is constructed. The ordering is presumably guaranteed by the pipeline initialization flow, but a defensive assertion would help catch misordering:
assert get_pipeline_parallel_world_size() > 0 or is_pipeline_first_stage(), \
"initialize_model_parallel must be called before WanTransformer3DModel construction"Alternatively, consider documenting this prerequisite in the transformer's docstring.
| self.proj_out = nn.Linear(inner_dim, out_channels * math.prod(patch_size)) | ||
| # 4. Output norm & projection — only on the last PP stage | ||
| if is_pipeline_last_stage(): | ||
| self.norm_out = AdaLayerNorm(inner_dim, elementwise_affine=False, eps=eps) |
There was a problem hiding this comment.
Same note as above — is_pipeline_last_stage() is called at init time. If pipeline_parallel_size > num_layers, some PP ranks would get zero layers (empty [start_layer, end_layer) range) and would be neither first nor last stage for output projection. This edge case should either be validated or documented.
| @@ -50,9 +45,6 @@ | |||
| "Wan22S2VPipeline", | |||
There was a problem hiding this comment.
The removal of Wan22TI2VPipeline is a breaking change for anyone using it. The PR description says it's "unused" — confirming there are no external consumers (and no references in docs or examples beyond this module) would be good.
Also, the deleted test_wan22_ti2v_pipeline.py had real test coverage (preprocess validation, timestep expansion, I2V latent preparation). Were those test cases verified to be covered elsewhere, or intentionally dropped?
Purpose
PipelineParallelMixinclass.Wan22TI2VPipelineto avoid confusion.Test Plan
Test Result
t2v_5B_single.mp4
t2v_5B_pp2.mp4
t2v_5B_pp4.mp4
t2v_14B_single.mp4
t2v_14B_pp2.mp4
t2v_14B_pp4.mp4
i2v_14B_single.mp4
i2v_14B_pp2.mp4
i2v_14B_pp4.mp4
Hybrid Parallel with TP and SP
t2v_5B_pp1_cfg1.mp4
t2v_5B_pp2_cfg1_sp2.mp4
t2v_5B_pp2_cfg1_tp2.mp4
Ascend NPU
Tested on Atlas A2 with Wan2.2-TI2V-5B-Diffusers.
t2v_5B_single.mp4
t2v_5B_pp4.mp4
t2v_5B_pp2_sp2.mp4
t2v_5B_pp2_tp2.mp4
t2v_5B_pp4_cfg2.mp4
t2v_5B_pp8.mp4
Essential Elements of an Effective PR Description Checklist
supported_models.mdandexamplesfor a new model. Please runmkdocs serveto sync the documentation editions to./docs.BEFORE SUBMITTING, PLEASE READ https://github.com/vllm-project/vllm-omni/blob/main/CONTRIBUTING.md (anything written below this line will be removed by GitHub Actions)