From f415df6a20e5dbb2c7ef227c241c63a3bcb1edd5 Mon Sep 17 00:00:00 2001 From: Dao Le Date: Thu, 21 May 2026 16:40:43 -0700 Subject: [PATCH 1/4] [KV Connector] Don't co-queue save with load in MooncakeStoreScheduler When a request has an external cache hit, the third loop in MooncakeStoreScheduler.build_connector_meta (and the resumed-from-preemption branch in the second loop) currently produces a ReqMeta with both load_spec.can_load=True and can_save=True. The worker then enqueues the req_id on BOTH kv_recv_thread and kv_send_thread, so on completion the worker reports the same req_id in both finished_recving and finished_sending. When the request is delay-freed (e.g. aborted while the load is in flight, or finished after some decode), the scheduler's _update_from_kv_xfer_finished frees the request via finished_recving and then trips `assert req_id in self.requests` on the finished_sending pass. Saving alongside the load is also wasted work: the bytes were just looked up in the store, and the worker would only re-store them after a batch_is_exist no-op. Subsequent decode steps continue to save new tokens via the normal cached_reqs path. Fix: pass skip_save=True to ReqMeta.from_request_tracker whenever a load is being issued for the request in the same step. Adds regression tests for the pending-load path, the resumed-from-preemption-with-load path, and confirms the resumed-without-load path still saves. AI assistance was used (Claude) to draft and validate this change. Co-Authored-By: Claude Opus 4.7 (1M context) Signed-off-by: Dao Le --- .../unit/test_mooncake_store_scheduler.py | 156 ++++++++++++++++++ .../v1/mooncake/store/scheduler.py | 15 +- 2 files changed, 169 insertions(+), 2 deletions(-) diff --git a/tests/v1/kv_connector/unit/test_mooncake_store_scheduler.py b/tests/v1/kv_connector/unit/test_mooncake_store_scheduler.py index 7e8a6db27dc4..c3c060adc94e 100644 --- a/tests/v1/kv_connector/unit/test_mooncake_store_scheduler.py +++ b/tests/v1/kv_connector/unit/test_mooncake_store_scheduler.py @@ -4,6 +4,7 @@ from types import SimpleNamespace from vllm.distributed.kv_transfer.kv_connector.v1.mooncake.store.data import ( + LoadSpec, RequestTracker, ) from vllm.distributed.kv_transfer.kv_connector.v1.mooncake.store.scheduler import ( @@ -109,3 +110,158 @@ def test_cached_request_without_spec_decode_keeps_current_step_save_overlap(): tracker = scheduler._request_trackers["req-0"] assert tracker.token_len == 48 assert tracker.num_saved_tokens == 48 + + +def _make_pending_load_unfinished_request( + scheduler: MooncakeStoreScheduler, + *, + num_tokens: int, + block_hashes: list[bytes], + block_ids: tuple[list[int], ...] = ([0, 1, 2],), +) -> None: + request = SimpleNamespace( + num_tokens=num_tokens, + block_hashes=block_hashes, + num_output_placeholders=0, + ) + scheduler._unfinished_requests["req-0"] = (request, block_ids) + + +def _make_pending_load_scheduler_output() -> SimpleNamespace: + """scheduler_output for a step where req-0 is parked on a pending load + (not in scheduled_new_reqs or scheduled_cached_reqs).""" + return SimpleNamespace( + finished_req_ids=set(), + preempted_req_ids=set(), + scheduled_new_reqs=[], + scheduled_cached_reqs=SimpleNamespace( + req_ids=[], + new_block_ids=[], + num_computed_tokens=[], + ), + num_scheduled_tokens={}, + scheduled_spec_decode_tokens={}, + ) + + +def test_pending_load_does_not_co_queue_save(): + # Regression: a cache-hit request waiting on an async load must not also + # enqueue a save in the same scheduling step. Co-queuing both produces a + # recv+send pair for the same req_id, and the scheduler's + # _update_from_kv_xfer_finished then trips `assert req_id in self.requests` + # when both completions land for the delay-freed request. + scheduler = _make_bare_scheduler() + _make_pending_load_unfinished_request( + scheduler, + num_tokens=48, + block_hashes=[b"h0", b"h1", b"h2"], + ) + scheduler.load_specs["req-0"] = LoadSpec( + vllm_cached_tokens=0, + kvpool_cached_tokens=48, + can_load=True, + ) + + meta = scheduler.build_connector_meta(_make_pending_load_scheduler_output()) + + assert len(meta.requests) == 1 + req_meta = meta.requests[0] + assert req_meta.req_id == "req-0" + # Save must be off so the worker does not call add_stored_request. + assert req_meta.can_save is False + # Load is still issued as planned. + assert req_meta.load_spec is not None + assert req_meta.load_spec.can_load is True + # And the tracker's saved-tokens watermark stays at 0 so request_finished + # later sees `num_saved_tokens <= 0` and frees immediately rather than + # waiting for a finished_sending that will never come. + tracker = scheduler._request_trackers["req-0"] + assert tracker.num_saved_tokens == 0 + + +def _make_resumed_unfinished_request( + scheduler: MooncakeStoreScheduler, + *, + token_ids: list[int], + block_hashes: list[bytes], + num_computed_tokens: int, +) -> None: + request = SimpleNamespace( + all_token_ids=token_ids, + block_hashes=block_hashes, + num_computed_tokens=num_computed_tokens, + num_output_placeholders=0, + ) + scheduler._unfinished_requests["req-0"] = (request, ([0, 1],)) + + +def _make_resumed_scheduler_output(*, num_scheduled_tokens: int) -> SimpleNamespace: + return SimpleNamespace( + finished_req_ids=set(), + preempted_req_ids=set(), + scheduled_new_reqs=[], + scheduled_cached_reqs=SimpleNamespace( + req_ids=["req-0"], + new_block_ids=[([2],)], + num_computed_tokens=[0], + ), + num_scheduled_tokens={"req-0": num_scheduled_tokens}, + scheduled_spec_decode_tokens={}, + ) + + +def test_resumed_from_preemption_with_load_skips_save(): + # On resume-from-preemption with a cache hit, the same co-queueing race + # applies: the resumed-from-preemption branch in build_connector_meta also + # passes load_spec.can_load=True. Skip save in this step; subsequent + # cached_reqs steps will save new tokens normally. + scheduler = _make_bare_scheduler() + scheduler._preempted_req_ids = {"req-0"} + _make_resumed_unfinished_request( + scheduler, + token_ids=list(range(48)), + block_hashes=[b"h0", b"h1", b"h2"], + num_computed_tokens=0, + ) + scheduler.load_specs["req-0"] = LoadSpec( + vllm_cached_tokens=0, + kvpool_cached_tokens=48, + can_load=True, + ) + + meta = scheduler.build_connector_meta( + _make_resumed_scheduler_output(num_scheduled_tokens=48) + ) + + assert len(meta.requests) == 1 + req_meta = meta.requests[0] + assert req_meta.req_id == "req-0" + assert req_meta.can_save is False + assert req_meta.load_spec is not None + assert req_meta.load_spec.can_load is True + tracker = scheduler._request_trackers["req-0"] + assert tracker.num_saved_tokens == 0 + + +def test_resumed_from_preemption_without_load_still_saves(): + # No load_spec → behavior is unchanged: save proceeds. + scheduler = _make_bare_scheduler() + scheduler._preempted_req_ids = {"req-0"} + _make_resumed_unfinished_request( + scheduler, + token_ids=list(range(48)), + block_hashes=[b"h0", b"h1", b"h2"], + num_computed_tokens=0, + ) + + meta = scheduler.build_connector_meta( + _make_resumed_scheduler_output(num_scheduled_tokens=48) + ) + + assert len(meta.requests) == 1 + req_meta = meta.requests[0] + assert req_meta.req_id == "req-0" + assert req_meta.can_save is True + assert req_meta.load_spec is None + tracker = scheduler._request_trackers["req-0"] + assert tracker.num_saved_tokens == 48 diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/scheduler.py b/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/scheduler.py index 5922965974fd..378c4de0e2d1 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/scheduler.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/scheduler.py @@ -273,11 +273,17 @@ def build_connector_meta( if self._discard_partial_chunks else len(prefill_tokens) ) + # When a load is also issued in this step, skip the save + # to avoid co-queuing recv+send for the same req_id (see + # comment in the load-only branch below). + skip_save_this_step = force_skip_save or ( + load_spec is not None and load_spec.can_load + ) req_meta = ReqMeta.from_request_tracker( request_tracker, self._block_size, load_spec=load_spec, - skip_save=force_skip_save, + skip_save=skip_save_this_step, block_hashes=request_real.block_hashes, is_last_chunk=( request_tracker.token_len >= last_chunk_tokens_num @@ -352,11 +358,16 @@ def build_connector_meta( num_saved_tokens=0, ) self._request_trackers[request_id] = request_tracker + # skip_save=True: co-queuing a save with a load makes the + # worker produce both finished_recving and finished_sending + # for the same req_id, which double-frees in + # _update_from_kv_xfer_finished. Saves resume normally on + # later cached_reqs steps as new tokens are computed. req_meta = ReqMeta.from_request_tracker( request_tracker, self._block_size, load_spec=load_spec, - skip_save=None, + skip_save=True, block_hashes=unfinished_req.block_hashes, discard_partial_chunks=self._discard_partial_chunks, ) From 40678115c5f82179dca86aea964003b36746b88a Mon Sep 17 00:00:00 2001 From: Dao Le Date: Thu, 21 May 2026 17:43:59 -0700 Subject: [PATCH 2/4] Simplify comment Signed-off-by: Dao Le --- .../kv_connector/v1/mooncake/store/scheduler.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/scheduler.py b/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/scheduler.py index 378c4de0e2d1..71c96893a06e 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/scheduler.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/scheduler.py @@ -358,16 +358,11 @@ def build_connector_meta( num_saved_tokens=0, ) self._request_trackers[request_id] = request_tracker - # skip_save=True: co-queuing a save with a load makes the - # worker produce both finished_recving and finished_sending - # for the same req_id, which double-frees in - # _update_from_kv_xfer_finished. Saves resume normally on - # later cached_reqs steps as new tokens are computed. req_meta = ReqMeta.from_request_tracker( request_tracker, self._block_size, load_spec=load_spec, - skip_save=True, + skip_save=True, # Do not save when we have to load (load_spec is not None) block_hashes=unfinished_req.block_hashes, discard_partial_chunks=self._discard_partial_chunks, ) From 91717a5962702c99a7db49a79a2838492e260fd0 Mon Sep 17 00:00:00 2001 From: Dao Le Date: Thu, 21 May 2026 17:57:36 -0700 Subject: [PATCH 3/4] Cleaner Signed-off-by: Dao Le --- .../unit/test_mooncake_store_scheduler.py | 83 +++++++++++++++++++ .../kv_connector/v1/mooncake/store/data.py | 6 ++ .../v1/mooncake/store/scheduler.py | 10 +-- 3 files changed, 91 insertions(+), 8 deletions(-) diff --git a/tests/v1/kv_connector/unit/test_mooncake_store_scheduler.py b/tests/v1/kv_connector/unit/test_mooncake_store_scheduler.py index c3c060adc94e..b76442b2544d 100644 --- a/tests/v1/kv_connector/unit/test_mooncake_store_scheduler.py +++ b/tests/v1/kv_connector/unit/test_mooncake_store_scheduler.py @@ -5,6 +5,7 @@ from vllm.distributed.kv_transfer.kv_connector.v1.mooncake.store.data import ( LoadSpec, + ReqMeta, RequestTracker, ) from vllm.distributed.kv_transfer.kv_connector.v1.mooncake.store.scheduler import ( @@ -265,3 +266,85 @@ def test_resumed_from_preemption_without_load_still_saves(): assert req_meta.load_spec is None tracker = scheduler._request_trackers["req-0"] assert tracker.num_saved_tokens == 48 + + +# Focused tests for ReqMeta.from_request_tracker — the centralized guard that +# enforces "a ReqMeta never carries both a save and a load". + + +def test_from_request_tracker_load_overrides_caller_skip_save(): + # Caller asks for skip_save=False, but load_spec.can_load=True. The + # function must force skip_save=True to avoid producing a ReqMeta the + # worker would enqueue on both kv_send_thread and kv_recv_thread. + tracker = RequestTracker( + req_id="req-0", + token_len=48, + allocated_block_ids=([0, 1, 2],), + num_saved_tokens=0, + ) + load_spec = LoadSpec( + vllm_cached_tokens=0, kvpool_cached_tokens=48, can_load=True + ) + + req_meta = ReqMeta.from_request_tracker( + tracker, + block_size=16, + load_spec=load_spec, + skip_save=False, + block_hashes=[b"h0", b"h1", b"h2"], + ) + + assert req_meta is not None + assert req_meta.can_save is False + assert req_meta.load_spec is load_spec + assert tracker.num_saved_tokens == 0 + + +def test_from_request_tracker_load_with_can_load_false_still_saves(): + # A LoadSpec with can_load=False (e.g., no external tokens to load after + # update_state_after_alloc) must not suppress the save. + tracker = RequestTracker( + req_id="req-0", + token_len=48, + allocated_block_ids=([0, 1, 2],), + num_saved_tokens=0, + ) + load_spec = LoadSpec( + vllm_cached_tokens=0, kvpool_cached_tokens=48, can_load=False + ) + + req_meta = ReqMeta.from_request_tracker( + tracker, + block_size=16, + load_spec=load_spec, + skip_save=False, + block_hashes=[b"h0", b"h1", b"h2"], + ) + + assert req_meta is not None + assert req_meta.can_save is True + # from_request_tracker clears load_spec when can_load is False. + assert req_meta.load_spec is None + assert tracker.num_saved_tokens == 48 + + +def test_from_request_tracker_no_load_saves_normally(): + tracker = RequestTracker( + req_id="req-0", + token_len=48, + allocated_block_ids=([0, 1, 2],), + num_saved_tokens=0, + ) + + req_meta = ReqMeta.from_request_tracker( + tracker, + block_size=16, + load_spec=None, + skip_save=False, + block_hashes=[b"h0", b"h1", b"h2"], + ) + + assert req_meta is not None + assert req_meta.can_save is True + assert req_meta.load_spec is None + assert tracker.num_saved_tokens == 48 diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/data.py b/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/data.py index ab1df0413d11..f3e9a2e64469 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/data.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/data.py @@ -236,6 +236,12 @@ def from_request_tracker( ) skip_save = skip_save or num_tokens_to_save < chunk_boundary + # A ReqMeta must never carry both a save AND a load. + # The save would also be wasted work — the bytes are being looked up + # in the store right now. Later cached_reqs steps save new tokens + # normally. + if load_spec is not None and load_spec.can_load: + skip_save = True if skip_save and load_spec is None: return None diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/scheduler.py b/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/scheduler.py index 71c96893a06e..5922965974fd 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/scheduler.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/scheduler.py @@ -273,17 +273,11 @@ def build_connector_meta( if self._discard_partial_chunks else len(prefill_tokens) ) - # When a load is also issued in this step, skip the save - # to avoid co-queuing recv+send for the same req_id (see - # comment in the load-only branch below). - skip_save_this_step = force_skip_save or ( - load_spec is not None and load_spec.can_load - ) req_meta = ReqMeta.from_request_tracker( request_tracker, self._block_size, load_spec=load_spec, - skip_save=skip_save_this_step, + skip_save=force_skip_save, block_hashes=request_real.block_hashes, is_last_chunk=( request_tracker.token_len >= last_chunk_tokens_num @@ -362,7 +356,7 @@ def build_connector_meta( request_tracker, self._block_size, load_spec=load_spec, - skip_save=True, # Do not save when we have to load (load_spec is not None) + skip_save=None, block_hashes=unfinished_req.block_hashes, discard_partial_chunks=self._discard_partial_chunks, ) From 3b3528cee068884e26bc724a915a6351b5311a9c Mon Sep 17 00:00:00 2001 From: Dao Le Date: Thu, 21 May 2026 23:20:23 -0700 Subject: [PATCH 4/4] Fix precommit Signed-off-by: Dao Le --- .../v1/kv_connector/unit/test_mooncake_store_scheduler.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/tests/v1/kv_connector/unit/test_mooncake_store_scheduler.py b/tests/v1/kv_connector/unit/test_mooncake_store_scheduler.py index b76442b2544d..4b46c03f5831 100644 --- a/tests/v1/kv_connector/unit/test_mooncake_store_scheduler.py +++ b/tests/v1/kv_connector/unit/test_mooncake_store_scheduler.py @@ -282,9 +282,7 @@ def test_from_request_tracker_load_overrides_caller_skip_save(): allocated_block_ids=([0, 1, 2],), num_saved_tokens=0, ) - load_spec = LoadSpec( - vllm_cached_tokens=0, kvpool_cached_tokens=48, can_load=True - ) + load_spec = LoadSpec(vllm_cached_tokens=0, kvpool_cached_tokens=48, can_load=True) req_meta = ReqMeta.from_request_tracker( tracker, @@ -309,9 +307,7 @@ def test_from_request_tracker_load_with_can_load_false_still_saves(): allocated_block_ids=([0, 1, 2],), num_saved_tokens=0, ) - load_spec = LoadSpec( - vllm_cached_tokens=0, kvpool_cached_tokens=48, can_load=False - ) + load_spec = LoadSpec(vllm_cached_tokens=0, kvpool_cached_tokens=48, can_load=False) req_meta = ReqMeta.from_request_tracker( tracker,