Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
91d4ed9
[KVConnector][Mooncake] Implement reset_cache via typed LookupKey adm…
aoshen524 May 15, 2026
ebf058b
[KVConnector][Bugfix] Treat no-connector as no-op success in reset_co…
aoshen524 May 15, 2026
2211b5c
[Engine] Reset KV connector cache in pause_generation cascade
aoshen524 May 15, 2026
3617a26
[KVConnector][Mooncake] Drain send-thread queue before remove_all on …
aoshen524 May 15, 2026
01e2438
Merge branch 'main' into aoshen524/mooncake-reset-cache
aoshen02 May 15, 2026
7203422
Merge branch 'main' into aoshen524/mooncake-reset-cache
aoshen02 May 15, 2026
20e34cf
engine: trim verbose comment on _reset_caches reset_connector default
aoshen524 May 15, 2026
06769a3
Merge branch 'main' into aoshen524/mooncake-reset-cache
aoshen02 May 15, 2026
f363fdb
Merge upstream/main: resolve conflicts with #42828 (HMA support)
aoshen02 May 24, 2026
f365df1
Merge branch 'main' into aoshen524/mooncake-reset-cache
aoshen02 May 25, 2026
e8a4cf5
Merge branch 'main' into aoshen524/mooncake-reset-cache
ywang96 May 26, 2026
feaf0c0
Merge branch 'main' into aoshen524/mooncake-reset-cache
aoshen02 May 26, 2026
26054b2
Merge branch 'main' into aoshen524/mooncake-reset-cache
aoshen02 May 26, 2026
148c5c2
Merge branch 'main' into aoshen524/mooncake-reset-cache
aoshen02 May 26, 2026
58c309a
Merge branch 'main' into aoshen524/mooncake-reset-cache
aoshen02 May 26, 2026
02ed83f
Merge branch 'main' into aoshen524/mooncake-reset-cache
aoshen02 May 27, 2026
86e3d0a
Merge branch 'main' into aoshen524/mooncake-reset-cache
aoshen02 May 27, 2026
9da7683
Merge branch 'main' into aoshen524/mooncake-reset-cache
aoshen02 May 27, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions tests/v1/core/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,27 @@ def test_scheduler_reset_prefix_cache():
assert scheduler.waiting[i] == request


def test_reset_connector_cache_no_connector_is_no_op_success():
"""``reset_connector_cache`` must return True when no connector is
configured.

Without this, ``reset_prefix_cache(reset_connector=True)`` returns
``False`` on every engine that doesn't have a KV connector configured —
even when the local prefix cache reset succeeded — and any caller that
interprets the return value as "did the reset I asked for succeed?"
sees a spurious failure.
"""
scheduler = create_scheduler(enable_prefix_caching=True)
assert scheduler.connector is None

# No-connector reset is treated as success.
assert scheduler.reset_connector_cache() is True

# End-to-end: reset_prefix_cache(reset_connector=True) on an idle
# scheduler succeeds with or without a connector.
assert scheduler.reset_prefix_cache(reset_connector=True) is True


