Skip to content

KVConnector: Introduce bind_scheduler_context#41120

Open
orozery wants to merge 6 commits intovllm-project:mainfrom
orozery:connector-kv-cache-state
Open

KVConnector: Introduce bind_scheduler_context#41120
orozery wants to merge 6 commits intovllm-project:mainfrom
orozery:connector-kv-cache-state

Conversation

@orozery
Copy link
Copy Markdown
Collaborator

@orozery orozery commented Apr 28, 2026

This PR introduces a scheduler-side API that allows connectors to query the core BlockPool, as well as touch and free blocks.

This is an alternative to #39654 and #41011.
The difference here is it decouples the connector API from the vLLM internal structs (BlockPool, KVCacheBlock, BlockHashWithGroupId).

cc @NickLucche @ivanium

BTW the simple offloading tests are broken.
Apparently they are not part of the CI jobs.

Copy link
Copy Markdown

@claude claude Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Claude Code Review

This pull request is from a fork — automated review is disabled. A repository maintainer can comment @claude review to run a one-time review.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces the KVCacheState and KVCacheBlock protocols to provide a structural interface for KV cache state, decoupling KV connectors from the internal BlockPool implementation. Key changes include renaming bind_gpu_block_pool to bind_kv_cache_state, implementing the new interface within the BlockPool class—including a new iter_blocks method for LRU iteration—and updating the SimpleCPUOffloadConnector to utilize these abstractions. Additionally, helper properties for block and group hashes were added to the block utility class. I have no feedback to provide as there are no review comments to assess.

@orozery orozery force-pushed the connector-kv-cache-state branch from aa82f73 to 8e8482c Compare April 28, 2026 10:55
@mergify
Copy link
Copy Markdown
Contributor

mergify Bot commented Apr 28, 2026

Hi @orozery, the pre-commit checks have failed. Please run:

uv pip install pre-commit>=4.5.1
pre-commit install
pre-commit run --all-files

Then, commit the changes and push to your branch.

For future commits, pre-commit will run automatically on changed files before each commit.

Tip

Is mypy failing?
mypy is run differently in CI. If the failure is related to this check, please use the following command to run it locally:
# For mypy (substitute "3.10" with the failing version if needed)
pre-commit run --hook-stage manual mypy-3.10

@orozery orozery requested a review from markmc April 28, 2026 11:42
Copy link
Copy Markdown
Member

@markmc markmc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like the direction, nice work @orozery

For reference, see my comment in #39654 (comment)

It would be nice to eliminate these new # type: ignore ... but I don't have immediate solutions



@runtime_checkable
class KVCacheBlock(Protocol):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit confusing that there's vllm.v1.core.kv_cache_utils.KVCacheBlock and this one ... (with the same name)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to KVCacheBlockView

Comment thread vllm/v1/core/block_pool.py Outdated
Comment thread vllm/v1/core/block_pool.py Outdated
@@ -421,6 +425,27 @@ def free_blocks(self, ordered_blocks: Iterable[KVCacheBlock]) -> None:
[block for block in blocks_list if block.ref_cnt == 0 and not block.is_null]
)

