diff --git a/tests/v1/kv_connector/unit/test_offloading_connector.py b/tests/v1/kv_connector/unit/test_offloading_connector.py index fea9ff09b567..1805f009db0e 100644 --- a/tests/v1/kv_connector/unit/test_offloading_connector.py +++ b/tests/v1/kv_connector/unit/test_offloading_connector.py @@ -16,6 +16,7 @@ from vllm.distributed.kv_transfer.kv_connector.v1.offloading_connector import ( OffloadingConnector, OffloadingConnectorMetadata, + OffloadingConnectorStats, ) from vllm.forward_context import ForwardContext from vllm.utils.hashing import sha256 @@ -86,7 +87,14 @@ def complete_jobs(self, job_ids: set[int]) -> None: if job_id in self.waiting_jobs: self.waiting_jobs.remove(job_id) self.completed_jobs.append(job_id) - self.completed_transfers.append((job_id, True)) + result = TransferResult( + job_id=job_id, + success=True, + transfer_size=None, + transfer_time=None, + transfer_type=None, + ) + self.completed_transfers.append(result) def wait(self, job_ids: set[int]) -> None: self.flushed_jobs |= job_ids @@ -720,3 +728,144 @@ def test_concurrent_lookups_of_the_same_prefix(request_runner): # second request will use the GPU prefix cache assert transfer_jobs == list(runner.offloading_spec.handler.transfer_specs) + + +class TestOffloadingConnectorStats: + """Tests for OffloadingConnector stats reconstruction and operations.""" + + def test_build_kv_connector_stats_with_none(self): + """Test that build_kv_connector_stats returns empty stats when given None.""" + stats = OffloadingConnector.build_kv_connector_stats(data=None) + + assert stats is not None + assert isinstance(stats, OffloadingConnectorStats) + assert len(stats.data) == 0 + assert stats.is_empty() + + def test_build_kv_connector_stats_with_empty_dict(self): + """Test that build_kv_connector_stats returns empty stats with empty dict.""" + stats = OffloadingConnector.build_kv_connector_stats(data={}) + + assert stats is not None + assert isinstance(stats, OffloadingConnectorStats) + assert len(stats.data) == 0 + assert stats.is_empty() + + def test_build_kv_connector_stats_reconstructs_offload_stats(self): + """Test that OffloadingConnector stats are properly reconstructed with + correct data.""" + serialized_data = { + "CPU_to_GPU": [ + {"op_size": 16, "op_time": 1.0}, + {"op_size": 8, "op_time": 0.5}, + ], + "GPU_to_CPU": [ + {"op_size": 1, "op_time": 0.1}, + {"op_size": 2, "op_time": 0.2}, + ], + } + + stats = OffloadingConnector.build_kv_connector_stats(data=serialized_data) + + offload_connector_stats = stats + assert isinstance(offload_connector_stats, OffloadingConnectorStats) + assert offload_connector_stats.data["CPU_to_GPU"] == [ + {"op_size": 16, "op_time": 1.0}, + {"op_size": 8, "op_time": 0.5}, + ] + assert offload_connector_stats.data["GPU_to_CPU"] == [ + {"op_size": 1, "op_time": 0.1}, + {"op_size": 2, "op_time": 0.2}, + ] + + def test_aggregate_same_connector(self): + """Test aggregating stats from the same connector type.""" + stats1 = OffloadingConnectorStats( + data={ + "CPU_to_GPU": [ + {"op_size": 16, "op_time": 1.0}, + {"op_size": 8, "op_time": 0.5}, + ], + "GPU_to_CPU": [ + {"op_size": 1, "op_time": 0.1}, + {"op_size": 2, "op_time": 0.2}, + ], + } + ) + + stats2 = OffloadingConnectorStats( + data={ + "CPU_to_GPU": [ + {"op_size": 3, "op_time": 0.2}, + {"op_size": 7, "op_time": 0.9}, + ], + "GPU_to_CPU": [{"op_size": 16, "op_time": 2}], + } + ) + + result = stats1.aggregate(stats2) + + assert result is stats1 # Should return self + offload_connector_stats = result + assert offload_connector_stats.data["CPU_to_GPU"] == [ + {"op_size": 16, "op_time": 1.0}, + {"op_size": 8, "op_time": 0.5}, + {"op_size": 3, "op_time": 0.2}, + {"op_size": 7, "op_time": 0.9}, + ] + assert offload_connector_stats.data["GPU_to_CPU"] == [ + {"op_size": 1, "op_time": 0.1}, + {"op_size": 2, "op_time": 0.2}, + {"op_size": 16, "op_time": 2}, + ] + + def test_reduce(self): + """Test that reduce() correctly reduces all nested connector stats.""" + stats = OffloadingConnectorStats( + data={ + "CPU_to_GPU": [ + {"op_size": 16, "op_time": 1.0}, + {"op_size": 8, "op_time": 0.5}, + {"op_size": 3, "op_time": 0.2}, + {"op_size": 7, "op_time": 0.9}, + ], + "GPU_to_CPU": [ + {"op_size": 1, "op_time": 0.1}, + {"op_size": 2, "op_time": 0.2}, + {"op_size": 16, "op_time": 2}, + ], + } + ) + + reduced = stats.reduce() + + assert isinstance(reduced, dict) + # Check that the stats were reduced (should have aggregated values) + assert "CPU_to_GPU_total_bytes" in reduced + assert "CPU_to_GPU_total_time" in reduced + assert "GPU_to_CPU_total_bytes" in reduced + assert "GPU_to_CPU_total_time" in reduced + assert reduced["CPU_to_GPU_total_bytes"] == 34 + assert reduced["CPU_to_GPU_total_time"] == 2.6 + assert reduced["GPU_to_CPU_total_time"] == 2.3 + assert reduced["GPU_to_CPU_total_bytes"] == 19 + + def test_reset(self): + """Test that reset() resets all nested connector stats.""" + offload_connector_stats = OffloadingConnectorStats( + data={ + "CPU_to_GPU": [ + {"op_size": 3, "op_time": 0.2}, + {"op_size": 7, "op_time": 0.9}, + ], + "GPU_to_CPU": [{"op_size": 16, "op_time": 2}], + } + ) + + assert not offload_connector_stats.is_empty() + + offload_connector_stats.reset() + + # After reset, stats should be empty + assert offload_connector_stats.is_empty() + assert len(offload_connector_stats.data) == 0 diff --git a/tests/v1/kv_offload/test_cpu_gpu.py b/tests/v1/kv_offload/test_cpu_gpu.py index 4a2801cefc42..9d14e3cff89e 100644 --- a/tests/v1/kv_offload/test_cpu_gpu.py +++ b/tests/v1/kv_offload/test_cpu_gpu.py @@ -168,15 +168,30 @@ def test_transfer( orig_dst_caches = [x.clone() for x in handler.dst_tensors] # call transfer function + start_time = time.time() assert handler.transfer_async(1, (src_spec, dst_spec)) - assert set({x[0] for x in handler._transfers}) == {1} + assert set({x.job_id for x in handler._transfers}) == {1} # wait for transfer to complete end_time = time.time() + 10 while time.time() < end_time: finished = handler.get_finished() if finished: - assert finished == [(1, True)] + assert finished[0].job_id == 1 + assert finished[0].success + assert ( + finished[0].transfer_type == ("GPU", "CPU") + if gpu_to_cpu + else ("CPU", "GPU") + ) + assert ( + finished[0].transfer_size + == handler.total_block_size_in_bytes + * handler.dst_block_size_factor + * len(dst_blocks) + ) + assert finished[0].transfer_time > 0 + assert finished[0].transfer_time < (time.time() - start_time) break time.sleep(0.1) diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/metrics.py b/vllm/distributed/kv_transfer/kv_connector/v1/metrics.py index 28aad71ab48f..db77d41c487f 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/metrics.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/metrics.py @@ -124,7 +124,7 @@ def __init__( self._counter_cls = metric_types[Counter] self._histogram_cls = metric_types[Histogram] self._labelnames = labelnames - self._per_engine_labelvalues = per_engine_labelvalues + self.per_engine_labelvalues = per_engine_labelvalues def make_per_engine(self, metric: PromMetric) -> dict[int, PromMetric]: """ @@ -134,7 +134,7 @@ def make_per_engine(self, metric: PromMetric) -> dict[int, PromMetric]: """ return { idx: metric.labels(*labelvalues) - for idx, labelvalues in self._per_engine_labelvalues.items() + for idx, labelvalues in self.per_engine_labelvalues.items() } def observe(self, transfer_stats_data: dict[str, Any], engine_idx: int = 0): diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/offloading_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/offloading_connector.py index 5667f1d3f248..708a7f12d307 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/offloading_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/offloading_connector.py @@ -17,6 +17,12 @@ KVConnectorRole, ) from vllm.distributed.kv_transfer.kv_connector.v1.base import KVConnectorMetadata +from vllm.distributed.kv_transfer.kv_connector.v1.metrics import ( + KVConnectorPromMetrics, + KVConnectorStats, + PromMetric, + PromMetricT, +) from vllm.forward_context import ForwardContext from vllm.logger import init_logger from vllm.v1.attention.backend import AttentionBackend, AttentionMetadata @@ -28,7 +34,11 @@ from vllm.v1.kv_offload.factory import OffloadingSpecFactory from vllm.v1.kv_offload.mediums import GPULoadStoreSpec from vllm.v1.kv_offload.spec import OffloadingSpec -from vllm.v1.kv_offload.worker.worker import OffloadingWorker, TransferSpec +from vllm.v1.kv_offload.worker.worker import ( + OffloadingWorker, + TransferSpec, + TransferType, +) from vllm.v1.outputs import KVConnectorOutput from vllm.v1.request import Request @@ -37,6 +47,66 @@ logger = init_logger(__name__) +@dataclass +class OffloadingOperationMetrics: + op_size: int + op_time: float + + +@dataclass +class OffloadingConnectorStats(KVConnectorStats): + def __post_init__(self): + if not self.data: + # Empty container init, no data is passed in. + self.reset() + + def reset(self): + self.data: dict[str, list[OffloadingOperationMetrics]] = {} + + def aggregate(self, other: KVConnectorStats) -> KVConnectorStats: + if not other.is_empty(): + for k, v in other.data.items(): + if k not in self.data: + self.data[k] = v + else: + accumulator = self.data[k] + assert isinstance(accumulator, list) + accumulator.extend(v) + return self + + def reduce(self) -> dict[str, int | float]: + """ + Reduce the observations collected during a time interval to one or + more representative values (eg avg/median/sum of the series). + This is meant to be called by the logger to produce a summary of the + stats for the last time interval. + """ + return_dict: dict[str, int | float] = {} + for transfer_type, ops_list in self.data.items(): + assert isinstance(ops_list, list) + total_bytes = 0 + total_time = 0 + for op in ops_list: + assert isinstance(op, dict) + total_bytes += op["op_size"] + total_time += op["op_time"] + return_dict[f"{transfer_type}_total_bytes"] = total_bytes + return_dict[f"{transfer_type}_total_time"] = total_time + return return_dict + + def is_empty(self) -> bool: + return not self.data + + def record_transfer(self, num_bytes: int, time: float, transfer_type: TransferType): + src, dst = transfer_type + transfer_type_key = src + "_to_" + dst + op = OffloadingOperationMetrics(num_bytes, time) + if transfer_type_key in self.data: + self.data[transfer_type_key].append(op) + else: + self.data[transfer_type_key] = [op] + + @dataclass class OffloadingConnectorMetadata(KVConnectorMetadata): reqs_to_load: dict[ReqId, TransferSpec] @@ -143,6 +213,33 @@ def take_events(self) -> Iterable[KVCacheEvent]: assert self.connector_scheduler is not None return self.connector_scheduler.take_events() + def get_kv_connector_stats(self) -> KVConnectorStats | None: + if self.connector_worker is None: + return None # We only emit stats from the worker-side + return self.connector_worker.get_kv_connector_stats() + + @classmethod + def build_kv_connector_stats( + cls, data: dict[str, Any] | None = None + ) -> KVConnectorStats | None: + return ( + OffloadingConnectorStats(data=data) + if data is not None + else OffloadingConnectorStats() + ) + + @classmethod + def build_prom_metrics( + cls, + vllm_config: VllmConfig, + metric_types: dict[type[PromMetric], type[PromMetricT]], + labelnames: list[str], + per_engine_labelvalues: dict[int, list[object]], + ) -> KVConnectorPromMetrics: + return OffloadPromMetrics( + vllm_config, metric_types, labelnames, per_engine_labelvalues + ) + class OffloadingConnectorScheduler: """Implementation of Scheduler side methods""" @@ -467,7 +564,8 @@ def __init__(self, spec: OffloadingSpec): self._job_counter = 0 - # job_id -> (req_id, store) + self.kv_connector_stats = OffloadingConnectorStats() + # req_id -> (job_id, store) self._jobs: dict[int, tuple[ReqId, bool]] = {} # req_id -> active job IDs self._load_job: dict[ReqId, int] = {} @@ -559,10 +657,21 @@ def get_finished(self, finished_req_ids: set[str]) -> tuple[set[str], set[str]]: """ finished_sending = set() finished_recving = set() - for job_id, success in self.worker.get_finished(): + for transfer_result in self.worker.get_finished(): # we currently do not support job failures - assert success + job_id = transfer_result.job_id + assert transfer_result.success req_id, store = self._jobs.pop(job_id) + if ( + transfer_result.transfer_time + and transfer_result.transfer_size is not None + and transfer_result.transfer_type is not None + ): + self.kv_connector_stats.record_transfer( + num_bytes=transfer_result.transfer_size, + time=transfer_result.transfer_time, + transfer_type=transfer_result.transfer_type, + ) if store: req_jobs = self._store_jobs[req_id] req_jobs.remove(job_id) @@ -588,3 +697,104 @@ def get_finished(self, finished_req_ids: set[str]) -> tuple[set[str], set[str]]: del self._store_jobs[req_id] return finished_sending, finished_recving + + def get_kv_connector_stats(self) -> KVConnectorStats | None: + """ + Get the KV transfer stats for the connector. + """ + + if self.kv_connector_stats.is_empty(): + return None + # Clear stats for next iteration + kv_connector_stats = self.kv_connector_stats + self.kv_connector_stats = OffloadingConnectorStats() + return kv_connector_stats + + +class OffloadPromMetrics(KVConnectorPromMetrics): + def __init__( + self, + vllm_config: VllmConfig, + metric_types: dict[type[PromMetric], type[PromMetricT]], + labelnames: list[str], + per_engine_labelvalues: dict[int, list[object]], + ): + super().__init__(vllm_config, metric_types, labelnames, per_engine_labelvalues) + # (engine_idx, transfer_tupe) -> (metric with bounded labels) + self.histogram_transfer_size: dict[tuple[int, str], PromMetricT] = {} + self.counter_kv_bytes: dict[tuple[int, str], PromMetricT] = {} + self.counter_kv_transfer_time: dict[tuple[int, str], PromMetricT] = {} + buckets = [ # In bytes + 1e6, + 5e6, + 10e6, + 20e6, + 40e6, + 60e6, + 80e6, + 100e6, + 150e6, + 200e6, + ] + + self._counter_kv_bytes = self._counter_cls( + name="vllm:kv_offload_total_bytes", + documentation="Number of bytes offloaded by KV connector", + labelnames=labelnames + ["transfer_type"], + ) + + self._counter_kv_transfer_time = self._counter_cls( + name="vllm:kv_offload_total_time", + documentation="Total time measured by all KV offloading operations", + labelnames=labelnames + ["transfer_type"], + ) + + self._histogram_transfer_size = self._histogram_cls( + name="vllm:kv_offload_size", + documentation="Histogram of KV offload transfer size, in bytes.", + buckets=buckets[:], + labelnames=labelnames + ["transfer_type"], + ) + + def observe(self, transfer_stats_data: dict[str, Any], engine_idx: int = 0): + """ + Observe transfer statistics from the new data structure. + transfer_stats_data is expected to be a dict where: + - keys are transfer type strings (e.g., "cpu_to_gpu", "gpu_to_cpu") + - values are lists of OffloadingOperationMetrics objects + """ + + for transfer_type, ops in transfer_stats_data.items(): + # Cache: + if (engine_idx, transfer_type) not in self.histogram_transfer_size: + self.histogram_transfer_size[(engine_idx, transfer_type)] = ( + self._histogram_transfer_size.labels( + *(self.per_engine_labelvalues[engine_idx] + [transfer_type]) + ) + ) + self.counter_kv_bytes[(engine_idx, transfer_type)] = ( + self._counter_kv_bytes.labels( + *(self.per_engine_labelvalues[engine_idx] + [transfer_type]) + ) + ) + self.counter_kv_transfer_time[(engine_idx, transfer_type)] = ( + self._counter_kv_transfer_time.labels( + *(self.per_engine_labelvalues[engine_idx] + [transfer_type]) + ) + ) + + # Process ops: + assert isinstance(ops, list) + for op in ops: # ops is a list of serialized OffloadingOperationMetrics + assert isinstance(op, dict) + # Observe size histogram + self.histogram_transfer_size[(engine_idx, transfer_type)].observe( + op["op_size"] + ) + + # Increment byte and time counters + self.counter_kv_bytes[(engine_idx, transfer_type)].inc(op["op_size"]) + + self.counter_kv_transfer_time[(engine_idx, transfer_type)].inc( + op["op_time"] + ) diff --git a/vllm/v1/kv_offload/worker/cpu_gpu.py b/vllm/v1/kv_offload/worker/cpu_gpu.py index b6caf3c4197d..a5abae51ef03 100644 --- a/vllm/v1/kv_offload/worker/cpu_gpu.py +++ b/vllm/v1/kv_offload/worker/cpu_gpu.py @@ -1,6 +1,7 @@ # SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright contributors to the vLLM project from collections import deque +from dataclasses import dataclass import numpy as np import torch @@ -19,6 +20,15 @@ logger = init_logger(__name__) +@dataclass +class Transfer: + job_id: int + stream: torch.cuda.Stream + start_event: torch.Event + end_event: torch.Event + num_bytes: int + + def expand_block_ids( block_ids: np.ndarray, block_size_factor: int, @@ -92,14 +102,15 @@ def __init__( tensor.element_size() * tensor.stride(0) * min_block_size_factor for tensor in src_tensors ] + self.total_block_size_in_bytes = sum(self.block_size_in_bytes) assert len(src_tensors) > 0 self.gpu_to_cpu: bool = self.src_tensors[0].is_cuda - + self.transfer_type = ("GPU", "CPU") if self.gpu_to_cpu else ("CPU", "GPU") # job_id -> event self._transfer_events: dict[int, torch.Event] = {} # queue of transfers (job_id, stream, event) - self._transfers: deque[tuple[int, torch.cuda.Stream, torch.Event]] = deque() + self._transfers: deque[Transfer] = deque() # list of CUDA streams available for re-use self._stream_pool: list[torch.cuda.Stream] = [] # list of CUDA events available for re-use @@ -132,16 +143,27 @@ def transfer_async(self, job_id: int, transfer_spec: TransferSpec) -> bool: src_to_dst_tensor = torch.from_numpy(src_to_dst) stream = self._stream_pool.pop() if self._stream_pool else torch.cuda.Stream() - event = self._event_pool.pop() if self._event_pool else torch.Event() + start_event = ( + self._event_pool.pop() + if self._event_pool + else torch.Event(enable_timing=True) + ) + end_event = ( + self._event_pool.pop() + if self._event_pool + else torch.Event(enable_timing=True) + ) if self.gpu_to_cpu: # wait for model computation to finish before offloading stream.wait_stream(torch.cuda.current_stream()) if self._transfers: - _, _, last_event = self._transfers[-1] + last_transfer: Transfer = self._transfers[-1] + last_event = last_transfer.end_event # assure job will start only after the previous one completes stream.wait_event(last_event) with torch.cuda.stream(stream): + start_event.record(stream) for src_tensor, dst_tensor, block_size_in_bytes in zip( self.src_tensors, self.dst_tensors, @@ -153,22 +175,42 @@ def transfer_async(self, job_id: int, transfer_spec: TransferSpec) -> bool: block_size_in_bytes, src_to_dst_tensor, ) - event.record(stream) - - self._transfer_events[job_id] = event - self._transfers.append((job_id, stream, event)) + end_event.record(stream) + + self._transfer_events[job_id] = end_event + self._transfers.append( + Transfer( + job_id=job_id, + stream=stream, + start_event=start_event, + end_event=end_event, + num_bytes=dst_sub_block_count * self.total_block_size_in_bytes, + ) + ) # success return True def get_finished(self) -> list[TransferResult]: results: list[TransferResult] = [] - while self._transfers and self._transfers[0][2].query(): - job_id, stream, event = self._transfers.popleft() - results.append((job_id, True)) - self._stream_pool.append(stream) - self._event_pool.append(event) - del self._transfer_events[job_id] + while self._transfers and self._transfers[0].end_event.query(): + transfer = self._transfers.popleft() + transfer_time = ( + transfer.start_event.elapsed_time(transfer.end_event) * 1e-3 + ) # elapsed_time is in miliseconds + result = TransferResult( + job_id=transfer.job_id, + success=True, + transfer_size=transfer.num_bytes, + transfer_time=transfer_time, + transfer_type=self.transfer_type, + ) + + results.append(result) + self._stream_pool.append(transfer.stream) + self._event_pool.append(transfer.end_event) + self._event_pool.append(transfer.start_event) + del self._transfer_events[transfer.job_id] return results def wait(self, job_ids: set[int]): diff --git a/vllm/v1/kv_offload/worker/worker.py b/vllm/v1/kv_offload/worker/worker.py index d332f2e156a8..efb31c2a0ec7 100644 --- a/vllm/v1/kv_offload/worker/worker.py +++ b/vllm/v1/kv_offload/worker/worker.py @@ -1,6 +1,7 @@ # SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright contributors to the vLLM project from abc import ABC, abstractmethod +from dataclasses import dataclass from vllm.logger import init_logger from vllm.v1.kv_offload.abstract import LoadStoreSpec @@ -9,12 +10,19 @@ TransferSpec = tuple[LoadStoreSpec, LoadStoreSpec] # transfers are forwarded to workers by (src_medium, dst_medium) TransferType = tuple[str, str] -# transfer result (job_id, success) -TransferResult = tuple[int, bool] logger = init_logger(__name__) +@dataclass +class TransferResult: + job_id: int + success: bool + transfer_size: int | None = None # Size in bytes + transfer_time: float | None = None + transfer_type: TransferType | None = None + + class OffloadingHandler(ABC): """ OffloadingHandler class for managing asynchronous KV data transfers @@ -57,7 +65,6 @@ def get_finished(self) -> list[TransferResult]: def wait(self, job_ids: set[int]) -> None: """ Wait for jobs to finish (blocking). - Args: job_ids: The set of job IDs to wait for. """ @@ -120,7 +127,6 @@ def transfer_async(self, job_id: int, spec: TransferSpec) -> bool: transfer_type = (src.medium(), dst.medium()) handler = self.transfer_type_to_handler.get(transfer_type) assert handler is not None - try: success = handler.transfer_async(job_id, spec) except Exception as e: @@ -137,7 +143,6 @@ def transfer_async(self, job_id: int, spec: TransferSpec) -> bool: logger.warning("Failed to submit %r transfer %d", transfer_type, job_id) else: logger.debug("Submitted %r transfer %d: %r", transfer_type, job_id, spec) - return success def get_finished(self) -> list[TransferResult]: @@ -145,7 +150,7 @@ def get_finished(self) -> list[TransferResult]: Get transfers finished since last call. Returns: - A list of (job_id, success) of transfers. + A list of TransferResults """ finished = [] for handler in self.handlers: