Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions tests/v1/kv_connector/unit/test_mooncake_store_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
from vllm.distributed.kv_transfer.kv_connector.v1.mooncake.store.data import (
MooncakeStoreConnectorMetadata,
)
from vllm.distributed.kv_transfer.kv_connector.v1.mooncake.store.metrics import (
MooncakeStoreConnectorStats,
)
from vllm.v1.kv_cache_interface import (
FullAttentionSpec,
KVCacheConfig,
Expand Down Expand Up @@ -129,6 +132,49 @@ def test_get_kv_connector_kv_cache_events_returns_none_when_empty():
assert connector.get_kv_connector_kv_cache_events() is None


def test_get_kv_connector_stats_delegates_to_worker():
vllm_config = _make_vllm_config()
kv_cache_config = _make_kv_cache_config()
expected_stats = MooncakeStoreConnectorStats()
expected_stats.record_operation("save_put", 0.01, 2, num_bytes=1024)

with (
set_current_vllm_config(vllm_config),
patch(
"vllm.distributed.kv_transfer.kv_connector.v1.mooncake.store."
"connector.MooncakeStoreWorker"
) as mock_worker_cls,
):
connector = mooncake_store_connector.MooncakeStoreConnector(
vllm_config, KVConnectorRole.WORKER, kv_cache_config
)

mock_worker_cls.return_value.get_kv_connector_stats.return_value = expected_stats
stats = connector.get_kv_connector_stats()

assert stats is expected_stats
mock_worker_cls.return_value.get_kv_connector_stats.assert_called_once_with()


def test_build_kv_connector_stats_reconstructs_mooncake_stats():
stats = mooncake_store_connector.MooncakeStoreConnector.build_kv_connector_stats(
{
"save_put": [
{
"duration_seconds": 0.02,
"num_keys": 4,
"num_bytes": 2048,
"status": "ok",
"num_failed_keys": 0,
}
]
}
)

assert isinstance(stats, MooncakeStoreConnectorStats)
assert stats.data["save_put"][0]["num_bytes"] == 2048


def test_get_kv_connector_kv_cache_events_wraps_worker_events():
vllm_config = _make_vllm_config()
kv_cache_config = _make_kv_cache_config()
Expand Down
102 changes: 102 additions & 0 deletions tests/v1/kv_connector/unit/test_mooncake_store_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
LoadSpec,
ReqMeta,
)
from vllm.distributed.kv_transfer.kv_connector.v1.mooncake.store.metrics import (
MooncakeStoreConnectorStats,
)


def _default_send_coord() -> mooncake_store_worker.MooncakeStoreCoordinator:
Expand Down Expand Up @@ -417,6 +420,24 @@ def test_store_sending_thread_skips_request_during_cpu_pressure():
assert store.batch_put_from_multi_buffers.call_count == 3


def test_store_sending_thread_records_mooncake_metrics():
store = MagicMock()
store.batch_is_exist.return_value = [0, 0]
store.batch_put_from_multi_buffers.return_value = [256, 256]
thread = _make_store_sending_thread(store)
stats = MooncakeStoreConnectorStats()
thread._record_operation_cb = stats.record_operation

thread.add_stored_request("req-a")
thread._handle_request(_make_store_req("req-a", [b"a0", b"a1"]))

assert len(stats.data["save_exists"]) == 1
assert stats.data["save_exists"][0]["num_keys"] == 2
assert len(stats.data["save_put"]) == 1
assert stats.data["save_put"][0]["num_bytes"] == 512
assert stats.data["save_put"][0]["status"] == "ok"


def test_store_sending_thread_only_skips_on_no_available_handle():
store = MagicMock()
store.batch_is_exist.side_effect = lambda keys: [0] * len(keys)
Expand Down Expand Up @@ -548,6 +569,29 @@ def test_recv_thread_logs_tier_summary_when_enabled(monkeypatch, caplog_vllm):
)


def test_recv_thread_records_partial_failure_metrics(monkeypatch):
monkeypatch.delenv("VLLM_MOONCAKE_STORE_TIER_LOG", raising=False)
store = MagicMock()
store.batch_get_into_multi_buffers.return_value = [256, -10]
thread = _make_store_recving_thread(store, disk_offload_buffer_budget_bytes=None)
stats = MooncakeStoreConnectorStats()
thread._record_operation_cb = stats.record_operation

req = _make_load_req(
"req-a",
[b"a0", b"a1"],
token_len=32,
)

thread._handle_request(req)

