Skip to content

Commit cae669c

Browse files
committed
[Metrics] Move LoRA request counts to SchedulerStats
SchedulerStats is the right place for this really, just like the regular running/waiting counts. Make sure to call LoRARequestStates.update_scheduler_stats() even where there was no engine core outputs. Signed-off-by: Mark McLoughlin <[email protected]>
1 parent aa24324 commit cae669c

File tree

6 files changed

+85
-50
lines changed

6 files changed

+85
-50
lines changed

tests/v1/engine/test_output_processor.py

Lines changed: 52 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,12 @@
2222
from vllm.v1.engine import (
2323
EngineCoreEvent,
2424
EngineCoreEventType,
25+
EngineCoreOutputs,
2526
EngineCoreRequest,
2627
FinishReason,
2728
)
2829
from vllm.v1.engine.output_processor import OutputProcessor, RequestOutputCollector
29-
from vllm.v1.metrics.stats import IterationStats
30+
from vllm.v1.metrics.stats import IterationStats, SchedulerStats
3031

3132

3233
def _ref_convert_id_to_token(
@@ -940,21 +941,26 @@ def test_lora_request_tracking(log_stats: bool, dummy_test_vectors):
940941
output_processor.add_request(request, None)
941942

942943
# First iteration: process outputs with QUEUED events
943-
outputs = engine_core.get_outputs()
944-
for output in outputs:
944+
outputs = EngineCoreOutputs(
945+
outputs=engine_core.get_outputs(), scheduler_stats=SchedulerStats()
946+
)
947+
for output in outputs.outputs:
945948
output.events = [
946949
EngineCoreEvent.new_event(EngineCoreEventType.QUEUED, engine_core_timestamp)
947950
]
948951

949952
iteration_stats = IterationStats() if log_stats else None
950-
output_processor.process_outputs(outputs, engine_core_timestamp, iteration_stats)
953+
output_processor.process_outputs(
954+
outputs.outputs, engine_core_timestamp, iteration_stats
955+
)
956+
output_processor.update_scheduler_stats(outputs.scheduler_stats)
951957

952958
if log_stats:
953959
# Verify waiting counts
954-
assert iteration_stats.waiting_lora_adapters.get("lora-1") == 1
955-
assert iteration_stats.waiting_lora_adapters.get("lora-2") == 1
956-
assert iteration_stats.running_lora_adapters.get("lora-1") == 0
957-
assert iteration_stats.running_lora_adapters.get("lora-2") == 0
960+
assert outputs.scheduler_stats.waiting_lora_adapters.get("lora-1") == 1
961+
assert outputs.scheduler_stats.waiting_lora_adapters.get("lora-2") == 1
962+
assert outputs.scheduler_stats.running_lora_adapters.get("lora-1") == 0
963+
assert outputs.scheduler_stats.running_lora_adapters.get("lora-2") == 0
958964
# Verify internal state
959965
assert len(output_processor.lora_states.requests) == 2
960966
assert "lora-1" in output_processor.lora_states.requests
@@ -965,76 +971,96 @@ def test_lora_request_tracking(log_stats: bool, dummy_test_vectors):
965971
assert len(output_processor.lora_states.requests) == 0
966972

967973
# Second iteration: process outputs with SCHEDULED events
968-
outputs = engine_core.get_outputs()
969-
for output in outputs:
974+
outputs = EngineCoreOutputs(
975+
outputs=engine_core.get_outputs(), scheduler_stats=SchedulerStats()
976+
)
977+
for output in outputs.outputs:
970978
output.events = [
971979
EngineCoreEvent.new_event(
972980
EngineCoreEventType.SCHEDULED, engine_core_timestamp
973981
)
974982
]
975983

976984
iteration_stats = IterationStats() if log_stats else None
977-
output_processor.process_outputs(outputs, engine_core_timestamp, iteration_stats)
985+
output_processor.process_outputs(
986+
outputs.outputs, engine_core_timestamp, iteration_stats
987+
)
988+
output_processor.update_scheduler_stats(outputs.scheduler_stats)
978989

979990
if log_stats:
980991
# Verify running counts
981-
assert iteration_stats.waiting_lora_adapters.get("lora-1") == 0
982-
assert iteration_stats.waiting_lora_adapters.get("lora-2") == 0
983-
assert iteration_stats.running_lora_adapters.get("lora-1") == 1
984-
assert iteration_stats.running_lora_adapters.get("lora-2") == 1
992+
assert outputs.scheduler_stats.waiting_lora_adapters.get("lora-1") == 0
993+
assert outputs.scheduler_stats.waiting_lora_adapters.get("lora-2") == 0
994+
assert outputs.scheduler_stats.running_lora_adapters.get("lora-1") == 1
995+
assert outputs.scheduler_stats.running_lora_adapters.get("lora-2") == 1
985996
else:
986997
assert iteration_stats is None
987998
assert len(output_processor.lora_states.requests) == 0
988999

9891000
# Third iteration: finish request-0 (lora-1)
990-
outputs = engine_core.get_outputs()
1001+
outputs = EngineCoreOutputs(
1002+
outputs=engine_core.get_outputs(), scheduler_stats=SchedulerStats()
1003+
)
9911004
# Find and mark request-0 as finished (it uses lora-1)
992-
for output in outputs:
1005+
for output in outputs.outputs:
9931006
if output.request_id == "request-0":
9941007
output.finish_reason = FinishReason.LENGTH
9951008
break
9961009

9971010
iteration_stats = IterationStats() if log_stats else None
998-
output_processor.process_outputs(outputs, engine_core_timestamp, iteration_stats)
1011+
output_processor.process_outputs(
1012+
outputs.outputs, engine_core_timestamp, iteration_stats
1013+
)
1014+
output_processor.update_scheduler_stats(outputs.scheduler_stats)
9991015

10001016
if log_stats:
10011017
# lora-1 should be removed since no requests remain
10021018
assert "lora-1" not in output_processor.lora_states.requests
10031019
# lora-2 should still be running
1004-
assert iteration_stats.running_lora_adapters.get("lora-2") == 1
1020+
assert outputs.scheduler_stats.running_lora_adapters.get("lora-2") == 1
10051021
assert len(output_processor.lora_states.requests) == 1
10061022
else:
10071023
assert len(output_processor.lora_states.requests) == 0
10081024

10091025
# Fourth iteration: finish request-1 (lora-2)
1010-
outputs = engine_core.get_outputs()
1026+
outputs = EngineCoreOutputs(
1027+
outputs=engine_core.get_outputs(), scheduler_stats=SchedulerStats()
1028+
)
10111029
# Find and mark request-1 as finished (it uses lora-2)
1012-
for output in outputs:
1030+
for output in outputs.outputs:
10131031
if output.request_id == "request-1":
10141032
output.finish_reason = FinishReason.LENGTH
10151033
break
10161034

10171035
iteration_stats = IterationStats() if log_stats else None
1018-
output_processor.process_outputs(outputs, engine_core_timestamp, iteration_stats)
1036+
output_processor.process_outputs(
1037+
outputs.outputs, engine_core_timestamp, iteration_stats
1038+
)
1039+
output_processor.update_scheduler_stats(outputs.scheduler_stats)
10191040

10201041
if log_stats:
10211042
# lora-2 should be removed since no requests remain
10221043
assert "lora-2" not in output_processor.lora_states.requests
1023-
assert len(iteration_stats.running_lora_adapters) == 0
1044+
assert len(outputs.scheduler_stats.running_lora_adapters) == 0
10241045
assert len(output_processor.lora_states.requests) == 0
10251046
else:
10261047
assert len(output_processor.lora_states.requests) == 0
10271048

10281049
# Finish the last request (no LoRA)
1029-
outputs = engine_core.get_outputs()
1050+
outputs = EngineCoreOutputs(
1051+
outputs=engine_core.get_outputs(), scheduler_stats=SchedulerStats()
1052+
)
10301053
# Find and mark request-2 as finished (it has no LoRA)
1031-
for output in outputs:
1054+
for output in outputs.outputs:
10321055
if output.request_id == "request-2":
10331056
output.finish_reason = FinishReason.LENGTH
10341057
break
10351058

10361059
iteration_stats = IterationStats() if log_stats else None
1037-
output_processor.process_outputs(outputs, engine_core_timestamp, iteration_stats)
1060+
output_processor.process_outputs(
1061+
outputs.outputs, engine_core_timestamp, iteration_stats
1062+
)
1063+
output_processor.update_scheduler_stats(outputs.scheduler_stats)
10381064

10391065
# Verify all requests are finished
10401066
assert output_processor.get_num_unfinished_requests() == 0

vllm/v1/engine/async_llm.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -508,6 +508,8 @@ async def output_handler():
508508
processed_outputs.reqs_to_abort
509509
)
510510

