Skip to content

Commit 561a7eb

Browse files
[Cherry-Pick] support model_id as metric labels by redefining metric update interface (#4480)
* [metric] support model_id as metric labels by redefining metric update interface * [feat] support FD_DEFAULT_METRIC_LABEL_VALUES
1 parent b3225f9 commit 561a7eb

File tree

14 files changed

+184
-75
lines changed

14 files changed

+184
-75
lines changed

fastdeploy/cache_manager/cache_metrics.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,10 @@ def _update_history_hit_metrics(self):
5555
self.cpu_hit_token_ratio = self.total_cpu_matched_token_num / self.total_token_num
5656
self.gpu_hit_token_ratio = self.total_gpu_matched_token_num / self.total_token_num
5757

58-
main_process_metrics.hit_req_rate.set(self.hit_req_ratio)
59-
main_process_metrics.hit_token_rate.set(self.hit_token_ratio)
60-
main_process_metrics.cpu_hit_token_rate.set(self.cpu_hit_token_ratio)
61-
main_process_metrics.gpu_hit_token_rate.set(self.gpu_hit_token_ratio)
58+
main_process_metrics.set_value("hit_req_rate", self.hit_req_ratio)
59+
main_process_metrics.set_value("hit_token_rate", self.hit_token_ratio)
60+
main_process_metrics.set_value("cpu_hit_token_rate", self.cpu_hit_token_ratio)
61+
main_process_metrics.set_value("gpu_hit_token_rate", self.gpu_hit_token_ratio)
6262

6363
logger.info(
6464
f"Metrics for all requests: req_count {self.req_count} hit_req_count {self.hit_req_count}"

fastdeploy/cache_manager/prefix_cache_manager.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,11 @@ def __init__(
111111
+ f"{self.num_cpu_blocks}, bytes_per_layer_per_block {self.cache_config.bytes_per_layer_per_block}"
112112
)
113113

114+
main_process_metrics.set_value("max_gpu_block_num", self.num_gpu_blocks)
115+
main_process_metrics.set_value("available_gpu_block_num", self.num_gpu_blocks)
116+
main_process_metrics.set_value("free_gpu_block_num", self.num_gpu_blocks)
117+
main_process_metrics.set_value("available_gpu_resource", 1.0)
118+
114119
@property
115120
def available_gpu_resource(self):
116121
return len(self.gpu_free_block_list) / self.num_gpu_blocks if self.num_gpu_blocks > 0 else 0.0
@@ -316,8 +321,10 @@ def update_cache_config(self, cache_config):
316321
heapq.heapify(self.gpu_free_block_list)
317322
self.node_id_pool = list(range(self.num_gpu_blocks + self.num_cpu_blocks))
318323

319-
main_process_metrics.max_gpu_block_num.set(self.num_gpu_blocks)
320-
main_process_metrics.available_gpu_resource.set(1.0)
324+
main_process_metrics.set_value("max_gpu_block_num", self.num_gpu_blocks)
325+
main_process_metrics.set_value("free_gpu_block_num", self.num_gpu_blocks)
326+
main_process_metrics.set_value("available_gpu_block_num", self.num_gpu_blocks)
327+
main_process_metrics.set_value("available_gpu_resource", 1.0)
321328

322329
def _enable_cpu_cache(self):
323330
"""
@@ -359,8 +366,8 @@ def allocate_gpu_blocks(self, num_blocks):
359366
logger.info(
360367
f"allocate_gpu_blocks: {allocated_block_ids}, len(self.gpu_free_block_list) {len(self.gpu_free_block_list)}"
361368
)
362-
main_process_metrics.free_gpu_block_num.set(len(self.gpu_free_block_list))
363-
main_process_metrics.available_gpu_resource.set(self.available_gpu_resource)
369+
main_process_metrics.set_value("free_gpu_block_num", len(self.gpu_free_block_list))
370+
main_process_metrics.set_value("available_gpu_resource", self.available_gpu_resource)
364371
return allocated_block_ids
365372

366373
def recycle_gpu_blocks(self, gpu_block_ids):
@@ -375,8 +382,8 @@ def recycle_gpu_blocks(self, gpu_block_ids):
375382
heapq.heappush(self.gpu_free_block_list, gpu_block_id)
376383
else:
377384
heapq.heappush(self.gpu_free_block_list, gpu_block_ids)
378-
main_process_metrics.free_gpu_block_num.set(len(self.gpu_free_block_list))
379-
main_process_metrics.available_gpu_resource.set(self.available_gpu_resource)
385+
main_process_metrics.set_value("free_gpu_block_num", len(self.gpu_free_block_list))
386+
main_process_metrics.set_value("available_gpu_resource", self.available_gpu_resource)
380387

381388
def allocate_cpu_blocks(self, num_blocks):
382389
"""

fastdeploy/engine/common_engine.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -562,8 +562,8 @@ def _insert_task_to_worker(self):
562562
else:
563563
continue
564564

565-
main_process_metrics.num_requests_waiting.dec(len(tasks))
566-
main_process_metrics.num_requests_running.inc(len(tasks))
565+
main_process_metrics.dec_value("num_requests_waiting", len(tasks))
566+
main_process_metrics.inc_value("num_requests_running", len(tasks))
567567
except Exception as e:
568568
err_msg = f"Error happend while insert task to engine: {e}, {traceback.format_exc()!s}."
569569
self.llm_logger.error(err_msg)
@@ -744,7 +744,7 @@ def _insert_zmq_task_to_scheduler(self):
744744
try:
745745
request = Request.from_dict(data)
746746
start_span("ENQUEUE_ZMQ", data, trace.SpanKind.PRODUCER)
747-
main_process_metrics.requests_number.inc()
747+
main_process_metrics.inc_value("requests_number", 1)
748748
self.llm_logger.debug(f"Receive request: {request}")
749749
except Exception as e:
750750
self.llm_logger.error(f"Receive request error: {e}, {traceback.format_exc()!s}")
@@ -775,7 +775,7 @@ def _insert_zmq_task_to_scheduler(self):
775775
added_requests.pop(request_id)
776776

777777
if failed is None:
778-
main_process_metrics.num_requests_waiting.inc(1)
778+
main_process_metrics.inc_value("num_requests_waiting", 1)
779779
continue
780780

781781
error_result = RequestOutput(

fastdeploy/engine/resource_manager.py

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ def __init__(
5959
# current batch status of the engine
6060
self.real_bsz = 0
6161
llm_logger.info(f"{self.info()}")
62-
main_process_metrics.max_batch_size.set(max_num_seqs)
62+
main_process_metrics.set_value("max_batch_size", max_num_seqs)
63+
main_process_metrics.set_value("available_batch_size", self.available_batch())
6364

6465
def reset_cache_config(self, cfg):
6566
"""
@@ -169,7 +170,7 @@ def _recycle_block_tables(self, task):
169170
ori_number = self.available_block_num()
170171
self.cache_manager.recycle_gpu_blocks(block_tables)
171172
cur_number = self.available_block_num()
172-
main_process_metrics.gpu_cache_usage_perc.set(self.get_gpu_cache_usage_perc())
173+
main_process_metrics.set_value("gpu_cache_usage_perc", self.get_gpu_cache_usage_perc())
173174
llm_logger.info(f"recycle {req_id} {cur_number - ori_number} blocks.")
174175

175176
def available_batch(self):
@@ -311,15 +312,15 @@ def allocate_resources_for_new_tasks(self, tasks):
311312
break
312313

313314
# record batch size here
314-
task_used_block_num = sum([len(task.block_tables) if task else 0 for task in self.tasks_list])
315-
main_process_metrics.available_gpu_block_num.set(self.total_block_number() - task_used_block_num)
316-
main_process_metrics.batch_size.set(self.max_num_seqs - self.available_batch())
317-
main_process_metrics.gpu_cache_usage_perc.set(self.get_gpu_cache_usage_perc())
315+
num_blocks_used_by_tasks = sum([len(task.block_tables) if task else 0 for task in self.tasks_list])
316+
main_process_metrics.set_value("available_gpu_block_num", self.total_block_number() - num_blocks_used_by_tasks)
317+
main_process_metrics.set_value("batch_size", self.max_num_seqs - self.available_batch())
318+
main_process_metrics.set_value("gpu_cache_usage_perc", self.get_gpu_cache_usage_perc())
318319
llm_logger.info(
319320
f"Number of allocated requests: {len(tasks)}, number of " f"running requests in worker: {self.real_bsz}"
320321
)
321322
llm_logger.info(f"{self.info()}")
322-
main_process_metrics.gpu_cache_usage_perc.set(self.get_gpu_cache_usage_perc())
323+
main_process_metrics.set_value("gpu_cache_usage_perc", self.get_gpu_cache_usage_perc())
323324

324325
return processed_tasks
325326

@@ -347,9 +348,9 @@ def _record_request_cache_info(self, task, common_block_ids, unique_block_ids, h
347348
task.cache_info = (cache_block_num, no_cache_block_num)
348349

349350
# Report the number of cached tokens to Prometheus metrics
350-
main_process_metrics.prefix_cache_token_num.inc(task.num_cached_tokens)
351-
main_process_metrics.prefix_gpu_cache_token_num.inc(task.gpu_cache_token_num)
352-
main_process_metrics.prefix_cpu_cache_token_num.inc(task.cpu_cache_token_num)
351+
main_process_metrics.inc_value("prefix_cache_token_num", task.num_cached_tokens)
352+
main_process_metrics.inc_value("prefix_gpu_cache_token_num", task.gpu_cache_token_num)
353+
main_process_metrics.inc_value("prefix_cpu_cache_token_num", task.cpu_cache_token_num)
353354

354355
cached_len = len(common_block_ids) * self.cfg.block_size
355356
task.block_tables = common_block_ids + unique_block_ids

fastdeploy/engine/sched/resource_manager_v1.py

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ def __init__(self, max_num_seqs, config, tensor_parallel_size, splitwise_role, l
9191
self.finish_execution_pool = ThreadPoolExecutor(max_workers=1)
9292
self.lock = threading.Lock()
9393
self.to_be_rescheduled_request_id_set = set()
94-
main_process_metrics.max_batch_size.set(max_num_seqs)
94+
main_process_metrics.set_value("max_batch_size", max_num_seqs)
9595

9696
self.using_extend_tables_req_id = set()
9797

@@ -144,13 +144,12 @@ def _trigger_preempt(self, request, num_new_blocks, preempted_reqs, scheduled_re
144144
if preempted_req.request_id in self.req_dict:
145145
del self.req_dict[preempted_req.request_id]
146146
self._free_blocks(preempted_req)
147-
main_process_metrics.num_requests_running.dec(1)
147+
llm_logger.info(f"Preemption is triggered! Preempted request id: {preempted_req.request_id}")
148148
else:
149149
self._free_blocks(preempted_req)
150150
preempted_req.cached_block_num = 0
151151
self.to_be_rescheduled_request_id_set.add(preempted_req.request_id)
152-
main_process_metrics.num_requests_waiting.inc(1)
153-
main_process_metrics.num_requests_running.dec(1)
152+
llm_logger.info(f"Preemption is triggered! Preempted request id: {preempted_req.request_id}")
154153
preempted_reqs.append(preempted_req)
155154
scheduled_reqs.append(self._prepare_preempt_task(preempted_req))
156155
if preempted_req == request:
@@ -414,8 +413,6 @@ def schedule(self):
414413
request, self.config.cache_config.block_size, request.num_computed_tokens
415414
)
416415
request.status = RequestStatus.RUNNING
417-
main_process_metrics.num_requests_waiting.dec(1)
418-
main_process_metrics.num_requests_running.inc(1)
419416
if self.config.scheduler_config.splitwise_role == "mixed":
420417
allocated_position = self.get_available_position()
421418
request.idx = allocated_position
@@ -460,8 +457,6 @@ def schedule(self):
460457
request, self.config.cache_config.block_size, request.num_computed_tokens
461458
)
462459
request.status = RequestStatus.RUNNING
463-
main_process_metrics.num_requests_waiting.dec(1)
464-
main_process_metrics.num_requests_running.inc(1)
465460
else:
466461
if self.config.cache_config.enable_prefix_caching:
467462
self._free_blocks(request)
@@ -520,11 +515,17 @@ def schedule(self):
520515
continue
521516

522517
if scheduled_reqs:
523-
task_used_block_num = sum([len(task.block_tables) if task else 0 for task in self.tasks_list])
524-
main_process_metrics.available_gpu_block_num.set(self.total_block_number() - task_used_block_num)
525-
main_process_metrics.batch_size.set(self.max_num_seqs - self.available_batch())
526-
main_process_metrics.gpu_cache_usage_perc.set(self.get_gpu_cache_usage_perc())
527518
llm_logger.debug(f"schedued_reqs: {scheduled_reqs}")
519+
520+
# Update metrics
521+
num_tasks = sum([1 if task else 0 for task in self.tasks_list])
522+
num_blocks_used_by_tasks = sum([len(task.block_tables) if task else 0 for task in self.tasks_list])
523+
main_process_metrics.set_value("available_gpu_block_num", self.total_block_number() - num_blocks_used_by_tasks)
524+
main_process_metrics.set_value("batch_size", self.max_num_seqs - self.available_batch())
525+
main_process_metrics.set_value("gpu_cache_usage_perc", self.get_gpu_cache_usage_perc())
526+
main_process_metrics.set_value("num_requests_running", len(self.running))
527+
main_process_metrics.set_value("num_requests_waiting", num_tasks - len(self.running))
528+
528529
return scheduled_reqs
529530

530531
def get_available_position(self) -> int:
@@ -566,9 +567,9 @@ def get_prefix_cached_blocks(self, request: Request):
566567
request.skip_allocate = False
567568

568569
# Report the number of cached tokens to Prometheus metrics
569-
main_process_metrics.prefix_cache_token_num.inc(matched_token_num)
570-
main_process_metrics.prefix_gpu_cache_token_num.inc(request.gpu_cache_token_num)
571-
main_process_metrics.prefix_cpu_cache_token_num.inc(request.cpu_cache_token_num)
570+
main_process_metrics.inc_value("prefix_cache_token_num", matched_token_num)
571+
main_process_metrics.inc_value("prefix_gpu_cache_token_num", request.gpu_cache_token_num)
572+
main_process_metrics.inc_value("prefix_cpu_cache_token_num", request.cpu_cache_token_num)
572573

573574
if matched_token_num == request.need_prefill_tokens:
574575
request.num_computed_tokens = matched_token_num - self.config.cache_config.block_size

fastdeploy/entrypoints/engine_client.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -158,9 +158,9 @@ async def add_requests(self, task):
158158
if "messages" in task:
159159
del task["messages"]
160160
api_server_logger.info(f"task['max_tokens']:{task['max_tokens']}")
161-
work_process_metrics.request_params_max_tokens.observe(task["max_tokens"])
162-
work_process_metrics.prompt_tokens_total.inc(input_ids_len)
163-
work_process_metrics.request_prompt_tokens.observe(input_ids_len)
161+
work_process_metrics.obs_value("request_params_max_tokens", task["max_tokens"])
162+
work_process_metrics.inc_value("prompt_tokens_total", input_ids_len)
163+
work_process_metrics.obs_value("request_prompt_tokens", input_ids_len)
164164
except Exception as e:
165165
api_server_logger.error(f"add_requests error: {e}, {str(traceback.format_exc())}")
166166
raise EngineError(str(e), error_code=400)

fastdeploy/entrypoints/openai/serving_chat.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -338,9 +338,7 @@ async def chat_completion_stream_generator(
338338
)
339339
if res["finished"]:
340340
num_choices -= 1
341-
work_process_metrics.e2e_request_latency.observe(
342-
time.time() - res["metrics"]["request_start_time"]
343-
)
341+
work_process_metrics.obs_value("e2e_request_latency", time.time() - res["metrics"]["request_start_time"])
344342
has_no_token_limit = request.max_tokens is None and request.max_completion_tokens is None
345343
max_tokens = request.max_completion_tokens or request.max_tokens
346344
if has_no_token_limit or previous_num_tokens != max_tokens:
@@ -533,7 +531,7 @@ async def chat_completion_full_generator(
533531
total_tokens=num_prompt_tokens + num_generated_tokens,
534532
prompt_tokens_details=PromptTokenUsageInfo(cached_tokens=final_res.get("num_cached_tokens", 0)),
535533
)
536-
work_process_metrics.e2e_request_latency.observe(time.time() - final_res["metrics"]["request_start_time"])
534+
work_process_metrics.obs_value("e2e_request_latency", time.time() - final_res["metrics"]["request_start_time"])
537535
res = ChatCompletionResponse(
538536
id=request_id,
539537
created=created_time,

fastdeploy/envs.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,12 @@
116116
# Max pre-fetch requests number in PD
117117
"FD_EP_MAX_PREFETCH_TASK_NUM": lambda: int(os.getenv("FD_EP_MAX_PREFETCH_TASK_NUM", "8")),
118118
"FD_ENABLE_MODEL_LOAD_CACHE": lambda: bool(int(os.getenv("FD_ENABLE_MODEL_LOAD_CACHE", "0"))),
119+
# Whether to clear cpu cache when clearing model weights.
120+
"FD_ENABLE_SWAP_SPACE_CLEARING": lambda: int(os.getenv("FD_ENABLE_SWAP_SPACE_CLEARING", "0")),
121+
# Whether to use labels in metrics.
122+
"FD_ENABLE_METRIC_LABELS": lambda: bool(int(os.getenv("FD_ENABLE_METRIC_LABELS", "0"))),
123+
# Default label values in metrics.
124+
"FD_DEFAULT_METRIC_LABEL_VALUES": lambda: os.getenv("FD_DEFAULT_METRIC_LABEL_VALUES", "{}"),
119125
}
120126

121127

fastdeploy/metrics/interface.py

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
"""
2+
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
"""
16+
17+
18+
from prometheus_client import (
19+
Counter,
20+
Gauge,
21+
Histogram,
22+
)
23+
from fastdeploy import envs
24+
import json
25+
26+
class MetricsManagerInterface:
27+
28+
def set_value(self, name, value, labelvalues=None):
29+
metric = getattr(self, name, None)
30+
if isinstance(metric, Gauge):
31+
if envs.FD_ENABLE_METRIC_LABELS:
32+
default_labelvalues = json.loads(envs.FD_DEFAULT_METRIC_LABEL_VALUES)
33+
if labelvalues is None:
34+
labelvalues = {}
35+
labelvalues = {ln: labelvalues.get(ln, default_labelvalues.get(ln, "")) for ln in metric._labelnames}
36+
metric.labels(**labelvalues).set(value)
37+
else:
38+
metric.set(value)
39+
return
40+
41+
def inc_value(self, name, value=1, labelvalues=None):
42+
metric = getattr(self, name, None)
43+
if isinstance(metric, Gauge) or isinstance(metric, Counter):
44+
if envs.FD_ENABLE_METRIC_LABELS:
45+
default_labelvalues = json.loads(envs.FD_DEFAULT_METRIC_LABEL_VALUES)
46+
if labelvalues is None:
47+
labelvalues = {}
48+
labelvalues = {ln: labelvalues.get(ln, default_labelvalues.get(ln, "")) for ln in metric._labelnames}
49+
metric.labels(**labelvalues).inc(value)
50+
else:
51+
metric.inc(value)
52+
return
53+
54+
def dec_value(self, name, value=1, labelvalues=None):
55+
metric = getattr(self, name, None)
56+
if isinstance(metric, Gauge):
57+
if envs.FD_ENABLE_METRIC_LABELS:
58+
default_labelvalues = json.loads(envs.FD_DEFAULT_METRIC_LABEL_VALUES)
59+
if labelvalues is None:
60+
labelvalues = {}
61+
labelvalues = {ln: labelvalues.get(ln, default_labelvalues.get(ln, "")) for ln in metric._labelnames}
62+
metric.labels(**labelvalues).dec(value)
63+
else:
64+
metric.dec(value)
65+
return
66+
67+
def obs_value(self, name, value, labelvalues=None):
68+
metric = getattr(self, name, None)
69+
if isinstance(metric, Histogram):
70+
if envs.FD_ENABLE_METRIC_LABELS:
71+
default_labelvalues = json.loads(envs.FD_DEFAULT_METRIC_LABEL_VALUES)
72+
if labelvalues is None:
73+
labelvalues = {}
74+
labelvalues = {ln: labelvalues.get(ln, default_labelvalues.get(ln, "")) for ln in metric._labelnames}
75+
metric.labels(**labelvalues).observe(value)
76+
else:
77+
metric.observe(value)
78+
return
79+

0 commit comments

Comments
 (0)