def iter_blocks(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From Claude FWIW

  1. Iterator Resumption Semantics
  def iter_blocks(self, after_block: KVCacheBlock | None = None) -> Iterator[KVCacheBlock]:                                                                                                         
      """Yield free blocks in LRU order starting after the given block."""                                                                                                                          

Question: What happens if after_block was evicted from the free queue between calls?

Looking at SimpleCPUOffloadScheduler:

  # Validate cursor: stale if block was removed from free queue.
  if self._cursor is not None and self._cursor.ref_cnt > 0:     
      self._cursor = None  # Reset to head                                                                                                                                                          

Issue: This validation logic is in the connector, not in iter_blocks() itself. If external connectors use this API, they need to know:

  1. Check block.ref_cnt > 0 before passing to iter_blocks(after_block)
  2. Or handle the case where iteration starts from an unexpected position

Recommendation: Add a docstring note or validation in iter_blocks():

  def iter_blocks(self, after_block: KVCacheBlock | None = None) -> Iterator[KVCacheBlock]:                                                                                                         
      """Yield free blocks in LRU order starting after the given block.                                                                                                                             
                                                                                                                                                                                                    
      Args:                   
          after_block: Resume iteration after this block. If the block                                                                                                                              
              has been removed from the free queue (ref_cnt > 0), iteration
              behavior is undefined. Callers should validate ref_cnt == 0                                                                                                                           
              before resuming.                                                                                                                                                                      
      """                                                              

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to iter_free_blocks and added a relevant note.

# Scheduler-side methods
# ==============================

def bind_kv_cache_state(self, kv_cache_state: KVCacheState) -> None:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

docs suggestions from Claude. Definitely way, way too verbose, but probably bits of this worth including

Details ``` def bind_kv_cache_state(self, kv_cache_state: KVCacheState) -> None: """Bind the GPU KV cache state to the connector (scheduler-side only).
  Called by the scheduler after the KV cache manager is constructed,                                                                                                                            
  providing connectors with controlled access to the GPU block pool state.                                                                                                                      
  Connectors that need to track, protect, or iterate GPU blocks should
  override this method to store the reference.                                                                                                                                                  
                                                                                                                                                                                                
  Use Cases                                                                                                                                                                                     
  ---------                                                                                                                                                                                     
  1. **Asynchronous Transfer Protection**: Connectors performing async                                                                                                                          
     CPU↔GPU transfers can use `touch()` to increment block ref_cnt                                                                                                                             
     before transfer and `free_blocks()` after completion, preventing                                                                                                                           
     premature eviction/reuse while data is in-flight.                                                                                                                                          
                                                                                                                                                                                                
  2. **Lazy Offloading**: Connectors can use `iter_blocks()` to scan                                                                                                                            
     the GPU free queue for eviction candidates (blocks with ref_cnt=0)                                                                                                                         
     and proactively offload them before GPU memory pressure occurs.                                                                                                                            
                                                                                                                                                                                                
  3. **Block State Queries**: Connectors can inspect block metadata                                                                                                                             
     (hash, group_id, is_null) via `get_block()` to make caching or                                                                                                                             
     transfer decisions.                                                                                                                                                                        
                                                                                                                                                                                                
  KVCacheState Operations                                                                                                                                                                       
  ------------------------                                                                                                                                                                      
  - `get_block(block_id)`: Returns a read-only view of a block by ID.
    Use this to inspect blocks allocated to requests or passed via                                                                                                                              
    update_state_after_alloc().                                                                                                                                                                 
                                                                                                                                                                                                
  - `touch(blocks)`: Increments ref_cnt for each block, protecting them                                                                                                                         
    from eviction. Blocks with ref_cnt > 0 are removed from the free                                                                                                                            
    queue and cannot be allocated to other requests.                                                                                                                                            
                                                                                                                                                                                                
  - `free_blocks(blocks)`: Decrements ref_cnt for each block. When                                                                                                                              
    ref_cnt reaches 0, blocks are returned to the free queue in LRU                                                                                                                             
    order, becoming available for allocation or eviction.                                                                                                                                       
                                                                                                                                                                                                
  - `iter_blocks(after_block=None)`: Yields blocks from the GPU free                                                                                                                            
    queue in LRU (least-recently-used) order. Pass a previously yielded                                                                                                                         
    block to resume iteration from that point. Useful for cursor-based                                                                                                                          
    scanning of eviction candidates.                                                                                                                                                            
                                                                                                                                                                                                
  Block Lifecycle and ref_cnt
  ----------------------------                                                                                                                                                                  
  The scheduler manages GPU blocks through reference counting:
                                                                                                                                                                                                
  - ref_cnt = 0: Block is in the free queue, available for eviction
  - ref_cnt > 0: Block is in use (by requests or connector touch())                                                                                                                             
                                                                                                                                                                                                
  When a request finishes and calls `request_finished()`:                                                                                                                                       
  - If connector returns (False, None): Scheduler immediately frees blocks                                                                                                                      
    (decrements ref_cnt). Blocks with ref_cnt=0 enter free queue.                                                                                                                               
  - If connector returns (True, None): Scheduler defers freeing until                                                                                                                           
    the request_id appears in worker's `get_finished()` output.                                                                                                                                 
                                                                                                                                                                                                
  Connectors using async transfers should:                                                                                                                                                      
  1. touch() blocks before starting transfer                                                                                                                                                    
  2. Return (False, None) from request_finished() so scheduler frees
     request's ref immediately                                                                                                                                                                  
  3. Blocks stay alive via connector's touch() ref until transfer completes                                                                                                                     
  4. free_blocks() after transfer completes to release connector's ref                                                                                                                          
                                                                                                                                                                                                
  This pattern decouples request lifecycle from transfer lifecycle,                                                                                                                             
  allowing requests to finish while blocks remain protected.                                                                                                                                    
                                                                                                                                                                                                
  Guarantees                                                                                                                                                                                    
  ----------                                                                                                                                                                                    
  - Block references remain valid as long as ref_cnt > 0
  - Blocks returned by get_block() and iter_blocks() are always                                                                                                                                 
    valid KVCacheBlock instances from the current block pool                                                                                                                                    
  - iter_blocks() yields blocks in consistent LRU order unless                                                                                                                                  
    the free queue is modified between calls                                                                                                                                                    
  - touch()/free_blocks() are idempotent per block: multiple touches                                                                                                                            
    add to ref_cnt, multiple frees subtract from ref_cnt                                                                                                                                        
                                                                                                                                                                                                
  Restrictions                                                                                                                                                                                  
  ------------                                                                                                                                                                                  
  - Only touch/free blocks obtained via get_block(), iter_blocks(),                                                                                                                             
    or update_state_after_alloc(). Do not create custom block objects.                                                                                                                          
  - When resuming iter_blocks(after_block), the block must still be                                                                                                                             
    in the free queue (ref_cnt == 0). If a previously-yielded block                                                                                                                             
    was touched, iteration behavior is undefined. Validate ref_cnt                                                                                                                              
    before resuming or restart from the head (after_block=None).                                                                                                                                
  - Do not modify block._block_hash or other internal fields. These                                                                                                                             
    are managed by the scheduler and allocator.                                                                                                                                                 
  - This is a scheduler-side API only. Workers do not receive                                                                                                                                   
    kv_cache_state and should not call this method.                                                                                                                                             
                                                                                                                                                                                                
  Example: Async Transfer Protection                                                                                                                                                            
  -----------------------------------                                                                                                                                                           
  class MyOffloadConnector(KVConnectorBase_V1):                                                                                                                                                 
      def bind_kv_cache_state(self, kv_cache_state: KVCacheState):                                                                                                                              
          self._gpu_state = kv_cache_state                                                                                                                                                      
                          
      def update_state_after_alloc(self, request, blocks, num_external_tokens):                                                                                                                 
          # Protect GPU blocks during async CPU→GPU load                                                                                                                                        
          if num_external_tokens > 0:                                                                                                                                                           
              gpu_block_ids = blocks.get_block_ids()[0]  # First group                                                                                                                          
              gpu_blocks = [self._gpu_state.get_block(bid)                                                                                                                                      
                            for bid in gpu_block_ids]                                                                                                                                           
              self._gpu_state.touch(gpu_blocks)                                                                                                                                                 
              # Start async transfer...                                                                                                                                                         
              self._pending_loads[request.request_id] = gpu_blocks                                                                                                                              
                                                                                                                                                                                                
      def update_connector_output(self, connector_output):                                                                                                                                      
          # Release protection after transfer completes                                                                                                                                         
          for req_id in connector_output.finished_recving or []:                                                                                                                                
              gpu_blocks = self._pending_loads.pop(req_id)                                                                                                                                      
              self._gpu_state.free_blocks(gpu_blocks)                                                                                                                                           
                                                                                                                                                                                                
      def request_finished(self, request, block_ids):                                                                                                                                           
          # Return False so scheduler frees request ref immediately                                                                                                                             
          # Blocks stay alive via connector's touch() until transfer done                                                                                                                       
          return False, None                                                                                                                                                                    
                                                                                                                                                                                                
  Example: Lazy Offloading                                                                                                                                                                      
  -------------------------                                                                                                                                                                     
  class MyOffloadConnector(KVConnectorBase_V1):
      def bind_kv_cache_state(self, kv_cache_state: KVCacheState):                                                                                                                              
          self._gpu_state = kv_cache_state
          self._cursor = None  # Track scan position                                                                                                                                            
      
      def prepare_offload(self, target_free_blocks: int):                                                                                                                                       
          blocks_to_offload = []                                                                                                                                                                
          
          # Scan free queue for eviction candidates                                                                                                                                             
          for block in self._gpu_state.iter_blocks(self._cursor):                                                                                                                               
              # Validate cursor hasn't been evicted                                                                                                                                             
              if self._cursor and self._cursor.ref_cnt > 0:                                                                                                                                     
                  self._cursor = None  # Restart from head                                                                                                                                      
                  break                                                                                                                                                                         
                          
              self._cursor = block                                                                                                                                                              
                                                                                                                                                                                                
              # Only offload cached blocks (with hash)                                                                                                                                          
              if block.block_hash and not block.is_null:                                                                                                                                        
                  blocks_to_offload.append(block)
                                                                                                                                                                                                
              if len(blocks_to_offload) >= target_free_blocks:
                  break                                                                                                                                                                         
          
          # Protect blocks during offload                                                                                                                                                       
          self._gpu_state.touch(blocks_to_offload)
          # Start async GPU→CPU transfer...                                                                                                                                                     
          return blocks_to_offload                                                                                                                                                              
                                                                                                                                                                                                
  Args:                                                                                                                                                                                         
      kv_cache_state: Structural interface to the GPU block pool,                                                                                                                               
          providing get_block(), touch(), free_blocks(), and                                                                                                                                    
          iter_blocks() operations. In practice, this is the                                                                                                                                    
          scheduler's kv_cache_manager.block_pool.                                                                                                                                              
                                                                                                                                                                                                
  Notes:                                                                                                                                                                                        
      - The default implementation is a no-op. Only override if your
        connector needs GPU block state access.                                                                                                                                                 
      - This method is called exactly once during scheduler initialization,                                                                                                                     
        after the KV cache manager is ready but before any requests                                                                                                                             
        are scheduled.                                                                                                                                                                          
      - External connectors (living outside vllm codebase) can safely                                                                                                                           
        use this API as it's part of the stable KVConnectorBase_V1                                                                                                                              
        interface with backwards compatibility guarantees.                                                                                                                                      
  """                                                                                                                                                                                           
  return                                                  
</details>

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some more docs

def group_id(self) -> int | None: ...


class KVCacheState(Protocol):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be inclined to still call it KVCacheBlockPool or something ... there is a certain symmetry in SimpleCPUOffloadingConnector around using the block pool interface for both GPU and CPU blocks, and that's sort of hidden with this KVCacheState naming

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the current time, KVCacheBlockPool might seem better.
However, we would like to extend this class to allow connectors to:

  1. Get the list of blocks per a request (KV cache manager level)
  2. Get the list of waiting requests (Scheduler-level)

So thinking ahead, I actually think SchedulerContext is the best name.

@orozery orozery changed the title KVConnector: Introduce bind_kv_cache_state KVConnector: Introduce bind_scheduler_context Apr 29, 2026
@mergify
Copy link
Copy Markdown
Contributor

mergify Bot commented Apr 29, 2026

Hi @orozery, the pre-commit checks have failed. Please run:

uv pip install pre-commit>=4.5.1
pre-commit install
pre-commit run --all-files

Then, commit the changes and push to your branch.

For future commits, pre-commit will run automatically on changed files before each commit.

Tip

Is mypy failing?
mypy is run differently in CI. If the failure is related to this check, please use the following command to run it locally:
# For mypy (substitute "3.10" with the failing version if needed)
pre-commit run --hook-stage manual mypy-3.10

@orozery
Copy link
Copy Markdown
Collaborator Author

orozery commented Apr 29, 2026

It would be nice to eliminate these new # type: ignore ... but I don't have immediate solutions

Left only ignores related to BlockHashWithGroupID type.
The simple cpu offload connector actually uses this type (which should be opaque bytes) as an input to internal vLLM structs (the block pool, being used as a cpu block pool).
If we want to remove the ignore we can move BlockHashWithGroupID out from kv_cache_utils.py to somewhere else.
But I think that the CPU offload connector case is unique. I would stay with the ignore for now.

orozery added 5 commits April 29, 2026 10:37
This commit introduces a scheduler-side API that allows connectors
to query the core BlockPool, as well as touch and free blocks.

Signed-off-by: Or Ozeri <oro@il.ibm.com>
Signed-off-by: Or Ozeri <oro@il.ibm.com>
Signed-off-by: Or Ozeri <oro@il.ibm.com>
Signed-off-by: Or Ozeri <oro@il.ibm.com>
Signed-off-by: Or Ozeri <oro@il.ibm.com>
@orozery orozery force-pushed the connector-kv-cache-state branch from df1c960 to f35dbb1 Compare April 29, 2026 07:38
Signed-off-by: Or Ozeri <oro@il.ibm.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants