From 9b93e06fe924a5ee729f8115a74f7468c2d0869e Mon Sep 17 00:00:00 2001 From: Can Sun Date: Tue, 16 Sep 2025 14:55:02 -0700 Subject: [PATCH 1/2] [Feat] Add HashTrie LRU to prevent OOM Signed-off-by: Can Sun --- src/tests/test_prefixaware_router.py | 109 +++++++++++++++++ src/vllm_router/prefix/__init__.py | 30 +++++ src/vllm_router/prefix/config.py | 58 +++++++++ src/vllm_router/prefix/hashtrie.py | 144 +++++++++++++++++++++-- src/vllm_router/routers/routing_logic.py | 6 +- 5 files changed, 333 insertions(+), 14 deletions(-) create mode 100644 src/tests/test_prefixaware_router.py create mode 100644 src/vllm_router/prefix/__init__.py create mode 100644 src/vllm_router/prefix/config.py diff --git a/src/tests/test_prefixaware_router.py b/src/tests/test_prefixaware_router.py new file mode 100644 index 000000000..64048acf5 --- /dev/null +++ b/src/tests/test_prefixaware_router.py @@ -0,0 +1,109 @@ +import asyncio +from typing import Dict + +from vllm_router.routers.routing_logic import PrefixAwareRouter + + +class EndpointInfo: + def __init__(self, url: str): + self.url = url + + +class RequestStats: + def __init__(self, qps: float): + self.qps = qps + + +class Request: + def __init__(self, headers: Dict[str, str]): + self.headers = headers + + +class EngineStats: + def __init__(self): + return + + +def test_prefixaware_logic(): + endpoints = [ + EndpointInfo(url="http://engine1.com"), + EndpointInfo(url="http://engine2.com"), + ] + request_stats = { + "http://engine1.com": RequestStats(qps=10), + "http://engine2.com": RequestStats(qps=5), + } + request = Request(headers={}) + + router = PrefixAwareRouter() + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + request_json = {"prompt": "Hello, how are you today?"} + url = loop.run_until_complete( + router.route_request(endpoints, None, request_stats, request, request_json) + ) + assert url in [endpoint.url for endpoint in endpoints] + + # Same request should route to same endpoint + url2 = loop.run_until_complete( + router.route_request(endpoints, None, request_stats, request, request_json) + ) + assert url == url2, "Same request should route to same endpoint" + + # chat messages should work + chat_request = { + "messages": [{"role": "user", "content": "Hello, how are you?"}] + } + url3 = loop.run_until_complete( + router.route_request(endpoints, None, request_stats, request, chat_request) + ) + assert url3 in [endpoint.url for endpoint in endpoints] + + finally: + loop.close() + + +def test_hashtrie_eviction(): + from vllm_router.prefix.config import HashTrieConfig + from vllm_router.prefix.hashtrie import HashTrie + + # Create a config with very small memory limit to trigger eviction + config = HashTrieConfig.from_defaults( + chunk_size=4, # Small chunk size + max_memory_size=0.0, # 0 MB - should trigger immediate eviction + eviction_threshold=0.5, + target_utilization=0.0, # evict all of the nodes + memory_check_request_batch_size=5, + ) + + # Create a new HashTrie with the restrictive config + hashtrie = HashTrie(config) + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + # Insert a request - this should trigger eviction due to 0 MB limit + request = "Hello world, this is a test request" + endpoint = "http://engine1.com" + + # Before insertion + initial_size = len(hashtrie.node_cache) + assert initial_size == 0, "HashTrie should start empty" + + # Insert a couple of times to trigger the memory check and cache eviction + for i in range(config.memory_check_request_batch_size): + loop.run_until_complete(hashtrie.insert(request, endpoint)) + + # After insertion with 0 MB limit, eviction should have occurred + # The trie might be empty or have very few nodes due to aggressive eviction + final_size = len(hashtrie.node_cache) + + # With 0 MB limit, the eviction should keep the trie very small + assert ( + final_size == 0 + ), f"HashTrie should be small after eviction, got {final_size} nodes" + + finally: + loop.close() diff --git a/src/vllm_router/prefix/__init__.py b/src/vllm_router/prefix/__init__.py new file mode 100644 index 000000000..f8634a63e --- /dev/null +++ b/src/vllm_router/prefix/__init__.py @@ -0,0 +1,30 @@ +# Copyright 2024-2025 The vLLM Production Stack Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +vLLM Router Prefix Matching Module + +This module provides HashTrie-based prefix matching with LRU eviction +for efficient request routing in vLLM production environments. +""" + +from .config import HashTrieConfig +from .hashtrie import HashTrie, NodeMetadata, TrieNode + +__all__ = [ + "HashTrie", + "HashTrieConfig", + "NodeMetadata", + "TrieNode", +] diff --git a/src/vllm_router/prefix/config.py b/src/vllm_router/prefix/config.py new file mode 100644 index 000000000..d7b0532a2 --- /dev/null +++ b/src/vllm_router/prefix/config.py @@ -0,0 +1,58 @@ +# Copyright 2024-2025 The vLLM Production Stack Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +from dataclasses import dataclass +from typing import Optional + + +@dataclass +class HashTrieConfig: + """Configuration for HashTrie with LRU eviction""" + + chunk_size: int = 0 + max_memory_size: int = 0 # GB + eviction_threshold: float = 0 # start evicting at + target_utilization: float = 0 # evict down to + memory_check_request_batch_size: int = 0 + + @staticmethod + def from_defaults( + chunk_size: int = 128, + max_memory_size: int = 2, + eviction_threshold: float = 0.9, + target_utilization: float = 0.5, + memory_check_request_batch_size: int = 10, + ) -> "HashTrieConfig": + """Create configuration with default values""" + return HashTrieConfig( + chunk_size, + max_memory_size, + eviction_threshold, + target_utilization, + memory_check_request_batch_size, + ) + + @staticmethod + def from_env() -> "HashTrieConfig": + """Load configuration from environment variables""" + return HashTrieConfig( + chunk_size=int(os.getenv("HASHTRIE_CHUNK_SIZE", "128")), + max_memory_size=int(os.getenv("PREFIXAWARE_MAX_MEMORY_SIZE_GB", "2")), + eviction_threshold=float(os.getenv("HASHTRIE_EVICTION_THRESHOLD", "0.9")), + target_utilization=float(os.getenv("HASHTRIE_TARGET_UTILIZATION", "0.5")), + memory_check_request_batch_size=int( + os.getenv("MEMORY_CHECK_REQUEST_BATCH_SIZE", "10") + ), + ) diff --git a/src/vllm_router/prefix/hashtrie.py b/src/vllm_router/prefix/hashtrie.py index 2316159ba..417dc528d 100644 --- a/src/vllm_router/prefix/hashtrie.py +++ b/src/vllm_router/prefix/hashtrie.py @@ -14,17 +14,30 @@ import asyncio import logging -from typing import Generator, Set, Tuple +import os +from collections import OrderedDict +from typing import Dict, Generator, List, Optional, Set, Tuple +import psutil import xxhash +from .config import HashTrieConfig + logger = logging.getLogger(__name__) +class NodeMetadata: + """Metadata for nodes tracked in LRU cache""" + + def __init__(self, parent: "TrieNode", child_hash: int): + self.parent = parent + self.child_hash = child_hash + + class TrieNode: def __init__(self): - self.children = {} - self.endpoints = set() + self.children: Dict[int, "TrieNode"] = {} + self.endpoints: Set[str] = set() # assign a lock for each trie node. # this assures that each node will only be accessed by one co-routine @@ -33,14 +46,27 @@ def __init__(self): class HashTrie: - def __init__(self, chunk_size: int = 128): + def __init__(self, config: Optional[HashTrieConfig] = None): """ - Initialize the HashTrie. + Initialize the HashTrie with LRU eviction based on system memory pressure. Args: - chunk_size (int): the string chunk size (in terms of # characters) + config (HashTrieConfig): Configuration for the HashTrie """ + self.config = config or HashTrieConfig.from_defaults() self.root = TrieNode() - self.chunk_size = chunk_size + + # HashTrie LRU + self.node_cache = OrderedDict[TrieNode, NodeMetadata]() + # eviction threshold + self.memory_threshold_mb = ( + self.config.max_memory_size * 1024 * self.config.eviction_threshold + ) + # eviction percentage + self.eviction_target_percentage = 1.0 - self.config.target_utilization + self.current_memory_mb = 0.0 + self.operation_count = 0 + # ensure that eviction is performed by only one co-routine + self.eviction_lock = asyncio.Lock() def _chunk_and_hash(self, request: str) -> Generator[int, None, None]: """ @@ -51,13 +77,13 @@ def _chunk_and_hash(self, request: str) -> Generator[int, None, None]: Generator[int, None, None]: A generator that yields a hash for each chunk. """ - - for i in range(0, len(request), self.chunk_size): - yield xxhash.xxh64(request[i : i + self.chunk_size]).intdigest() + for i in range(0, len(request), self.config.chunk_size): + chunk = request[i : i + self.config.chunk_size] + yield xxhash.xxh64(chunk).intdigest() async def insert(self, request: str, endpoint: str) -> None: """ - Insert the request and endpoint into the trie. + Insert request-endpoint mapping with LRU tracking and memory pressure monitoring. Args: request (str): The request to insert. endpoint (str): The endpoint to insert. @@ -65,14 +91,60 @@ async def insert(self, request: str, endpoint: str) -> None: node = self.root async with node.lock: node.endpoints.add(endpoint) + + path_nodes = [(self.root, None)] # (node, hash_to_reach_it) + for chunk_hash in self._chunk_and_hash(request): async with node.lock: if chunk_hash not in node.children: node.children[chunk_hash] = TrieNode() node = node.children[chunk_hash] + path_nodes.append((node, chunk_hash)) + async with node.lock: node.endpoints.add(endpoint) + # add nodes to LRU in reverse order of insert which ensures child nodes are always evicted + # before parent nodes. Parent node will definitely be accessed when a child node is matched. + # Thus evicting from child nodes should have least impact. + for i in range(len(path_nodes) - 1, 0, -1): # Skip root (index 0) + current_node, child_hash = path_nodes[i] + parent_node, _ = path_nodes[i - 1] + + if current_node not in self.node_cache: + # add new node (appears older due to reverse order) + self.node_cache[current_node] = NodeMetadata( + parent=parent_node, child_hash=child_hash + ) + else: + self.node_cache.move_to_end(current_node) + + async with self.eviction_lock: + # Track operations and check if eviction is needed + self.operation_count = ( + self.operation_count + 1 + ) % self.config.memory_check_request_batch_size + + # Check if we should evict + if self.operation_count == 0: + try: + process = psutil.Process(os.getpid()) + memory_mb = process.memory_info().rss / (1024 * 1024) + + # python may not release the memory back to OS after GC thus the same memory block will + # be reused to create new nodes. Skip the eviction if memory usage does not change to + # prevent duplicate evictions + if memory_mb != self.current_memory_mb: + self.current_memory_mb = memory_mb + if memory_mb > self.memory_threshold_mb: + await self.batch_evict(self.eviction_target_percentage) + else: + logger.info( + f"Eviction skipped - no memory change detected: {memory_mb:.1f}MB" + ) + except: + pass + async def longest_prefix_match( self, request: str, available_endpoints: Set[str] = set() ) -> Tuple[int, Set[str]]: @@ -97,7 +169,55 @@ async def longest_prefix_match( # reached longest prefix match in currently-available endpoints. if not intersection: break - match_length += self.chunk_size + match_length += self.config.chunk_size selected_endpoints = intersection return match_length, selected_endpoints + + async def batch_evict(self, eviction_percentage) -> None: + """ + Manually evict a percentage of LRU nodes. + + Args: + eviction_percentage: Percentage of nodes to evict + """ + if len(self.node_cache) == 0: + return + + initial_count = len(self.node_cache) + target_evictions = max(1, int(initial_count * eviction_percentage)) + evict_nodes = [] + for node in list(self.node_cache.keys())[:target_evictions]: + evict_nodes.append(node) + + evicted_count = 0 + for evict_node in evict_nodes: + if evict_node in self.node_cache: + await self._evict_node(evict_node) + evicted_count += 1 + + logger.info( + f"Batch eviction completed - evicted {evicted_count} out of {initial_count} nodes" + ) + + async def _evict_node(self, node: TrieNode) -> None: + """ + Evict a single node + + Args: + node: the HashTrie node to be evicted + """ + if node not in self.node_cache: + return + + node_metadata = self.node_cache[node] + parent_node = node_metadata.parent + child_hash = node_metadata.child_hash + + # Remove from parent's children list + async with parent_node.lock: + if child_hash in parent_node.children: + del parent_node.children[child_hash] + + # Remove from cache + del self.node_cache[node] diff --git a/src/vllm_router/routers/routing_logic.py b/src/vllm_router/routers/routing_logic.py index f5ae7cced..b131eb0e3 100644 --- a/src/vllm_router/routers/routing_logic.py +++ b/src/vllm_router/routers/routing_logic.py @@ -352,12 +352,14 @@ class PrefixAwareRouter(RoutingInterface): In this class, we assume that there is no eviction of prefix cache. """ - def __init__(self: int): + def __init__(self): if hasattr(self, "_initialized"): return + from vllm_router.prefix.config import HashTrieConfig from vllm_router.prefix.hashtrie import HashTrie - self.hashtrie = HashTrie() + config = HashTrieConfig.from_env() + self.hashtrie = HashTrie(config) self._initialized = True async def route_request( From a54321df6024e76b14442f42dc129ab39ccb4312 Mon Sep 17 00:00:00 2001 From: Can Sun Date: Tue, 23 Sep 2025 12:41:21 -0700 Subject: [PATCH 2/2] Fix pre-commit failures Signed-off-by: Can Sun --- src/vllm_router/prefix/config.py | 1 - src/vllm_router/prefix/hashtrie.py | 6 +++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/vllm_router/prefix/config.py b/src/vllm_router/prefix/config.py index d7b0532a2..2b1f00592 100644 --- a/src/vllm_router/prefix/config.py +++ b/src/vllm_router/prefix/config.py @@ -14,7 +14,6 @@ import os from dataclasses import dataclass -from typing import Optional @dataclass diff --git a/src/vllm_router/prefix/hashtrie.py b/src/vllm_router/prefix/hashtrie.py index 417dc528d..9a1351e88 100644 --- a/src/vllm_router/prefix/hashtrie.py +++ b/src/vllm_router/prefix/hashtrie.py @@ -16,7 +16,7 @@ import logging import os from collections import OrderedDict -from typing import Dict, Generator, List, Optional, Set, Tuple +from typing import Dict, Generator, Optional, Set, Tuple import psutil import xxhash @@ -142,8 +142,8 @@ async def insert(self, request: str, endpoint: str) -> None: logger.info( f"Eviction skipped - no memory change detected: {memory_mb:.1f}MB" ) - except: - pass + except Exception as e: + logger.error(f"Eviction failed - error message: {e}") async def longest_prefix_match( self, request: str, available_endpoints: Set[str] = set()