From aa5c00053d52c85b91e595224f183ffaa8a4b964 Mon Sep 17 00:00:00 2001 From: KuntaiDu Date: Thu, 29 Jan 2026 23:24:40 +0000 Subject: [PATCH 1/9] ensure that get_finished only return once per request Signed-off-by: KuntaiDu --- .../kv_connector/v1/lmcache_mp_connector.py | 31 ++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py index 629170615dd8..60e518637976 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py @@ -251,7 +251,7 @@ def __str__(self) -> str: @dataclass class LMCacheMPRequestMetadata: request_id: str - direction: Literal["STORE", "RETRIEVE"] + direction: Literal["STORE", "RETRIEVE", "SKIP"] op: LoadStoreOp @staticmethod @@ -349,16 +349,22 @@ class LMCacheMPConnectorMetadata(KVConnectorMetadata): def __init__(self): super().__init__() self.requests: list[LMCacheMPRequestMetadata] = [] + self.executing_request_ids_at_this_step: set[str] = set() def add_request_metadata(self, request_metadata: LMCacheMPRequestMetadata): self.requests.append(request_metadata) + def add_new_executing_request_ids_at_this_step(self, request_ids: set[str]): + self.executing_request_ids_at_this_step.update(request_ids) + def __len__(self): return len(self.requests) # For debugging def __str__(self): request_strs = [] + for req_id in self.executing_request_ids_at_this_step: + request_strs.append(f"NewRequestArrived(request_id={req_id})") for req_meta in self.requests: request_strs.append( f"RequestMetadata(request_id={req_meta.request_id}, " @@ -413,6 +419,8 @@ def __init__( self.vllm_block_size = vllm_config.cache_config.block_size + self._executing_request_ids: set[str] = set() + @property def role(self) -> KVConnectorRole: return self._role @@ -560,6 +568,9 @@ def get_finished( The finished saves/sends req ids must belong to a set provided in a call to this method (this call or a prior one). """ + # first, add the executing request ids to internal state + meta = self._get_connector_metadata() + self.worker_adapter.add_new_executing_request_ids_at_this_step_worker(meta.executing_request_ids_at_this_step) val = self.worker_adapter.get_finished(finished_req_ids) # logger.error("Finished req ids: %s, %s", val[0], val[1]) return val @@ -730,6 +741,10 @@ def build_connector_meta( self._process_new_requests(scheduler_output, metadata) self._process_cached_requests(scheduler_output, metadata) + metadata.add_new_executing_request_ids_at_this_step( + self._get_new_executing_request_ids_at_this_step(scheduler_output) + ) + if len(metadata) > 0: logger.debug("Final connector metadata: %s", metadata) @@ -766,6 +781,10 @@ def request_finished( """ # Clean up request tracker to prevent memory leak self._cleanup_request_tracker(request.request_id) + + # Remove from existing scheduling request id, to prevent memory leak. + self.scheduler_adapter.remove_from_executing_request_ids(request.request_id) + return True, None def take_events(self) -> Iterable["KVCacheEvent"]: @@ -898,6 +917,16 @@ def _process_cached_requests( if r_meta is not None: metadata.add_request_metadata(r_meta) + + def _get_new_executing_request_ids_at_this_step( + self, scheduler_output: SchedulerOutput, + ) -> list[str]: + + reqs = scheduler_output.scheduled_new_reqs + for req in reqs: + self.scheduler_adapter.maybe_add_to_executing_request_ids(req.req_id) + return self.scheduler_adapter.get_executing_request_ids_at_this_step() + def _get_request_tracker(self, request_id: str) -> LMCacheMPRequestTracker: assert request_id in self.request_trackers, ( f"Request tracker for request_id {request_id} not found. " From f6ed9de793b9a0ee88738164ee12aa8e5fff24e6 Mon Sep 17 00:00:00 2001 From: KuntaiDu Date: Thu, 29 Jan 2026 23:50:17 +0000 Subject: [PATCH 2/9] simplify the logic and make sure the merge order does not affect functionality Signed-off-by: KuntaiDu --- .../kv_connector/v1/lmcache_mp_connector.py | 46 ++++++++----------- 1 file changed, 20 insertions(+), 26 deletions(-) diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py index 60e518637976..26171172896b 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py @@ -349,13 +349,14 @@ class LMCacheMPConnectorMetadata(KVConnectorMetadata): def __init__(self): super().__init__() self.requests: list[LMCacheMPRequestMetadata] = [] - self.executing_request_ids_at_this_step: set[str] = set() + # New request IDs this step - for double-free prevention in async scheduling + self.new_req_ids: set[str] = set() def add_request_metadata(self, request_metadata: LMCacheMPRequestMetadata): self.requests.append(request_metadata) - def add_new_executing_request_ids_at_this_step(self, request_ids: set[str]): - self.executing_request_ids_at_this_step.update(request_ids) + def set_new_req_ids(self, request_ids: set[str]): + self.new_req_ids = request_ids def __len__(self): return len(self.requests) @@ -363,8 +364,8 @@ def __len__(self): # For debugging def __str__(self): request_strs = [] - for req_id in self.executing_request_ids_at_this_step: - request_strs.append(f"NewRequestArrived(request_id={req_id})") + if self.new_req_ids: + request_strs.append(f"NewReqs({self.new_req_ids})") for req_meta in self.requests: request_strs.append( f"RequestMetadata(request_id={req_meta.request_id}, " @@ -419,8 +420,6 @@ def __init__( self.vllm_block_size = vllm_config.cache_config.block_size - self._executing_request_ids: set[str] = set() - @property def role(self) -> KVConnectorRole: return self._role @@ -568,12 +567,13 @@ def get_finished( The finished saves/sends req ids must belong to a set provided in a call to this method (this call or a prior one). """ - # first, add the executing request ids to internal state + # Register new request IDs for double-free prevention (async scheduling). + # Use hasattr for merge-order independence: if LMCache doesn't have + # track_new_reqs yet, skip gracefully (feature disabled). meta = self._get_connector_metadata() - self.worker_adapter.add_new_executing_request_ids_at_this_step_worker(meta.executing_request_ids_at_this_step) - val = self.worker_adapter.get_finished(finished_req_ids) - # logger.error("Finished req ids: %s, %s", val[0], val[1]) - return val + if hasattr(self.worker_adapter, "track_new_reqs"): + self.worker_adapter.track_new_reqs(meta.new_req_ids) + return self.worker_adapter.get_finished(finished_req_ids) def get_block_ids_with_load_errors(self) -> set[int]: """ @@ -741,9 +741,7 @@ def build_connector_meta( self._process_new_requests(scheduler_output, metadata) self._process_cached_requests(scheduler_output, metadata) - metadata.add_new_executing_request_ids_at_this_step( - self._get_new_executing_request_ids_at_this_step(scheduler_output) - ) + metadata.set_new_req_ids(self._collect_new_req_ids(scheduler_output)) if len(metadata) > 0: logger.debug("Final connector metadata: %s", metadata) @@ -782,8 +780,8 @@ def request_finished( # Clean up request tracker to prevent memory leak self._cleanup_request_tracker(request.request_id) - # Remove from existing scheduling request id, to prevent memory leak. - self.scheduler_adapter.remove_from_executing_request_ids(request.request_id) + # Untrack request from scheduler-side tracking to prevent memory leak. + self.scheduler_adapter.untrack_request(request.request_id) return True, None @@ -917,15 +915,11 @@ def _process_cached_requests( if r_meta is not None: metadata.add_request_metadata(r_meta) - - def _get_new_executing_request_ids_at_this_step( - self, scheduler_output: SchedulerOutput, - ) -> list[str]: - - reqs = scheduler_output.scheduled_new_reqs - for req in reqs: - self.scheduler_adapter.maybe_add_to_executing_request_ids(req.req_id) - return self.scheduler_adapter.get_executing_request_ids_at_this_step() + def _collect_new_req_ids(self, scheduler_output: SchedulerOutput) -> set[str]: + """Collect newly scheduled request IDs for double-free prevention.""" + for req in scheduler_output.scheduled_new_reqs: + self.scheduler_adapter.track_request(req.req_id) + return self.scheduler_adapter.pop_new_reqs_this_step() def _get_request_tracker(self, request_id: str) -> LMCacheMPRequestTracker: assert request_id in self.request_trackers, ( From cfcd13e1a0bf8bc3aa694d52c8e2aa74aea65df0 Mon Sep 17 00:00:00 2001 From: KuntaiDu Date: Fri, 30 Jan 2026 06:07:20 +0000 Subject: [PATCH 3/9] remove unnecessary skip Signed-off-by: KuntaiDu --- .../kv_transfer/kv_connector/v1/lmcache_mp_connector.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py index 26171172896b..174091f88f0a 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py @@ -251,7 +251,7 @@ def __str__(self) -> str: @dataclass class LMCacheMPRequestMetadata: request_id: str - direction: Literal["STORE", "RETRIEVE", "SKIP"] + direction: Literal["STORE", "RETRIEVE"] op: LoadStoreOp @staticmethod From b5c978d6038c750616fe8d25f11cd07c60b33a46 Mon Sep 17 00:00:00 2001 From: KuntaiDu Date: Fri, 30 Jan 2026 06:19:30 +0000 Subject: [PATCH 4/9] simplify logic --- don't use complicated loop to track new request, just simply check scheduleroutput new requests Signed-off-by: KuntaiDu --- .../kv_transfer/kv_connector/v1/lmcache_mp_connector.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py index 174091f88f0a..bac62ec5b3f0 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py @@ -780,9 +780,6 @@ def request_finished( # Clean up request tracker to prevent memory leak self._cleanup_request_tracker(request.request_id) - # Untrack request from scheduler-side tracking to prevent memory leak. - self.scheduler_adapter.untrack_request(request.request_id) - return True, None def take_events(self) -> Iterable["KVCacheEvent"]: @@ -917,9 +914,7 @@ def _process_cached_requests( def _collect_new_req_ids(self, scheduler_output: SchedulerOutput) -> set[str]: """Collect newly scheduled request IDs for double-free prevention.""" - for req in scheduler_output.scheduled_new_reqs: - self.scheduler_adapter.track_request(req.req_id) - return self.scheduler_adapter.pop_new_reqs_this_step() + return {req.req_id for req in scheduler_output.scheduled_new_reqs} def _get_request_tracker(self, request_id: str) -> LMCacheMPRequestTracker: assert request_id in self.request_trackers, ( From 1781a02af52b15dcf745db0a3eaa9771943f2009 Mon Sep 17 00:00:00 2001 From: KuntaiDu Date: Fri, 30 Jan 2026 06:47:21 +0000 Subject: [PATCH 5/9] fix the check --- it needs to check that the request is finished instead of just checking it is None Signed-off-by: KuntaiDu --- .../kv_connector/v1/lmcache_mp_connector.py | 24 +++---------------- vllm/v1/core/sched/scheduler.py | 7 ++++-- 2 files changed, 8 insertions(+), 23 deletions(-) diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py index bac62ec5b3f0..629170615dd8 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py @@ -349,23 +349,16 @@ class LMCacheMPConnectorMetadata(KVConnectorMetadata): def __init__(self): super().__init__() self.requests: list[LMCacheMPRequestMetadata] = [] - # New request IDs this step - for double-free prevention in async scheduling - self.new_req_ids: set[str] = set() def add_request_metadata(self, request_metadata: LMCacheMPRequestMetadata): self.requests.append(request_metadata) - def set_new_req_ids(self, request_ids: set[str]): - self.new_req_ids = request_ids - def __len__(self): return len(self.requests) # For debugging def __str__(self): request_strs = [] - if self.new_req_ids: - request_strs.append(f"NewReqs({self.new_req_ids})") for req_meta in self.requests: request_strs.append( f"RequestMetadata(request_id={req_meta.request_id}, " @@ -567,13 +560,9 @@ def get_finished( The finished saves/sends req ids must belong to a set provided in a call to this method (this call or a prior one). """ - # Register new request IDs for double-free prevention (async scheduling). - # Use hasattr for merge-order independence: if LMCache doesn't have - # track_new_reqs yet, skip gracefully (feature disabled). - meta = self._get_connector_metadata() - if hasattr(self.worker_adapter, "track_new_reqs"): - self.worker_adapter.track_new_reqs(meta.new_req_ids) - return self.worker_adapter.get_finished(finished_req_ids) + val = self.worker_adapter.get_finished(finished_req_ids) + # logger.error("Finished req ids: %s, %s", val[0], val[1]) + return val def get_block_ids_with_load_errors(self) -> set[int]: """ @@ -741,8 +730,6 @@ def build_connector_meta( self._process_new_requests(scheduler_output, metadata) self._process_cached_requests(scheduler_output, metadata) - metadata.set_new_req_ids(self._collect_new_req_ids(scheduler_output)) - if len(metadata) > 0: logger.debug("Final connector metadata: %s", metadata) @@ -779,7 +766,6 @@ def request_finished( """ # Clean up request tracker to prevent memory leak self._cleanup_request_tracker(request.request_id) - return True, None def take_events(self) -> Iterable["KVCacheEvent"]: @@ -912,10 +898,6 @@ def _process_cached_requests( if r_meta is not None: metadata.add_request_metadata(r_meta) - def _collect_new_req_ids(self, scheduler_output: SchedulerOutput) -> set[str]: - """Collect newly scheduled request IDs for double-free prevention.""" - return {req.req_id for req in scheduler_output.scheduled_new_reqs} - def _get_request_tracker(self, request_id: str) -> LMCacheMPRequestTracker: assert request_id in self.request_trackers, ( f"Request tracker for request_id {request_id} not found. " diff --git a/vllm/v1/core/sched/scheduler.py b/vllm/v1/core/sched/scheduler.py index 30a459386a73..ebfa0fcd19d0 100644 --- a/vllm/v1/core/sched/scheduler.py +++ b/vllm/v1/core/sched/scheduler.py @@ -1270,10 +1270,13 @@ def update_from_output( # skip failed or rescheduled requests from KV load failure continue request = self.requests.get(req_id) - if request is None: + if request is None or request.is_finished(): # The request is already finished. This can happen if the # request is aborted while the model is executing it (e.g., - # in pipeline parallelism). + # in pipeline parallelism). When delay_free_blocks=True (for + # KV transfer), the request stays in self.requests but is + # already finished - we must check is_finished() to avoid + # double-free. continue req_index = model_runner_output.req_id_to_index[req_id] From 9e44b2d99a9f9dd6986eec1956b6fa91a11b1631 Mon Sep 17 00:00:00 2001 From: KuntaiDu Date: Fri, 30 Jan 2026 06:49:20 +0000 Subject: [PATCH 6/9] edit the docstring Signed-off-by: KuntaiDu --- vllm/v1/core/sched/scheduler.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/vllm/v1/core/sched/scheduler.py b/vllm/v1/core/sched/scheduler.py index ebfa0fcd19d0..1131a40f30ad 100644 --- a/vllm/v1/core/sched/scheduler.py +++ b/vllm/v1/core/sched/scheduler.py @@ -1273,10 +1273,11 @@ def update_from_output( if request is None or request.is_finished(): # The request is already finished. This can happen if the # request is aborted while the model is executing it (e.g., - # in pipeline parallelism). When delay_free_blocks=True (for - # KV transfer), the request stays in self.requests but is - # already finished - we must check is_finished() to avoid - # double-free. + # in pipeline parallelism). + # NOTE(Kuntai): When delay_free_blocks=True (for async KV + # cache transfer in KV connector), the request stays in + # self.requests but is already finished - we must check + # is_finished() to avoid double-free. continue req_index = model_runner_output.req_id_to_index[req_id] From 4d2282c30347c8829db574742f5b16e5e88e68c1 Mon Sep 17 00:00:00 2001 From: KuntaiDu Date: Fri, 30 Jan 2026 06:59:12 +0000 Subject: [PATCH 7/9] adjust doc string Signed-off-by: KuntaiDu --- vllm/v1/core/sched/scheduler.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/vllm/v1/core/sched/scheduler.py b/vllm/v1/core/sched/scheduler.py index 1131a40f30ad..8a639b012c59 100644 --- a/vllm/v1/core/sched/scheduler.py +++ b/vllm/v1/core/sched/scheduler.py @@ -1273,10 +1273,10 @@ def update_from_output( if request is None or request.is_finished(): # The request is already finished. This can happen if the # request is aborted while the model is executing it (e.g., - # in pipeline parallelism). + # in pipeline parallelism) or in async scheduling. # NOTE(Kuntai): When delay_free_blocks=True (for async KV - # cache transfer in KV connector), the request stays in - # self.requests but is already finished - we must check + # cache transfer in KV connector), the aborted request stays in + # self.requests but is already finished - we also need to check # is_finished() to avoid double-free. continue From e4649679e791c0cb1d56df3e3f5ab8bbd0b24aff Mon Sep 17 00:00:00 2001 From: KuntaiDu Date: Fri, 30 Jan 2026 07:07:33 +0000 Subject: [PATCH 8/9] adjust docstring to be more descriptive Signed-off-by: KuntaiDu --- vllm/v1/core/sched/scheduler.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/vllm/v1/core/sched/scheduler.py b/vllm/v1/core/sched/scheduler.py index 9e0c49c3fda2..452317baa954 100644 --- a/vllm/v1/core/sched/scheduler.py +++ b/vllm/v1/core/sched/scheduler.py @@ -1271,9 +1271,9 @@ def update_from_output( # request is aborted while the model is executing it (e.g., # in pipeline parallelism) or in async scheduling. # NOTE(Kuntai): When delay_free_blocks=True (for async KV - # cache transfer in KV connector), the aborted request stays in - # self.requests but is already finished - we also need to check - # is_finished() to avoid double-free. + # cache transfer in KV connector), the aborted request will not + # be set to None (in order to finish async KV transfer). + # In this case, we use is_finished() to check. continue req_index = model_runner_output.req_id_to_index[req_id] From 8f65d7adbeacd111b28bd54f8e0de470b76ce486 Mon Sep 17 00:00:00 2001 From: KuntaiDu Date: Fri, 30 Jan 2026 07:08:24 +0000 Subject: [PATCH 9/9] grammar fix Signed-off-by: KuntaiDu --- vllm/v1/core/sched/scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm/v1/core/sched/scheduler.py b/vllm/v1/core/sched/scheduler.py index 452317baa954..53e4fc8d8e59 100644 --- a/vllm/v1/core/sched/scheduler.py +++ b/vllm/v1/core/sched/scheduler.py @@ -1269,7 +1269,7 @@ def update_from_output( if request is None or request.is_finished(): # The request is already finished. This can happen if the # request is aborted while the model is executing it (e.g., - # in pipeline parallelism) or in async scheduling. + # in pipeline parallelism or in async scheduling). # NOTE(Kuntai): When delay_free_blocks=True (for async KV # cache transfer in KV connector), the aborted request will not # be set to None (in order to finish async KV transfer).