diff --git a/tests/unit_tests/kv_offload/offloading_connector/utils.py b/tests/unit_tests/kv_offload/offloading_connector/utils.py index 83e7e2c8f5..1f70098d78 100644 --- a/tests/unit_tests/kv_offload/offloading_connector/utils.py +++ b/tests/unit_tests/kv_offload/offloading_connector/utils.py @@ -1,6 +1,5 @@ # SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright contributors to the vLLM project -import copy from collections.abc import Iterable, Iterator from dataclasses import dataclass from typing import Any @@ -11,31 +10,39 @@ from tests.unit_tests.kv_offload.utils import ( EOS_TOKEN_ID, - create_request_compatible_with_signature, create_model_runner_output, + create_request_compatible_with_signature, create_vllm_config, ) from vllm import SamplingParams from vllm.config import KVTransferConfig, VllmConfig, set_current_vllm_config from vllm.distributed.kv_transfer.kv_connector.v1 import KVConnectorRole from vllm.distributed.kv_transfer.kv_connector.v1.offloading.common import ( - OffloadingConnectorMetadata, ) + OffloadingConnectorMetadata, + OffloadingWorkerMetadata, +) from vllm.distributed.kv_transfer.kv_connector.v1.offloading_connector import ( OffloadingConnector, ) from vllm.forward_context import ForwardContext from vllm.utils.hashing import sha256 from vllm.v1.attention.backends.flash_attn import FlashAttentionBackend from vllm.v1.core.kv_cache_utils import ( - BlockHash, get_request_block_hasher, init_none_hash, ) +from vllm.v1.core.sched.async_scheduler import AsyncScheduler from vllm.v1.core.sched.scheduler import Scheduler -from vllm.v1.kv_cache_interface import KVCacheConfig +from vllm.v1.kv_cache_interface import ( + FullAttentionSpec, + KVCacheConfig, + KVCacheGroupSpec, +) from vllm.v1.kv_offload.abstract import ( LoadStoreSpec, OffloadingManager, + OffloadKey, PrepareStoreOutput, + make_offload_key, ) from vllm.v1.kv_offload.mediums import GPULoadStoreSpec from vllm.v1.kv_offload.spec import OffloadingSpec @@ -44,26 +51,28 @@ TransferResult, TransferSpec, ) -from vllm.v1.outputs import EMPTY_MODEL_RUNNER_OUTPUT, KVConnectorOutput -from vllm.v1.core.sched.async_scheduler import AsyncScheduler -from vllm.v1.kv_cache_interface import ( - FullAttentionSpec, - KVCacheGroupSpec, -) from vllm.v1.structured_output import StructuredOutputManager +def to_key(int_hash: int) -> OffloadKey: + return make_offload_key(str(int_hash).encode(), 0) + + +def to_keys(int_hashes: list[int]) -> list[OffloadKey]: + return [to_key(i) for i in int_hashes] + + class MockLoadStoreSpec(LoadStoreSpec): - def __init__(self, block_hashes: Iterable[BlockHash]): - self.block_hashes: list[BlockHash] = list(block_hashes) + def __init__(self, offload_keys: Iterable[OffloadKey]): + self.offload_keys: list[OffloadKey] = list(offload_keys) @staticmethod def medium() -> str: return "Mock" def __repr__(self) -> str: - return repr(self.block_hashes) + return repr(self.offload_keys) class MockOffloadingHandler(OffloadingHandler): @@ -111,7 +120,8 @@ def __init__(self, vllm_config: VllmConfig, kv_cache_config: KVCacheConfig): self.manager = MagicMock(spec=OffloadingManager) self.manager.lookup.return_value = 0 - self.manager.prepare_load = lambda block_hashes, req_context: (MockLoadStoreSpec(block_hashes)) + self.manager.prepare_load = lambda keys, req_context: MockLoadStoreSpec(keys) + self.manager.lookup.return_value = False self.handler = MockOffloadingHandler() def get_manager(self) -> OffloadingManager: @@ -143,14 +153,17 @@ class TransferSummary: class RequestRunner: - def __init__(self, - offloaded_block_size: int, - gpu_block_size: int, - num_gpu_blocks: int, - async_scheduling: bool = False): + def __init__( + self, + offloaded_block_size: int, + gpu_block_size: int, + num_gpu_blocks: int, + async_scheduling: bool = True, + ): self.offloaded_block_size: int = offloaded_block_size self.gpu_block_size: int = gpu_block_size self.num_gpu_blocks: int = num_gpu_blocks + self.async_scheduling: bool = async_scheduling self.req_id: int = -1 @@ -184,7 +197,8 @@ def __init__(self, ) vllm_config.cache_config.num_gpu_blocks = num_gpu_blocks self.num_kv_groups = len(kv_cache_config.kv_cache_groups) - scheduler_cls = (AsyncScheduler if vllm_config.scheduler_config.async_scheduling else Scheduler) + + scheduler_cls = AsyncScheduler if async_scheduling else Scheduler self.scheduler = scheduler_cls( vllm_config=vllm_config, kv_cache_config=kv_cache_config, @@ -247,7 +261,11 @@ def __init__(self, slot_mapping={}, ) - def new_request(self, token_ids: list[int]): + def new_request( + self, + token_ids: list[int], + kv_transfer_params: dict | None = None, + ): self.req_id += 1 sampling_params = SamplingParams(max_tokens=1000) @@ -260,9 +278,9 @@ def new_request(self, token_ids: list[int]): "pooling_params": None, "block_hasher": self._block_hasher, } - req = create_request_compatible_with_signature(**request_kwargs) - + if kv_transfer_params is not None: + req.kv_transfer_params = kv_transfer_params self.scheduler.add_request(req) def _parse_transfers(self): @@ -294,11 +312,11 @@ def _parse_transfers(self): for block_id in gpu_spec.block_ids: gpu_block_indices.append(self.gpu_block_index[block_id.item()]) - # list of (block_hash, sub_block_offset) + # list of (offload_key, sub_block_offset) offload_addresses: list[Any] = [] - for block_hash in offload_spec.block_hashes: + for offload_key in offload_spec.offload_keys: for sub_block_idx in range(block_size_factor): - offload_addresses.append((block_hash, sub_block_idx)) + offload_addresses.append((offload_key, sub_block_idx)) if store: assert len(gpu_block_indices) == len(offload_addresses) @@ -329,8 +347,15 @@ def _run(self, decoded_tokens: list[int], complete_transfers: bool): tokens_iter = iter(decoded_tokens) token_id = next(tokens_iter, None) + prev_scheduler_output = None + prev_model_runner_output = None while True: - assert self.scheduler.requests + # Strict-always-False frees the request immediately on EOS, but + # the worker may still have a deferred store queued. In production + # the next request's step drains it; in single-request tests we + # must keep stepping until the scheduler sees no in-flight jobs. + if not self.scheduler.requests and not self.connector_scheduler._jobs: + break scheduler_output = self.scheduler.schedule() self._update_gpu_block_idx() @@ -351,6 +376,7 @@ def _run(self, decoded_tokens: list[int], complete_transfers: bool): self.offloading_spec.complete_transfers() finished_sending, finished_recving = self.worker_connector.get_finished(scheduler_output.finished_req_ids) + worker_meta = self.worker_connector.build_connector_worker_meta() or OffloadingWorkerMetadata() self.worker_connector.clear_connector_metadata() @@ -359,40 +385,38 @@ def _run(self, decoded_tokens: list[int], complete_transfers: bool): finished_sending=finished_sending, finished_recving=finished_recving, token_id=token_id or 0, + kv_connector_worker_meta=worker_meta, ) prev_token_id = token_id if self.scheduler.running: token_id = next(tokens_iter, None) - self.scheduler.update_from_output(scheduler_output, model_runner_output) + if self.async_scheduling: + # in async scheduling we update the output of the previous step + if prev_model_runner_output is not None: + self.scheduler.update_from_output(prev_scheduler_output, prev_model_runner_output) + prev_scheduler_output = scheduler_output + prev_model_runner_output = model_runner_output + else: + self.scheduler.update_from_output(scheduler_output, model_runner_output) - if (prev_token_id is EOS_TOKEN_ID and prev_token_id != token_id and self.scheduler.requests): + if (prev_token_id == EOS_TOKEN_ID and prev_token_id != token_id + and (self.scheduler.requests or self.connector_scheduler._jobs)): # continue for one more step to allow offloading to kick off continue if token_id is None: + if self.async_scheduling: + # sample last token + self.scheduler.update_from_output(prev_scheduler_output, prev_model_runner_output) break self._parse_transfers() - # run one more step to update finished stored if EOS_TOKEN_ID in decoded_tokens: assert not self.scheduler.running - while self.scheduler.requests: - scheduler_output = self.scheduler.schedule() - - finished_sending, finished_recving = self.worker_connector.get_finished( - scheduler_output.finished_req_ids) - - assert not finished_recving - - model_runner_output = copy.deepcopy(EMPTY_MODEL_RUNNER_OUTPUT) - model_runner_output.kv_connector_output = KVConnectorOutput(finished_sending=finished_sending) - - self.scheduler.update_from_output(scheduler_output, model_runner_output) - def run( self, decoded_tokens: list[int], @@ -445,7 +469,7 @@ def run( def request_runner(): runners = [] - def runner_factory(offloaded_block_size, gpu_block_size, num_gpu_blocks, async_scheduling=False): + def runner_factory(offloaded_block_size, gpu_block_size, num_gpu_blocks, async_scheduling): runner = RequestRunner( offloaded_block_size=offloaded_block_size, gpu_block_size=gpu_block_size, @@ -458,10 +482,10 @@ def runner_factory(offloaded_block_size, gpu_block_size, num_gpu_blocks, async_s yield runner_factory # pass factory to the test -def generate_store_output(block_hashes: Iterable[BlockHash]): - block_hashes = list(block_hashes) +def generate_store_output(keys: Iterable[OffloadKey]): + keys = list(keys) return PrepareStoreOutput( - keys_to_store=list(block_hashes), - store_spec=MockLoadStoreSpec(block_hashes), + keys_to_store=list(keys), + store_spec=MockLoadStoreSpec(keys), evicted_keys=[], ) diff --git a/tests/unit_tests/kv_offload/utils.py b/tests/unit_tests/kv_offload/utils.py index b88d629145..1d8575972d 100644 --- a/tests/unit_tests/kv_offload/utils.py +++ b/tests/unit_tests/kv_offload/utils.py @@ -25,6 +25,7 @@ KVConnectorBase_V1, KVConnectorMetadata, KVConnectorRole, + KVConnectorWorkerMetadata, ) from vllm.distributed.kv_transfer.kv_connector.v1.example_connector import ( # noqa ExampleConnector, ) @@ -64,9 +65,9 @@ def assert_scheduler_empty(scheduler: Scheduler): assert len(scheduler.encoder_cache_manager.cached) == 0 # KVCache Manager. - assert (len(scheduler.kv_cache_manager.coordinator.single_type_managers[0].req_to_blocks) == 0) - assert (len(scheduler.kv_cache_manager.coordinator.single_type_managers[0].num_cached_block) == 0) - num_free_blocks = (scheduler.kv_cache_manager.block_pool.free_block_queue.num_free_blocks) + assert len(scheduler.kv_cache_manager.coordinator.single_type_managers[0].req_to_blocks) == 0 + assert len(scheduler.kv_cache_manager.coordinator.single_type_managers[0].num_cached_block) == 0 + num_free_blocks = scheduler.kv_cache_manager.block_pool.free_block_queue.num_free_blocks assert num_free_blocks == (scheduler.kv_cache_manager.block_pool.num_gpu_blocks - 1) # NOTE(rob): just the ref count on blocks will be 0. The hash @@ -232,6 +233,7 @@ def create_model_runner_output( invalid_block_ids: set[int] | None = None, use_eos: bool = False, token_id: int = 0, + kv_connector_worker_meta: KVConnectorWorkerMetadata | None = None, ) -> ModelRunnerOutput: """Make dummy model runner output for testing.""" @@ -243,12 +245,13 @@ def create_model_runner_output( sampled_token = EOS_TOKEN_ID if use_eos else token_id sampled_token_ids = [[sampled_token] for _ in req_ids] - kv_connector_output = (None if (finished_sending is None and finished_recving is None and invalid_block_ids is None) - else KVConnectorOutput( - finished_sending=finished_sending, - finished_recving=finished_recving, - invalid_block_ids=invalid_block_ids or set(), - )) + kv_connector_output = (None if (finished_sending is None and finished_recving is None and invalid_block_ids is None + and kv_connector_worker_meta is None) else KVConnectorOutput( + finished_sending=finished_sending, + finished_recving=finished_recving, + invalid_block_ids=invalid_block_ids or set(), + kv_connector_worker_meta=kv_connector_worker_meta, + )) # Make output data structure. return ModelRunnerOutput( @@ -269,7 +272,7 @@ def __init__(self, config: VllmConfig, role, kv_cache_config): self._connector = ExampleConnector(config, role) self.call_record: dict[str, int] = defaultdict(int) # Use a unique temp file per connector - self._event_file = (tempfile.gettempdir() + f"/connector_{self.name}-{self.role.name}_events.log") + self._event_file = tempfile.gettempdir() + f"/connector_{self.name}-{self.role.name}_events.log" # Start with an empty file with open(self._event_file, "w") as _: pass diff --git a/tests/unit_tests/ops/test_hpu_compressed_tensors.py b/tests/unit_tests/ops/test_hpu_compressed_tensors.py index d6d71584f9..e72b990b07 100644 --- a/tests/unit_tests/ops/test_hpu_compressed_tensors.py +++ b/tests/unit_tests/ops/test_hpu_compressed_tensors.py @@ -11,7 +11,7 @@ HPUCompressedTensorsW8A8Int8_BF16Fallback, HPUCompressedTensorsW8A8Fp8MoEMethod) from vllm_gaudi.utils import HPUCompileConfig -from vllm.forward_context import override_forward_context +from vllm.forward_context import ForwardContext, override_forward_context from safetensors import safe_open @@ -387,10 +387,13 @@ def test_compressed_tensors_wna16_moe_method(default_vllm_config: None, dist_ini ref_output = f.get_tensor("ref_output") # Execute layer - mock_ctx = MagicMock(spec=["dp_metadata"]) - mock_ctx.dp_metadata = None - with override_forward_context(mock_ctx): - out = oot_op.runner._forward_dispatch(oot_op, hidden_states, router_logits, hidden_states) + ctx = ForwardContext( + no_compile_layers={oot_op.runner.layer_name: oot_op}, + attn_metadata={}, + slot_mapping={}, + ) + with override_forward_context(ctx): + out = oot_op.runner.forward(hidden_states, router_logits) # Check correctness torch.testing.assert_close(ref_output, out, atol=1e-4, rtol=1e-4) diff --git a/tests/unit_tests/ops/test_hpu_fused_moe.py b/tests/unit_tests/ops/test_hpu_fused_moe.py index 6aab174f02..da804f9d9f 100644 --- a/tests/unit_tests/ops/test_hpu_fused_moe.py +++ b/tests/unit_tests/ops/test_hpu_fused_moe.py @@ -4,10 +4,9 @@ import torch import habana_frameworks.torch as htorch from utils import get_data_path, create_fused_moe -from unittest.mock import MagicMock from vllm_gaudi.ops.hpu_fused_moe import HPUUnquantizedFusedMoEMethod from vllm_gaudi.utils import HPUCompileConfig -from vllm.forward_context import override_forward_context +from vllm.forward_context import ForwardContext, override_forward_context from safetensors import safe_open @@ -38,10 +37,13 @@ def test_unquantized_fused_moe_method(default_vllm_config: None, dist_init): ref_output = f.get_tensor("ref_output") # Execute layer - mock_ctx = MagicMock(spec=["dp_metadata"]) - mock_ctx.dp_metadata = None - with override_forward_context(mock_ctx): - out = oot_op.runner._forward_dispatch(oot_op, hidden_states, router_logits, hidden_states) + ctx = ForwardContext( + no_compile_layers={oot_op.runner.layer_name: oot_op}, + attn_metadata={}, + slot_mapping={}, + ) + with override_forward_context(ctx): + out = oot_op.runner.forward(hidden_states, router_logits) # Check correctness torch.testing.assert_close(ref_output, out, atol=1e-4, rtol=1e-4) diff --git a/tests/unit_tests/ops/utils.py b/tests/unit_tests/ops/utils.py index 3d252c64aa..bc19aa05c8 100644 --- a/tests/unit_tests/ops/utils.py +++ b/tests/unit_tests/ops/utils.py @@ -70,7 +70,9 @@ def create_fused_moe(quant_config=None): enable_eplb=False, num_redundant_experts=0, has_bias=False, - is_sequence_parallel=False) + is_sequence_parallel=False, + zero_expert_type=None, + hash_indices_table=None) def get_data_path(filename): diff --git a/vllm_gaudi/lora/punica_wrapper/punica_hpu.py b/vllm_gaudi/lora/punica_wrapper/punica_hpu.py index a3c3a38cc5..ca031a3b42 100644 --- a/vllm_gaudi/lora/punica_wrapper/punica_hpu.py +++ b/vllm_gaudi/lora/punica_wrapper/punica_hpu.py @@ -4,7 +4,7 @@ from typing import Optional, Union, final import torch -from vllm_gaudi.extension.ops import (dispatch_bgmv_embedding, dispatch_bgmv_linear) +from vllm_gaudi.extension.ops import LoraMask, dispatch_bgmv_embedding, dispatch_bgmv_linear from vllm.lora.punica_wrapper.punica_base import PunicaWrapperBase @@ -26,33 +26,43 @@ def add_lora_embedding(self, **kwargs) -> None: dispatch_bgmv_embedding(y, x, lora_b_stacked, 0) - def add_lora_linear(self, - y: torch.Tensor, - x: torch.Tensor, - lora_a_stacked: tuple[torch.Tensor, ...], - lora_b_stacked: tuple[torch.Tensor, ...], - scale: float, - output_slices: tuple[int, ...], - *, - buffer: Optional[tuple[torch.Tensor, ...]] = None, - **kwargs) -> None: + def add_lora_linear( + self, + y: torch.Tensor, + x: torch.Tensor, + lora_a_stacked: tuple[torch.Tensor, ...], + lora_b_stacked: tuple[torch.Tensor, ...], + scale: float, + output_slices: tuple[int, ...], + *, + buffer: Optional[tuple[torch.Tensor, ...]] = None, + **kwargs, + ) -> None: x = x.view(-1, x.shape[-1]) offset_left = 0 for slice_idx in range(len(output_slices)): - dispatch_bgmv_linear(y[:, offset_left:offset_left + output_slices[slice_idx]], x, lora_a_stacked[slice_idx], - lora_b_stacked[slice_idx], 0, scale) + dispatch_bgmv_linear( + y[:, offset_left:offset_left + output_slices[slice_idx]], + x, + lora_a_stacked[slice_idx], + lora_b_stacked[slice_idx], + 0, + scale, + ) offset_left += output_slices[slice_idx] - def add_lora_logits(self, - y: torch.Tensor, - x: torch.Tensor, - lora_a_stacked: torch.Tensor, - lora_b_stacked: torch.Tensor, - scale, - *, - buffer: Optional[torch.Tensor] = None, - **kwargs) -> None: + def add_lora_logits( + self, + y: torch.Tensor, + x: torch.Tensor, + lora_a_stacked: torch.Tensor, + lora_b_stacked: torch.Tensor, + scale, + *, + buffer: Optional[torch.Tensor] = None, + **kwargs, + ) -> None: y_org = y y = y.view(-1, y.shape[-1]) x = x.view(-1, x.shape[-1]) @@ -67,7 +77,18 @@ def add_shrink( scale: float, **kwargs, ) -> None: - raise NotImplementedError + x = x.view(-1, x.shape[-1]) + mask = LoraMask.getLoraMask() + for slice_idx in range(len(lora_a_stacked)): + wa = lora_a_stacked[slice_idx][:, 0, :, :] + num_loras = wa.shape[0] + lora_rank = wa.shape[1] + wa = wa.reshape(num_loras * lora_rank, wa.shape[2]).transpose(0, 1) + wa = wa.to(x.dtype) + out = x @ wa + out = out * mask + out = out.reshape(out.shape[0], num_loras, lora_rank).sum(dim=1) + y[slice_idx] += out * scale def add_expand( self, @@ -79,4 +100,22 @@ def add_expand( add_inputs=True, **kwargs, ) -> None: - raise NotImplementedError + y = y.view(-1, y.shape[-1]) + mask = LoraMask.getLoraMask() + offset_left = offset_start + for slice_idx in range(len(lora_b_stacked)): + wb = lora_b_stacked[slice_idx][:, 0, :, :] + num_loras = wb.shape[0] + lora_rank = wb.shape[2] + wb = wb.transpose(1, 2).reshape(num_loras * lora_rank, wb.shape[1]) + x_i = x[slice_idx] + wb = wb.to(x_i.dtype) + x_expanded = x_i.repeat(1, num_loras) + x_expanded = x_expanded * mask + out = x_expanded @ wb + out = out.to(y.dtype) + if add_inputs: + y[:, offset_left:offset_left + output_slices[slice_idx]] += out + else: + y[:, offset_left:offset_left + output_slices[slice_idx]] = out + offset_left += output_slices[slice_idx] diff --git a/vllm_gaudi/ops/hpu_compressed_tensors.py b/vllm_gaudi/ops/hpu_compressed_tensors.py index aed2e179e8..af7916020c 100644 --- a/vllm_gaudi/ops/hpu_compressed_tensors.py +++ b/vllm_gaudi/ops/hpu_compressed_tensors.py @@ -938,6 +938,7 @@ def apply_monolithic( layer: FusedMoE, x: torch.Tensor, router_logits: torch.Tensor, + **kwargs, ) -> Union[torch.Tensor, tuple[torch.Tensor, torch.Tensor]]: input_shape = x.shape x = x.view(-1, x.shape[-1]) diff --git a/vllm_gaudi/ops/hpu_fused_moe.py b/vllm_gaudi/ops/hpu_fused_moe.py index 491a3a95d2..17d242f113 100644 --- a/vllm_gaudi/ops/hpu_fused_moe.py +++ b/vllm_gaudi/ops/hpu_fused_moe.py @@ -4,14 +4,14 @@ import os from typing import Union -from vllm.model_executor.layers.fused_moe.runner.moe_runner_base import ( - MoERunnerBase, ) +from vllm.model_executor.layers.fused_moe.runner.moe_runner import ( + MoERunner as MoERunnerBase, ) import torch import vllm import vllm.envs as envs from vllm.config import get_current_vllm_config from vllm.distributed.eplb.eplb_state import EplbLayerState -from vllm.model_executor.layers.fused_moe.layer import (FusedMoE, UnquantizedFusedMoEMethod) +from vllm.model_executor.layers.fused_moe.layer import FusedMoE, UnquantizedFusedMoEMethod from vllm.model_executor.layers.fused_moe.router.custom_routing_router import ( CustomRoutingRouter, ) from vllm.model_executor.layers.fused_moe.router.fused_topk_bias_router import ( @@ -22,14 +22,13 @@ FusedTopKRouter, ) from vllm.model_executor.layers.fused_moe.router.grouped_topk_router import ( GroupedTopKRouter, ) - from vllm.model_executor.layers.fused_moe.router.router_factory import ( EMPTY_EPLB_STATE, ) from vllm.model_executor.layers.fused_moe.router.routing_simulator_router import ( RoutingSimulatorRouter, ) from vllm.model_executor.layers.fused_moe.router.zero_expert_router import ( ZeroExpertRouter, ) -from vllm_gaudi.extension.ops import (VllmMixtureOfExpertsOp) +from vllm_gaudi.extension.ops import VllmMixtureOfExpertsOp from vllm_gaudi.extension.runtime import get_config from vllm.model_executor.utils import set_weight_attrs from vllm_gaudi.utils import has_quant_config @@ -51,11 +50,11 @@ def __init__(self, *args, **kwargs): vllm_config = get_current_vllm_config() self.model_type = None self.is_mxfp4 = False - if vllm_config is not None and vllm_config.model_config is not None \ - and vllm_config.model_config.hf_config is not None: + if (vllm_config is not None and vllm_config.model_config is not None + and vllm_config.model_config.hf_config is not None): self.model_type = vllm_config.model_config.hf_config.model_type - if hasattr(vllm_config.model_config.hf_config, "quantization_config") and \ - vllm_config.model_config.hf_config.quantization_config is not None: + if (hasattr(vllm_config.model_config.hf_config, "quantization_config") + and vllm_config.model_config.hf_config.quantization_config is not None): self.is_mxfp4 = vllm_config.model_config.hf_config.quantization_config.get("quant_method") == "mxfp4" def _select_monolithic(self) -> Callable: @@ -71,7 +70,7 @@ def process_weights_after_loading(self, layer: torch.nn.Module) -> None: # custom handling for HPU num_experts = layer.local_num_experts ep_shift = layer.ep_rank * num_experts - has_bias = hasattr(layer, 'w13_bias') and hasattr(layer, 'w2_bias') + has_bias = hasattr(layer, "w13_bias") and hasattr(layer, "w2_bias") experts_min, experts_max = ep_shift, num_experts + ep_shift - 1 @@ -82,7 +81,7 @@ def process_weights_after_loading(self, layer: torch.nn.Module) -> None: bias = has_bias if has_bias is True else None - is_bf16 = getattr(layer, 'w13_weight', None) is not None and layer.w13_weight.dtype == torch.bfloat16 + is_bf16 = getattr(layer, "w13_weight", None) is not None and layer.w13_weight.dtype == torch.bfloat16 model_config = None if getattr(layer, "vllm_config", None) is not None: @@ -107,33 +106,43 @@ def process_weights_after_loading(self, layer: torch.nn.Module) -> None: if cache_weight_lists and hasattr(layer.moe_op, "_cache_weight_lists"): layer.moe_op._cache_weight_lists() - def create_weights(self, layer: torch.nn.Module, num_experts: int, hidden_size: int, - intermediate_size_per_partition: int, params_dtype: torch.dtype, **extra_weight_attrs): + def create_weights( + self, + layer: torch.nn.Module, + num_experts: int, + hidden_size: int, + intermediate_size_per_partition: int, + params_dtype: torch.dtype, + **extra_weight_attrs, + ): if self.model_type in ["gpt_oss"] and self.is_mxfp4: from vllm.utils.math_utils import round_up + # Fused gate_up_proj (column parallel) - w13_weight = torch.nn.Parameter(torch.zeros(num_experts, - 2 * round_up(intermediate_size_per_partition, 32), - hidden_size, - dtype=params_dtype), - requires_grad=False) + w13_weight = torch.nn.Parameter( + torch.zeros(num_experts, + 2 * round_up(intermediate_size_per_partition, 32), + hidden_size, + dtype=params_dtype), + requires_grad=False, + ) layer.register_parameter("w13_weight", w13_weight) set_weight_attrs(w13_weight, extra_weight_attrs) - w13_bias = torch.nn.Parameter(torch.zeros(num_experts, - 2 * round_up(intermediate_size_per_partition, 32), - dtype=params_dtype), - requires_grad=False) + w13_bias = torch.nn.Parameter( + torch.zeros(num_experts, 2 * round_up(intermediate_size_per_partition, 32), dtype=params_dtype), + requires_grad=False, + ) layer.register_parameter("w13_bias", w13_bias) set_weight_attrs(w13_bias, extra_weight_attrs) # down_proj (row parallel) - w2_weight = torch.nn.Parameter(torch.zeros(num_experts, - hidden_size, - round_up(intermediate_size_per_partition, 32), - dtype=params_dtype), - requires_grad=False) + w2_weight = torch.nn.Parameter( + torch.zeros(num_experts, hidden_size, round_up(intermediate_size_per_partition, 32), + dtype=params_dtype), + requires_grad=False, + ) layer.register_parameter("w2_weight", w2_weight) set_weight_attrs(w2_weight, extra_weight_attrs) @@ -157,6 +166,7 @@ def apply_monolithic( topk_weights, topk_ids = layer.router.select_experts(hidden_states=x, router_logits=router_logits) else: import torch.nn.functional as F + if self.model_type == "gpt_oss": topk_weights, topk_ids = torch.topk(router_logits, layer.top_k, dim=-1) topk_weights = F.softmax(topk_weights, dim=-1, dtype=torch.float32) @@ -210,6 +220,7 @@ def forward_oot( topk_weights, topk_ids = layer.router.select_experts(hidden_states=x, router_logits=router_logits) else: import torch.nn.functional as F + if self.model_type is not None and self.model_type in ["gpt_oss"]: topk_weights, topk_ids = torch.topk(router_logits, layer.top_k, dim=-1) topk_weights = F.softmax(topk_weights, dim=-1, dtype=torch.float32) @@ -264,6 +275,7 @@ def patched_fused_moe_forward( self, hidden_states: torch.Tensor, router_logits: torch.Tensor, + input_ids: torch.Tensor | None = None, ) -> Union[torch.Tensor, tuple[torch.Tensor, torch.Tensor]]: """Patched forward that avoids graph breaks from ForwardContext lookups and dynamo per-layer string guards. @@ -277,8 +289,8 @@ def patched_fused_moe_forward( emits per-layer string guards that trigger recompilation. The post-forward reduction sequence mirrors upstream - MoERunnerBase.forward (vllm/model_executor/layers/fused_moe/runner/ - moe_runner_base.py) so we stay in sync with the new shared/fused + MoERunner.forward (vllm/model_executor/layers/fused_moe/runner/ + moe_runner.py) so we stay in sync with the new shared/fused output combination logic introduced by upstream PR #35949. """ hidden_states, shared_experts_input = self.apply_routed_input_transform(hidden_states) @@ -295,11 +307,12 @@ def patched_fused_moe_forward( if self.gate is not None: router_logits, _ = self.gate(hidden_states) - result = self._forward_impl(self._hpu_layer_ref, hidden_states, router_logits, shared_experts_input) + result = self._forward_impl(self._hpu_layer_ref, hidden_states, router_logits, shared_experts_input, input_ids) else: - result = self._forward_entry(hidden_states, router_logits, shared_experts_input, self._encode_layer_name()) + result = self._forward_entry(hidden_states, router_logits, shared_experts_input, input_ids, + self._encode_layer_name()) - # Mirror upstream MoERunnerBase.forward post-_forward_entry pipeline. + # Mirror upstream MoERunner.forward post-_forward_entry pipeline. if isinstance(result, tuple): shared_output, fused_output = result else: @@ -329,7 +342,7 @@ def get_compressed_expert_map(expert_map: torch.Tensor) -> str: experts that are not assigned to the current rank. Returns: - str: A string mapping from local to global index, + str: A string mapping from local to global index, ordered by global index. (e.g., "0->5, 1->12, 2->23") """ @@ -369,6 +382,7 @@ def create_fused_moe_router( # zero expert parameters zero_expert_type: str | None = None, num_logical_experts: int | None = None, + hash_indices_table: torch.Tensor | None = None, ) -> FusedMoERouter: """ Factory function to create the appropriate FusedMoERouter subclass based on @@ -412,6 +426,10 @@ def create_fused_moe_router( num_logical_experts: Number of real (non-zero) experts. Required when zero_expert_type is not None. + Hash Indices Table: + hash_indices_table: Used to map input_ids to experts, needed for + Deepseek V4 + Returns: An instance of the appropriate FusedMoERouter subclass """ @@ -427,8 +445,8 @@ def create_fused_moe_router( ) if zero_expert_type is not None: - assert num_logical_experts is not None, ("num_logical_experts is required when zero_expert_type is set") - assert e_score_correction_bias is not None, ("e_score_correction_bias is required when zero_expert_type is set") + assert num_logical_experts is not None, "num_logical_experts is required when zero_expert_type is set" + assert e_score_correction_bias is not None, "e_score_correction_bias is required when zero_expert_type is set" return ZeroExpertRouter( top_k=top_k, global_num_experts=global_num_experts, @@ -446,8 +464,7 @@ def create_fused_moe_router( if use_grouped_topk: assert custom_routing_function is None if num_expert_group is None or topk_group is None: - raise ValueError("num_expert_group and topk_group must be provided when " - "use_grouped_topk is True") + raise ValueError("num_expert_group and topk_group must be provided when use_grouped_topk is True") grouped_topk_router = GroupedTopKRouter( top_k=top_k, global_num_experts=global_num_experts, @@ -475,7 +492,9 @@ def create_fused_moe_router( indices_type_getter=indices_type_getter, ) - if e_score_correction_bias is not None: + assert scoring_func in ["sigmoid", "softmax", "sqrtsoftplus"] + + if e_score_correction_bias is not None or hash_indices_table is not None: return FusedTopKBiasRouter( top_k=top_k, global_num_experts=global_num_experts, @@ -486,6 +505,7 @@ def create_fused_moe_router( routed_scaling_factor=routed_scaling_factor, enable_eplb=enable_eplb, indices_type_getter=indices_type_getter, + hash_indices_table=hash_indices_table, ) return FusedTopKRouter( diff --git a/vllm_gaudi/ops/hpu_lora.py b/vllm_gaudi/ops/hpu_lora.py index e32459023e..76b47e34ab 100644 --- a/vllm_gaudi/ops/hpu_lora.py +++ b/vllm_gaudi/ops/hpu_lora.py @@ -54,7 +54,8 @@ def forward(self, x: torch.Tensor) -> torch.Tensor: # Patch _all_lora_classes so from_layer() creates HPU-specific instances. # The module-level patching above is not sufficient because vllm.lora.utils # captures class references at import time via `from ... import`. +import vllm.lora.utils # noqa: E402 from vllm.lora.utils import _all_lora_classes # noqa: E402 -_all_lora_classes.discard(VocabParallelEmbeddingWithLoRA) -_all_lora_classes.add(HPUVocabParallelEmbeddingWithLoRA) +vllm.lora.utils._all_lora_classes = tuple( + HPUVocabParallelEmbeddingWithLoRA if cls is VocabParallelEmbeddingWithLoRA else cls for cls in _all_lora_classes) diff --git a/vllm_gaudi/v1/sample/hpu_rejection_sampler.py b/vllm_gaudi/v1/sample/hpu_rejection_sampler.py index 8e0ba917df..7dcaa2395a 100644 --- a/vllm_gaudi/v1/sample/hpu_rejection_sampler.py +++ b/vllm_gaudi/v1/sample/hpu_rejection_sampler.py @@ -139,6 +139,8 @@ def rejection_sample( # [batch_size, 1] bonus_token_ids: torch.Tensor, sampling_metadata: SamplingMetadata, + synthetic_mode: bool = False, + synthetic_conditional_rates: Optional[torch.Tensor] = None, ) -> torch.Tensor: assert sampling_metadata.all_greedy, "Only greedy sampling is supported." diff --git a/vllm_gaudi/v1/worker/hpu_model_runner.py b/vllm_gaudi/v1/worker/hpu_model_runner.py index 3574122b01..41f0eff499 100644 --- a/vllm_gaudi/v1/worker/hpu_model_runner.py +++ b/vllm_gaudi/v1/worker/hpu_model_runner.py @@ -1988,12 +1988,13 @@ def _get_prompts_and_decodes( requests = scheduler_output.kv_connector_metadata.reqs_to_save | \ scheduler_output.kv_connector_metadata.reqs_to_recv elif isinstance(scheduler_output.kv_connector_metadata, OffloadingConnectorMetadata): - for req in scheduler_output.kv_connector_metadata.reqs_to_store: - requests_type[req] = 'prefill' - for req in scheduler_output.kv_connector_metadata.reqs_to_load: - requests_type[req] = 'decode' - requests = scheduler_output.kv_connector_metadata.reqs_to_store | \ - scheduler_output.kv_connector_metadata.reqs_to_load + store_reqs = {job.req_id for job in scheduler_output.kv_connector_metadata.store_jobs.values()} + load_reqs = {job.req_id for job in scheduler_output.kv_connector_metadata.load_jobs.values()} + for req in store_reqs: + requests_type[req] = "prefill" + for req in load_reqs: + requests_type[req] = "decode" + requests = store_reqs | load_reqs elif isinstance(scheduler_output.kv_connector_metadata, MultiKVConnectorMetadata): for i, metadata in enumerate(scheduler_output.kv_connector_metadata.metadata): if isinstance(metadata, NixlConnectorMetadata) and (metadata.reqs_to_save or metadata.reqs_to_recv): @@ -2002,13 +2003,15 @@ def _get_prompts_and_decodes( for req in metadata.reqs_to_recv: requests_type[req] = 'decode' requests = metadata.reqs_to_save | metadata.reqs_to_recv - elif isinstance(metadata, OffloadingConnectorMetadata) and (metadata.reqs_to_store - or metadata.reqs_to_load): - for req in metadata.reqs_to_store: - requests_type[req] = 'prefill' - for req in metadata.reqs_to_load: - requests_type[req] = 'decode' - requests = metadata.reqs_to_store | metadata.reqs_to_load + elif isinstance(metadata, OffloadingConnectorMetadata) and (metadata.store_jobs + or metadata.load_jobs): + store_reqs = {job.req_id for job in metadata.store_jobs.values()} + load_reqs = {job.req_id for job in metadata.load_jobs.values()} + for req in store_reqs: + requests_type[req] = "prefill" + for req in load_reqs: + requests_type[req] = "decode" + requests = store_reqs | load_reqs else: requests = scheduler_output.kv_connector_metadata.requests # Traverse decodes first @@ -4659,9 +4662,14 @@ def _sync_moe_kernel_flags(module: torch.nn.Module): # by _gate; setting _gate=None makes it return False. experts._gate = None else: - # INC wrappers (e.g. PatchedMixtralMoE) don't inherit - # the property — set a plain attribute instead. - experts.is_internal_router = False + # INC wrappers (e.g. PatchedMixtralMoE) may inherit + # is_internal_router as a read-only @property; + # runner.gate = None below handles that case. + if not isinstance( + getattr(type(experts), "is_internal_router", None), + property, + ): + experts.is_internal_router = False runner = getattr(experts, "runner", None) if runner is not None and hasattr(runner, "gate"): runner.gate = None