assert len(stats.data["load_get"]) == 1
assert stats.data["load_get"][0]["num_keys"] == 2
assert stats.data["load_get"][0]["num_bytes"] == 512
assert stats.data["load_get"][0]["status"] == "partial_failure"
assert stats.data["load_get"][0]["num_failed_keys"] == 1


def test_recv_thread_uses_ratio_scaled_budget_for_first_pass_split():
store = MagicMock()
store.batch_get_into_multi_buffers.side_effect = [
Expand Down Expand Up @@ -1025,6 +1069,8 @@ def _make_bare_worker(

worker.disk_offload_buffer_budget_bytes = None
worker.store_replicate_config = SimpleNamespace()
worker._kv_connector_stats_lock = threading.Lock()
worker.kv_connector_stats = MooncakeStoreConnectorStats()

spec = FullAttentionSpec(
block_size=block_size, num_kv_heads=8, head_size=64, dtype=None
Expand Down Expand Up @@ -1373,3 +1419,59 @@ def test_topology_embedded_cpu_only(tmp_path, monkeypatch):
assert w.store_replicate_config.preferred_segment == ""
# No disk budget — enable_offload was absent (defaults to False).
assert w.disk_offload_buffer_budget_bytes is None


# ---------------------------------------------------------------------------
# Stats/metrics tests (PR-35 port)
# ---------------------------------------------------------------------------


def test_mooncake_store_stats_aggregate_reduce():
stats = MooncakeStoreConnectorStats()
stats.record_operation("save_put", 0.01, 2, num_bytes=128)
other = MooncakeStoreConnectorStats()
other.record_operation(
"save_put",
0.03,
1,
num_bytes=64,
status="error",
num_failed_keys=1,
)

reduced = stats.aggregate(other).reduce()

assert reduced["save_put_count"] == 2
assert reduced["save_put_total_keys"] == 3
assert reduced["save_put_total_bytes"] == 192
assert reduced["save_put_failed_keys"] == 1
assert reduced["save_put_error_count"] == 1


def test_worker_get_kv_connector_stats_resets_after_read():
worker = _make_bare_worker()
worker._record_kv_connector_operation(
"save_put",
0.01,
2,
num_bytes=128,
)

stats = worker.get_kv_connector_stats()

assert isinstance(stats, MooncakeStoreConnectorStats)
assert stats.data["save_put"][0]["num_bytes"] == 128
assert worker.get_kv_connector_stats() is None


def test_lookup_records_mooncake_metrics():
worker = _make_bare_worker()
worker.store.batch_is_exist.return_value = [1, 1]

result = worker.lookup(32, [b"a0", b"a1"])
stats = worker.get_kv_connector_stats()

assert result == 32
assert isinstance(stats, MooncakeStoreConnectorStats)
assert len(stats.data["lookup_exists"]) == 1
assert stats.data["lookup_exists"][0]["num_keys"] == 2
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@
KVConnectorRole,
SupportsHMA,
)
from vllm.distributed.kv_transfer.kv_connector.v1.metrics import (
KVConnectorPromMetrics,
KVConnectorStats,
PromMetric,
PromMetricT,
)
from vllm.forward_context import ForwardContext
from vllm.logger import init_logger
from vllm.v1.attention.backend import AttentionMetadata
Expand All @@ -38,6 +44,7 @@
from vllm.v1.request import Request

from .data import MooncakeStoreConnectorMetadata
from .metrics import MooncakeStoreConnectorStats, MooncakeStorePromMetrics
from .scheduler import MooncakeStoreScheduler
from .worker import MooncakeStoreWorker

Expand Down Expand Up @@ -274,3 +281,30 @@ def get_kv_connector_kv_cache_events(
kv_events = MooncakeStoreKVEvents(num_workers=1)
kv_events.add_events(events)
return kv_events

def get_kv_connector_stats(self) -> KVConnectorStats | None:
if self.connector_worker is None:
return None
return self.connector_worker.get_kv_connector_stats()

@classmethod
def build_kv_connector_stats(
cls, data: dict[str, Any] | None = None
) -> KVConnectorStats | None:
return (
MooncakeStoreConnectorStats(data=data)
if data is not None
else MooncakeStoreConnectorStats()
)

@classmethod
def build_prom_metrics(
cls,
vllm_config: VllmConfig,
metric_types: dict[type[PromMetric], type[PromMetricT]],
labelnames: list[str],
per_engine_labelvalues: dict[int, list[object]],
) -> KVConnectorPromMetrics:
return MooncakeStorePromMetrics(
vllm_config, metric_types, labelnames, per_engine_labelvalues
)
Loading
Loading