511+
output_processor.update_scheduler_stats(outputs.scheduler_stats)
512+
511513
# 4) Logging.
512514
# TODO(rob): make into a coroutine and launch it in
513515
# background thread once Prometheus overhead is non-trivial.

vllm/v1/engine/llm_engine.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,7 @@ def step(self) -> list[RequestOutput | PoolingRequestOutput]:
289289
engine_core_timestamp=outputs.timestamp,
290290
iteration_stats=iteration_stats,
291291
)
292+
self.output_processor.update_scheduler_stats(outputs.scheduler_stats)
292293

293294
# 3) Abort any reqs that finished due to stop strings.
294295
self.engine_core.abort_requests(processed_outputs.reqs_to_abort)

vllm/v1/engine/output_processor.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,12 @@
2222
from vllm.v1.engine.detokenizer import IncrementalDetokenizer
2323
from vllm.v1.engine.logprobs import LogprobsProcessor
2424
from vllm.v1.engine.parallel_sampling import ParentRequest
25-
from vllm.v1.metrics.stats import IterationStats, LoRARequestStates, RequestStateStats
25+
from vllm.v1.metrics.stats import (
26+
IterationStats,
27+
LoRARequestStates,
28+
RequestStateStats,
29+
SchedulerStats,
30+
)
2631

2732

2833
class RequestOutputCollector:
@@ -483,13 +488,15 @@ def process_outputs(
483488
)
484489
if self.tracer:
485490
self.do_tracing(engine_core_output, req_state, iteration_stats)
486-
self.lora_states.update_iteration_stats(iteration_stats)
487491

488492
return OutputProcessorOutput(
489493
request_outputs=request_outputs,
490494
reqs_to_abort=reqs_to_abort,
491495
)
492496

497+
def update_scheduler_stats(self, scheduler_stats: SchedulerStats | None):
498+
self.lora_states.update_scheduler_stats(scheduler_stats)
499+
493500
def do_tracing(
494501
self,
495502
engine_core_output: EngineCoreOutput,

vllm/v1/metrics/loggers.py

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -989,6 +989,20 @@ def record(
989989
scheduler_stats.kv_connector_stats, engine_idx
990990
)
991991

992+
if self.gauge_lora_info is not None:
993+
running_lora_adapters = ",".join(
994+
scheduler_stats.running_lora_adapters.keys()
995+
)
996+
waiting_lora_adapters = ",".join(
997+
scheduler_stats.waiting_lora_adapters.keys()
998+
)
999+
lora_info_labels = {
1000+
self.labelname_running_lora_adapters: running_lora_adapters,
1001+
self.labelname_waiting_lora_adapters: waiting_lora_adapters,
1002+
self.labelname_max_lora: self.max_lora,
1003+
}
1004+
self.gauge_lora_info.labels(**lora_info_labels).set_to_current_time()
1005+
9921006
if mm_cache_stats is not None:
9931007
self.counter_mm_cache_queries[engine_idx].inc(mm_cache_stats.queries)
9941008
self.counter_mm_cache_hits[engine_idx].inc(mm_cache_stats.hits)
@@ -1055,20 +1069,6 @@ def record(
10551069
finished_request.max_tokens_param
10561070
)
10571071

1058-
if self.gauge_lora_info is not None:
1059-
running_lora_adapters = ",".join(
1060-
iteration_stats.running_lora_adapters.keys()
1061-
)
1062-
waiting_lora_adapters = ",".join(
1063-
iteration_stats.waiting_lora_adapters.keys()
1064-
)
1065-
lora_info_labels = {
1066-
self.labelname_running_lora_adapters: running_lora_adapters,
1067-
self.labelname_waiting_lora_adapters: waiting_lora_adapters,
1068-
self.labelname_max_lora: self.max_lora,
1069-
}
1070-
self.gauge_lora_info.labels(**lora_info_labels).set_to_current_time()
1071-
10721072
def record_sleep_state(self, sleep: int = 0, level: int = 0):
10731073
awake = 1
10741074
discard_all = 0

vllm/v1/metrics/stats.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,9 @@ class SchedulerStats:
169169
spec_decoding_stats: SpecDecodingStats | None = None
170170
kv_connector_stats: dict[str, Any] | None = None
171171

172+
waiting_lora_adapters: dict[str, int] = field(default_factory=dict)
173+
running_lora_adapters: dict[str, int] = field(default_factory=dict)
174+
172175

173176
@dataclass
174177
class RequestStateStats:
@@ -222,9 +225,6 @@ def __init__(self):
222225
self.n_params_iter: list[int] = []
223226
self.time_to_first_tokens_iter: list[float] = []
224227
self.inter_token_latencies_iter: list[float] = []
225-
self.waiting_lora_adapters: dict[str, int] = {}
226-
self.running_lora_adapters: dict[str, int] = {}
227-
self.num_corrupted_reqs: int = 0
228228

229229
def __repr__(self) -> str:
230230
field_to_value_str = ", ".join(f"{k}={v}" for k, v in vars(self).items())
@@ -411,10 +411,9 @@ def request_running(self, req_id: str, lora_name: str | None):
411411
def request_finished(self, req_id: str, lora_name: str | None):
412412
self._request_update(req_id, lora_name, waiting=False, running=False)
413413

414-
def update_iteration_stats(self, iteration_stats: IterationStats | None):
415-
if not self.log_stats:
414+
def update_scheduler_stats(self, scheduler_stats: SchedulerStats | None):
415+
if not self.log_stats or scheduler_stats is None:
416416
return
417-
assert iteration_stats is not None
418417
for lora_name, stats in self.requests.items():
419-
iteration_stats.waiting_lora_adapters[lora_name] = len(stats.waiting)
420-
iteration_stats.running_lora_adapters[lora_name] = len(stats.running)
418+
scheduler_stats.waiting_lora_adapters[lora_name] = len(stats.waiting)
419+
scheduler_stats.running_lora_adapters[lora_name] = len(stats.running)

0 commit comments

Comments
 (0)