[Metrics] [KVConnector] Add Offloading Connector metrics#27942
[Metrics] [KVConnector] Add Offloading Connector metrics#27942markmc merged 51 commits intovllm-project:mainfrom
Conversation
|
👋 Hi! Thank you for contributing to the vLLM project. 💬 Join our developer Slack at https://slack.vllm.ai to discuss your PR in #pr-reviews, coordinate on features in #feat- channels, or join special interest groups in #sig- channels. Just a reminder: PRs would not trigger full CI run by default. Instead, it would only run You ask your reviewers to trigger select CI tests on top of Once the PR is approved and ready to go, your PR reviewer(s) can run CI to test the changes comprehensively before merging. To run CI, PR reviewers can either: Add If you have any questions, please reach out to us on Slack at https://slack.vllm.ai. 🚀 |
There was a problem hiding this comment.
Code Review
This pull request adds metrics for the Offloading Connector, including queries, hits, and timing for store/load operations. The changes are generally good, but I've found a critical bug in a type hint that would cause a NameError, and a flaw in the timing metric calculation that would lead to inaccurate results. I've provided suggestions to fix these issues. I also suggested a small improvement for robustness in the stats reset logic.
| # req_id -> (job_id, store) | ||
| self._jobs: dict[int, tuple[ReqId, bool]] = {} | ||
| # req_id -> (job_id, store, start_time, num_blocks) | ||
| self._jobs: dict[int, tuple[ReqId, bool, float, num_blocks]] = {} |
There was a problem hiding this comment.
| class OffloadTiming: | ||
| data: dict[str, Any] = field(default_factory=dict) | ||
| num_stores: int = 0 | ||
| num_loads: int = 0 | ||
| def __post_init__(self): | ||
| if not self.data: | ||
| # Empty container init, no data is passed in. | ||
| self.reset() | ||
|
|
||
| def reset(self): | ||
| self.data: dict[str, float] = { | ||
| "total_load_time": 0, | ||
| "total_store_time": 0 | ||
| } | ||
| self.num_stores = 0 | ||
| self.num_loads = 0 | ||
|
|
||
| # Time is already normalized by the number of blocks. | ||
| def record_time(self, time: float, is_store: bool): | ||
| if is_store: | ||
| self.data["total_store_time"] += time | ||
| self.num_stores += 1 | ||
| else: | ||
| self.data["total_load_time"] += time | ||
| self.num_loads += 1 | ||
|
|
There was a problem hiding this comment.
The current implementation for calculating average load/store time is flawed. It computes the average of per-operation rates (sum(duration_i / num_blocks_i) / num_ops), which is not mathematically equivalent to the true average rate (sum(duration_i) / sum(num_blocks_i)) when num_blocks_i varies across operations. This can lead to inaccurate metrics.
To fix this, OffloadTiming should accumulate the total duration and total number of blocks for all operations, and then the average time per token can be correctly calculated. This comment refactors OffloadTiming to support this. Subsequent comments will adjust the call sites.
@dataclass
class OffloadTiming:
data: dict[str, Any] = field(default_factory=dict)
def __post_init__(self):
if not self.data:
# Empty container init, no data is passed in.
self.reset()
def reset(self):
self.data: dict[str, float] = {
"total_load_duration": 0.0,
"total_store_duration": 0.0,
"total_loaded_blocks": 0,
"total_stored_blocks": 0,
}
def record_op(self, duration: float, num_blocks: int, is_store: bool):
if is_store:
self.data["total_store_duration"] += duration
self.data["total_stored_blocks"] += num_blocks
else:
self.data["total_load_duration"] += duration
self.data["total_loaded_blocks"] += num_blocks| def reset(self): | ||
| self.data: dict[str, float] = { | ||
| "total_queries" : 0, | ||
| "total_hits": 0, | ||
| } |
There was a problem hiding this comment.
It's good practice to initialize all keys that will be accessed later in the reset method. The reduce method accesses avg_load_time and avg_store_time, but they are not initialized here. While the current code flow seems to ensure they are set before reduce is called, adding them here makes the code more robust against future changes.
| def reset(self): | |
| self.data: dict[str, float] = { | |
| "total_queries" : 0, | |
| "total_hits": 0, | |
| } | |
| def reset(self): | |
| self.data: dict[str, float] = { | |
| "total_queries" : 0, | |
| "total_hits": 0, | |
| "avg_load_time": 0.0, | |
| "avg_store_time": 0.0, | |
| } |
| def aggregate_time_data(self, offload_timing: OffloadTiming): | ||
| # Avoid division by zero: | ||
| if offload_timing.num_loads == 0 or self.offloaded_block_size == 0: | ||
| self.data["avg_load_time"] = 0 | ||
| else: | ||
| self.data["avg_load_time"] = offload_timing.data["total_load_time"] / (offload_timing.num_loads * self.offloaded_block_size) | ||
| if offload_timing.num_stores == 0 or self.gpu_block_size == 0: | ||
| self.data["avg_store_time"] = 0 | ||
| else: | ||
| self.data["avg_store_time"] = offload_timing.data["total_store_time"] / (offload_timing.num_stores * self.gpu_block_size) | ||
|
|
There was a problem hiding this comment.
Following the refactoring of OffloadTiming, this method needs to be updated to correctly calculate the average load and store times. The average time per token should be total_duration / total_tokens.
| def aggregate_time_data(self, offload_timing: OffloadTiming): | |
| # Avoid division by zero: | |
| if offload_timing.num_loads == 0 or self.offloaded_block_size == 0: | |
| self.data["avg_load_time"] = 0 | |
| else: | |
| self.data["avg_load_time"] = offload_timing.data["total_load_time"] / (offload_timing.num_loads * self.offloaded_block_size) | |
| if offload_timing.num_stores == 0 or self.gpu_block_size == 0: | |
| self.data["avg_store_time"] = 0 | |
| else: | |
| self.data["avg_store_time"] = offload_timing.data["total_store_time"] / (offload_timing.num_stores * self.gpu_block_size) | |
| def aggregate_time_data(self, offload_timing: OffloadTiming): | |
| # Avoid division by zero: | |
| total_loaded_tokens = offload_timing.data["total_loaded_blocks"] * self.offloaded_block_size | |
| if total_loaded_tokens > 0: | |
| self.data["avg_load_time"] = offload_timing.data["total_load_duration"] / total_loaded_tokens | |
| else: | |
| self.data["avg_load_time"] = 0.0 | |
| total_stored_tokens = offload_timing.data["total_stored_blocks"] * self.gpu_block_size | |
| if total_stored_tokens > 0: | |
| self.data["avg_store_time"] = offload_timing.data["total_store_duration"] / total_stored_tokens | |
| else: | |
| self.data["avg_store_time"] = 0.0 |
| @@ -450,7 +575,8 @@ def get_finished(self, finished_req_ids: set[str]) -> tuple[set[str], set[str]]: | |||
| for job_id, success in self.worker.get_finished(): | |||
| # we currently do not support job failures | |||
| assert success | |||
| req_id, store = self._jobs.pop(job_id) | |||
| req_id, store, start_time, num_blocks = self._jobs.pop(job_id) | |||
| self._timing_stats.record_time((time.perf_counter() - start_time) / num_blocks, store) | |||
There was a problem hiding this comment.
Following the refactoring of OffloadTiming, this needs to be updated to call the new record_op method with the total duration and number of blocks for the finished operation. This also fixes a potential ZeroDivisionError if num_blocks is 0.
| self._timing_stats.record_time((time.perf_counter() - start_time) / num_blocks, store) | |
| duration = time.perf_counter() - start_time | |
| if num_blocks > 0: | |
| self._timing_stats.record_op(duration, num_blocks, store) |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| def get_kv_connector_stats(self) -> KVConnectorStats | None: | ||
| # if self.connector_worker is None: | ||
| # return None | ||
| if self.connector_worker: | ||
| self.kv_connector_stats.aggregate_time_data(self.connector_worker._timing_stats) | ||
| return self.kv_connector_stats |
There was a problem hiding this comment.
Reset KV offload metrics before logging
The connector counts cache queries/hits in get_num_new_matched_tokens and stores them on self.kv_connector_stats, but get_kv_connector_stats just returns the same instance without cloning or clearing it. The Prometheus logger treats the returned dict as a delta and calls counter_offload_kv_connector_* .inc(...) on every iteration. Because the same cumulative totals are reported over and over, the counters grow faster than the real number of queries/hits (e.g., the second call increments by the whole lifetime total instead of the new activity). The stats object should be reset or snapshot before being handed to the logger to avoid double‑counting.
Useful? React with 👍 / 👎.
Signed-off-by: omerpaz95 <omerpaz95@gmail.com>
Signed-off-by: omerpaz95 <omerpaz95@gmail.com>
ff71bba to
fb23282
Compare
Signed-off-by: omerpaz95 <omerpaz95@gmail.com>
vllm/v1/metrics/loggers.py
Outdated
| ) | ||
|
|
||
| # | ||
| # Offloading connector |
There was a problem hiding this comment.
Connectors can now add their own metrics via the build_prom_metrics() added in #26811
Signed-off-by: omerpaz95 <73347585+omerpaz95@users.noreply.github.com>
|
Hi @omerpaz95, the pre-commit checks have failed. Please run: uv pip install pre-commit
pre-commit install
pre-commit run --all-filesThen, commit the changes and push to your branch. For future commits, Tip Is
|
| for k, v in self.data.items(): | ||
| assert isinstance(v, list) | ||
| for op in v: | ||
| for stat, value in [ | ||
| ("_total_bytes", op["op_size"]), | ||
| ("_total_time", op["op_time"]), | ||
| ]: | ||
| log_key = k + stat | ||
| if log_key not in return_dict: | ||
| return_dict[log_key] = value | ||
| else: | ||
| return_dict[log_key] += value |
There was a problem hiding this comment.
nit:
| for k, v in self.data.items(): | |
| assert isinstance(v, list) | |
| for op in v: | |
| for stat, value in [ | |
| ("_total_bytes", op["op_size"]), | |
| ("_total_time", op["op_time"]), | |
| ]: | |
| log_key = k + stat | |
| if log_key not in return_dict: | |
| return_dict[log_key] = value | |
| else: | |
| return_dict[log_key] += value | |
| for transfer_type, ops_list in self.data.items(): | |
| assert isinstance(ops_list, list) | |
| return_dict[f"{transfer_type}_total_bytes"] = sum(op["op_size"] for op in ops) | |
| return_dict[f"{transfer_type}_total_time"] = sum(op["op_time"] for op in ops) |
| return return_dict | ||
|
|
||
| def is_empty(self) -> bool: | ||
| return len(self.data.items()) == 0 |
There was a problem hiding this comment.
nit:
| return len(self.data.items()) == 0 | |
| return not self.data |
| if kv_cache_config is None: | ||
| raise ValueError("kv_cache_config cannot be None for WORKER role") |
There was a problem hiding this comment.
Why do we need this addition?
There was a problem hiding this comment.
Deleted, no idea how it got here. Sorry.
| if self.connector_worker is None: | ||
| return None |
There was a problem hiding this comment.
| if self.connector_worker is None: | |
| return None | |
| assert self.connector_worker is not None |
There was a problem hiding this comment.
my bad, I just noticed that get_kv_connector_stats is used by both the scheduler and worker.
We just emit from the worker.
So can you just add a comment above the return None?
e.g. # We only emit stats from the worker-side
| if self.kv_connector_stats.is_empty(): | ||
| return None | ||
| # Clear stats for next iteration | ||
| return self.kv_connector_stats.clone_and_reset() |
There was a problem hiding this comment.
This triggers a copy of the old KVConnectorStats.
I see NixlConnector also does it, but it seems to me that a more simpler and efficient way is to simply:
| return self.kv_connector_stats.clone_and_reset() | |
| kv_connector_stats = self.kv_connector_stats | |
| self.kv_connector_stats = OffloadingConnectorStats() | |
| return kv_connector_stats |
@markmc your thoughts?
|
|
||
| def __getitem__(self, key): | ||
| return getattr(self, key) |
There was a problem hiding this comment.
I added this so that we can access the op's fields in a dict-like manner from reduce() , to conform with how we access them in observe(). Otherwise, it fails pre-commit.
There was a problem hiding this comment.
I just tested it, and it seems that after de-serialization, type(op) is actually a dict.
So I think we can remove the code here, and instead assert(isinstance(op, dict)) on the observe function.
| self.reset() | ||
| return old | ||
|
|
||
| def aggregate(self, other: KVConnectorStats) -> KVConnectorStats: |
There was a problem hiding this comment.
see cursor's comment below
There was a problem hiding this comment.
I already fixed the issue (To my understanding)
Signed-off-by: omerpaz95 <omerpaz95@gmail.com>
Signed-off-by: omerpaz95 <omerpaz95@gmail.com>
Signed-off-by: omerpaz95 <omerpaz95@gmail.com>
Signed-off-by: omerpaz95 <omerpaz95@gmail.com>
|
Hi @omerpaz95, the pre-commit checks have failed. Please run: uv pip install pre-commit
pre-commit install
pre-commit run --all-filesThen, commit the changes and push to your branch. For future commits, Tip Is
|
| transfer_size=None, # For now - just to get the test working | ||
| transfer_time=None, # For now - just to get the test working | ||
| transfer_type=None, # For now - just to get the test working |
There was a problem hiding this comment.
nit: I would remove the comments
|
|
||
|
|
||
| class TestOffloadingConnectorStats: | ||
| """Tests for MultiConnector stats reconstruction and operations.""" |
There was a problem hiding this comment.
| """Tests for MultiConnector stats reconstruction and operations.""" | |
| """Tests for OffloadingConnector stats reconstruction and operations.""" |
vllm/v1/kv_offload/worker/cpu_gpu.py
Outdated
| stream=stream, | ||
| start_event=start_event, | ||
| end_event=end_event, | ||
| num_bytes=dst_sub_block_count * self.block_size_in_bytes, |
There was a problem hiding this comment.
I think self.block_size_in_bytes is a list.
Can you define self.total_block_size_in_bytes which is the sum of this list and use it instead?
| == handler.block_size_in_bytes | ||
| * handler.dst_block_size_factor | ||
| * len(dst_blocks) | ||
| ) |
There was a problem hiding this comment.
can you also assert 0 < finished[0].time < (time.time() - start_time)?
Signed-off-by: omerpaz95 <omerpaz95@gmail.com>
Added queries and hits metrics for the Offloading Connector.
Also added timing metrics for store and load operations, which take the average time it takes to load/store, per-token.
The metrics are available from Prometheus and from the StatLogger.
Purpose
Allows collection of timing metrics for the Offloading Connector, which is essential for future development.
@orozery please review.
Test Plan
Test Result
Note
Cursor Bugbot is generating a summary for commit c035d2f. Configure here.
Note
Introduces metrics and instrumentation for KV offloading transfers.
OffloadingConnectorStatsandOffloadPromMetricswith histograms, gauges, and counters forCPU_to_GPUandGPU_to_CPUtransfer size and throughputTransferResultnow includes job id, success, num blocks, duration, and transfer type; CUDA start/end events capture timings; results aggregated per request to bytes via computedbytes_per_blockLoadStoreSpec.num_blocksand implementation inBlockIDsLoadStoreSpec;OffloadingConnectornow requireskv_cache_configfor WORKER, computesbytes_per_block, exposesget_kv_connector_stats, and builders for stats/Prom metricsBlockIDsLoadStoreSpecusage and record transfer types; scheduler/worker paths unchanged functionally aside from stats collectionWritten by Cursor Bugbot for commit 1981e15. This will update automatically on new commits. Configure here.
Note
Cursor Bugbot is generating a summary for commit 4a67a80. Configure here.
Note
Introduces per-transfer KV offloading observability and minimal API changes to support it.
OffloadingConnectorStatsandOffloadPromMetricswith histograms, gauges, and counters forCPU_to_GPU/GPU_to_CPUtransfer size and throughput; exposesget_kv_connector_statsplus builders on the connectorTransferResultwith job id, success, block count, duration, and transfer type; aggregate to bytes via computedbytes_per_block; validateBlockIDsLoadStoreSpecusageLoadStoreSpec.num_blocksand implementation inBlockIDsLoadStoreSpec;OffloadingConnectorWORKER now requireskv_cache_configto computebytes_per_blockWritten by Cursor Bugbot for commit 4a67a80. This will update automatically on new commits. Configure here.