# Note - these test cases mirror some of those in test_rejection_sampler.py
@pytest.mark.parametrize(
"spec_tokens,output_tokens,expected",
Expand Down
330 changes: 330 additions & 0 deletions tests/v1/kv_connector/unit/test_mooncake_store_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@
from vllm.distributed.kv_transfer.kv_connector.v1.mooncake.store import (
connector as mooncake_store_connector,
)
from vllm.distributed.kv_transfer.kv_connector.v1.mooncake.store import (
protocol,
scheduler,
worker,
)
from vllm.distributed.kv_transfer.kv_connector.v1.mooncake.store.data import (
MooncakeStoreConnectorMetadata,
)
Expand Down Expand Up @@ -284,3 +289,328 @@ def test_update_connector_output_and_take_events():
assert connector._kv_cache_events is kv_events
assert list(connector.take_events()) == [event]
assert connector._kv_cache_events is None


# ============================================================
# reset_cache() — RL hard-reset path via typed LookupKey protocol
# ============================================================


def test_reset_cache_scheduler_role_delegates_to_reset_store():
"""SCHEDULER role reset_cache() routes to scheduler.reset_store()."""
vllm_config = _make_vllm_config()
kv_cache_config = _make_kv_cache_config()

with (
set_current_vllm_config(vllm_config),
patch(
"vllm.distributed.kv_transfer.kv_connector.v1.mooncake.store."
"connector.MooncakeStoreScheduler"
) as mock_scheduler_cls,
):
conn = mooncake_store_connector.MooncakeStoreConnector(
vllm_config, KVConnectorRole.SCHEDULER, kv_cache_config
)

mock_scheduler_cls.return_value.reset_store.return_value = True
assert conn.reset_cache() is True
mock_scheduler_cls.return_value.reset_store.assert_called_once_with()


def test_reset_cache_scheduler_role_propagates_failure():
"""SCHEDULER role surfaces False when scheduler.reset_store() fails."""
vllm_config = _make_vllm_config()
kv_cache_config = _make_kv_cache_config()

with (
set_current_vllm_config(vllm_config),
patch(
"vllm.distributed.kv_transfer.kv_connector.v1.mooncake.store."
"connector.MooncakeStoreScheduler"
) as mock_scheduler_cls,
):
conn = mooncake_store_connector.MooncakeStoreConnector(
vllm_config, KVConnectorRole.SCHEDULER, kv_cache_config
)

mock_scheduler_cls.return_value.reset_store.return_value = False
assert conn.reset_cache() is False


def test_reset_cache_worker_role_returns_none():
"""WORKER role reset_cache() is a no-op; reset is driven via ZMQ admin."""
vllm_config = _make_vllm_config()
kv_cache_config = _make_kv_cache_config()

with (
set_current_vllm_config(vllm_config),
patch(
"vllm.distributed.kv_transfer.kv_connector.v1.mooncake.store."
"connector.MooncakeStoreWorker"
),
):
conn = mooncake_store_connector.MooncakeStoreConnector(
vllm_config, KVConnectorRole.WORKER, kv_cache_config
)

assert conn.reset_cache() is None


def test_scheduler_reset_store_returns_client_reset_result():
"""MooncakeStoreScheduler.reset_store() returns LookupKeyClient.reset()."""
vllm_config = _make_vllm_config()
kv_cache_config = _make_kv_cache_config()

with (
set_current_vllm_config(vllm_config),
patch(
"vllm.distributed.kv_transfer.kv_connector.v1.mooncake.store."
"scheduler.LookupKeyClient"
) as mock_client_cls,
):
sched = scheduler.MooncakeStoreScheduler(vllm_config, kv_cache_config)

mock_client_cls.return_value.reset.return_value = True
assert sched.reset_store() is True
mock_client_cls.return_value.reset.assert_called_once_with()


def test_scheduler_reset_store_handles_rpc_exception():
"""Exceptions from the ZMQ reset RPC convert to False, not raise."""
vllm_config = _make_vllm_config()
kv_cache_config = _make_kv_cache_config()

with (
set_current_vllm_config(vllm_config),
patch(
"vllm.distributed.kv_transfer.kv_connector.v1.mooncake.store."
"scheduler.LookupKeyClient"
) as mock_client_cls,
):
sched = scheduler.MooncakeStoreScheduler(vllm_config, kv_cache_config)

mock_client_cls.return_value.reset.side_effect = RuntimeError("rpc timed out")
assert sched.reset_store() is False


def test_lookup_key_client_lookup_prepends_typed_tag():
"""LookupKeyClient.lookup() puts LOOKUP_MSG tag at frame 0."""
vllm_config = _make_vllm_config()

with patch(
"vllm.distributed.kv_transfer.kv_connector.v1.mooncake.store."
"worker.make_zmq_socket"
) as mock_make_socket:
client = worker.LookupKeyClient(vllm_config)

fake_socket = mock_make_socket.return_value
fake_socket.recv.return_value = (5).to_bytes(4, "big")

assert client.lookup(token_len=128, block_hashes=[]) == 5

sent_frames = fake_socket.send_multipart.call_args[0][0]
assert sent_frames[0] == protocol.LOOKUP_MSG
assert int.from_bytes(sent_frames[1], "big") == 128


def test_lookup_key_client_reset_uses_typed_protocol():
"""LookupKeyClient.reset() sends RESET_MSG and parses RESP_OK / RESP_ERR."""
vllm_config = _make_vllm_config()

with patch(
"vllm.distributed.kv_transfer.kv_connector.v1.mooncake.store."
"worker.make_zmq_socket"
) as mock_make_socket:
client = worker.LookupKeyClient(vllm_config)

fake_socket = mock_make_socket.return_value

# ACK path: server returns RESP_OK -> client returns True.
fake_socket.recv.return_value = protocol.RESP_OK
assert client.reset() is True
assert fake_socket.send.call_args[0][0] == protocol.RESET_MSG

# NACK path: server returns RESP_ERR -> client returns False.
fake_socket.recv.return_value = protocol.RESP_ERR
assert client.reset() is False


def test_protocol_tags_are_distinct_and_non_empty():
"""Protocol tags must be unique and non-empty to avoid collision."""
tags = {protocol.LOOKUP_MSG, protocol.RESET_MSG}
assert len(tags) == 2
for tag in tags:
assert isinstance(tag, bytes)
assert len(tag) > 0
assert protocol.RESP_OK != protocol.RESP_ERR


def test_scheduler_reset_connector_cache_invokes_connector_reset():
"""Cascade test: Scheduler.reset_prefix_cache(reset_connector=True)
cascades into MooncakeStoreConnector.reset_cache without dragging in
the heavy KVCacheManager fixtures.
"""
vllm_config = _make_vllm_config()
kv_cache_config = _make_kv_cache_config()

with (
set_current_vllm_config(vllm_config),
patch(
"vllm.distributed.kv_transfer.kv_connector.v1.mooncake.store."
"connector.MooncakeStoreScheduler"
) as mock_scheduler_cls,
):
conn = mooncake_store_connector.MooncakeStoreConnector(
vllm_config, KVConnectorRole.SCHEDULER, kv_cache_config
)

mock_scheduler_cls.return_value.reset_store.return_value = True

class _StubScheduler:
def __init__(self, c):
self.connector = c

def reset_connector_cache(self):
return self.connector.reset_cache() is not False

sched = _StubScheduler(conn)
assert sched.reset_connector_cache() is True
mock_scheduler_cls.return_value.reset_store.assert_called_once_with()

mock_scheduler_cls.return_value.reset_store.reset_mock()
mock_scheduler_cls.return_value.reset_store.return_value = False
assert sched.reset_connector_cache() is False


def test_reset_cache_scheduler_role_clears_local_state():
"""SCHEDULER reset_cache() must clear scheduler-side state that points
at master keys we're about to wipe -- pending load_specs and
accumulated _kv_cache_events both reference keys whose blobs are
about to be remove_all'd, so reading them after reset would surface
stale references to wiped keys.
"""
from vllm.distributed.kv_transfer.kv_connector.v1.mooncake.store.data import ( # noqa: E501
LoadSpec,
)

vllm_config = _make_vllm_config()
kv_cache_config = _make_kv_cache_config()

with (
set_current_vllm_config(vllm_config),
patch(
"vllm.distributed.kv_transfer.kv_connector.v1.mooncake.store."
"connector.MooncakeStoreScheduler"
) as mock_scheduler_cls,
):
conn = mooncake_store_connector.MooncakeStoreConnector(
vllm_config, KVConnectorRole.SCHEDULER, kv_cache_config
)

# Seed both sentinel pieces of stale-reference state.
sched_inst = mock_scheduler_cls.return_value
sched_inst.load_specs = {
"req-A": LoadSpec(vllm_cached_tokens=0, kvpool_cached_tokens=128, can_load=True)
}
conn._kv_cache_events = mooncake_store_connector.MooncakeStoreKVEvents(
num_workers=1
)
sched_inst.reset_store.return_value = True

assert conn.reset_cache() is True

# Both stale references must be cleared by the time reset_store is
# invoked downstream (load_specs flushed dict, events nulled).
assert sched_inst.load_specs == {}
assert conn._kv_cache_events is None


def test_lookup_key_server_reset_drains_send_queue_before_remove_all():
"""LookupKeyServer RESET handler must drain the send thread's
request_queue BEFORE calling store.remove_all -- otherwise stale
puts that were already in flight when the caller paused generation
can land on the master AFTER remove_all and silently repopulate it
with KV hashed against the previous-policy weights.
"""
# Exercise the handler logic directly with mocks for the send thread
# and store. We assert (a) join() is called, (b) remove_all is called,
# and (c) join() comes BEFORE remove_all in the call order. The full
# LookupKeyServer is heavy (binds a real ZMQ REP socket), so we drive
# just the dispatch branch here via a stub equivalent.
from vllm.distributed.kv_transfer.kv_connector.v1.mooncake.store import (
protocol,
)

call_order: list[str] = []

fake_send_queue = MagicMock()
fake_send_queue.join.side_effect = lambda: call_order.append("join")

fake_store = MagicMock()
fake_store.remove_all.side_effect = lambda force: call_order.append(
f"remove_all(force={force})"
)

fake_send_thread = MagicMock()
fake_send_thread.request_queue = fake_send_queue

fake_store_worker = MagicMock()
fake_store_worker.kv_send_thread = fake_send_thread
fake_store_worker.store = fake_store

fake_socket = MagicMock()
sent: list[bytes] = []
fake_socket.send.side_effect = lambda frame: sent.append(frame)

# Mirror the body of LookupKeyServer.process_request RESET_MSG branch.
# Keeping this inline (instead of importing the closure) keeps the
# test independent of the live thread lifecycle.
msg_type = protocol.RESET_MSG
if msg_type == protocol.RESET_MSG:
try:
if fake_store_worker.kv_send_thread is not None:
fake_store_worker.kv_send_thread.request_queue.join()
fake_store_worker.store.remove_all(force=True)
fake_socket.send(protocol.RESP_OK)
except Exception:
fake_socket.send(protocol.RESP_ERR)

# Drain must happen before remove_all.
assert call_order == ["join", "remove_all(force=True)"]
# Worker reported success.
assert sent == [protocol.RESP_OK]


def test_lookup_key_server_reset_skips_drain_when_no_send_thread():
"""When the worker has no send thread (e.g. consumer-only role
configurations), the RESET handler must still call remove_all
instead of dereferencing a None send thread.
"""
from vllm.distributed.kv_transfer.kv_connector.v1.mooncake.store import (
protocol,
)

call_order: list[str] = []
fake_store = MagicMock()
fake_store.remove_all.side_effect = lambda force: call_order.append("remove_all")

fake_store_worker = MagicMock()
fake_store_worker.kv_send_thread = None
fake_store_worker.store = fake_store

fake_socket = MagicMock()
sent: list[bytes] = []
fake_socket.send.side_effect = lambda frame: sent.append(frame)

msg_type = protocol.RESET_MSG
if msg_type == protocol.RESET_MSG:
try:
if fake_store_worker.kv_send_thread is not None:
fake_store_worker.kv_send_thread.request_queue.join()
fake_store_worker.store.remove_all(force=True)
fake_socket.send(protocol.RESP_OK)
except Exception:
fake_socket.send(protocol.RESP_ERR)

assert call_order == ["remove_all"]
assert sent == [protocol.RESP_OK]
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,23 @@ def request_finished_all_groups(
assert self.connector_scheduler is not None
return self.connector_scheduler.request_finished(request, block_ids)

def reset_cache(self) -> bool | None:
"""Reset the external Mooncake store on prefix-cache reset.

Drains the worker send queue, then runs ``remove_all`` on the
Mooncake master. Caller must first pause generation (e.g.
``pause_generation``) so no new puts are enqueued during drain.

Returns True on ack, False on failure, None for the worker role.
"""
if self.role == KVConnectorRole.SCHEDULER:
assert self.connector_scheduler is not None
# Clear local references to keys we're about to wipe.
self.connector_scheduler.load_specs.clear()
self._kv_cache_events = None
return self.connector_scheduler.reset_store()
return None

def update_connector_output(self, connector_output: KVConnectorOutput):
kv_cache_events = connector_output.kv_cache_events
if not kv_cache_events or not isinstance(
Expand Down
Loading
Loading