Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion vllm/platforms/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -645,11 +645,18 @@ def _align_hybrid_block_size(
)

if cache_config.block_size < attn_block_size:
# Preserve the user-requested block_size as hash_block_size
# before inflating block_size to attn_block_size, so prefix
# hashing keeps the finer granularity for hybrid models.
if cache_config.hash_block_size is None:
cache_config.hash_block_size = cache_config.block_size
cache_config.block_size = attn_block_size
logger.info(
"Setting attention block size to %d tokens "
"to ensure that attention page size is >= mamba page size.",
"to ensure that attention page size is >= mamba page size "
"(hash granularity preserved at %d).",
attn_block_size,
cache_config.hash_block_size,
)

if cache_config.mamba_cache_mode == "align":
Expand Down
107 changes: 88 additions & 19 deletions vllm/v1/core/single_type_kv_cache_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ def __init__(
# data for preempted ones.
self.num_cached_block: dict[str, int] = {}

# Per-request hash-block emission cursor for sub-block events.
self._last_emitted_hash_block: dict[str, int] = {}

self.kv_cache_group_id = kv_cache_group_id
self._null_block = block_pool.null_block

Expand Down Expand Up @@ -275,6 +278,63 @@ def take_new_block_ids(self) -> list[int]:
self.new_block_ids = []
return ids

def _maybe_emit_sub_block_events(self, request: Request, num_tokens: int) -> None:
"""Emit synthetic BlockStored events at hash_block_size granularity.

When physical block_size is inflated for hybrid models (to satisfy
page-size constraints between Mamba and Attention groups), the
normal cache_full_blocks() path only fires events on full physical
blocks. For prompts smaller than the inflated block no events ever
flow. This emitter advances a per-request cursor over hash-block
boundaries (set upstream by resolve_kv_cache_block_sizes) and
appends BlockStored events to block_pool.kv_event_queue. Mamba
groups are suppressed via the MambaManager override below.
"""
from vllm.distributed.kv_events import MEDIUM_GPU, BlockStored
from vllm.v1.core.kv_cache_utils import maybe_convert_block_hash

bp = self.block_pool
if not bp.enable_kv_cache_events:
return
hash_bs = bp.hash_block_size
if hash_bs >= self.block_size:
return # only meaningful when block_size > hash_block_size (hybrid)

num_hash_blocks = num_tokens // hash_bs
last_emitted = self._last_emitted_hash_block.get(request.request_id, 0)
if num_hash_blocks <= last_emitted:
return
if len(request.block_hashes) < num_hash_blocks:
return # block hashes not populated for these positions yet

new_hashes = [
maybe_convert_block_hash(request.block_hashes[i])
for i in range(last_emitted, num_hash_blocks)
]
parent_hash = (
maybe_convert_block_hash(request.block_hashes[last_emitted - 1])
if last_emitted > 0
else None
)
start_tok = last_emitted * hash_bs
end_tok = num_hash_blocks * hash_bs
token_ids = list(request.all_token_ids[start_tok:end_tok])
bp.kv_event_queue.append(
BlockStored(
block_hashes=new_hashes,
parent_block_hash=parent_hash,
token_ids=token_ids,
block_size=hash_bs,
lora_id=(
request.lora_request.adapter_id if request.lora_request else None
),
medium=MEDIUM_GPU,
lora_name=(request.lora_request.name if request.lora_request else None),
group_idx=self.kv_cache_group_id,
)
)
self._last_emitted_hash_block[request.request_id] = num_hash_blocks

def cache_blocks(
self,
request: Request,
Expand All @@ -298,27 +358,30 @@ def cache_blocks(
num_cached_blocks = self.num_cached_block.get(request.request_id, 0)
num_full_blocks = num_tokens // self.block_size

if num_cached_blocks >= num_full_blocks:
return

# Fast path: when the coordinator imposes no alignment constraint
if alignment_tokens is None or alignment_tokens <= self.block_size:
block_mask = None
else:
block_mask = self._cache_block_mask(
num_cached_blocks, num_full_blocks, alignment_tokens
if num_full_blocks > num_cached_blocks:
# Fast path: when the coordinator imposes no alignment constraint
if alignment_tokens is None or alignment_tokens <= self.block_size:
block_mask = None
else:
block_mask = self._cache_block_mask(
num_cached_blocks, num_full_blocks, alignment_tokens
)
self.block_pool.cache_full_blocks(
request=request,
blocks=self.req_to_blocks[request.request_id],
num_cached_blocks=num_cached_blocks,
num_full_blocks=num_full_blocks,
block_size=self.block_size,
kv_cache_group_id=self.kv_cache_group_id,
block_mask=block_mask,
)
self.block_pool.cache_full_blocks(
request=request,
blocks=self.req_to_blocks[request.request_id],
num_cached_blocks=num_cached_blocks,
num_full_blocks=num_full_blocks,
block_size=self.block_size,
kv_cache_group_id=self.kv_cache_group_id,
block_mask=block_mask,
)
self.num_cached_block[request.request_id] = num_full_blocks

self.num_cached_block[request.request_id] = num_full_blocks
# Also emit sub-block events so prompts smaller than the inflated
# physical block still produce a KV-event signal. Runs every call
# because hash-block boundaries can advance independently of full
# physical-block boundaries.
self._maybe_emit_sub_block_events(request, num_tokens)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This implementation introduces duplicate BlockStored events for hybrid models. When block_size > hash_block_size, the self.block_pool.cache_full_blocks call (lines 352-359) already emits BlockStored events for the newly cached physical blocks at hash_block_size granularity. By calling _maybe_emit_sub_block_events immediately after, the same hash blocks are emitted again (once as part of the physical block event and once as individual sub-block events).

This redundancy wastes bandwidth and can cause issues for downstream consumers that expect unique events per hash block. To fix this, you should ensure that _maybe_emit_sub_block_events only emits for tokens that are not yet covered by a full physical block, or coordinate with BlockPool to suppress its internal emission when sub-block tracking is active for a group.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for flagging this. The two emission paths actually fire at different granularities for the hybrid case where block_size > hash_block_size, so they end up complementary rather than duplicating each other:

  • block_pool.cache_full_blocks (vllm/v1/core/block_pool.py:241-256) rebuilds block_hashes with BlockHashListWithBlockSize(request.block_hashes, self.hash_block_size, block_size) when block_size != hash_block_size, and emits BlockStored with block_size=self.block_size (the inflated value, e.g. 2176). One coarse event per physical block, hashed over the full physical-block window.
  • _maybe_emit_sub_block_events (new) walks request.block_hashes directly and emits BlockStored with block_size=hash_block_size (e.g. 64) — many fine events per physical block, each hashed over a 64-token window.

Because the hash inputs are different (a hash over 2176 tokens vs. a hash over 64 tokens), the resulting block_hashes values are distinct between the two streams; downstream consumers see them as different cached entries at different granularities, not redundant events for the same hash.

Verified empirically against vllm/vllm-openai:nightly-bf610c2f5 with a hybrid Mamba+Attention model, --block-size 64, --enable-prefix-caching, and the ZMQ subscriber from the PR description: without this change the subscriber receives only block_size=2176 events; with this change those exact same coarse events still arrive and a new stream of block_size=64 events appears alongside them (with the right parent_block_hash chain).

Consumers that only need the fine granularity can filter on block_size == hash_block_size. If maintainers think the coarse stream should be suppressed for hybrid groups when sub-block emission is active, I can gate it behind a flag — happy to discuss the right behaviour here.


def _cache_block_mask(
self,
Expand Down Expand Up @@ -351,6 +414,7 @@ def free(self, request_id: str) -> None:

self.block_pool.free_blocks(ordered_blocks)
self.num_cached_block.pop(request_id, None)
self._last_emitted_hash_block.pop(request_id, None)

@abstractmethod
def get_num_common_prefix_blocks(self, running_request_id: str) -> int:
Expand Down Expand Up @@ -854,6 +918,11 @@ def __init__(
# The set of the requests that have been allocated blocks
self._allocated_block_reqs: set[str] = set()

def _maybe_emit_sub_block_events(self, request: Request, num_tokens: int) -> None:
# Mamba state is recurrent — there is no token-level prefix-shareable
# hash block to surface on the kv-events stream.
return

@classmethod
def find_longest_cache_hit(
cls,
Expand Down
22 changes: 21 additions & 1 deletion vllm/v1/engine/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,15 @@ def _initialize_kv_caches(self, vllm_config: VllmConfig) -> KVCacheConfig:
vllm_config.cache_config.num_gpu_blocks = scheduler_kv_cache_config.num_blocks
kv_cache_groups = scheduler_kv_cache_config.kv_cache_groups
if kv_cache_groups:
# Preserve the user --block-size as hash_block_size before
# worker-side block_size inflation flows back here, so
# resolve_kv_cache_block_sizes() returns hash_block_size !=
# block_size for hybrid models (gates sub-block BlockStored
# emission in SingleTypeKVCacheManager).
if vllm_config.cache_config.hash_block_size is None:
vllm_config.cache_config.hash_block_size = (
vllm_config.cache_config.block_size
)
vllm_config.cache_config.block_size = min(
g.kv_cache_spec.block_size for g in kv_cache_groups
)
Expand Down Expand Up @@ -318,14 +327,25 @@ def get_kv_cache_group_metadata(self) -> list[dict[str, int | str | None]]:
if kv_cache_config is None:
return []

# Report hash_block_size as block_size when sub-block emission is
# active so downstream KV-event consumers use the right hashing
# granularity. Falls back to spec.block_size when hash_block_size is
# unset or equals the physical block size.
cache_config = self.vllm_config.cache_config
hash_bs = getattr(cache_config, "hash_block_size", None)
metadata: list[dict[str, int | str | None]] = []
for group_idx, group in enumerate(kv_cache_config.kv_cache_groups):
spec = group.kv_cache_spec
effective_block_size = (
hash_bs
if hash_bs is not None and hash_bs < spec.block_size
else spec.block_size
)
metadata.append(
{
"group_idx": group_idx,
"kind": get_kv_cache_spec_kind(spec).value,
"block_size": spec.block_size,
"block_size": effective_block_size,
"sliding_window": getattr(spec, "sliding_window", None),
}
)
Expand Down
Loading