diff --git a/tests/v1/core/test_kv_cache_utils.py b/tests/v1/core/test_kv_cache_utils.py index 8153fed699fe..a1895af0c415 100644 --- a/tests/v1/core/test_kv_cache_utils.py +++ b/tests/v1/core/test_kv_cache_utils.py @@ -9,7 +9,7 @@ import torch import vllm.v1.core.kv_cache_utils as kv_cache_utils -from vllm.config import ModelConfig, SchedulerConfig, VllmConfig +from vllm.config import CacheConfig, ModelConfig, SchedulerConfig, VllmConfig from vllm.lora.request import LoRARequest from vllm.multimodal.inputs import ( MultiModalFeatureSpec, @@ -18,7 +18,9 @@ ) from vllm.sampling_params import SamplingParams from vllm.utils.hashing import sha256, sha256_cbor +from vllm.utils.math_utils import cdiv from vllm.utils.mem_constants import GiB_bytes +from vllm.v1.core.block_pool import BlockPool from vllm.v1.core.kv_cache_manager import KVCacheManager from vllm.v1.core.kv_cache_utils import ( BlockHash, @@ -2135,3 +2137,1440 @@ def test_unify_hybrid_kv_cache_specs(): with pytest.raises(ValueError): kv_cache_utils.unify_hybrid_kv_cache_specs(kv_cache_spec) + + +def _make_qwen35_specs( + kv_dtype: torch.dtype = torch.bfloat16, + mamba_dtype: torch.dtype = torch.bfloat16, + block_size: int = 16, + mamba_cache_mode: str = "none", + num_speculative_blocks: int = 0, +): + """Build KV cache specs matching real Qwen3.5 architecture. + + Both Qwen3.5-4B and 9B share identical KV cache dimensions: + - Attention: 4 KV heads, 256 head_dim + - GatedDeltaNet: conv(3, 8192) + temporal(32, 128, 128) + - 32 layers: 24 GatedDeltaNet + 8 full attention (3:1 ratio) + The models differ only in hidden_size (2560 vs 4096) which does not + affect KV cache or recurrent state sizes. + """ + attention_spec = FullAttentionSpec( + block_size=block_size, + num_kv_heads=4, + head_size=256, + dtype=kv_dtype, + ) + mamba_spec = MambaSpec( + block_size=block_size, + shapes=((3, 8192), (32, 128, 128)), + dtypes=(mamba_dtype, mamba_dtype), + mamba_cache_mode=mamba_cache_mode, + num_speculative_blocks=num_speculative_blocks, + ) + # Qwen3.5 layer pattern: every 4th layer is full attention + kv_cache_specs: dict[str, KVCacheSpec] = {} + for i in range(32): + if (i + 1) % 4 == 0: + kv_cache_specs[f"layer_{i}"] = attention_spec + else: + kv_cache_specs[f"layer_{i}"] = mamba_spec + return kv_cache_specs, attention_spec, mamba_spec + + +# --------------------------------------------------------------------------- +# Qwen3.5 hybrid Mamba+attention tests +# --------------------------------------------------------------------------- + + +def test_has_mixed_mamba_attention(): + """_has_mixed_mamba_attention returns True only for mixed groups.""" + kv_cache_specs, attn_spec, mamba_spec = _make_qwen35_specs() + + # Pure attention -> False + assert not kv_cache_utils._has_mixed_mamba_attention( + [KVCacheGroupSpec([f"layer_{i}" for i in range(8)], attn_spec)] + ) + # Pure Mamba -> False + assert not kv_cache_utils._has_mixed_mamba_attention( + [KVCacheGroupSpec([f"layer_{i}" for i in range(24)], mamba_spec)] + ) + # Mixed (Qwen3.5 layout) -> True + assert kv_cache_utils._has_mixed_mamba_attention( + [ + KVCacheGroupSpec([f"layer_{i}" for i in range(24)], mamba_spec), + KVCacheGroupSpec([f"layer_{i}" for i in range(24, 32)], attn_spec), + ] + ) + + +@pytest.mark.parametrize( + "kv_dtype, mamba_dtype, model_tag", + [ + (torch.bfloat16, torch.bfloat16, "Qwen3.5-4B/9B bf16"), + (torch.float16, torch.float16, "Qwen3.5-4B/9B fp16"), + (torch.float8_e4m3fn, torch.bfloat16, "Qwen3.5-4B/9B fp8-kv"), + ], + ids=["bf16", "fp16", "fp8-kv"], +) +def test_qwen35_allocation_per_layer_tensors(kv_dtype, mamba_dtype, model_tag): + """Verify per-layer tensor allocation for real Qwen3.5 specs. + + Each of the 32 layers should get its own tensor at its natural page size. + Attention and GatedDeltaNet tensors must have different sizes. + Total allocation must be efficient (>90% of available memory used). + """ + model_config = ModelConfig(max_model_len=1024) + vllm_config = VllmConfig(model_config=model_config) + + kv_cache_specs, attn_spec, mamba_spec = _make_qwen35_specs( + kv_dtype=kv_dtype, mamba_dtype=mamba_dtype + ) + attn_page = attn_spec.page_size_bytes + mamba_page = mamba_spec.page_size_bytes + + # Give enough memory for ~10 blocks + total_page_per_block = 8 * attn_page + 24 * mamba_page + available_memory = total_page_per_block * 10 + + kv_cache_config = kv_cache_utils.get_kv_cache_configs( + vllm_config, [kv_cache_specs], [available_memory] + )[0] + + # 32 tensors, one per layer + assert len(kv_cache_config.kv_cache_tensors) == 32, ( + f"{model_tag}: expected 32 per-layer tensors, " + f"got {len(kv_cache_config.kv_cache_tensors)}" + ) + + # Each tensor serves exactly one layer + for t in kv_cache_config.kv_cache_tensors: + assert len(t.shared_by) == 1 + + # Separate attention vs Mamba tensors + attn_tensors = [ + t + for t in kv_cache_config.kv_cache_tensors + if kv_cache_specs[t.shared_by[0]] is attn_spec + ] + mamba_tensors = [ + t + for t in kv_cache_config.kv_cache_tensors + if kv_cache_specs[t.shared_by[0]] is mamba_spec + ] + assert len(attn_tensors) == 8, f"{model_tag}: expected 8 attention tensors" + assert len(mamba_tensors) == 24, f"{model_tag}: expected 24 Mamba tensors" + + # Tensor sizes match their spec's page_size * block count. + # With compact allocation, Mamba uses mamba_num_blocks (not num_blocks). + num_blocks = kv_cache_config.num_blocks + mamba_num_blocks = kv_cache_config.mamba_num_blocks or num_blocks + assert num_blocks > 0 + for t in attn_tensors: + assert t.size == attn_page * num_blocks + for t in mamba_tensors: + assert t.size == mamba_page * mamba_num_blocks + + # Attention and Mamba tensors have DIFFERENT sizes (not padded uniform) + assert attn_tensors[0].size != mamba_tensors[0].size, ( + f"{model_tag}: tensors should differ — " + f"attn={attn_tensors[0].size}, mamba={mamba_tensors[0].size}" + ) + + # Allocation is efficient: >90% of available memory used + total_allocated = sum(t.size for t in kv_cache_config.kv_cache_tensors) + efficiency = total_allocated / available_memory + assert efficiency > 0.90, ( + f"{model_tag}: allocation efficiency {efficiency:.1%} < 90%" + ) + + +@pytest.mark.parametrize( + "kv_dtype, mamba_dtype", + [ + (torch.bfloat16, torch.bfloat16), + (torch.float16, torch.float16), + (torch.float8_e4m3fn, torch.bfloat16), + ], + ids=["bf16", "fp16", "fp8-kv"], +) +def test_qwen35_concurrency_estimate(kv_dtype, mamba_dtype): + """Verify concurrency estimate correctly weights Mamba vs attention cost. + + For Qwen3.5, Mamba's 24 layers have O(1) state per request (~26 MiB total + at bf16) while attention's 8 layers have O(n) KV (~1 GiB at 32K context). + The concurrency estimate must reflect that attention dominates cost. + """ + max_model_len = 32768 + model_config = ModelConfig(max_model_len=max_model_len) + scheduler_config = SchedulerConfig( + max_num_batched_tokens=1024, + enable_chunked_prefill=True, + max_model_len=max_model_len, + is_encoder_decoder=model_config.is_encoder_decoder, + ) + vllm_config = VllmConfig( + model_config=model_config, + scheduler_config=scheduler_config, + ) + + _, attn_spec, mamba_spec = _make_qwen35_specs( + kv_dtype=kv_dtype, mamba_dtype=mamba_dtype + ) + + # Compute expected values + attn_max_mem = attn_spec.max_memory_usage_bytes(vllm_config) # O(n) + mamba_max_mem = mamba_spec.max_memory_usage_bytes(vllm_config) # O(1) + + # Mamba per-request cost should be a small fraction of attention + total_attn_cost = 8 * attn_max_mem + total_mamba_cost = 24 * mamba_max_mem + mamba_fraction = total_mamba_cost / (total_attn_cost + total_mamba_cost) + assert mamba_fraction < 0.10, ( + f"Mamba should be <10% of per-request cost, got {mamba_fraction:.1%}" + ) + + # Compute blocks-per-request using same formula as our implementation: + # total_per_request = sum(layers_in_group * spec.max_memory) + # total_per_block = sum(layers_in_group * spec.page_size) + # blocks_per_request = ceil(total_per_request / total_per_block) + total_per_request = 8 * attn_max_mem + 24 * mamba_max_mem + total_per_block = 8 * attn_spec.page_size_bytes + 24 * mamba_spec.page_size_bytes + blocks_per_request = (total_per_request + total_per_block - 1) // total_per_block + + # Give enough blocks for ~3 concurrent requests + num_blocks = blocks_per_request * 3 + + kv_cache_config = KVCacheConfig( + num_blocks=num_blocks, + kv_cache_tensors=[], + kv_cache_groups=[ + KVCacheGroupSpec([f"layer_{i}" for i in range(24)], mamba_spec), + KVCacheGroupSpec([f"layer_{i}" for i in range(24, 32)], attn_spec), + ], + ) + concurrency = get_max_concurrency_for_kv_cache_config(vllm_config, kv_cache_config) + + # Concurrency should be exactly 3 (we gave exactly 3x blocks_per_request) + assert concurrency == 3.0, f"Expected 3.0 concurrency, got {concurrency:.2f}" + + +def test_qwen35_groups_skip_page_size_unification(): + """Page size unification is skipped for Qwen3.5 mixed Mamba+attention. + + Without this, unify_kv_cache_spec_page_size would pad one spec's page + size to match the other, wasting memory. + """ + model_config = ModelConfig(max_model_len=1024) + vllm_config = VllmConfig(model_config=model_config) + + kv_cache_specs, attn_spec, mamba_spec = _make_qwen35_specs() + attn_page = attn_spec.page_size_bytes + mamba_page = mamba_spec.page_size_bytes + + groups = kv_cache_utils.get_kv_cache_groups(vllm_config, kv_cache_specs) + + # Must have both Mamba and attention groups + attn_groups = [g for g in groups if not isinstance(g.kv_cache_spec, MambaSpec)] + mamba_groups = [g for g in groups if isinstance(g.kv_cache_spec, MambaSpec)] + assert len(attn_groups) >= 1 + assert len(mamba_groups) >= 1 + + # Page sizes must be preserved (not padded to match each other) + for g in attn_groups: + assert g.kv_cache_spec.page_size_bytes == attn_page + for g in mamba_groups: + assert g.kv_cache_spec.page_size_bytes == mamba_page + assert attn_page != mamba_page + + +def test_qwen35_mamba_cache_mode_all_includes_mamba_in_token_count(): + """When mamba_cache_mode='all', Mamba states are cached per-token for + prefix caching. The token capacity report must include Mamba groups.""" + model_config = ModelConfig(max_model_len=1024) + cache_config = CacheConfig(mamba_cache_mode="all") + vllm_config = VllmConfig(model_config=model_config, cache_config=cache_config) + + _, attn_spec, mamba_spec = _make_qwen35_specs() + + kv_cache_config = KVCacheConfig( + num_blocks=320, + kv_cache_tensors=[], + kv_cache_groups=[ + KVCacheGroupSpec([f"layer_{i}" for i in range(24)], mamba_spec), + KVCacheGroupSpec([f"layer_{i}" for i in range(24, 32)], attn_spec), + ], + ) + + # In "all" mode, all 32 groups count toward token capacity + # In "none" mode, only 8 attention groups would count + # We verify by checking _report_kv_cache_config runs without error + # and that the filter includes all groups + mamba_cache_mode = vllm_config.cache_config.mamba_cache_mode + attention_groups = [ + g + for g in kv_cache_config.kv_cache_groups + if not isinstance(g.kv_cache_spec, MambaSpec) or mamba_cache_mode == "all" + ] + assert len(attention_groups) == 2, ( + "In 'all' mode, both Mamba and attention groups should be included" + ) + + # Contrast with "none" mode — only attention groups + vllm_config_none = VllmConfig(model_config=model_config) + mamba_cache_mode_none = vllm_config_none.cache_config.mamba_cache_mode + attention_groups_none = [ + g + for g in kv_cache_config.kv_cache_groups + if not isinstance(g.kv_cache_spec, MambaSpec) or mamba_cache_mode_none == "all" + ] + assert len(attention_groups_none) == 1, ( + "In 'none' mode, only attention groups should be included" + ) + + +def test_qwen35_pure_attention_and_pure_mamba_unaffected(): + """Our changes must not affect pure-attention or pure-Mamba models.""" + model_config = ModelConfig(max_model_len=1024) + vllm_config = VllmConfig(model_config=model_config) + + _, attn_spec, mamba_spec = _make_qwen35_specs() + + # Pure attention (e.g. Llama) — should NOT hit mixed path + attn_specs: dict[str, KVCacheSpec] = {f"layer_{i}": attn_spec for i in range(32)} + attn_groups = kv_cache_utils.get_kv_cache_groups(vllm_config, attn_specs) + assert not kv_cache_utils._has_mixed_mamba_attention(attn_groups) + + # Pure Mamba (e.g. Mamba2) — should NOT hit mixed path + mamba_specs: dict[str, KVCacheSpec] = {f"layer_{i}": mamba_spec for i in range(32)} + mamba_groups = kv_cache_utils.get_kv_cache_groups(vllm_config, mamba_specs) + assert not kv_cache_utils._has_mixed_mamba_attention(mamba_groups) + + +# --------------------------------------------------------------------------- +# Compact Mamba allocation tests +# --------------------------------------------------------------------------- + +# Qwen3.5 architecture: 32 layers, every 4th is attention (see _make_qwen35_specs) +_QWEN35_NUM_MAMBA_LAYERS = 24 +_QWEN35_NUM_ATTN_LAYERS = 8 + + +def _total_page_per_block(kv_cache_specs: dict[str, KVCacheSpec]) -> int: + """Total page size across all layers for one block.""" + return sum(spec.page_size_bytes for spec in kv_cache_specs.values()) + + +def test_estimate_consistent_with_allocation(): + """The memory estimate must be consistent with the compact allocation. + + If the estimate says max_model_len=M fits, the allocation MUST have + enough attention blocks for M tokens. This is the OOM-prevention invariant. + """ + model_config = ModelConfig(max_model_len=1024) + vllm_config = VllmConfig(model_config=model_config) + + kv_cache_specs, attn_spec, mamba_spec = _make_qwen35_specs() + block_size = attn_spec.block_size + + # Test at several memory levels + total_page_per_block = _total_page_per_block(kv_cache_specs) + for num_blocks_target in [5, 10, 50, 200]: + available_memory = total_page_per_block * num_blocks_target + + kv_cache_config = kv_cache_utils.get_kv_cache_configs( + vllm_config, [kv_cache_specs], [available_memory] + )[0] + + # Attention blocks must be enough for at least 1 full request + blocks_needed_for_max_model_len = cdiv( + vllm_config.model_config.max_model_len, block_size + ) + assert kv_cache_config.num_blocks >= blocks_needed_for_max_model_len, ( + f"OOM invariant violated: {kv_cache_config.num_blocks} attention " + f"blocks < {blocks_needed_for_max_model_len} needed for " + f"max_model_len={vllm_config.model_config.max_model_len}" + ) + + # Mamba blocks must exist (at least 1 concurrent request) + if kv_cache_config.mamba_num_blocks is not None: + assert kv_cache_config.mamba_num_blocks >= 1, ( + "Mamba must have at least 1 block for 1 concurrent request" + ) + + +def test_compact_mamba_allocation_sizes(): + """Compact allocation gives Mamba much fewer blocks than attention. + + Mamba tensors should be sized for the compact block count, not the + attention block count. + """ + model_config = ModelConfig(max_model_len=1024) + vllm_config = VllmConfig(model_config=model_config) + + kv_cache_specs, attn_spec, mamba_spec = _make_qwen35_specs() + attn_page = attn_spec.page_size_bytes + mamba_page = mamba_spec.page_size_bytes + + # Give plenty of memory so the difference is stark + total_page_per_block = _total_page_per_block(kv_cache_specs) + available_memory = total_page_per_block * 200 + + kv_cache_config = kv_cache_utils.get_kv_cache_configs( + vllm_config, [kv_cache_specs], [available_memory] + )[0] + + # Compact allocation should be active (mamba_cache_mode defaults to "none") + assert kv_cache_config.mamba_num_blocks is not None, ( + "Compact allocation should be active for default mamba_cache_mode" + ) + + # Separate tensors + attn_tensors = [ + t + for t in kv_cache_config.kv_cache_tensors + if kv_cache_specs[t.shared_by[0]] is attn_spec + ] + mamba_tensors = [ + t + for t in kv_cache_config.kv_cache_tensors + if kv_cache_specs[t.shared_by[0]] is mamba_spec + ] + + # Mamba tensors should be much smaller than attention tensors + mamba_blocks = mamba_tensors[0].size // mamba_page + attn_blocks = attn_tensors[0].size // attn_page + + assert mamba_blocks < attn_blocks, ( + f"Mamba blocks ({mamba_blocks}) should be << attention blocks ({attn_blocks})" + ) + assert mamba_blocks == kv_cache_config.mamba_num_blocks + assert attn_blocks == kv_cache_config.num_blocks + + # Attention gets more total memory than Mamba + total_attn_mem = sum(t.size for t in attn_tensors) + total_mamba_mem = sum(t.size for t in mamba_tensors) + assert total_attn_mem > total_mamba_mem, ( + f"Attention memory ({total_attn_mem}) should be > Mamba memory " + f"({total_mamba_mem})" + ) + + +def test_token_capacity_improvement(): + """Compact allocation should yield much higher token capacity than the + old shared-pool approach. + + The old approach gives all layers the same num_blocks. For Qwen3.5 with + 24 Mamba layers at ~1 MB page size, this wastes enormous amounts of + memory. The compact approach should yield at least 5x more tokens. + """ + model_config = ModelConfig(max_model_len=1024) + vllm_config = VllmConfig(model_config=model_config) + + kv_cache_specs, attn_spec, mamba_spec = _make_qwen35_specs() + attn_page = attn_spec.page_size_bytes + block_size = attn_spec.block_size + + # 10 GB available + available_memory = 10 * GiB_bytes + + # Old approach: all layers share num_blocks + total_page_per_block = _total_page_per_block(kv_cache_specs) + old_num_blocks = int(available_memory // total_page_per_block) + old_token_capacity = old_num_blocks * block_size + + # New approach: compact allocation + kv_cache_config = kv_cache_utils.get_kv_cache_configs( + vllm_config, [kv_cache_specs], [available_memory] + )[0] + new_token_capacity = kv_cache_config.num_blocks * block_size + + # The improvement ratio is roughly total_page / attn_page_total because + # compact allocation lets attention use nearly all the memory. + # For Qwen3.5: total ≈ 26.9 MB, attn ≈ 0.5 MB → ~50x theoretical max. + # Use a conservative floor that still validates the optimization works. + attn_page_total = _QWEN35_NUM_ATTN_LAYERS * attn_page + expected_ratio = total_page_per_block / attn_page_total + conservative_floor = expected_ratio / 10 # 10% of theoretical max + assert new_token_capacity > old_token_capacity * conservative_floor, ( + f"New capacity ({new_token_capacity} tokens) should be " + f">{conservative_floor:.0f}x old ({old_token_capacity} tokens), " + f"got {new_token_capacity / old_token_capacity:.1f}x" + ) + + +def test_compact_mamba_not_used_for_mode_all(): + """When mamba_cache_mode='all', Mamba should share the block pool. + + Compact allocation is only for "none" and "align" modes where Mamba + state is O(1) per request. + """ + model_config = ModelConfig(max_model_len=1024) + cache_config = CacheConfig(mamba_cache_mode="all") + vllm_config = VllmConfig(model_config=model_config, cache_config=cache_config) + + kv_cache_specs, attn_spec, mamba_spec = _make_qwen35_specs() + + total_page_per_block = _total_page_per_block(kv_cache_specs) + # "all" mode needs cdiv(max_model_len, block_size) blocks minimum + # to serve one max-length request. Double it for headroom. + block_size = attn_spec.block_size + min_blocks = cdiv(model_config.max_model_len, block_size) + available_memory = total_page_per_block * min_blocks * 2 + + kv_cache_config = kv_cache_utils.get_kv_cache_configs( + vllm_config, [kv_cache_specs], [available_memory] + )[0] + + # mamba_num_blocks should be None (shared pool) + assert kv_cache_config.mamba_num_blocks is None, ( + "mamba_cache_mode='all' should not use compact allocation" + ) + + # All tensors should have the same num_blocks + num_blocks = kv_cache_config.num_blocks + for t in kv_cache_config.kv_cache_tensors: + layer_name = t.shared_by[0] + spec = kv_cache_specs[layer_name] + expected_size = spec.page_size_bytes * num_blocks + assert t.size == expected_size, ( + f"Layer {layer_name}: size {t.size} != expected {expected_size}" + ) + + +def test_concurrency_reflects_actual_capacity(): + """Concurrency for compact allocation should reflect both attention and + Mamba capacity, and should allow multiple concurrent requests.""" + model_config = ModelConfig(max_model_len=1024) + vllm_config = VllmConfig(model_config=model_config) + + kv_cache_specs, attn_spec, mamba_spec = _make_qwen35_specs() + + # Give enough memory for many requests + total_page_per_block = _total_page_per_block(kv_cache_specs) + available_memory = total_page_per_block * 200 + + kv_cache_config = kv_cache_utils.get_kv_cache_configs( + vllm_config, [kv_cache_specs], [available_memory] + )[0] + + concurrency = get_max_concurrency_for_kv_cache_config(vllm_config, kv_cache_config) + + # Should support multiple concurrent requests + assert concurrency > 1, f"Concurrency should be > 1, got {concurrency:.2f}" + + # Concurrency should be approximately the compact allocation's + # num_concurrent (Mamba is the tighter constraint by design) + if kv_cache_config.mamba_num_blocks is not None: + mamba_blocks_per_req = max( + ( + g.kv_cache_spec.max_memory_usage_bytes(vllm_config) + + g.kv_cache_spec.page_size_bytes + - 1 + ) + // g.kv_cache_spec.page_size_bytes + for g in kv_cache_config.kv_cache_groups + if isinstance(g.kv_cache_spec, MambaSpec) + ) + mamba_concurrency = kv_cache_config.mamba_num_blocks / mamba_blocks_per_req + # Concurrency = min(attn, mamba), so it must be <= mamba capacity + assert concurrency <= mamba_concurrency + 1e-9 # float tolerance + + +def test_pure_models_unaffected_by_compact_allocation(): + """Pure attention and pure Mamba models should not use compact allocation. + + This is a regression guard: the compact path is gated by + _has_mixed_mamba_attention(). + """ + model_config = ModelConfig(max_model_len=1024) + vllm_config = VllmConfig(model_config=model_config) + + _, attn_spec, mamba_spec = _make_qwen35_specs() + + # Pure attention model + attn_specs: dict[str, KVCacheSpec] = {f"layer_{i}": attn_spec for i in range(32)} + attn_config = kv_cache_utils.get_kv_cache_configs( + vllm_config, + [attn_specs], + [attn_spec.page_size_bytes * 32 * 100], + )[0] + assert attn_config.mamba_num_blocks is None, ( + "Pure attention model should not have mamba_num_blocks" + ) + + # Pure Mamba model + mamba_specs_dict: dict[str, KVCacheSpec] = { + f"layer_{i}": mamba_spec for i in range(32) + } + mamba_config = kv_cache_utils.get_kv_cache_configs( + vllm_config, + [mamba_specs_dict], + [mamba_spec.page_size_bytes * 32 * 100], + )[0] + assert mamba_config.mamba_num_blocks is None, ( + "Pure Mamba model should not have mamba_num_blocks" + ) + + +def test_compact_allocation_low_memory_floor(): + """When memory barely fits 1 request, the max(1,...) floor on + num_concurrent must kick in. + + This exercises the edge case where optimal_C < 1. The floor guarantees + at least 1 concurrent request, and therefore: + - mamba_blocks >= mamba_blocks_per_req (enough for 1 request) + - attention_num_blocks >= blocks_per_attn_request (enough for max_model_len) + """ + model_config = ModelConfig(max_model_len=1024) + vllm_config = VllmConfig(model_config=model_config) + + kv_cache_specs, attn_spec, mamba_spec = _make_qwen35_specs() + attn_page = attn_spec.page_size_bytes + mamba_page = mamba_spec.page_size_bytes + block_size = attn_spec.block_size + + blocks_for_max_model_len = cdiv(model_config.max_model_len, block_size) + + # Compute exact cost of 1 request: attention blocks * attn_page_total + # + mamba_blocks_per_req * mamba_page_cost. + attn_page_total = _QWEN35_NUM_ATTN_LAYERS * attn_page + mamba_page_cost = _QWEN35_NUM_MAMBA_LAYERS * mamba_page + # mamba_blocks_per_req = cdiv(max_memory_usage, page_size_bytes) + # For "none" mode: max_memory_usage = page_size_bytes (1 block) + mamba_blocks_per_req = 1 + + cost_of_one_request = ( + attn_page_total * blocks_for_max_model_len + + mamba_page_cost * mamba_blocks_per_req + ) + # Give exactly enough for ~1.1 requests (floor should cap to 1) + available_memory = int(cost_of_one_request * 1.1) + + kv_cache_config = kv_cache_utils.get_kv_cache_configs( + vllm_config, [kv_cache_specs], [available_memory] + )[0] + + # num_concurrent should be 1 (floor kicks in) + # Justification: mamba_blocks = num_concurrent * mamba_blocks_per_req + assert kv_cache_config.mamba_num_blocks is not None, ( + "Compact allocation should be active" + ) + assert kv_cache_config.mamba_num_blocks == mamba_blocks_per_req, ( + f"With 1 concurrent request, mamba_blocks should be " + f"{mamba_blocks_per_req}, got {kv_cache_config.mamba_num_blocks}" + ) + + # Attention blocks must still fit max_model_len (OOM invariant). + # Justification: if this fails, a single max-length request would OOM. + assert kv_cache_config.num_blocks >= blocks_for_max_model_len, ( + f"Attention blocks {kv_cache_config.num_blocks} < " + f"{blocks_for_max_model_len} needed for max_model_len" + ) + + +def test_compact_allocation_capped_by_max_num_seqs(): + """When max_num_seqs caps num_concurrent, the freed Mamba budget + should go to attention blocks. + + With huge memory and max_num_seqs=4, optimal_C would be >> 4 but + gets capped. The Mamba pool is sized for exactly 4 requests, and + the remaining memory goes to attention. + """ + max_num_seqs = 4 + model_config = ModelConfig(max_model_len=1024) + scheduler_config = SchedulerConfig( + max_model_len=model_config.max_model_len, + is_encoder_decoder=False, + max_num_seqs=max_num_seqs, + ) + vllm_config = VllmConfig( + model_config=model_config, scheduler_config=scheduler_config + ) + + kv_cache_specs, attn_spec, mamba_spec = _make_qwen35_specs() + attn_page = attn_spec.page_size_bytes + mamba_page = mamba_spec.page_size_bytes + + # Give huge memory (1000 blocks worth) + total_page_per_block = _total_page_per_block(kv_cache_specs) + available_memory = total_page_per_block * 1000 + + kv_cache_config = kv_cache_utils.get_kv_cache_configs( + vllm_config, [kv_cache_specs], [available_memory] + )[0] + + assert kv_cache_config.mamba_num_blocks is not None + + # Justification: mamba_blocks = min(optimal_C, max_num_seqs) * blocks_per_req. + # With "none" mode, blocks_per_req = 1, so mamba_blocks should be exactly 4. + mamba_blocks_per_req = 1 + expected_mamba_blocks = max_num_seqs * mamba_blocks_per_req + assert kv_cache_config.mamba_num_blocks == expected_mamba_blocks, ( + f"Expected {expected_mamba_blocks} mamba blocks (capped by " + f"max_num_seqs={max_num_seqs}), got {kv_cache_config.mamba_num_blocks}" + ) + + # Justification: with only 4 Mamba blocks, nearly all memory goes to + # attention. Attention blocks should be much higher than the uncapped case + # would give per-concurrent-request. Specifically, nearly all available + # memory minus 4*mamba_cost should be in attention. + mamba_page_cost = _QWEN35_NUM_MAMBA_LAYERS * mamba_page + mamba_total = expected_mamba_blocks * mamba_page_cost + attn_page_total = _QWEN35_NUM_ATTN_LAYERS * attn_page + expected_attn_blocks = int((available_memory - mamba_total) // attn_page_total) + assert kv_cache_config.num_blocks == expected_attn_blocks, ( + f"Attention blocks {kv_cache_config.num_blocks} != expected " + f"{expected_attn_blocks} (available - mamba_cost)" + ) + + +def test_cross_worker_mamba_scaling(): + """Multi-worker configs with different available memory should be + synchronized to the minimum mamba_num_blocks and num_blocks. + + This exercises the cross-worker tensor scaling path that scales Mamba + and attention tensors independently using _is_mamba_layer(). + """ + model_config = ModelConfig(max_model_len=1024) + vllm_config = VllmConfig(model_config=model_config) + + kv_cache_specs, attn_spec, mamba_spec = _make_qwen35_specs() + attn_page = attn_spec.page_size_bytes + mamba_page = mamba_spec.page_size_bytes + + total_page_per_block = _total_page_per_block(kv_cache_specs) + # Worker 0 has more memory than worker 1 + mem_worker_0 = total_page_per_block * 200 + mem_worker_1 = total_page_per_block * 100 + + configs = kv_cache_utils.get_kv_cache_configs( + vllm_config, + [kv_cache_specs, kv_cache_specs], + [mem_worker_0, mem_worker_1], + ) + + # Justification: cross-worker sync sets all configs to the minimum. + # Both workers must have identical num_blocks and mamba_num_blocks. + assert configs[0].num_blocks == configs[1].num_blocks, ( + "num_blocks must be synchronized across workers" + ) + assert configs[0].mamba_num_blocks == configs[1].mamba_num_blocks, ( + "mamba_num_blocks must be synchronized across workers" + ) + + # The synced values should match the smaller worker's allocation + single_config = kv_cache_utils.get_kv_cache_configs( + vllm_config, [kv_cache_specs], [mem_worker_1] + )[0] + assert configs[0].num_blocks == single_config.num_blocks, ( + "Synced num_blocks should match the smaller worker's allocation" + ) + assert configs[0].mamba_num_blocks == single_config.mamba_num_blocks, ( + "Synced mamba_num_blocks should match the smaller worker's allocation" + ) + + # Justification: tensor sizes must be scaled to match the synced block counts. + # Mamba tensors use mamba_num_blocks, attention tensors use num_blocks. + for cfg in configs: + for tensor in cfg.kv_cache_tensors: + layer_name = tensor.shared_by[0] + spec = kv_cache_specs[layer_name] + if isinstance(spec, MambaSpec): + assert tensor.size == mamba_page * cfg.mamba_num_blocks, ( + f"Mamba tensor {layer_name}: size {tensor.size} != " + f"{mamba_page} * {cfg.mamba_num_blocks}" + ) + else: + assert tensor.size == attn_page * cfg.num_blocks, ( + f"Attn tensor {layer_name}: size {tensor.size} != " + f"{attn_page} * {cfg.num_blocks}" + ) + + # Justification: generate_scheduler_kv_cache_config must not raise + # because mamba_num_blocks is consistent across workers. + scheduler_config_result = generate_scheduler_kv_cache_config(configs) + assert scheduler_config_result.mamba_num_blocks == configs[0].mamba_num_blocks + + +def test_compact_mamba_manager_allocate_and_free(): + """MambaManager in compact mode should allocate from and free to its + private compact pool, without touching the shared BlockPool. + + This validates the core lifecycle: allocate blocks for requests, + free them, and confirm they're reusable. + """ + from vllm.v1.core.single_type_kv_cache_manager import MambaManager + + _, _, mamba_spec = _make_qwen35_specs() + block_size = mamba_spec.block_size + + num_gpu_blocks = 100 + block_pool = BlockPool( + num_gpu_blocks=num_gpu_blocks, + enable_caching=False, + hash_block_size=block_size, + ) + initial_pool_free = block_pool.free_block_queue.num_free_blocks + + mamba_num_blocks = 5 + manager = MambaManager( + kv_cache_spec=mamba_spec, + block_pool=block_pool, + mamba_num_blocks=mamba_num_blocks, + enable_caching=False, + kv_cache_group_id=0, + ) + + # Justification: compact mode should be active when mamba_num_blocks is set + # and mamba_cache_mode != "all". + assert manager.compact_mode is True + assert len(manager._compact_free) == mamba_num_blocks + + # Allocate blocks for 3 requests (1 block each for 16 tokens) + for i in range(3): + req_id = f"req_{i}" + blocks = manager.allocate_new_blocks(req_id, block_size, block_size) + assert len(blocks) == 1, f"Expected 1 block for req_{i}" + # Justification: each block ID should be in [0, mamba_num_blocks) + assert 0 <= blocks[0].block_id < mamba_num_blocks + + # Justification: 3 blocks allocated from 5 total, so 2 should remain free. + assert len(manager._compact_free) == 2 + + # Justification: shared BlockPool must not be touched in compact mode. + assert block_pool.free_block_queue.num_free_blocks == initial_pool_free, ( + "BlockPool free count changed — compact blocks leaked into shared pool" + ) + + # Free one request and verify block returns to compact pool. + manager.free("req_1") + # Justification: freeing 1 request returns its 1 block to the compact pool. + assert len(manager._compact_free) == 3 + + # Allocate another request — should reuse the freed block. + blocks = manager.allocate_new_blocks("req_3", block_size, block_size) + assert len(blocks) == 1 + # Justification: the freed block from req_1 should be reused (LIFO stack). + assert len(manager._compact_free) == 2 + + # Justification: BlockPool must still be untouched after all operations. + assert block_pool.free_block_queue.num_free_blocks == initial_pool_free + + +def test_compact_mamba_manager_exhaustion_rejects(): + """When the compact pool is exhausted, get_num_blocks_to_allocate + must return a rejection signal (> num_gpu_blocks). + + This prevents over-allocation and is the signal to the scheduler + to not schedule this request in the current step. + """ + from vllm.v1.core.single_type_kv_cache_manager import MambaManager + + _, _, mamba_spec = _make_qwen35_specs() + block_size = mamba_spec.block_size + + num_gpu_blocks = 100 + block_pool = BlockPool( + num_gpu_blocks=num_gpu_blocks, + enable_caching=False, + hash_block_size=block_size, + ) + + mamba_num_blocks = 2 + manager = MambaManager( + kv_cache_spec=mamba_spec, + block_pool=block_pool, + mamba_num_blocks=mamba_num_blocks, + enable_caching=False, + kv_cache_group_id=0, + ) + + # Fill the compact pool: 2 requests × 1 block each + manager.allocate_new_blocks("req_0", block_size, block_size) + manager.allocate_new_blocks("req_1", block_size, block_size) + assert len(manager._compact_free) == 0 + + # Justification: with 0 free compact blocks and a new request needing 1, + # get_num_blocks_to_allocate must return > num_gpu_blocks to signal rejection. + num_to_alloc = manager.get_num_blocks_to_allocate( + request_id="req_2", + num_tokens=block_size, + new_computed_blocks=[], + total_computed_tokens=0, + num_tokens_main_model=block_size, + ) + assert num_to_alloc > num_gpu_blocks, ( + f"Expected rejection signal (>{num_gpu_blocks}), got {num_to_alloc}" + ) + + # After freeing one request, the same call should succeed (return 0). + manager.free("req_0") + num_to_alloc = manager.get_num_blocks_to_allocate( + request_id="req_2", + num_tokens=block_size, + new_computed_blocks=[], + total_computed_tokens=0, + num_tokens_main_model=block_size, + ) + # Justification: 0 means "no shared pool blocks needed" — compact handles it. + assert num_to_alloc == 0, ( + f"Expected 0 (compact handles allocation), got {num_to_alloc}" + ) + + +def test_compact_mamba_cache_blocks_noop(): + """cache_blocks in compact mode must be a no-op to prevent compact + block IDs from entering the shared pool's cache hash table. + + If compact IDs leak into the cache, they could collide with attention + block IDs and cause incorrect cache hits or block corruption. + """ + from vllm.v1.core.single_type_kv_cache_manager import MambaManager + + _, _, mamba_spec = _make_qwen35_specs() + block_size = mamba_spec.block_size + + num_gpu_blocks = 100 + block_pool = BlockPool( + num_gpu_blocks=num_gpu_blocks, + enable_caching=True, # Caching enabled to verify no-op + hash_block_size=block_size, + ) + + manager = MambaManager( + kv_cache_spec=mamba_spec, + block_pool=block_pool, + mamba_num_blocks=5, + enable_caching=True, + kv_cache_group_id=0, + ) + + # Allocate a block + manager.allocate_new_blocks("req_0", block_size, block_size) + + # Count cached blocks in the pool before + cached_before = len(block_pool.cached_block_hash_to_block) + + # Create a minimal request-like object for cache_blocks + req = make_request("req_0", list(range(block_size)), block_size) + manager.cache_blocks(req, block_size) + + # Justification: cache_blocks is a no-op in compact mode, so no new + # entries should appear in the block pool's cache. + cached_after = len(block_pool.cached_block_hash_to_block) + assert cached_after == cached_before, ( + f"Block pool cache grew from {cached_before} to {cached_after} — " + "compact block IDs leaked into shared cache" + ) + + +def test_all_mode_concurrency(): + """Concurrency for mamba_cache_mode='all' should use the standard + mixed formula (num_blocks / blocks_per_request), not the compact path. + + This verifies the 'all' mode branch of get_max_concurrency_for_kv_cache_config + wasn't broken when we added the compact branch. + """ + model_config = ModelConfig(max_model_len=1024) + cache_config = CacheConfig(mamba_cache_mode="all") + vllm_config = VllmConfig(model_config=model_config, cache_config=cache_config) + + kv_cache_specs, attn_spec, mamba_spec = _make_qwen35_specs() + block_size = attn_spec.block_size + + total_page_per_block = _total_page_per_block(kv_cache_specs) + min_blocks = cdiv(model_config.max_model_len, block_size) + available_memory = total_page_per_block * min_blocks * 2 + + kv_cache_config = kv_cache_utils.get_kv_cache_configs( + vllm_config, [kv_cache_specs], [available_memory] + )[0] + + # mamba_num_blocks is None for "all" mode + assert kv_cache_config.mamba_num_blocks is None + + concurrency = get_max_concurrency_for_kv_cache_config(vllm_config, kv_cache_config) + + # Justification: "all" mode uses num_blocks / blocks_per_request where + # blocks_per_request is based on per-request memory usage across all groups. + # With ~2x headroom, concurrency should be approximately 2. + assert concurrency > 1.0, ( + f"'all' mode concurrency should be > 1 with 2x headroom, got {concurrency:.2f}" + ) + + # Justification: concurrency is calculated as num_blocks / blocks_per_request. + # Verify it matches the manual calculation to confirm the right formula is used. + max_memory_per_req = sum( + len(g.layer_names) * g.kv_cache_spec.max_memory_usage_bytes(vllm_config) + for g in kv_cache_config.kv_cache_groups + ) + total_page = sum( + len(g.layer_names) * g.kv_cache_spec.page_size_bytes + for g in kv_cache_config.kv_cache_groups + ) + blocks_per_req = (max_memory_per_req + total_page - 1) // total_page + expected_concurrency = kv_cache_config.num_blocks / blocks_per_req + assert abs(concurrency - expected_concurrency) < 1e-9, ( + f"Concurrency {concurrency:.4f} != expected {expected_concurrency:.4f}" + ) + + +# --------------------------------------------------------------------------- +# Compact "none" mode performance fix tests +# --------------------------------------------------------------------------- + + +def _create_compact_mamba_manager( + mamba_num_blocks: int, + spec: int = 0, + mamba_cache_mode: str = "none", + block_size: int = 16, +): + """Create a MambaManager in compact mode for testing.""" + from vllm.v1.core.single_type_kv_cache_manager import MambaManager + + mamba_spec = MambaSpec( + block_size=block_size, + shapes=((3, 8192), (32, 128, 128)), + dtypes=(torch.bfloat16, torch.bfloat16), + mamba_cache_mode=mamba_cache_mode, + num_speculative_blocks=spec, + ) + block_pool = BlockPool( + num_gpu_blocks=100, + enable_caching=False, + hash_block_size=block_size, + ) + return MambaManager( + kv_cache_spec=mamba_spec, + block_pool=block_pool, + mamba_num_blocks=mamba_num_blocks, + enable_caching=False, + kv_cache_group_id=0, + ) + + +def test_compact_none_constant_blocks_regardless_of_tokens(): + """Compact 'none' mode must allocate exactly 1+spec blocks per request, + regardless of how many tokens are scheduled. The Mamba kernel only uses + 1+spec block IDs, so allocating more wastes the compact pool.""" + manager = _create_compact_mamba_manager(mamba_num_blocks=50, spec=0) + block_size = manager.block_size + + for num_tokens in [block_size, 5 * block_size, 20 * block_size]: + req_id = f"req_{num_tokens}" + needed = manager.get_num_blocks_to_allocate( + req_id, num_tokens, [], 0, num_tokens + ) + assert needed == 0, ( + f"Expected 0 (compact handles it), got {needed} for {num_tokens} tokens" + ) + + blocks = manager.allocate_new_blocks(req_id, num_tokens, num_tokens) + assert len(blocks) == 1, ( + f"Expected 1 block (1+spec=1), got {len(blocks)} for {num_tokens} tokens" + ) + manager.free(req_id) + + # Same test with speculative blocks + manager_spec = _create_compact_mamba_manager(mamba_num_blocks=50, spec=2) + for num_tokens in [block_size, 5 * block_size, 20 * block_size]: + req_id = f"req_spec_{num_tokens}" + blocks = manager_spec.allocate_new_blocks(req_id, num_tokens, num_tokens) + assert len(blocks) == 3, ( + f"Expected 3 blocks (1+spec=3), got {len(blocks)} for {num_tokens} tokens" + ) + manager_spec.free(req_id) + + +def test_compact_none_no_block_churn(): + """In compact 'none' mode, remove_skipped_blocks must be a no-op. + Blocks are permanent for the request lifetime. If they get freed, + the kernel reads null block IDs and produces corrupt state.""" + manager = _create_compact_mamba_manager(mamba_num_blocks=10, spec=0) + + blocks = manager.allocate_new_blocks("req_0", 200, 200) + assert len(blocks) == 1 + original_block_id = blocks[0].block_id + free_before = len(manager._compact_free) + + # Simulate multiple decode steps at increasing token counts + for num_computed in [200, 232, 264, 500, 1000]: + manager.remove_skipped_blocks("req_0", num_computed) + assert len(manager._compact_free) == free_before, ( + f"Blocks were freed at num_computed={num_computed}" + ) + req_blocks = manager.req_to_blocks["req_0"] + assert len(req_blocks) == 1 + assert req_blocks[0].block_id == original_block_id + assert not req_blocks[0].is_null + + +def test_compact_none_full_concurrency(): + """Pool sized for N×(1+spec) must serve exactly N concurrent requests. + With the O(n) bug, prefill with >block_size tokens causes premature + pool exhaustion, throttling concurrency.""" + max_concurrent = 32 + spec = 2 + blocks_per_req = 1 + spec + pool_size = max_concurrent * blocks_per_req + + manager = _create_compact_mamba_manager(mamba_num_blocks=pool_size, spec=spec) + + # Schedule all 32 requests with large prefill (220 tokens >> block_size=16) + large_prefill = 220 + for i in range(max_concurrent): + req_id = f"req_{i}" + needed = manager.get_num_blocks_to_allocate( + req_id, large_prefill, [], 0, large_prefill + ) + assert needed == 0, ( + f"Request {i} rejected — pool should handle all " + f"{max_concurrent} concurrent requests" + ) + blocks = manager.allocate_new_blocks(req_id, large_prefill, large_prefill) + assert len(blocks) == blocks_per_req + + assert len(manager._compact_free) == 0 + + # 33rd request must be rejected + needed = manager.get_num_blocks_to_allocate( + "req_overflow", large_prefill, [], 0, large_prefill + ) + assert needed > manager.block_pool.num_gpu_blocks, "Should reject overflow" + + # Free one → can schedule one more + manager.free("req_0") + assert len(manager._compact_free) == blocks_per_req + needed = manager.get_num_blocks_to_allocate( + "req_replacement", large_prefill, [], 0, large_prefill + ) + assert needed == 0 + + +def test_compact_none_blocks_stable_across_decode(): + """Block IDs must remain constant across decode steps. The kernel + always reads the same block table entries. Any change means corrupt state.""" + manager = _create_compact_mamba_manager(mamba_num_blocks=20, spec=2) + + blocks = manager.allocate_new_blocks("req_0", 200, 200) + original_ids = [b.block_id for b in blocks] + assert len(original_ids) == 3 + + # Simulate decode steps — state is deterministic, 10 steps is sufficient + for step in range(10): + num_computed = 200 + step + manager.remove_skipped_blocks("req_0", num_computed) + needed = manager.get_num_blocks_to_allocate( + "req_0", num_computed + 1, [], num_computed, num_computed + 1 + ) + assert needed == 0 + current_ids = [b.block_id for b in manager.req_to_blocks["req_0"]] + assert current_ids == original_ids, ( + f"Block IDs changed at step {step}: {original_ids} → {current_ids}" + ) + + +def test_compact_align_mode_unaffected(): + """Align mode compact should still use null-block patterns and + dynamic allocation. The fix must NOT change align mode behavior.""" + manager = _create_compact_mamba_manager( + mamba_num_blocks=20, spec=0, mamba_cache_mode="align" + ) + + manager.allocate_new_blocks("req_0", 200, 200) + # align mode uses null blocks for skipped positions — req_to_blocks + # should have more entries than just 1+spec + assert len(manager.req_to_blocks["req_0"]) > 1 + + # Verify remove_skipped_blocks IS active (not a no-op) for align mode. + # After remove_skipped_blocks, freed blocks from generic skipping should + # return to the compact pool. + compact_free_before = len(manager._compact_free) + manager.remove_skipped_blocks("req_0", 200) + assert len(manager._compact_free) >= compact_free_before, ( + "Align mode remove_skipped_blocks should not consume compact blocks" + ) + + +# --------------------------------------------------------------------------- +# Compact allocation concurrency & memory efficiency tests +# --------------------------------------------------------------------------- + + +def test_compact_concurrency_independent_of_max_model_len(): + """Mamba concurrency must not depend on max_model_len. + + Mamba state is O(1) per request. The compact pool should be sized + for max_num_seqs regardless of whether sequences are 1K or 262K tokens. + The old formula erroneously used max_model_len in the denominator, + giving C=1 for long-context models like Qwen3.5 at 262K. + """ + max_num_seqs = 32 + kv_cache_specs, attn_spec, mamba_spec = _make_qwen35_specs() + total_page_per_block = _total_page_per_block(kv_cache_specs) + available_memory = total_page_per_block * 200 # plenty of memory + + mamba_blocks_list = [] + for max_model_len in [1024, 8192, 65536, 262144]: + model_config = ModelConfig(max_model_len=max_model_len) + scheduler_config = SchedulerConfig( + max_model_len=max_model_len, + is_encoder_decoder=False, + max_num_seqs=max_num_seqs, + ) + vllm_config = VllmConfig( + model_config=model_config, + scheduler_config=scheduler_config, + ) + kv_cache_config = kv_cache_utils.get_kv_cache_configs( + vllm_config, [kv_cache_specs], [available_memory] + )[0] + + assert kv_cache_config.mamba_num_blocks is not None + mamba_blocks_list.append(kv_cache_config.mamba_num_blocks) + + # All should give the same Mamba block count + assert all(m == mamba_blocks_list[0] for m in mamba_blocks_list), ( + f"Mamba blocks vary with max_model_len: {mamba_blocks_list}. " + f"Expected all to be {mamba_blocks_list[0]}" + ) + # And it should be max_num_seqs * blocks_per_req (=1 for "none" mode) + assert mamba_blocks_list[0] == max_num_seqs, ( + f"Expected {max_num_seqs} mamba blocks, got {mamba_blocks_list[0]}" + ) + + +def test_compact_mamba_tensors_use_real_page_size(): + """Compact Mamba tensors must use real_page_size_bytes, not padded. + + HybridAttentionMambaModelConfig pads Mamba pages to match attention + pages. This is correct for shared-pool ("all") mode but wasteful for + compact mode where Mamba has its own separate tensors. + + Uses a large block_size (like the real Qwen3.5 config) so that + attn_page_size >= mamba_page_size, which is the prerequisite for + padding to be applied by config.py. + """ + # Use block_size=800 (like a real-world user config after auto-adjustment) + # so attention page > mamba page, enabling padding. + kv_cache_specs, attention_spec, mamba_spec = _make_qwen35_specs(block_size=800) + assert attention_spec.page_size_bytes > mamba_spec.real_page_size_bytes + + # Create padded mamba spec (simulating what config.py does) + padded_value = attention_spec.page_size_bytes + padded_mamba_spec = MambaSpec( + block_size=mamba_spec.block_size, + shapes=mamba_spec.shapes, + dtypes=mamba_spec.dtypes, + page_size_padded=padded_value, + ) + assert padded_mamba_spec.page_size_bytes == padded_value + real_mamba_page = padded_mamba_spec.real_page_size_bytes + assert real_mamba_page < padded_value, "Sanity: real < padded" + + # Replace Mamba specs with padded versions + padded_specs: dict[str, KVCacheSpec] = {} + for name, spec in kv_cache_specs.items(): + if isinstance(spec, MambaSpec): + padded_specs[name] = padded_mamba_spec + else: + padded_specs[name] = spec + + model_config = ModelConfig(max_model_len=262144) + vllm_config = VllmConfig(model_config=model_config) + available_memory = 10 * GiB_bytes + + kv_cache_config = kv_cache_utils.get_kv_cache_configs( + vllm_config, [padded_specs], [available_memory] + )[0] + + assert kv_cache_config.mamba_num_blocks is not None + mamba_blocks = kv_cache_config.mamba_num_blocks + + # Check tensor sizes: Mamba tensors should use REAL page size + for tensor in kv_cache_config.kv_cache_tensors: + layer_name = tensor.shared_by[0] + if isinstance(padded_specs[layer_name], MambaSpec): + expected_size = real_mamba_page * mamba_blocks + assert tensor.size == expected_size, ( + f"Mamba tensor for {layer_name}: size={tensor.size}, " + f"expected={expected_size} (real_page={real_mamba_page} x " + f"{mamba_blocks} blocks). Padded would be " + f"{padded_value * mamba_blocks}." + ) + + +def test_compact_long_context_full_concurrency(): + """Reproduces a reported regression: Qwen3.5 at 262K context with + align mode must support max_num_seqs concurrent Mamba requests. + + The old formula gives optimal_C=1 because it divides available memory + by (attention_cost_at_max_model_len + mamba_cost). With 262K context + and block_size=800, attention_cost_at_max_model_len dominates, + crushing concurrency to 1. + """ + max_num_seqs = 32 + num_speculative_blocks = 2 + + # Use align mode with speculative blocks (like a real-world user config) + align_specs, _, _ = _make_qwen35_specs( + mamba_cache_mode="align", + num_speculative_blocks=num_speculative_blocks, + ) + + total_page = _total_page_per_block(align_specs) + available_memory = total_page * 200 # enough for many requests + + model_config = ModelConfig(max_model_len=262144) + scheduler_config = SchedulerConfig( + max_model_len=262144, + is_encoder_decoder=False, + max_num_seqs=max_num_seqs, + ) + vllm_config = VllmConfig( + model_config=model_config, + scheduler_config=scheduler_config, + ) + + kv_cache_config = kv_cache_utils.get_kv_cache_configs( + vllm_config, [align_specs], [available_memory] + )[0] + + assert kv_cache_config.mamba_num_blocks is not None + # align mode: 2 + num_speculative_blocks = 4 blocks per request + mamba_blocks_per_req = 2 + num_speculative_blocks + expected_mamba = max_num_seqs * mamba_blocks_per_req + + assert kv_cache_config.mamba_num_blocks == expected_mamba, ( + f"Expected {expected_mamba} mamba blocks " + f"({max_num_seqs} reqs x {mamba_blocks_per_req} blocks/req), " + f"got {kv_cache_config.mamba_num_blocks}. " + f"Old formula likely gave C=1 -> {mamba_blocks_per_req} blocks." + ) + + +def test_compact_attention_always_ge_shared_pool(): + """Compact allocation must yield attention_num_blocks >= shared pool blocks. + + The shared pool gives num_shared = memory / (attn_page_total + mamba_page_cost). + The compact formula caps Mamba so this invariant always holds. + Test across multiple configurations. + """ + kv_cache_specs, attn_spec, mamba_spec = _make_qwen35_specs() + total_page_per_block = _total_page_per_block(kv_cache_specs) + + configs = [ + # (max_num_seqs, max_model_len, memory_in_shared_blocks) + (4, 1024, 50), + (32, 1024, 200), + (32, 262144, 200), + (128, 1024, 500), + (128, 262144, 200), + (256, 1024, 1000), + ] + for max_num_seqs, max_model_len, num_shared_equiv in configs: + available_memory = total_page_per_block * num_shared_equiv + + model_config = ModelConfig(max_model_len=max_model_len) + scheduler_config = SchedulerConfig( + max_model_len=max_model_len, + is_encoder_decoder=False, + max_num_seqs=max_num_seqs, + ) + vllm_config = VllmConfig( + model_config=model_config, + scheduler_config=scheduler_config, + ) + kv_cache_config = kv_cache_utils.get_kv_cache_configs( + vllm_config, [kv_cache_specs], [available_memory] + )[0] + + if kv_cache_config.mamba_num_blocks is None: + continue # not compact mode + + attn_page_total = _QWEN35_NUM_ATTN_LAYERS * attn_spec.page_size_bytes + mamba_page_cost = _QWEN35_NUM_MAMBA_LAYERS * mamba_spec.page_size_bytes + shared_pool_blocks = int( + available_memory // (attn_page_total + mamba_page_cost) + ) + assert kv_cache_config.num_blocks >= shared_pool_blocks, ( + f"Config(seqs={max_num_seqs}, len={max_model_len}, " + f"mem={num_shared_equiv}): attention blocks " + f"{kv_cache_config.num_blocks} < shared pool {shared_pool_blocks}" + ) + + +def test_compact_mamba_capped_by_memory(): + """With very high max_num_seqs, Mamba pool is capped by available memory. + + The shared-pool-equivalent cap ensures we never allocate more Mamba + than the system could afford, leaving enough for attention. + """ + max_num_seqs = 1000 # very high + kv_cache_specs, attn_spec, mamba_spec = _make_qwen35_specs() + total_page_per_block = _total_page_per_block(kv_cache_specs) + + # Only 50 shared-pool-equivalent blocks of memory + available_memory = total_page_per_block * 50 + + model_config = ModelConfig(max_model_len=1024) + scheduler_config = SchedulerConfig( + max_model_len=1024, + is_encoder_decoder=False, + max_num_seqs=max_num_seqs, + ) + vllm_config = VllmConfig( + model_config=model_config, + scheduler_config=scheduler_config, + ) + kv_cache_config = kv_cache_utils.get_kv_cache_configs( + vllm_config, [kv_cache_specs], [available_memory] + )[0] + + assert kv_cache_config.mamba_num_blocks is not None + # Should be much less than max_num_seqs * 1 = 1000 + assert kv_cache_config.mamba_num_blocks < max_num_seqs, ( + f"Mamba blocks {kv_cache_config.mamba_num_blocks} should be " + f"< max_num_seqs={max_num_seqs} due to memory cap" + ) + # Should still have positive attention blocks + assert kv_cache_config.num_blocks > 0 + + +def test_compact_align_full_concurrency(): + """Align mode compact pool sized for N*(2+spec) must serve N requests. + + Similar to test_compact_none_full_concurrency but for align mode. + Each request uses 2+spec blocks (dual buffer + speculative). + """ + max_concurrent = 16 + spec = 2 + blocks_per_req = 2 + spec # align mode: 2 base + speculative + pool_size = max_concurrent * blocks_per_req + + manager = _create_compact_mamba_manager( + mamba_num_blocks=pool_size, spec=spec, mamba_cache_mode="align" + ) + block_size = manager.block_size + + # Schedule all requests with prefill that spans multiple blocks + # Use exact multiple of block_size for clean align-mode allocation + prefill_tokens = block_size * 5 # 5 blocks worth, >> 1 block + for i in range(max_concurrent): + req_id = f"req_{i}" + needed = manager.get_num_blocks_to_allocate( + req_id, prefill_tokens, [], 0, prefill_tokens + ) + assert needed == 0, ( + f"Request {i} rejected - pool of {pool_size} blocks should " + f"handle {max_concurrent} concurrent align-mode requests" + ) + manager.allocate_new_blocks(req_id, prefill_tokens, prefill_tokens) + + # The key invariant: all requests were accepted without rejection + # Free all and verify blocks return + for i in range(max_concurrent): + manager.free(f"req_{i}") + assert len(manager._compact_free) == pool_size diff --git a/vllm/v1/core/kv_cache_coordinator.py b/vllm/v1/core/kv_cache_coordinator.py index eaa95dfe49f7..4c527c93dde7 100644 --- a/vllm/v1/core/kv_cache_coordinator.py +++ b/vllm/v1/core/kv_cache_coordinator.py @@ -21,6 +21,7 @@ FullAttentionSpec, KVCacheConfig, KVCacheSpec, + MambaSpec, ) from vllm.v1.request import Request @@ -64,6 +65,11 @@ def __init__( kv_cache_group_id=i, dcp_world_size=dcp_world_size, pcp_world_size=pcp_world_size, + **( + {"mamba_num_blocks": kv_cache_config.mamba_num_blocks} + if isinstance(kv_cache_group.kv_cache_spec, MambaSpec) + else {} + ), ) for i, kv_cache_group in enumerate(self.kv_cache_config.kv_cache_groups) ) diff --git a/vllm/v1/core/kv_cache_utils.py b/vllm/v1/core/kv_cache_utils.py index 83ada05309f9..e61cc59acc28 100644 --- a/vllm/v1/core/kv_cache_utils.py +++ b/vllm/v1/core/kv_cache_utils.py @@ -9,7 +9,7 @@ from collections.abc import Callable, Iterable, Iterator, Sequence from dataclasses import dataclass, replace from functools import partial -from typing import Any, NewType, TypeAlias, overload +from typing import Any, NewType, TypeAlias, cast, overload from vllm import envs from vllm.config import VllmConfig @@ -24,6 +24,7 @@ KVCacheGroupSpec, KVCacheSpec, KVCacheTensor, + MambaSpec, SlidingWindowSpec, UniformTypeKVCacheSpecs, ) @@ -801,18 +802,45 @@ def get_max_concurrency_for_kv_cache_config( ) -> float: """ Get the maximum concurrency for the given KV cache configuration. + + For mixed Mamba+attention models (e.g. Qwen3.5), each group's cost is + summed independently so that constant-cost Mamba groups don't inflate + the per-request estimate via the uniform multiplier. """ - num_layer_per_group = max( - len(group.layer_names) for group in kv_cache_config.kv_cache_groups - ) - max_memory_usage_per_request = num_layer_per_group * max_memory_usage_bytes( - vllm_config, (group.kv_cache_spec for group in kv_cache_config.kv_cache_groups) - ) - memory_per_block = ( - kv_cache_config.kv_cache_groups[0].kv_cache_spec.page_size_bytes - * num_layer_per_group - ) - num_block_per_request = cdiv(max_memory_usage_per_request, memory_per_block) + groups = kv_cache_config.kv_cache_groups + if _has_mixed_mamba_attention(groups): + if kv_cache_config.mamba_num_blocks is not None: + # Compact allocation: concurrency = min(attention, mamba) + attn_groups = [ + g for g in groups if not isinstance(g.kv_cache_spec, MambaSpec) + ] + mamba_groups = [g for g in groups if isinstance(g.kv_cache_spec, MambaSpec)] + attn_blocks_per_req = _max_blocks_per_request(vllm_config, attn_groups) + mamba_slots_per_req = _max_blocks_per_request(vllm_config, mamba_groups) + attn_concurrency = kv_cache_config.num_blocks / attn_blocks_per_req + mamba_concurrency = kv_cache_config.mamba_num_blocks / mamba_slots_per_req + return min(attn_concurrency, mamba_concurrency) + # "all" mode: standard mixed formula + max_memory_usage_per_request = sum( + len(group.layer_names) + * group.kv_cache_spec.max_memory_usage_bytes(vllm_config) + for group in groups + ) + total_page_size_per_block = sum( + len(group.layer_names) * group.kv_cache_spec.page_size_bytes + for group in groups + ) + num_block_per_request = cdiv( + max_memory_usage_per_request, total_page_size_per_block + ) + else: + num_layer_per_group = max(len(group.layer_names) for group in groups) + max_memory_usage_per_request = num_layer_per_group * max_memory_usage_bytes( + vllm_config, + (group.kv_cache_spec for group in groups), + ) + memory_per_block = groups[0].kv_cache_spec.page_size_bytes * num_layer_per_group + num_block_per_request = cdiv(max_memory_usage_per_request, memory_per_block) max_concurrency = kv_cache_config.num_blocks / num_block_per_request return max_concurrency @@ -948,6 +976,31 @@ def unify_kv_cache_spec_page_size( return new_kv_cache_spec +def _has_mixed_mamba_attention( + kv_cache_groups: list[KVCacheGroupSpec], +) -> bool: + """Check if groups contain both MambaSpec and non-MambaSpec layers.""" + has_mamba = any(isinstance(g.kv_cache_spec, MambaSpec) for g in kv_cache_groups) + has_attention = any( + not isinstance(g.kv_cache_spec, MambaSpec) for g in kv_cache_groups + ) + return has_mamba and has_attention + + +def _max_blocks_per_request( + vllm_config: VllmConfig, + groups: Iterable[KVCacheGroupSpec], +) -> int: + """Max blocks any single group in ``groups`` needs per request.""" + return max( + cdiv( + g.kv_cache_spec.max_memory_usage_bytes(vllm_config), + g.kv_cache_spec.page_size_bytes, + ) + for g in groups + ) + + def is_kv_cache_type_attention_free(kv_cache_spec: dict[str, KVCacheSpec]) -> bool: # kv_cache_spec is an empty dict for attention free models return not kv_cache_spec @@ -1119,6 +1172,108 @@ def get_kv_cache_config_from_groups( ) for layer_name in kv_cache_groups[0].layer_names ] + elif _has_mixed_mamba_attention(kv_cache_groups): + # Mixed Mamba+attention (e.g. Qwen3.5): handle allocation based on + # mamba_cache_mode. Mamba layers keep their small page size instead + # of being padded to match attention. + mamba_groups = [ + g for g in kv_cache_groups if isinstance(g.kv_cache_spec, MambaSpec) + ] + attention_groups_list = [ + g for g in kv_cache_groups if not isinstance(g.kv_cache_spec, MambaSpec) + ] + mamba_cache_mode = vllm_config.cache_config.mamba_cache_mode + + all_layers: dict[str, KVCacheSpec] = {} + for group in kv_cache_groups: + for layer_name in group.layer_names: + all_layers[layer_name] = group.kv_cache_spec + + if mamba_cache_mode == "all": + # "all" mode: Mamba scales with tokens (prefix caching). + # Use standard per-layer allocation with shared block count. + total_page_size = sum(spec.page_size_bytes for spec in all_layers.values()) + num_blocks = int(available_memory // total_page_size) + num_blocks = max(num_blocks, 0) + num_blocks = may_override_num_blocks(vllm_config, num_blocks) + kv_cache_tensors = [ + KVCacheTensor( + size=( + spec.real_page_size_bytes + if isinstance(spec, MambaSpec) + else spec.page_size_bytes + ) + * num_blocks, + shared_by=[layer_name], + ) + for layer_name, spec in all_layers.items() + ] + mamba_num_blocks = None # shared pool + else: + # Compact Mamba allocation: Mamba state is O(1) per request, + # so decouple it from the shared attention block pool. + attention_page_total = sum( + len(g.layer_names) * g.kv_cache_spec.page_size_bytes + for g in attention_groups_list + ) + mamba_page_cost = sum( + len(g.layer_names) + * cast(MambaSpec, g.kv_cache_spec).real_page_size_bytes + for g in mamba_groups + ) + # Mamba blocks per request varies by mode: + # "none" = 1 (+speculative), "align" = 2 (+speculative) + mamba_blocks_per_req = _max_blocks_per_request(vllm_config, mamba_groups) + max_num_seqs = vllm_config.scheduler_config.max_num_seqs + + # Mamba state is O(1) per request and a hard constraint + # (no preemption). Size for max_num_seqs so Mamba never + # bottlenecks the scheduler. + # Cap: compact Mamba must never exceed what a shared pool + # would cost (guarantees attention_num_blocks >= shared pool). + num_shared_blocks = int( + available_memory // (attention_page_total + mamba_page_cost) + ) + num_concurrent = max( + 1, + min(max_num_seqs, num_shared_blocks // mamba_blocks_per_req), + ) + + # Size compact pool: total blocks = requests * blocks_per_request + mamba_blocks = num_concurrent * mamba_blocks_per_req + mamba_total = mamba_blocks * mamba_page_cost + remaining = available_memory - mamba_total + attention_num_blocks = max(0, int(remaining // attention_page_total)) + attention_num_blocks = may_override_num_blocks( + vllm_config, attention_num_blocks + ) + + # Build per-layer tensors with different block counts + kv_cache_tensors = [] + for layer_name, spec in all_layers.items(): + if isinstance(spec, MambaSpec): + kv_cache_tensors.append( + KVCacheTensor( + size=spec.real_page_size_bytes * mamba_blocks, + shared_by=[layer_name], + ) + ) + else: + kv_cache_tensors.append( + KVCacheTensor( + size=spec.page_size_bytes * attention_num_blocks, + shared_by=[layer_name], + ) + ) + num_blocks = attention_num_blocks # Pool only serves attention + mamba_num_blocks = mamba_blocks + + return KVCacheConfig( + num_blocks=num_blocks, + kv_cache_tensors=kv_cache_tensors, + kv_cache_groups=kv_cache_groups, + mamba_num_blocks=mamba_num_blocks, + ) else: # General case: # We will have group_size memory pools, each is shared by one layer from @@ -1248,6 +1403,16 @@ def get_kv_cache_groups( # same window size). Put all layers into one group. return _get_kv_cache_groups_uniform_type(uniform_spec) + # For mixed Mamba+attention, skip page size unification — the dedicated + # allocation path in get_kv_cache_config_from_groups handles non-uniform + # page sizes by giving each layer its own tensor. + has_mamba = any(isinstance(spec, MambaSpec) for spec in kv_cache_spec.values()) + has_non_mamba = any( + not isinstance(spec, MambaSpec) for spec in kv_cache_spec.values() + ) + if has_mamba and has_non_mamba: + return _get_kv_cache_groups_uniform_page_size(kv_cache_spec) + # As KVCacheManager can only allocate memory of one size, we need to unify # the page size of the layers. For cases cannot be unified, this function # will raise an error. @@ -1266,7 +1431,14 @@ def generate_scheduler_kv_cache_config( Generate the KV cache configuration for the scheduler. """ assert all( - [cfg.num_blocks == kv_cache_configs[0].num_blocks for cfg in kv_cache_configs] + cfg.num_blocks == kv_cache_configs[0].num_blocks for cfg in kv_cache_configs + ) + assert all( + cfg.mamba_num_blocks == kv_cache_configs[0].mamba_num_blocks + for cfg in kv_cache_configs + ), ( + "mamba_num_blocks must be consistent across workers: " + f"{[cfg.mamba_num_blocks for cfg in kv_cache_configs]}" ) # All workers have the same kv_cache_config except layer names, so use # an arbitrary one to initialize the scheduler. @@ -1296,11 +1468,21 @@ def _report_kv_cache_config( ) # Log the KV cache size and maximum concurrency. - num_tokens = ( - kv_cache_config.num_blocks - // len(kv_cache_config.kv_cache_groups) - * min_block_size + # For hybrid models, Mamba groups only contribute token capacity when + # mamba_cache_mode="all" (prefix caching). In default "none" and "align" + # modes, Mamba state is constant-size and doesn't scale with tokens. + mamba_cache_mode = vllm_config.cache_config.mamba_cache_mode + attention_groups = [ + g + for g in kv_cache_config.kv_cache_groups + if not isinstance(g.kv_cache_spec, MambaSpec) or mamba_cache_mode == "all" + ] + num_attention_groups = ( + len(attention_groups) + if attention_groups + else len(kv_cache_config.kv_cache_groups) ) + num_tokens = kv_cache_config.num_blocks // num_attention_groups * min_block_size dcp_size = vllm_config.parallel_config.decode_context_parallel_size pcp_size = vllm_config.parallel_config.prefill_context_parallel_size if pcp_size * dcp_size > 1: @@ -1350,19 +1532,29 @@ def _max_memory_usage_bytes_from_groups( for spec in per_layer_specs.values() ) - # General case: group_size pools, each shared by one layer per group - # Memory = group_size * page_size * blocks_for_max_len - group_size = max(len(group.layer_names) for group in kv_cache_groups) - page_size = get_uniform_page_size( - [group.kv_cache_spec for group in kv_cache_groups] - ) - blocks_needed = sum( - cdiv(group.kv_cache_spec.max_memory_usage_bytes(vllm_config), page_size) + # Mixed Mamba+attention with "all" mode: all layers share the same block + # count, so the per-request cost is max_blocks × total_page_size. + # This is consistent with the "all" mode allocation path. + if _has_mixed_mamba_attention(kv_cache_groups): + mamba_mode = vllm_config.cache_config.mamba_cache_mode + if mamba_mode == "all": + max_blocks = _max_blocks_per_request(vllm_config, kv_cache_groups) + total_page = sum( + len(g.layer_names) * g.kv_cache_spec.page_size_bytes + for g in kv_cache_groups + ) + return max_blocks * total_page + + # General case: sum each group's actual memory usage independently. + # This handles hybrid models (e.g. Mamba+attention) where groups have + # different scaling characteristics and potentially different page sizes. + # For compact Mamba allocation (non-"all" mode), this correctly returns + # the per-request cost: attention O(n) + Mamba O(1). + return sum( + len(group.layer_names) * group.kv_cache_spec.max_memory_usage_bytes(vllm_config) for group in kv_cache_groups ) - return group_size * page_size * blocks_needed - def _estimate_max_model_len_from_groups( vllm_config: VllmConfig, @@ -1599,14 +1791,40 @@ def get_kv_cache_configs( min_num_blocks = min( kv_cache_config.num_blocks for kv_cache_config in kv_cache_configs ) + + # Sync mamba_num_blocks separately if compact allocation is active. + min_mamba_num_blocks: int | None = None + if any(cfg.mamba_num_blocks is not None for cfg in kv_cache_configs): + min_mamba_num_blocks = min( + cfg.mamba_num_blocks + for cfg in kv_cache_configs + if cfg.mamba_num_blocks is not None + ) + for kv_cache_config in kv_cache_configs: num_blocks_old = kv_cache_config.num_blocks + mamba_num_blocks_old = kv_cache_config.mamba_num_blocks kv_cache_config.num_blocks = min_num_blocks - - # Shrink tensor size proportionally + if min_mamba_num_blocks is not None: + kv_cache_config.mamba_num_blocks = min_mamba_num_blocks + + # Shrink tensor size proportionally — handle Mamba and attention + # separately when compact allocation is active. + mamba_layer_names = { + name + for group in kv_cache_config.kv_cache_groups + if isinstance(group.kv_cache_spec, MambaSpec) + for name in group.layer_names + } for tensor in kv_cache_config.kv_cache_tensors: - assert tensor.size % num_blocks_old == 0 - tensor.size = tensor.size // num_blocks_old * min_num_blocks + is_mamba = tensor.shared_by[0] in mamba_layer_names + if is_mamba and mamba_num_blocks_old is not None: + assert tensor.size % mamba_num_blocks_old == 0 + assert min_mamba_num_blocks is not None + tensor.size = tensor.size // mamba_num_blocks_old * min_mamba_num_blocks + else: + assert tensor.size % num_blocks_old == 0 + tensor.size = tensor.size // num_blocks_old * min_num_blocks if len(kv_cache_config.kv_cache_groups) > 0: _report_kv_cache_config(vllm_config, kv_cache_config) diff --git a/vllm/v1/core/single_type_kv_cache_manager.py b/vllm/v1/core/single_type_kv_cache_manager.py index 62bdb8113a32..54f28c473ce5 100644 --- a/vllm/v1/core/single_type_kv_cache_manager.py +++ b/vllm/v1/core/single_type_kv_cache_manager.py @@ -762,7 +762,12 @@ def get_num_common_prefix_blocks(self, running_request_id: str) -> int: class MambaManager(SingleTypeKVCacheManager): def __init__( - self, kv_cache_spec: MambaSpec, block_pool: BlockPool, **kwargs + self, + kv_cache_spec: MambaSpec, + block_pool: BlockPool, + *, + mamba_num_blocks: int | None = None, + **kwargs, ) -> None: super().__init__(kv_cache_spec, block_pool, **kwargs) self.cached_blocks_this_step: set[BlockHashWithGroupId] = set() @@ -775,6 +780,23 @@ def __init__( # The set of the requests that have been allocated blocks self._allocated_block_reqs: set[str] = set() + # Compact mode: Mamba self-manages a small block space (0..C-1) + # instead of sharing the attention BlockPool. This avoids large + # memory waste from sizing Mamba tensors for N attention blocks + # when only C << N compact blocks are needed. + self.compact_mode = ( + mamba_num_blocks is not None and kv_cache_spec.mamba_cache_mode != "all" + ) + if self.compact_mode: + assert mamba_num_blocks is not None + # Stack of free compact blocks + self._compact_free: list[KVCacheBlock] = [ + KVCacheBlock(block_id=i) for i in range(mamba_num_blocks) + ] + # Blocks needed for first allocation: 1 state + speculative. + # Used by both "none" (every call) and "align" (first prefill). + self._initial_blocks_per_req = 1 + self.num_speculative_blocks + @classmethod def find_longest_cache_hit( cls, @@ -833,6 +855,37 @@ def remove_skipped_blocks(self, request_id: str, num_computed_tokens: int) -> No # that we might actually need. num_computed_tokens = max(0, num_computed_tokens - self.num_speculative_blocks) + if self.compact_mode: + if self.mamba_cache_mode != "align": + # "none" mode: blocks are permanent for request lifetime. + # Kernel overwrites state in-place; nothing to skip or free. + return + + # "align" mode: free skipped blocks to compact pool (not block_pool). + # MUST NOT call super() which uses block_pool. + num_skipped_tokens = self.get_num_skipped_tokens(num_computed_tokens) + if num_skipped_tokens > 0: + blocks = self.req_to_blocks[request_id] + num_skipped_blocks = num_skipped_tokens // self.block_size + num_skipped_blocks = min(num_skipped_blocks, len(blocks)) + for i in range(num_skipped_blocks - 1, -1, -1): + if blocks[i] == self._null_block: + break + self._compact_free.append(blocks[i]) + blocks[i] = self._null_block + + last_state_block_idx = self.last_state_block_idx.get(request_id) + if ( + last_state_block_idx is not None + and last_state_block_idx + < cdiv(num_computed_tokens, self.block_size) - 1 + ): + blocks = self.req_to_blocks[request_id] + if blocks[last_state_block_idx] != self._null_block: + self._compact_free.append(blocks[last_state_block_idx]) + blocks[last_state_block_idx] = self._null_block + return + super().remove_skipped_blocks(request_id, num_computed_tokens) if self.mamba_cache_mode == "align": # `last_state_block_idx` refers to the block index allocated two steps ago. @@ -877,6 +930,35 @@ def get_num_blocks_to_allocate( # that kv_cache_manager will think there is no enough blocks to allocate now # and don't schedule it in the current step. return self.block_pool.num_gpu_blocks + 1 + + if self.compact_mode: + # Compact mode: check against compact free list, not block pool. + # Return 0 (no shared pool blocks needed) or force rejection. + if self.mamba_cache_mode != "align": + # "none" mode: fixed block count per request (no token scaling). + # new_computed_blocks is always empty (cache_blocks is a no-op). + req_blocks = self.req_to_blocks[request_id] + num_new = max(0, self._initial_blocks_per_req - len(req_blocks)) + else: + num_tokens = num_tokens_main_model + num_required_blocks = ( + cdiv(num_tokens, self.block_size) + self.num_speculative_blocks + ) + num_new = ( + num_required_blocks + - len(new_computed_blocks) + - len(self.req_to_blocks[request_id]) + ) + if num_new > 0: + if request_id in self._allocated_block_reqs: + num_new = 1 + else: + num_new = self._initial_blocks_per_req + + if num_new > len(self._compact_free): + return self.block_pool.num_gpu_blocks + 1 # force rejection + return 0 # don't request any shared pool blocks + if self.mamba_cache_mode != "align": # Allocate extra `num_speculative_blocks` blocks for # speculative decoding (MTP/EAGLE) with linear attention. @@ -928,6 +1010,69 @@ def allocate_new_blocks( self, request_id: str, num_tokens: int, num_tokens_main_model: int ) -> list[KVCacheBlock]: assert isinstance(self.kv_cache_spec, MambaSpec) + + if self.compact_mode: + if self.mamba_cache_mode != "align": + # "none" mode: fixed block count per request (no token scaling). + req_blocks = self.req_to_blocks[request_id] + num_new = self._initial_blocks_per_req - len(req_blocks) + if num_new <= 0: + return [] + new_blocks = [self._compact_free.pop() for _ in range(num_new)] + req_blocks.extend(new_blocks) + return new_blocks + else: + # "align" mode compact allocation — mirrors non-compact + # "align" logic but uses compact free list. + num_tokens = num_tokens_main_model + req_blocks = self.req_to_blocks[request_id] + num_required_blocks = ( + cdiv(num_tokens, self.block_size) + self.num_speculative_blocks + ) + if num_required_blocks == len(req_blocks): + return [] + assert num_required_blocks > len(req_blocks), ( + "num_required_blocks " + f"{num_required_blocks} < len(req_blocks) {len(req_blocks)}" + ) + prev_block_len = len(req_blocks) + blocks_allocated = request_id in self._allocated_block_reqs + if blocks_allocated: + self.last_state_block_idx[request_id] = ( + prev_block_len - 1 - self.num_speculative_blocks + ) + elif prev_block_len > 0: + self.last_state_block_idx[request_id] = prev_block_len - 1 + num_skipped_blocks = ( + num_required_blocks - self.num_speculative_blocks - 1 + ) + if prev_block_len < num_skipped_blocks: + req_blocks.extend( + [ + self._null_block + for _ in range(prev_block_len, num_skipped_blocks) + ] + ) + if blocks_allocated: + for block_idx in range( + prev_block_len - self.num_speculative_blocks, + prev_block_len, + ): + if block_idx < num_skipped_blocks: + req_blocks.append(req_blocks[block_idx]) + req_blocks[block_idx] = self._null_block + else: + break + num_new_blocks = num_required_blocks - len(req_blocks) + if blocks_allocated: + assert num_new_blocks <= 1 + else: + assert num_new_blocks <= self.num_speculative_blocks + 1 + new_blocks = [self._compact_free.pop() for _ in range(num_new_blocks)] + req_blocks.extend(new_blocks) + self._allocated_block_reqs.add(request_id) + return req_blocks[prev_block_len:] + if self.mamba_cache_mode != "align": # Allocate extra `num_speculative_blocks` blocks for # speculative decoding (MTP/EAGLE) with linear attention. @@ -943,7 +1088,7 @@ def allocate_new_blocks( # We can ignore lookahead tokens because current draft models don't have # mamba layers. num_tokens = num_tokens_main_model - req_blocks: list[KVCacheBlock] = self.req_to_blocks[request_id] + req_blocks = self.req_to_blocks[request_id] # NOTE(tdouble): this is an over-estimate of how many blocks we need because # num_tokens can include draft tokens that will later be rejected. num_required_blocks = ( @@ -1003,6 +1148,17 @@ def allocate_new_blocks( return req_blocks[prev_block_len:] def free(self, request_id: str) -> None: + if self.compact_mode: + if self.mamba_cache_mode == "align": + self._allocated_block_reqs.discard(request_id) + self.last_state_block_idx.pop(request_id, None) + req_blocks = self.req_to_blocks.pop(request_id, []) + # Return compact blocks to free stack (skip null blocks) + for block in req_blocks: + if not block.is_null: + self._compact_free.append(block) + self.num_cached_block.pop(request_id, None) + return if self.mamba_cache_mode == "align": self._allocated_block_reqs.discard(request_id) self.last_state_block_idx.pop(request_id, None) @@ -1017,6 +1173,10 @@ def get_num_skipped_tokens(self, num_computed_tokens: int) -> int: return num_computed_tokens - 1 def cache_blocks(self, request: Request, num_tokens: int) -> None: + if self.compact_mode: + # No prefix caching for compact Mamba — compact block IDs + # must never enter the shared pool's cache hash table. + return num_cached_blocks_before = self.num_cached_block.get(request.request_id, 0) super().cache_blocks(request, num_tokens) num_cached_blocks_after = self.num_cached_block.get(request.request_id, 0) diff --git a/vllm/v1/kv_cache_interface.py b/vllm/v1/kv_cache_interface.py index 48ecf6b9dc85..133ded691580 100644 --- a/vllm/v1/kv_cache_interface.py +++ b/vllm/v1/kv_cache_interface.py @@ -280,11 +280,15 @@ class MambaSpec(KVCacheSpec): num_speculative_blocks: int = 0 @property - def page_size_bytes(self) -> int: - page_size = sum( + def real_page_size_bytes(self) -> int: + return sum( prod(shape) * get_dtype_size(dtype) for (shape, dtype) in zip(self.shapes, self.dtypes) ) + + @property + def page_size_bytes(self) -> int: + page_size = self.real_page_size_bytes if self.page_size_padded is not None: assert self.page_size_padded >= page_size return self.page_size_padded @@ -489,6 +493,14 @@ class KVCacheConfig: For models with multiple types of attention, there will be multiple groups, see `_get_kv_cache_config_uniform_page_size` for more details. """ + mamba_num_blocks: int | None = None + """ + Number of compact Mamba blocks when using separate Mamba allocation. + None means Mamba shares the attention block pool (mamba_cache_mode="all"). + When set, Mamba tensors are sized for this many blocks instead of + num_blocks. This avoids large memory waste in hybrid models where Mamba + state is O(1) per request but the shared pool forces O(n)-sized tensors. + """ @property def has_mamba_layers(self) -> bool: diff --git a/vllm/v1/worker/gpu_model_runner.py b/vllm/v1/worker/gpu_model_runner.py index af5dca71f9c0..0b42d0626e14 100644 --- a/vllm/v1/worker/gpu_model_runner.py +++ b/vllm/v1/worker/gpu_model_runner.py @@ -6309,8 +6309,19 @@ def _reshape_kv_cache_tensors( if layer_name in self.runner_only_attn_layers: continue raw_tensor = kv_cache_raw_tensors[layer_name] - assert raw_tensor.numel() % kv_cache_spec.page_size_bytes == 0 - num_blocks = raw_tensor.numel() // kv_cache_spec.page_size_bytes + if isinstance(kv_cache_spec, AttentionSpec): + assert raw_tensor.numel() % kv_cache_spec.page_size_bytes == 0 + num_blocks = raw_tensor.numel() // kv_cache_spec.page_size_bytes + elif isinstance(kv_cache_spec, MambaSpec): + # Mamba tensors are allocated with real (unpadded) + # page sizes — padding only applies to shared-pool + # block indexing, not to actual tensor layout. + assert raw_tensor.numel() % kv_cache_spec.real_page_size_bytes == 0 + num_blocks = ( + raw_tensor.numel() // kv_cache_spec.real_page_size_bytes + ) + else: + raise NotImplementedError if isinstance(kv_cache_spec, AttentionSpec): has_attn = True num_blocks_per_kv_block = ( @@ -6355,11 +6366,10 @@ def _reshape_kv_cache_tensors( raw_tensor = kv_cache_raw_tensors[layer_name] state_tensors = [] storage_offset_bytes = 0 + mamba_page = kv_cache_spec.real_page_size_bytes for shape, dtype in zip(kv_cache_spec.shapes, kv_cache_spec.dtypes): dtype_size = get_dtype_size(dtype) - num_element_per_page = ( - kv_cache_spec.page_size_bytes // dtype_size - ) + num_element_per_page = mamba_page // dtype_size target_shape = (num_blocks, *shape) stride = torch.empty(target_shape).stride() target_stride = (num_element_per_page, *stride[1:])