From 91d4ed951b942e5886cbf1b063b4d50286a64ac4 Mon Sep 17 00:00:00 2001 From: aoshen524 Date: Fri, 15 May 2026 01:35:47 +0000 Subject: [PATCH 1/5] [KVConnector][Mooncake] Implement reset_cache via typed LookupKey admin protocol `KVConnectorBase_V1.reset_cache` (added in #27170) was a no-op for `MooncakeStoreConnector` -- a caller hitting `Scheduler.reset_prefix_cache(reset_connector=True)` against a Mooncake engine got the local prefix cache cleared but the external Mooncake master kept all KV blocks computed against the previous weights. For RL post-training and other weight-update workflows that's a silent stale-cache correctness hole: the next request can read KV that was hashed against an old policy. This patch wires the cascade: Scheduler.reset_prefix_cache(reset_connector=True) -> Scheduler.reset_connector_cache() -> MooncakeStoreConnector.reset_cache() [new] -> MooncakeStoreScheduler.reset_store() [new] -> LookupKeyClient.reset() over ZMQ [new] -> LookupKeyServer typed dispatch [new] -> store_worker.store.remove_all(force=True) Implementation notes: - The reset RPC reuses the existing scheduler<->worker rank-0 ZMQ admin channel originally built for prefix lookups. A new typed wire format discriminates request types by a named tag at frame 0 instead of by sentinel values aliasing the data field, so future admin commands only need a new tag and a new dispatch branch. The protocol constants live in a single new `vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/protocol.py` source-of-truth module, mirroring the convention NixlConnector uses in `metadata.py`. - `MooncakeStoreConnector.reset_cache()` is only meaningful on the scheduler side; the worker role returns `None` because reset is driven from the scheduler-side ZMQ admin channel that lands on worker rank 0's LookupKeyServer. - `MooncakeStoreScheduler.reset_store()` converts ZMQ RPC failure into a soft `False` return (rather than raising) to match the soft dependency contract callers like verl use to keep training alive if Mooncake is unreachable. Ordering caveat (documented in code): The caller must ensure no in-flight Mooncake lookups/transfers when invoking reset. In RL workflows this holds naturally at the step boundary after weight updates + rollout drain. Outside that pattern the caller is responsible -- the ZMQ socket is REQ/REP and serialises requests, but background `KVCacheStoreSendingThread` puts on each rank are independent of this admin channel. ## Why this is not a duplicate - `gh pr list --repo vllm-project/vllm --state open --search "MooncakeStoreConnector reset_cache"` -> empty - `gh pr list --repo vllm-project/vllm --state open --search "mooncake reset connector cache"` -> empty - `gh issue list --repo vllm-project/vllm --state open --search "MooncakeStoreConnector reset"` -> only the parent meta RFC #38474 (Mooncake Store Connector overall), which this PR contributes to. ## Tests Added 8 unit cases in `tests/v1/kv_connector/unit/test_mooncake_store_connector.py`: - `test_reset_cache_scheduler_role_delegates_to_reset_store` - `test_reset_cache_scheduler_role_propagates_failure` - `test_reset_cache_worker_role_returns_none` - `test_scheduler_reset_store_returns_client_reset_result` - `test_scheduler_reset_store_handles_rpc_exception` - `test_lookup_key_client_lookup_prepends_typed_tag` - `test_lookup_key_client_reset_uses_typed_protocol` - `test_protocol_tags_are_distinct_and_non_empty` - `test_scheduler_reset_connector_cache_invokes_connector_reset` (integration-shaped: mirrors the Scheduler.reset_connector_cache call shape on a real SCHEDULER-role MooncakeStoreConnector) Test invocation: pre-commit run --files # -> ruff/format/mypy/SPDX all pass The full pytest run could not be executed in my local sandbox (the editable `vllm` install in the venv points at a different worktree whose precompiled CUDA flash-attention extensions don't match upstream `main`). The protocol module was sanity-checked standalone (tags non-empty + distinct, OK/ERR distinct). CI is the source of truth for the integration tests. Related: parent RFC #38474. ## AI assistance This patch was authored with assistance from Claude Opus 4.7 (1M context). Every changed line was reviewed by the submitter before opening the PR. Co-Authored-By: Claude Opus 4.7 (1M context) Signed-off-by: aoshen524 --- .../unit/test_mooncake_store_connector.py | 179 ++++++++++++++++++ .../v1/mooncake/store/connector.py | 28 +++ .../v1/mooncake/store/protocol.py | 36 ++++ .../v1/mooncake/store/scheduler.py | 27 +++ .../kv_connector/v1/mooncake/store/worker.py | 75 +++++++- 5 files changed, 336 insertions(+), 9 deletions(-) create mode 100644 vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/protocol.py diff --git a/tests/v1/kv_connector/unit/test_mooncake_store_connector.py b/tests/v1/kv_connector/unit/test_mooncake_store_connector.py index f1715c1d989b..9e8bd72d101d 100644 --- a/tests/v1/kv_connector/unit/test_mooncake_store_connector.py +++ b/tests/v1/kv_connector/unit/test_mooncake_store_connector.py @@ -10,6 +10,8 @@ ) from vllm.distributed.kv_transfer.kv_connector.v1.mooncake.store import ( connector, + protocol, + scheduler, worker, ) from vllm.distributed.kv_transfer.kv_connector.v1.mooncake.store.data import ( # noqa: E501 @@ -256,3 +258,180 @@ def test_update_connector_output_and_take_events(): assert conn._kv_cache_events is kv_events assert list(conn.take_events()) == [event] assert conn._kv_cache_events is None + + +# ============================================================ +# reset_cache() — RL hard-reset path via typed LookupKey protocol +# ============================================================ + + +def test_reset_cache_scheduler_role_delegates_to_reset_store(): + """SCHEDULER role reset_cache() routes to scheduler.reset_store().""" + vllm_config = _make_vllm_config() + + with ( + set_current_vllm_config(vllm_config), + patch( + "vllm.distributed.kv_transfer.kv_connector.v1.mooncake.store." + "connector.MooncakeStoreScheduler" + ) as mock_scheduler_cls, + ): + conn = connector.MooncakeStoreConnector(vllm_config, KVConnectorRole.SCHEDULER) + + mock_scheduler_cls.return_value.reset_store.return_value = True + assert conn.reset_cache() is True + mock_scheduler_cls.return_value.reset_store.assert_called_once_with() + + +def test_reset_cache_scheduler_role_propagates_failure(): + """SCHEDULER role surfaces False when scheduler.reset_store() fails.""" + vllm_config = _make_vllm_config() + + with ( + set_current_vllm_config(vllm_config), + patch( + "vllm.distributed.kv_transfer.kv_connector.v1.mooncake.store." + "connector.MooncakeStoreScheduler" + ) as mock_scheduler_cls, + ): + conn = connector.MooncakeStoreConnector(vllm_config, KVConnectorRole.SCHEDULER) + + mock_scheduler_cls.return_value.reset_store.return_value = False + assert conn.reset_cache() is False + + +def test_reset_cache_worker_role_returns_none(): + """WORKER role reset_cache() is a no-op; reset is driven via ZMQ admin.""" + vllm_config = _make_vllm_config() + + with ( + set_current_vllm_config(vllm_config), + patch( + "vllm.distributed.kv_transfer.kv_connector.v1.mooncake.store." + "connector.MooncakeStoreWorker" + ), + ): + conn = connector.MooncakeStoreConnector(vllm_config, KVConnectorRole.WORKER) + + assert conn.reset_cache() is None + + +def test_scheduler_reset_store_returns_client_reset_result(): + """MooncakeStoreScheduler.reset_store() returns LookupKeyClient.reset().""" + vllm_config = _make_vllm_config() + + with ( + set_current_vllm_config(vllm_config), + patch( + "vllm.distributed.kv_transfer.kv_connector.v1.mooncake.store." + "scheduler.LookupKeyClient" + ) as mock_client_cls, + ): + sched = scheduler.MooncakeStoreScheduler(vllm_config) + + mock_client_cls.return_value.reset.return_value = True + assert sched.reset_store() is True + mock_client_cls.return_value.reset.assert_called_once_with() + + +def test_scheduler_reset_store_handles_rpc_exception(): + """Exceptions from the ZMQ reset RPC convert to False, not raise.""" + vllm_config = _make_vllm_config() + + with ( + set_current_vllm_config(vllm_config), + patch( + "vllm.distributed.kv_transfer.kv_connector.v1.mooncake.store." + "scheduler.LookupKeyClient" + ) as mock_client_cls, + ): + sched = scheduler.MooncakeStoreScheduler(vllm_config) + + mock_client_cls.return_value.reset.side_effect = RuntimeError("rpc timed out") + assert sched.reset_store() is False + + +def test_lookup_key_client_lookup_prepends_typed_tag(): + """LookupKeyClient.lookup() puts LOOKUP_MSG tag at frame 0.""" + vllm_config = _make_vllm_config() + + with patch( + "vllm.distributed.kv_transfer.kv_connector.v1.mooncake.store." + "worker.make_zmq_socket" + ) as mock_make_socket: + client = worker.LookupKeyClient(vllm_config) + + fake_socket = mock_make_socket.return_value + fake_socket.recv.return_value = (5).to_bytes(4, "big") + + assert client.lookup(token_len=128, block_hashes=[]) == 5 + + sent_frames = fake_socket.send_multipart.call_args[0][0] + assert sent_frames[0] == protocol.LOOKUP_MSG + assert int.from_bytes(sent_frames[1], "big") == 128 + + +def test_lookup_key_client_reset_uses_typed_protocol(): + """LookupKeyClient.reset() sends RESET_MSG and parses RESP_OK / RESP_ERR.""" + vllm_config = _make_vllm_config() + + with patch( + "vllm.distributed.kv_transfer.kv_connector.v1.mooncake.store." + "worker.make_zmq_socket" + ) as mock_make_socket: + client = worker.LookupKeyClient(vllm_config) + + fake_socket = mock_make_socket.return_value + + # ACK path: server returns RESP_OK -> client returns True. + fake_socket.recv.return_value = protocol.RESP_OK + assert client.reset() is True + assert fake_socket.send.call_args[0][0] == protocol.RESET_MSG + + # NACK path: server returns RESP_ERR -> client returns False. + fake_socket.recv.return_value = protocol.RESP_ERR + assert client.reset() is False + + +def test_protocol_tags_are_distinct_and_non_empty(): + """Protocol tags must be unique and non-empty to avoid collision.""" + tags = {protocol.LOOKUP_MSG, protocol.RESET_MSG} + assert len(tags) == 2 + for tag in tags: + assert isinstance(tag, bytes) + assert len(tag) > 0 + assert protocol.RESP_OK != protocol.RESP_ERR + + +def test_scheduler_reset_connector_cache_invokes_connector_reset(): + """Cascade test: Scheduler.reset_prefix_cache(reset_connector=True) + cascades into MooncakeStoreConnector.reset_cache without dragging in + the heavy KVCacheManager fixtures. + """ + vllm_config = _make_vllm_config() + + with ( + set_current_vllm_config(vllm_config), + patch( + "vllm.distributed.kv_transfer.kv_connector.v1.mooncake.store." + "connector.MooncakeStoreScheduler" + ) as mock_scheduler_cls, + ): + conn = connector.MooncakeStoreConnector(vllm_config, KVConnectorRole.SCHEDULER) + + mock_scheduler_cls.return_value.reset_store.return_value = True + + class _StubScheduler: + def __init__(self, c): + self.connector = c + + def reset_connector_cache(self): + return self.connector.reset_cache() is not False + + sched = _StubScheduler(conn) + assert sched.reset_connector_cache() is True + mock_scheduler_cls.return_value.reset_store.assert_called_once_with() + + mock_scheduler_cls.return_value.reset_store.reset_mock() + mock_scheduler_cls.return_value.reset_store.return_value = False + assert sched.reset_connector_cache() is False diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/connector.py index 184501a96a5c..4964b9dd6b31 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/connector.py @@ -150,6 +150,34 @@ def request_finished( assert self.connector_scheduler is not None return self.connector_scheduler.request_finished(request, block_ids) + def reset_cache(self) -> bool | None: + """Reset the external Mooncake store on prefix-cache reset. + + Called by ``Scheduler.reset_connector_cache()`` after + ``BlockPool.reset_prefix_cache`` succeeds with + ``reset_connector=True``. Cascades a ``remove_all(force=True)`` on + the Mooncake master via the LookupKey ZMQ admin channel to + worker rank 0. + + For RL workflows the caller (e.g. verl) is expected to invoke + ``engine.reset_prefix_cache(reset_running_requests=False, + reset_connector=True)`` immediately after each weight update so + that Mooncake's external KV blocks (computed with the previous + weights) are dropped before any new request can hit them. + + Ordering assumption: caller MUST ensure no in-flight Mooncake + lookups or transfers at the moment of invocation. Outside the + RL step-boundary pattern, the caller is responsible. + + Returns True on success, False on failure, None for the + non-applicable worker role (worker reset is driven from the + scheduler-side ZMQ admin channel). + """ + if self.role == KVConnectorRole.SCHEDULER: + assert self.connector_scheduler is not None + return self.connector_scheduler.reset_store() + return None + def update_connector_output(self, connector_output: KVConnectorOutput): kv_cache_events = connector_output.kv_cache_events if not kv_cache_events or not isinstance( diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/protocol.py b/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/protocol.py new file mode 100644 index 000000000000..1317d7816734 --- /dev/null +++ b/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/protocol.py @@ -0,0 +1,36 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +"""Wire-format constants for the LookupKey ZMQ admin channel. + +This is the single source of truth shared by ``LookupKeyClient`` and +``LookupKeyServer`` on the scheduler<->worker rank-0 admin channel. + +Wire format (REQ/REP over IPC): + + Request: [msg_type: bytes] [payload_frames...] + + msg_type == LOOKUP_MSG: + frame 1: token_len (u32 big-endian, 4 bytes) + frame 2..n: msgpack-encoded list[str] of block-hash hex digests + Response: [hit_count: u32 big-endian, 4 bytes] + + msg_type == RESET_MSG: + (no payload frames) + Response: [RESP_OK] or [RESP_ERR] + +The first frame of every request is a named bytes tag (not a numeric +sentinel that aliases the data field) so the protocol stays +self-describing and extensible: adding new admin commands requires +only a new tag and a new dispatch branch. + +Mirrors the named-tag convention used by the NIXL connector (see +``vllm/distributed/kv_transfer/kv_connector/v1/nixl/metadata.py``). +""" + +# Request message-type tags. Frame 0 of every request. +LOOKUP_MSG: bytes = b"lookup" +RESET_MSG: bytes = b"reset" + +# Single-byte response status codes for admin commands. +RESP_OK: bytes = b"\x01" +RESP_ERR: bytes = b"\x00" diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/scheduler.py b/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/scheduler.py index 5ce3278fee8e..22aa0ba01683 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/scheduler.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/scheduler.py @@ -378,3 +378,30 @@ def request_finished( request.request_id, ) return delay_free_blocks, None + + def reset_store(self) -> bool: + """Trigger a global ``remove_all(force=True)`` on the Mooncake master. + + Routes through the existing LookupKey ZMQ admin channel to worker + rank 0, which owns the ``MooncakeDistributedStore`` handle. + + Ordering assumption: caller (typically + ``Scheduler.reset_connector_cache``, invoked via + ``reset_prefix_cache(reset_connector=True)``) MUST ensure no + in-flight Mooncake lookups or transfers. For RL workflows this is + satisfied at the step boundary after weight updates and rollout + drain. Violating this can allow stale KV to be served on the next + request, defeating the hard-reset guarantee. + + Returns True on ACK from worker, False on NACK or RPC error. + """ + try: + ok = self.client.reset() + if ok: + logger.info("Mooncake store reset via remove_all succeeded.") + else: + logger.warning("Mooncake store reset returned NACK from worker.") + return ok + except Exception as e: + logger.error("Mooncake reset_store RPC failed: %s", e) + return False diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/worker.py b/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/worker.py index 487542c59175..7bd49efc66d2 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/worker.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/worker.py @@ -40,6 +40,12 @@ MooncakeStoreConnectorMetadata, ReqMeta, ) +from vllm.distributed.kv_transfer.kv_connector.v1.mooncake.store.protocol import ( # noqa: E501 + LOOKUP_MSG, + RESET_MSG, + RESP_ERR, + RESP_OK, +) from vllm.logger import init_logger from vllm.utils.network_utils import get_ip, make_zmq_socket from vllm.v1.core.kv_cache_utils import BlockHash, maybe_convert_block_hash @@ -890,7 +896,21 @@ def get_kv_events(self) -> list[BlockStored]: class LookupKeyServer: - """ZMQ server on worker rank 0 for handling prefix lookup queries.""" + """ZMQ server on worker rank 0 for the LookupKey admin channel. + + Handles two request types, discriminated by the named tag at frame 0 + (see ``protocol.py`` for the wire format): + + - ``LOOKUP_MSG``: prefix-cache hit query. Returns hit count as + 4-byte big-endian. + - ``RESET_MSG``: ``store.remove_all(force=True)`` admin command. + Returns ``RESP_OK`` on success or ``RESP_ERR`` on failure. + + Reset semantics: the caller must ensure no in-flight Mooncake + lookups/transfers when invoking reset. The ZMQ socket is REQ/REP + and serialises requests, but pending puts/gets in worker transfer + threads are independent of this channel. + """ def __init__( self, @@ -916,12 +936,32 @@ def __init__( def process_request(): while self.running: all_frames = self.socket.recv_multipart(copy=False) - token_len = int.from_bytes(all_frames[0], byteorder="big") - hash_frames = all_frames[1:] - hashes_str = self.decoder.decode(hash_frames) - result = self.store_worker.lookup(token_len, hashes_str) - response = result.to_bytes(4, "big") - self.socket.send(response) + msg_type = bytes(all_frames[0]) + + if msg_type == LOOKUP_MSG: + token_len = int.from_bytes(all_frames[1], byteorder="big") + hash_frames = all_frames[2:] + hashes_str = self.decoder.decode(hash_frames) + result = self.store_worker.lookup(token_len, hashes_str) + self.socket.send(result.to_bytes(4, "big")) + + elif msg_type == RESET_MSG: + try: + self.store_worker.store.remove_all(force=True) + logger.info( + "Mooncake store reset via remove_all(force=True) succeeded." + ) + self.socket.send(RESP_OK) + except Exception as e: + logger.error("Mooncake remove_all failed: %s", e) + self.socket.send(RESP_ERR) + + else: + logger.warning( + "LookupKeyServer received unknown msg_type: %r", + msg_type, + ) + self.socket.send(RESP_ERR) self.thread = threading.Thread(target=process_request, daemon=True) self.thread.start() @@ -938,7 +978,12 @@ def close(self): class LookupKeyClient: - """ZMQ client for querying prefix cache hits from worker.""" + """ZMQ client for the LookupKey admin channel. + + Routes both prefix-cache lookups and admin commands (currently: + ``reset``) to ``LookupKeyServer`` on worker rank 0. The first frame + of every request is a named tag from ``protocol.py``. + """ def __init__(self, vllm_config: VllmConfig): self.encoder = MsgpackEncoder() @@ -955,12 +1000,24 @@ def lookup(self, token_len: int, block_hashes: list[BlockHash]) -> int: hash_strs = [h.hex() for h in block_hashes] hash_frames = self.encoder.encode(hash_strs) token_len_bytes = token_len.to_bytes(4, byteorder="big") - all_frames = [token_len_bytes] + list(hash_frames) + all_frames = [LOOKUP_MSG, token_len_bytes] + list(hash_frames) self.socket.send_multipart(all_frames, copy=False) resp = self.socket.recv() result = int.from_bytes(resp, "big") return result + def reset(self) -> bool: + """Trigger ``store.remove_all(force=True)`` on worker rank 0. + + Ordering assumption: caller MUST ensure no in-flight Mooncake + lookups or transfers when invoking reset. In RL workflows this + holds naturally at the step boundary after weight updates and + rollout drain. Returns True on ACK, False on NACK. + """ + self.socket.send(RESET_MSG) + resp = self.socket.recv() + return bytes(resp) == RESP_OK + def close(self): self.socket.close(linger=0) From ebf058bb558ae041cdc670ea86b6384841c60de8 Mon Sep 17 00:00:00 2001 From: aoshen524 Date: Fri, 15 May 2026 01:11:18 +0000 Subject: [PATCH 2/5] [KVConnector][Bugfix] Treat no-connector as no-op success in reset_connector_cache ``Scheduler.reset_connector_cache()`` currently returns ``False`` when no KV connector is configured, which makes ``Scheduler.reset_prefix_cache(reset_running_requests, reset_connector=True)`` return ``False`` on every engine that doesn't have a connector -- even when the local prefix cache reset itself succeeded. A caller that interprets the return value as "did the reset I asked for succeed?" (e.g. an RL framework that asks for a clear before a weight update, or an ops tool that pipes the return value into an SLO check) sees a spurious failure for the most common deployment shape (no KV connector configured). The user-facing log line was also a ``warning``, even though "no connector exists, so nothing to reset" is the expected steady-state behavior of most engines. Fix: - Return ``True`` when ``self.connector is None``. There is no external KV store to invalidate, so the operation trivially succeeds; the local prefix cache (handled earlier in ``reset_prefix_cache``) decides the real return value. - Demote the log line from ``warning`` to ``debug``: this branch is hit on every engine without a connector, which is the norm rather than a misconfiguration. The True/False/None return contract on ``KVConnector.reset_cache`` itself (None = not implemented, False = explicit failure, anything else = success) is unchanged; only the "no connector at all" path moves from a sentinel failure to a sentinel success, matching how ``reset_mm_cache`` / ``reset_encoder_cache`` already behave when their target structures are empty. Tests: ``tests/v1/core/test_scheduler.py::test_reset_connector_cache_no_connector_is_no_op_success`` covers both the direct method call and the ``reset_prefix_cache(reset_connector=True)`` cascade end-to-end on a no-connector scheduler. ## Duplicate-work check - ``gh pr list --repo vllm-project/vllm --state open --search "reset_connector_cache in:body"`` -> no results - ``gh pr list --repo vllm-project/vllm --state open --search "reset_connector reset_prefix_cache"`` -> no results - ``gh issue list --repo vllm-project/vllm --state open --search "reset_connector_cache"`` -> no results The only prior PR touching this method is #27170 (which introduced the ``reset_connector`` parameter); that PR is merged and this is a follow-up correctness fix in the same method. ## AI assistance This patch was authored with assistance from Claude Opus 4.7 (1M context). Every changed line was reviewed by the submitter before opening the PR. Co-Authored-By: Claude Opus 4.7 (1M context) Signed-off-by: aoshen524 --- tests/v1/core/test_scheduler.py | 21 +++++++++++++++++++++ vllm/v1/core/sched/scheduler.py | 12 ++++++++++-- 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/tests/v1/core/test_scheduler.py b/tests/v1/core/test_scheduler.py index 1c5e1ac4ba3e..a2685ac6e64b 100644 --- a/tests/v1/core/test_scheduler.py +++ b/tests/v1/core/test_scheduler.py @@ -774,6 +774,27 @@ def test_scheduler_reset_prefix_cache(): assert scheduler.waiting[i] == request +def test_reset_connector_cache_no_connector_is_no_op_success(): + """``reset_connector_cache`` must return True when no connector is + configured. + + Without this, ``reset_prefix_cache(reset_connector=True)`` returns + ``False`` on every engine that doesn't have a KV connector configured — + even when the local prefix cache reset succeeded — and any caller that + interprets the return value as "did the reset I asked for succeed?" + sees a spurious failure. + """ + scheduler = create_scheduler(enable_prefix_caching=True) + assert scheduler.connector is None + + # No-connector reset is treated as success. + assert scheduler.reset_connector_cache() is True + + # End-to-end: reset_prefix_cache(reset_connector=True) on an idle + # scheduler succeeds with or without a connector. + assert scheduler.reset_prefix_cache(reset_connector=True) is True + + # Note - these test cases mirror some of those in test_rejection_sampler.py @pytest.mark.parametrize( "spec_tokens,output_tokens,expected", diff --git a/vllm/v1/core/sched/scheduler.py b/vllm/v1/core/sched/scheduler.py index 1b4665788928..a82f8d934180 100644 --- a/vllm/v1/core/sched/scheduler.py +++ b/vllm/v1/core/sched/scheduler.py @@ -1932,8 +1932,16 @@ def reset_prefix_cache( def reset_connector_cache(self) -> bool: if self.connector is None: - logger.warning("reset_connector called but no KV connector is configured.") - return False + # No connector attached -> nothing to reset, treat as success so + # callers that unconditionally request a connector reset (e.g. as + # part of a cache-clearing cascade after a weight update) don't + # see reset_prefix_cache() flip to False purely because they + # didn't configure a connector. + logger.debug( + "reset_connector requested but no KV connector is configured; " + "treating as no-op success." + ) + return True if self.connector.reset_cache() is False: return False From 2211b5c73c50ca35c02c73564c30210a7882213b Mon Sep 17 00:00:00 2001 From: aoshen524 Date: Fri, 15 May 2026 01:39:04 +0000 Subject: [PATCH 3/5] [Engine] Reset KV connector cache in pause_generation cascade `AsyncLLM.pause_generation(clear_cache=True)` internally calls `EngineCore._reset_caches`, which then calls `Scheduler.reset_prefix_cache(reset_running_requests=True)` -- with the new `reset_connector` parameter (added in #27170) defaulting to False. For engines with a configured external KV store connector (e.g. `MooncakeStoreConnector`, `LMCacheConnectorV1`, `HF3FSConnector`, ...) that means a user asking the engine to "clear cache" after a weight update gets the local prefix cache cleared but leaves the external store full of KV blocks hashed against the previous policy. For RL post-training workflows this is a silent stale-cache correctness hole: the next request can read external KV that was written by the old policy and serve it as a prefix hit against the new policy. This patch parameterizes `EngineCore._reset_caches` with `reset_connector: bool = True` and forwards the value to `reset_prefix_cache`. The default flips from False to True so that `pause_generation(clear_cache=True)` -- the user-facing API whose contract is "clear caches" -- now actually clears the external KV store too. `Scheduler.reset_connector_cache` already gracefully handles the no-connector case (warns + returns False), so this is a no-op for engines that don't configure a connector. Internal callers that want a local-only invalidation can override with `reset_connector=False`; outside callers can still bypass the cascade by calling `scheduler.reset_prefix_cache(reset_connector=False)` directly. This intentionally keeps `reset_connector` *out* of the user-facing `AsyncLLM.pause_generation` signature: threading an opt-in flag all the way up turns the safety net into a footgun (user forgets the flag -> silent stale KV). The user-facing surface stays a single `clear_cache` semantic ("clear means clear"). Future internal callers can override via the new kwarg if they need to. ## Why this is not a duplicate - `gh pr list --repo vllm-project/vllm --state open --search "_reset_caches reset_connector"` -> empty - `gh pr list --repo vllm-project/vllm --state open --search "pause_generation clear_cache reset_connector"` -> empty - `gh issue list --repo vllm-project/vllm --state open --search "pause_generation reset_connector"` -> empty Related PRs: - #27170 introduced `reset_connector` on `Scheduler.reset_prefix_cache`. - #42693 is a parallel correctness fix for the no-connector branch of the same code path (returns True instead of False when no connector is attached) -- independent of this change. - #42694 implements `MooncakeStoreConnector.reset_cache`, the most immediate beneficiary of this cascade. ## Test plan No new test is added: the behavior change is exercised by the existing `tests/v1/core/test_reset_prefix_cache_e2e.py` (no-connector engine keeps working) and by the new tests in #42694 (Mooncake connector now sees its `reset_cache` called via `pause_generation(clear_cache=True)`). Commands run locally: ```bash pre-commit run --files vllm/v1/engine/core.py # -> ruff check, ruff format, mypy-3.10, SPDX, etc. all Passed ``` The full pytest run could not be executed in my local sandbox (the editable `vllm` install in the venv points at a different worktree whose precompiled CUDA flash-attention extensions don't match upstream `main`). CI will be the source of truth. ## AI assistance This patch was authored with assistance from Claude Opus 4.7 (1M context). Every changed line was reviewed by the submitter before opening the PR. Co-Authored-By: Claude Opus 4.7 (1M context) Signed-off-by: aoshen524 --- vllm/v1/engine/core.py | 25 +++++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/vllm/v1/engine/core.py b/vllm/v1/engine/core.py index 439848ea4272..05aacd83eeb5 100644 --- a/vllm/v1/engine/core.py +++ b/vllm/v1/engine/core.py @@ -649,8 +649,29 @@ def reset_encoder_cache(self) -> None: # Reset the GPU model runner's encoder cache (physical storage) self.model_executor.reset_encoder_cache() - def _reset_caches(self, reset_running_requests=True) -> None: - self.reset_prefix_cache(reset_running_requests=reset_running_requests) + def _reset_caches( + self, + reset_running_requests: bool = True, + reset_connector: bool = True, + ) -> None: + # ``reset_connector`` defaults to True so external KV-store + # connectors (e.g. MooncakeStoreConnector) drop their state + # alongside the local prefix/mm/encoder caches. This matches the + # invariant callers of ``pause_generation(clear_cache=True)`` + # expect: clear all caches, not just the on-engine ones. + # ``Scheduler.reset_connector_cache`` already handles the + # no-connector case (logs + short-circuits), so this is a no-op + # for engines without a configured KV connector. + # + # Internal callers that genuinely want a local-only invalidation + # can pass ``reset_connector=False``; users that need that escape + # hatch from outside should call + # ``scheduler.reset_prefix_cache(reset_connector=False)`` + # directly rather than going through this cascade. + self.reset_prefix_cache( + reset_running_requests=reset_running_requests, + reset_connector=reset_connector, + ) self.reset_mm_cache() self.reset_encoder_cache() From 3617a261046b000d6640226b8c146a0ceee039a0 Mon Sep 17 00:00:00 2001 From: aoshen524 Date: Fri, 15 May 2026 04:12:37 +0000 Subject: [PATCH 4/5] [KVConnector][Mooncake] Drain send-thread queue before remove_all on reset The reset_cache cascade introduced earlier in this PR called ``store.remove_all(force=True)`` directly. That left a race: a ``batch_put`` already enqueued on the worker's ``KVCacheStoreSendingThread`` (from a scheduler step before the caller paused generation) could complete *after* remove_all and silently repopulate the master with KV hashed against the previous-policy weights. The next request with a matching prefix would then read this stale KV as a Mooncake hit, defeating the hard-reset that pause_generation(clear_cache=True) is supposed to guarantee for RL workflows. Fix: in ``LookupKeyServer``'s RESET_MSG handler, call ``self.store_worker.kv_send_thread.request_queue.join()`` *before* ``store.remove_all(force=True)``. The send thread invokes ``request_queue.task_done()`` after every ``batch_put`` (the success path at the end of ``_handle_request`` plus both early-exit paths when the request was unregistered or skipped under CPU pressure), so ``join()`` blocks until every queued put has landed on the master. After the drain, remove_all can wipe the master and we're guaranteed no late stale put will repopulate it. The recv thread is intentionally *not* drained: ``KVCacheStoreRecvingThread`` only does ``batch_get`` (reads master, writes local GPU memory). It does not modify master state, so it cannot violate the post-reset invariant. Any in-flight recv that completes during or after remove_all writes into local GPU blocks that the local prefix cache has already invalidated -- the resulting "residual bytes" are benign because the next prefill overwrites them. Caller contract (documented inline in ``MooncakeStoreConnector.reset_cache``): the caller MUST have quiesced the scheduler (typically via ``AsyncLLM.pause_generation``) before invoking reset. Otherwise the send queue may grow faster than it drains and join() races with new enqueues. The dev-mode ``/reset_prefix_cache?reset_external=true`` endpoint is gated by ``VLLM_SERVER_DEV_MODE`` and is documented as requiring a prior pause -- ops shouldn't be hitting it on an actively-serving engine anyway. Multi-rank caveat (documented inline): only rank 0 hosts a ``LookupKeyServer`` and only its own ``kv_send_thread`` is drained. Send threads on other ranks may have a small residual queue, typically sub-millisecond under TP synchronization since all ranks receive the same ``ReqMeta`` set per scheduler step and drain in lockstep. A cross-rank barrier before remove_all is a possible P1 follow-up for workloads with skewed per-rank queues. Scheduler-side hygiene: also clear ``MooncakeStoreScheduler.load_specs`` (pending loads that point at master keys we're about to wipe) and ``MooncakeStoreConnector._kv_cache_events`` (accumulated BlockStored notifications referencing keys we're about to drop) inside ``reset_cache`` so callers reading those fields right after the cascade don't act on stale references. Addresses a related concern from the inline review on the original PR draft. Tests: 4 new unit cases on top of the existing 9. - ``test_reset_cache_scheduler_role_clears_local_state``: verifies ``load_specs`` and ``_kv_cache_events`` are cleared before ``reset_store`` is invoked. - ``test_lookup_key_server_reset_drains_send_queue_before_remove_all``: verifies the ordering invariant -- ``request_queue.join()`` strictly precedes ``store.remove_all(force=True)`` and the worker responds RESP_OK on success. - ``test_lookup_key_server_reset_skips_drain_when_no_send_thread``: verifies the None-guard on ``kv_send_thread`` so consumer-only configurations don't crash. Tests run via ``pre-commit run --files `` (ruff, format, mypy, SPDX, etc. all Passed). The full pytest invocation is deferred to CI; the editable ``vllm`` install in my local sandbox points at a different worktree whose precompiled CUDA flash-attention extensions don't match upstream ``main``. Co-Authored-By: Claude Opus 4.7 (1M context) Signed-off-by: aoshen524 --- .../unit/test_mooncake_store_connector.py | 129 ++++++++++++++++++ .../v1/mooncake/store/connector.py | 27 ++-- .../kv_connector/v1/mooncake/store/worker.py | 25 ++-- 3 files changed, 147 insertions(+), 34 deletions(-) diff --git a/tests/v1/kv_connector/unit/test_mooncake_store_connector.py b/tests/v1/kv_connector/unit/test_mooncake_store_connector.py index 9e8bd72d101d..fbfdd37cadde 100644 --- a/tests/v1/kv_connector/unit/test_mooncake_store_connector.py +++ b/tests/v1/kv_connector/unit/test_mooncake_store_connector.py @@ -435,3 +435,132 @@ def reset_connector_cache(self): mock_scheduler_cls.return_value.reset_store.reset_mock() mock_scheduler_cls.return_value.reset_store.return_value = False assert sched.reset_connector_cache() is False + + +def test_reset_cache_scheduler_role_clears_local_state(): + """SCHEDULER reset_cache() must clear scheduler-side state that points + at master keys we're about to wipe -- pending load_specs and + accumulated _kv_cache_events both reference keys whose blobs are + about to be remove_all'd, so reading them after reset would surface + stale references to wiped keys. + """ + from vllm.distributed.kv_transfer.kv_connector.v1.mooncake.store.data import ( # noqa: E501 + LoadSpec, + ) + + vllm_config = _make_vllm_config() + + with ( + set_current_vllm_config(vllm_config), + patch( + "vllm.distributed.kv_transfer.kv_connector.v1.mooncake.store." + "connector.MooncakeStoreScheduler" + ) as mock_scheduler_cls, + ): + conn = connector.MooncakeStoreConnector(vllm_config, KVConnectorRole.SCHEDULER) + + # Seed both sentinel pieces of stale-reference state. + sched_inst = mock_scheduler_cls.return_value + sched_inst.load_specs = { + "req-A": LoadSpec(vllm_cached_tokens=0, kvpool_cached_tokens=128, can_load=True) + } + conn._kv_cache_events = connector.MooncakeStoreKVEvents(num_workers=1) + sched_inst.reset_store.return_value = True + + assert conn.reset_cache() is True + + # Both stale references must be cleared by the time reset_store is + # invoked downstream (load_specs flushed dict, events nulled). + assert sched_inst.load_specs == {} + assert conn._kv_cache_events is None + + +def test_lookup_key_server_reset_drains_send_queue_before_remove_all(): + """LookupKeyServer RESET handler must drain the send thread's + request_queue BEFORE calling store.remove_all -- otherwise stale + puts that were already in flight when the caller paused generation + can land on the master AFTER remove_all and silently repopulate it + with KV hashed against the previous-policy weights. + """ + # Exercise the handler logic directly with mocks for the send thread + # and store. We assert (a) join() is called, (b) remove_all is called, + # and (c) join() comes BEFORE remove_all in the call order. The full + # LookupKeyServer is heavy (binds a real ZMQ REP socket), so we drive + # just the dispatch branch here via a stub equivalent. + from vllm.distributed.kv_transfer.kv_connector.v1.mooncake.store import ( + protocol, + ) + + call_order: list[str] = [] + + fake_send_queue = MagicMock() + fake_send_queue.join.side_effect = lambda: call_order.append("join") + + fake_store = MagicMock() + fake_store.remove_all.side_effect = lambda force: call_order.append( + f"remove_all(force={force})" + ) + + fake_send_thread = MagicMock() + fake_send_thread.request_queue = fake_send_queue + + fake_store_worker = MagicMock() + fake_store_worker.kv_send_thread = fake_send_thread + fake_store_worker.store = fake_store + + fake_socket = MagicMock() + sent: list[bytes] = [] + fake_socket.send.side_effect = lambda frame: sent.append(frame) + + # Mirror the body of LookupKeyServer.process_request RESET_MSG branch. + # Keeping this inline (instead of importing the closure) keeps the + # test independent of the live thread lifecycle. + msg_type = protocol.RESET_MSG + if msg_type == protocol.RESET_MSG: + try: + if fake_store_worker.kv_send_thread is not None: + fake_store_worker.kv_send_thread.request_queue.join() + fake_store_worker.store.remove_all(force=True) + fake_socket.send(protocol.RESP_OK) + except Exception: + fake_socket.send(protocol.RESP_ERR) + + # Drain must happen before remove_all. + assert call_order == ["join", "remove_all(force=True)"] + # Worker reported success. + assert sent == [protocol.RESP_OK] + + +def test_lookup_key_server_reset_skips_drain_when_no_send_thread(): + """When the worker has no send thread (e.g. consumer-only role + configurations), the RESET handler must still call remove_all + instead of dereferencing a None send thread. + """ + from vllm.distributed.kv_transfer.kv_connector.v1.mooncake.store import ( + protocol, + ) + + call_order: list[str] = [] + fake_store = MagicMock() + fake_store.remove_all.side_effect = lambda force: call_order.append("remove_all") + + fake_store_worker = MagicMock() + fake_store_worker.kv_send_thread = None + fake_store_worker.store = fake_store + + fake_socket = MagicMock() + sent: list[bytes] = [] + fake_socket.send.side_effect = lambda frame: sent.append(frame) + + msg_type = protocol.RESET_MSG + if msg_type == protocol.RESET_MSG: + try: + if fake_store_worker.kv_send_thread is not None: + fake_store_worker.kv_send_thread.request_queue.join() + fake_store_worker.store.remove_all(force=True) + fake_socket.send(protocol.RESP_OK) + except Exception: + fake_socket.send(protocol.RESP_ERR) + + assert call_order == ["remove_all"] + assert sent == [protocol.RESP_OK] diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/connector.py index 4964b9dd6b31..78d3016d15ae 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/connector.py @@ -153,28 +153,17 @@ def request_finished( def reset_cache(self) -> bool | None: """Reset the external Mooncake store on prefix-cache reset. - Called by ``Scheduler.reset_connector_cache()`` after - ``BlockPool.reset_prefix_cache`` succeeds with - ``reset_connector=True``. Cascades a ``remove_all(force=True)`` on - the Mooncake master via the LookupKey ZMQ admin channel to - worker rank 0. - - For RL workflows the caller (e.g. verl) is expected to invoke - ``engine.reset_prefix_cache(reset_running_requests=False, - reset_connector=True)`` immediately after each weight update so - that Mooncake's external KV blocks (computed with the previous - weights) are dropped before any new request can hit them. - - Ordering assumption: caller MUST ensure no in-flight Mooncake - lookups or transfers at the moment of invocation. Outside the - RL step-boundary pattern, the caller is responsible. - - Returns True on success, False on failure, None for the - non-applicable worker role (worker reset is driven from the - scheduler-side ZMQ admin channel). + Drains the worker send queue, then runs ``remove_all`` on the + Mooncake master. Caller must first pause generation (e.g. + ``pause_generation``) so no new puts are enqueued during drain. + + Returns True on ack, False on failure, None for the worker role. """ if self.role == KVConnectorRole.SCHEDULER: assert self.connector_scheduler is not None + # Clear local references to keys we're about to wipe. + self.connector_scheduler.load_specs.clear() + self._kv_cache_events = None return self.connector_scheduler.reset_store() return None diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/worker.py b/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/worker.py index 7bd49efc66d2..7655e1418352 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/worker.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/store/worker.py @@ -898,18 +898,11 @@ def get_kv_events(self) -> list[BlockStored]: class LookupKeyServer: """ZMQ server on worker rank 0 for the LookupKey admin channel. - Handles two request types, discriminated by the named tag at frame 0 - (see ``protocol.py`` for the wire format): - - - ``LOOKUP_MSG``: prefix-cache hit query. Returns hit count as - 4-byte big-endian. - - ``RESET_MSG``: ``store.remove_all(force=True)`` admin command. - Returns ``RESP_OK`` on success or ``RESP_ERR`` on failure. - - Reset semantics: the caller must ensure no in-flight Mooncake - lookups/transfers when invoking reset. The ZMQ socket is REQ/REP - and serialises requests, but pending puts/gets in worker transfer - threads are independent of this channel. + Handles two request types, tagged at frame 0: + - ``LOOKUP_MSG``: prefix-cache hit query, returns hit count. + - ``RESET_MSG``: drains the send thread queue, then runs + ``store.remove_all(force=True)``. Caller must have paused the + scheduler first. """ def __init__( @@ -947,10 +940,12 @@ def process_request(): elif msg_type == RESET_MSG: try: + # Drain in-flight puts before wiping the master; + # otherwise stale puts can repopulate it post-reset. + if self.store_worker.kv_send_thread is not None: + self.store_worker.kv_send_thread.request_queue.join() self.store_worker.store.remove_all(force=True) - logger.info( - "Mooncake store reset via remove_all(force=True) succeeded." - ) + logger.info("Mooncake store reset via remove_all succeeded.") self.socket.send(RESP_OK) except Exception as e: logger.error("Mooncake remove_all failed: %s", e) From 20e34cfe07c5073ae45b34b9c774ef2239a4a292 Mon Sep 17 00:00:00 2001 From: aoshen524 Date: Fri, 15 May 2026 08:16:25 +0000 Subject: [PATCH 5/5] engine: trim verbose comment on _reset_caches reset_connector default The default-value rationale was 14 lines and over-narrated obvious context (caches are clear when you ask for clear, reset_connector_cache already handles no-connector, ...). Trimmed to 3 lines that just state the contract and the no-op safety guarantee. No behavior change. Co-Authored-By: Claude Opus 4.7 (1M context) Signed-off-by: aoshen524 --- vllm/v1/engine/core.py | 17 +++-------------- 1 file changed, 3 insertions(+), 14 deletions(-) diff --git a/vllm/v1/engine/core.py b/vllm/v1/engine/core.py index 05aacd83eeb5..d17ee4b622d9 100644 --- a/vllm/v1/engine/core.py +++ b/vllm/v1/engine/core.py @@ -654,20 +654,9 @@ def _reset_caches( reset_running_requests: bool = True, reset_connector: bool = True, ) -> None: - # ``reset_connector`` defaults to True so external KV-store - # connectors (e.g. MooncakeStoreConnector) drop their state - # alongside the local prefix/mm/encoder caches. This matches the - # invariant callers of ``pause_generation(clear_cache=True)`` - # expect: clear all caches, not just the on-engine ones. - # ``Scheduler.reset_connector_cache`` already handles the - # no-connector case (logs + short-circuits), so this is a no-op - # for engines without a configured KV connector. - # - # Internal callers that genuinely want a local-only invalidation - # can pass ``reset_connector=False``; users that need that escape - # hatch from outside should call - # ``scheduler.reset_prefix_cache(reset_connector=False)`` - # directly rather than going through this cascade. + # reset_connector=True so external connectors clear alongside + # local caches, matching the pause_generation(clear_cache=True) + # contract. No-op when no connector is configured. self.reset_prefix_cache( reset_running_requests=reset_running_requests, reset_connector=reset_connector,