diff --git a/.github/workflows/_e2e_test.yaml b/.github/workflows/_e2e_test.yaml index 9ef5650e24b..ef7c621acf2 100644 --- a/.github/workflows/_e2e_test.yaml +++ b/.github/workflows/_e2e_test.yaml @@ -27,7 +27,7 @@ on: continue_on_error: required: false type: boolean - default: false + default: true # The following inputs are used by comment-triggered E2E tests (/e2e ). # They carry space-separated pytest paths, categorized by runner type. # Leave empty (default) when running label-triggered full/light suites. diff --git a/.github/workflows/dockerfiles/Dockerfile.lint b/.github/workflows/dockerfiles/Dockerfile.lint index af664c1dc32..573c2530c9e 100644 --- a/.github/workflows/dockerfiles/Dockerfile.lint +++ b/.github/workflows/dockerfiles/Dockerfile.lint @@ -27,7 +27,7 @@ RUN apt-get update -y && \ ARG VLLM_REPO=https://github.com/vllm-project/vllm.git # For lint purpose, actually we need make a main2main matching. -ARG VLLM_COMMIT=d886c26d4d4fef7d079696beb4ece1cfb4b008a8 +ARG VLLM_COMMIT=856b15c62c8a574a1a0a289444d5b9a8120433e3 RUN git init /vllm-workspace/vllm && \ git -C /vllm-workspace/vllm fetch --depth 1 $VLLM_REPO $VLLM_COMMIT && \ git -C /vllm-workspace/vllm checkout FETCH_HEAD diff --git a/.github/workflows/pr_test_full.yaml b/.github/workflows/pr_test_full.yaml index c3f15a90f13..8e8b0eaeeee 100644 --- a/.github/workflows/pr_test_full.yaml +++ b/.github/workflows/pr_test_full.yaml @@ -80,7 +80,7 @@ jobs: name: e2e-full strategy: matrix: - vllm_version: [d886c26d4d4fef7d079696beb4ece1cfb4b008a8, v0.19.1] + vllm_version: [856b15c62c8a574a1a0a289444d5b9a8120433e3, v0.19.1] needs: [changes] if: ${{ needs.changes.outputs.e2e_tracker == 'true' || needs.changes.outputs.e2e_tracker == true }} uses: ./.github/workflows/_e2e_test.yaml diff --git a/.github/workflows/pr_test_light.yaml b/.github/workflows/pr_test_light.yaml index bc51592122f..dde52b83b88 100644 --- a/.github/workflows/pr_test_light.yaml +++ b/.github/workflows/pr_test_light.yaml @@ -41,7 +41,7 @@ jobs: lint: uses: ./.github/workflows/_pre_commit.yml with: - vllm: d886c26d4d4fef7d079696beb4ece1cfb4b008a8 + vllm: 856b15c62c8a574a1a0a289444d5b9a8120433e3 changes: runs-on: linux-aarch64-a2b3-0 container: @@ -154,7 +154,7 @@ jobs: if: ${{ needs.lint.result == 'success' && needs.changes.outputs.has_tests == 'true' }} strategy: matrix: - vllm_version: [d886c26d4d4fef7d079696beb4ece1cfb4b008a8, v0.19.1] + vllm_version: [856b15c62c8a574a1a0a289444d5b9a8120433e3, v0.19.1] uses: ./.github/workflows/_optional_smart_e2e.yaml with: vllm: ${{ matrix.vllm_version }} @@ -164,7 +164,7 @@ jobs: name: e2e-light strategy: matrix: - vllm_version: [d886c26d4d4fef7d079696beb4ece1cfb4b008a8, v0.19.1] + vllm_version: [856b15c62c8a574a1a0a289444d5b9a8120433e3, v0.19.1] # Note (yikun): If CI resource are limited we can split job into two chain jobs needs: [lint, changes] # only trigger e2e test after lint passed and the change is e2e related with pull request. diff --git a/csrc/third_party/catlass b/csrc/third_party/catlass index b50cad688e0..716fd7baa7f 160000 --- a/csrc/third_party/catlass +++ b/csrc/third_party/catlass @@ -1 +1 @@ -Subproject commit b50cad688e03281c9281e421edd01369b6dd2144 +Subproject commit 716fd7baa7fb7f6cac0488bb628fd1dd0e875641 diff --git a/docs/source/conf.py b/docs/source/conf.py index fc6a857335f..e6bb9799dcf 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -81,7 +81,7 @@ # CANN image tag "cann_image_tag": "8.5.1-910b-ubuntu22.04-py3.11", # vLLM commit hash for main branch - "main_vllm_commit": "d886c26d4d4fef7d079696beb4ece1cfb4b008a8", + "main_vllm_commit": "856b15c62c8a574a1a0a289444d5b9a8120433e3", # vLLM tag for main branch "main_vllm_tag": "v0.19.1", # Python version for main branch diff --git a/vllm_ascend/_310p/fused_moe/fused_moe.py b/vllm_ascend/_310p/fused_moe/fused_moe.py index 0b515480d43..cd949c82a44 100644 --- a/vllm_ascend/_310p/fused_moe/fused_moe.py +++ b/vllm_ascend/_310p/fused_moe/fused_moe.py @@ -20,14 +20,11 @@ from vllm.distributed import get_dp_group, get_ep_group, get_tp_group from vllm.model_executor.layers.fused_moe.config import FusedMoEConfig from vllm.model_executor.layers.fused_moe.layer import FusedMoE, UnquantizedFusedMoEMethod -from vllm.model_executor.layers.fused_moe.shared_fused_moe import SharedFusedMoE - from vllm_ascend.ascend_forward_context import _EXTRA_CTX, MoECommType from vllm_ascend.ops.fused_moe.experts_selector import zero_experts_compute from vllm_ascend.ops.fused_moe.moe_comm_method import FusedExpertsResult, _MoECommMethods from vllm_ascend.ops.fused_moe.moe_runtime_args import build_fused_experts_input from vllm_ascend.quantization.quant_type import QuantType -from vllm_ascend.utils import vllm_version_is from .experts_selector import select_experts from .moe_comm_method import AllGatherCommImpl310 @@ -164,16 +161,14 @@ def __init__(self, *args, **kwargs): from vllm_ascend.ops.fused_moe.fused_moe import AscendMoERunner - is_legacy = vllm_version_is("0.19.1") self.runner = AscendMoERunner( - self if is_legacy else self.layer_name, + self.layer_name, self.moe_config, self.router, - self._routed_input_transform, - self.gate if is_legacy else kwargs.pop("gate", None), - self.shared_experts if is_legacy else kwargs.pop("shared_experts", None), + self.runner.routed_input_transform, + kwargs.pop("gate", None), + kwargs.pop("shared_experts", None), self.quant_method, - self.reduce_results, self.vllm_config.parallel_config.enable_dbo, ) @@ -263,7 +258,7 @@ def forward_impl( # type: ignore[override] return routed_out -class AscendSharedFusedMoE310(SharedFusedMoE, AscendFusedMoE310): +class AscendSharedFusedMoE310(AscendFusedMoE310): def __init__( self, shared_experts: torch.nn.Module, @@ -285,16 +280,14 @@ def __init__( # which at this point is still the stale runner built with shared_experts=None. from vllm_ascend.ops.fused_moe.fused_moe import AscendMoERunner - is_legacy = vllm_version_is("0.19.1") self.runner = AscendMoERunner( - self if is_legacy else self.layer_name, + self.layer_name, self.moe_config, self.router, self._routed_input_transform, self._gate, self._shared_experts, self.quant_method, - self.reduce_results, self.vllm_config.parallel_config.enable_dbo, ) diff --git a/vllm_ascend/ops/fused_moe/fused_moe.py b/vllm_ascend/ops/fused_moe/fused_moe.py index b793bff0701..11c1fb68df0 100644 --- a/vllm_ascend/ops/fused_moe/fused_moe.py +++ b/vllm_ascend/ops/fused_moe/fused_moe.py @@ -29,8 +29,8 @@ from vllm.model_executor.layers.fused_moe.config import FusedMoEConfig from vllm.model_executor.layers.fused_moe.layer import FusedMoE, UnquantizedFusedMoEMethod, get_compressed_expert_map from vllm.model_executor.layers.fused_moe.routed_experts_capturer import RoutedExpertsCapturer -from vllm.model_executor.layers.fused_moe.runner.default_moe_runner import DefaultMoERunner # type: ignore -from vllm.model_executor.layers.fused_moe.shared_fused_moe import SharedFusedMoE +from vllm.model_executor.layers.fused_moe.runner.moe_runner import MoERunner + import vllm_ascend.envs as envs_ascend from vllm_ascend.ascend_config import get_ascend_config @@ -49,7 +49,6 @@ npu_stream_switch, shared_expert_dp_enabled, shared_experts_calculation_stream, - vllm_version_is, ) @@ -220,47 +219,17 @@ def apply( return final_hidden_states -# Please remove this inheritance after extending vllm, todo(wxs) -class AscendMoERunner(DefaultMoERunner): - @property - def use_dp_chunking(self) -> bool: - """Ascend uses its own forward_impl path, not the FlashInfer Cutlass - chunked path. Always return False to stay on forward_impl.""" - return False - - # TODO: Remove this after drop v0.19.1 support - def forward_impl( - self, - layer: torch.nn.Module, - hidden_states: torch.Tensor, - router_logits: torch.Tensor, - shared_input: torch.Tensor | None, - ) -> torch.Tensor | tuple[torch.Tensor, torch.Tensor]: - """ - Override the default forward_impl to use Ascend-specific implementation. - This delegates to the layer's forward_impl method which contains the - Ascend-specific MoE computation logic. - """ - result = layer.forward_impl(hidden_states, router_logits) - # If the layer has shared experts, forward_impl returns a tuple (shared_out, routed_out) - # Otherwise, it returns just routed_out - # The torch op expects the same return type based on whether it's moe_forward or moe_forward_shared - return result - - def forward_dispatch( +class AscendMoERunner(MoERunner): + def _forward_impl( self, layer: torch.nn.Module, hidden_states: torch.Tensor, router_logits: torch.Tensor, shared_experts_input: torch.Tensor | None, + input_ids: torch.Tensor | None = None, ) -> torch.Tensor | tuple[torch.Tensor, torch.Tensor]: with self._sequence_parallel_context(): - return self.forward_impl( - layer, - hidden_states, - router_logits, - shared_experts_input, - ) + return layer.forward_impl(hidden_states, router_logits) class AscendFusedMoE(FusedMoE): @@ -274,6 +243,15 @@ def __init__(self, *args, **kwargs): intermediate_size = kwargs["intermediate_size"] num_shared_experts = kwargs.get("n_shared_experts", 0) + # Extract shared_experts and gate early for attribute setup below. + # kwargs.pop() is called later at runner creation time; use .get() here + # so the values are still available for both attribute assignment and + # the pop-based runner construction. + _shared_experts_raw = kwargs.get("shared_experts") + _gate_raw = kwargs.get("gate") + _use_overlapped = kwargs.get("use_overlapped", True) + has_shared_experts = _shared_experts_raw is not None + AscendFusedMoE.moe_counter += 1 self.moe_instance_id = AscendFusedMoE.moe_counter @@ -293,8 +271,8 @@ def __init__(self, *args, **kwargs): self.moe_config.mc2_group = get_mc2_group() self.moe_config.supports_eplb = self.quant_method.supports_eplb ascend_config = get_ascend_config() - # flashcommon3 gate stream - self.multistream_overlap_gate = ascend_config.multistream_overlap_gate + # flashcommon3 gate stream – only meaningful when shared experts exist + self.multistream_overlap_gate = ascend_config.multistream_overlap_gate and has_shared_experts if self.multistream_overlap_gate and AscendFusedMoE.gate_stream is None: AscendFusedMoE.gate_stream = torch.npu.Stream() if self.custom_routing_function is None and self.e_score_correction_bias is not None: @@ -354,19 +332,33 @@ def __init__(self, *args, **kwargs): self.enable_shared_expert_dp = ascend_config.enable_shared_expert_dp self.enable_npugraph_ex_static_kernel = ascend_config.ascend_compilation_config.enable_static_kernel + # Shared-expert / gate attributes – used in forward_impl and by + # AscendSharedFusedMoE (which can still be created from old call-sites). + self._gate = _gate_raw + self.use_overlapped = _use_overlapped + self._shared_experts = _shared_experts_raw + self.shared_expert_stream = None + self.multistream_overlap_shared_expert = ( + ascend_config.multistream_overlap_shared_expert and has_shared_experts + ) + # Upstream FusedMoE removed reduce_results; keep False for finalize(). + self.reduce_results = False + setup_moe_comm_method(self.moe_config) self.quant_type = self._get_quant_type() - is_legacy = vllm_version_is("0.19.1") + # Gate passed to the runner respects use_overlapped. + gate_for_runner = self._gate if self.use_overlapped else None + _ = kwargs.pop("gate", None) + _ = kwargs.pop("shared_experts", None) self.runner = AscendMoERunner( - self if is_legacy else self.layer_name, + self.layer_name, self.moe_config, self.router, - self._routed_input_transform, - self.gate if is_legacy else kwargs.pop("gate", None), - self.shared_experts if is_legacy else kwargs.pop("shared_experts", None), + self.runner.routed_input_transform, + gate_for_runner, + self._shared_experts, self.quant_method, - self.reduce_results, self.vllm_config.parallel_config.enable_dbo, ) @@ -379,6 +371,57 @@ def _get_quant_type(self) -> QuantType: return quant_type + @property + def is_internal_router(self) -> bool: + # The Ascend MoE always expects the model to compute router logits + # externally (the gate in the runner is used only for multistream + # overlap, not for the internal-router dispatch path). + return False + + @property + def gate(self) -> torch.nn.Module | None: + return self._gate if self.use_overlapped else None + + def _shared_experts_part1(self, hidden_states: torch.Tensor): + shared_gate_up, _ = self._shared_experts.gate_up_proj(hidden_states) # type: ignore + return shared_gate_up + + def _shared_experts_part2(self, hidden_states: torch.Tensor, shared_gate_up: torch.Tensor): + shared_act = self._shared_experts.act_fn(shared_gate_up) # type: ignore + shared_out, _ = self._shared_experts.down_proj(shared_act) # type: ignore + + # Qwen3-Next specific gating mechanism + if hasattr(self._shared_experts, "expert_gate") and self._shared_experts.expert_gate is not None: + gate_out, _ = self._shared_experts.expert_gate(hidden_states) # type: ignore + shared_out = F.sigmoid(gate_out) * shared_out + return shared_out + + def _forward_shared_experts(self, hidden_states: torch.Tensor, fused_moe_evts: FusedMoEEvents): + if self._shared_experts is None: + return None + + def maybe_wait_event(evt: torch.npu.Event | None): + if evt is not None: + torch.npu.current_stream().wait_event(evt) + + with npu_stream_switch(shared_experts_calculation_stream(), enabled=self.multistream_overlap_shared_expert): + torch.npu.current_stream().wait_event(fused_moe_evts.before_routed_experts) + maybe_wait_event(fused_moe_evts.before_dispatch) + part1_out = self._shared_experts_part1(hidden_states) + maybe_wait_event(fused_moe_evts.before_combine) + shared_out = self._shared_experts_part2(hidden_states, part1_out) + + if self.multistream_overlap_shared_expert: + torch.npu.current_stream().wait_stream(shared_experts_calculation_stream()) + + moe_comm_type = _EXTRA_CTX.moe_comm_type + if ( + moe_comm_type in {MoECommType.ALLTOALL, MoECommType.MC2, MoECommType.FUSED_MC2} + and not shared_expert_dp_enabled() + ): + shared_out = tensor_model_parallel_all_reduce(shared_out) + return shared_out + def update_expert_map(self, new_expert_map): self._expert_map = new_expert_map @@ -428,7 +471,17 @@ def forward_impl( # type: ignore[override] # This approach may overlook some extreme scenarios. enable_force_load_balance = _EXTRA_CTX.in_profile_run + has_shared = self._shared_experts is not None + forward_context = get_forward_context() + # For the direct (non-event-return) shared-expert path, set up context and + # events before the gate-stream block so they are available later. + if has_shared and not return_with_event: + if self.multistream_overlap_gate: + set_flash_common3_context(shared_experts=self._shared_experts) + else: + before_routed_experts = torch.npu.current_stream().record_event() + if self.multistream_overlap_gate: assert AscendFusedMoE.gate_stream is not None fc3_context = get_flash_common3_context() @@ -468,6 +521,10 @@ def forward_impl( # type: ignore[override] set_flash_common3_context(topk_weights=topk_weights, topk_ids=topk_ids) + # Save original hidden_states before prepare_output may replace it, + # so shared experts always receive the pre-dispatch activations. + original_hidden_states = hidden_states + prepare_output = _EXTRA_CTX.moe_comm_method.prepare( hidden_states=hidden_states, router_logits=router_logits, @@ -541,12 +598,28 @@ def forward_impl( # type: ignore[override] before_dispatch_evt=fused_experts_results.before_dispatch_evt, before_combine_evt=fused_experts_results.before_combine_evt, ) - else: - # The vLLM FusedMoE forward_impl does not return events. - return routed_out + + if has_shared: + if self.multistream_overlap_gate: + fc3_context = get_flash_common3_context() + assert fc3_context is not None + shared_out = fc3_context.shared_out + else: + shared_out = self._forward_shared_experts( + original_hidden_states, + FusedMoEEvents( + before_routed_experts=before_routed_experts, + before_dispatch=fused_experts_results.before_dispatch_evt, + before_combine=fused_experts_results.before_combine_evt, + ), + ) + return shared_out, routed_out + + # The vLLM FusedMoE forward_impl does not return events. + return routed_out -class AscendSharedFusedMoE(SharedFusedMoE, AscendFusedMoE): +class AscendSharedFusedMoE(AscendFusedMoE): def __init__( self, shared_experts: torch.nn.Module, @@ -583,16 +656,14 @@ def __init__( # NOTE: must use self._shared_experts here, not self.shared_experts — # FusedMoE.shared_experts is a property that reads self.runner.shared_experts, # which at this point is still the stale runner built with shared_experts=None. - is_legacy = vllm_version_is("0.19.1") self.runner = AscendMoERunner( - self if is_legacy else self.layer_name, + self.layer_name, self.moe_config, self.router, self._routed_input_transform, self.gate, self._shared_experts, self.quant_method, - self.reduce_results, self.vllm_config.parallel_config.enable_dbo, ) diff --git a/vllm_ascend/sample/rejection_sampler.py b/vllm_ascend/sample/rejection_sampler.py index 358c294ebe0..26c0bf395ac 100644 --- a/vllm_ascend/sample/rejection_sampler.py +++ b/vllm_ascend/sample/rejection_sampler.py @@ -94,6 +94,8 @@ def rejection_sample( # [batch_size, 1] bonus_token_ids: torch.Tensor, sampling_metadata: SamplingMetadata, + synthetic_mode: bool = False, + synthetic_conditional_rates: torch.Tensor | None = None, ) -> torch.Tensor: assert draft_token_ids.ndim == 1 assert draft_probs is None or draft_probs.ndim == 2