From 29c85610e946de166c2229bcb641599a324a8fe7 Mon Sep 17 00:00:00 2001 From: liyonghua0910 Date: Fri, 17 Oct 2025 14:09:45 +0800 Subject: [PATCH 1/2] [metric] support model_id as metric labels by redefining metric update interface --- fastdeploy/cache_manager/cache_metrics.py | 8 +-- .../cache_manager/prefix_cache_manager.py | 19 +++-- fastdeploy/engine/common_engine.py | 8 +-- fastdeploy/engine/resource_manager.py | 21 +++--- .../engine/sched/resource_manager_v1.py | 31 ++++---- fastdeploy/entrypoints/engine_client.py | 6 +- fastdeploy/entrypoints/openai/serving_chat.py | 6 +- fastdeploy/envs.py | 4 ++ fastdeploy/metrics/interface.py | 70 +++++++++++++++++++ fastdeploy/metrics/metrics.py | 19 ++++- fastdeploy/metrics/work_metrics.py | 14 +++- fastdeploy/output/token_processor.py | 36 +++++----- .../splitwise/internal_adapter_utils.py | 4 +- fastdeploy/splitwise/splitwise_connector.py | 2 +- 14 files changed, 173 insertions(+), 75 deletions(-) create mode 100644 fastdeploy/metrics/interface.py diff --git a/fastdeploy/cache_manager/cache_metrics.py b/fastdeploy/cache_manager/cache_metrics.py index 3946357c833..51501f53284 100644 --- a/fastdeploy/cache_manager/cache_metrics.py +++ b/fastdeploy/cache_manager/cache_metrics.py @@ -55,10 +55,10 @@ def _update_history_hit_metrics(self): self.cpu_hit_token_ratio = self.total_cpu_matched_token_num / self.total_token_num self.gpu_hit_token_ratio = self.total_gpu_matched_token_num / self.total_token_num - main_process_metrics.hit_req_rate.set(self.hit_req_ratio) - main_process_metrics.hit_token_rate.set(self.hit_token_ratio) - main_process_metrics.cpu_hit_token_rate.set(self.cpu_hit_token_ratio) - main_process_metrics.gpu_hit_token_rate.set(self.gpu_hit_token_ratio) + main_process_metrics.set_value("hit_req_rate", self.hit_req_ratio) + main_process_metrics.set_value("hit_token_rate", self.hit_token_ratio) + main_process_metrics.set_value("cpu_hit_token_rate", self.cpu_hit_token_ratio) + main_process_metrics.set_value("gpu_hit_token_rate", self.gpu_hit_token_ratio) logger.info( f"Metrics for all requests: req_count {self.req_count} hit_req_count {self.hit_req_count}" diff --git a/fastdeploy/cache_manager/prefix_cache_manager.py b/fastdeploy/cache_manager/prefix_cache_manager.py index 8b3c8798e00..245fd4049c2 100644 --- a/fastdeploy/cache_manager/prefix_cache_manager.py +++ b/fastdeploy/cache_manager/prefix_cache_manager.py @@ -111,6 +111,11 @@ def __init__( + f"{self.num_cpu_blocks}, bytes_per_layer_per_block {self.cache_config.bytes_per_layer_per_block}" ) + main_process_metrics.set_value("max_gpu_block_num", self.num_gpu_blocks) + main_process_metrics.set_value("available_gpu_block_num", self.num_gpu_blocks) + main_process_metrics.set_value("free_gpu_block_num", self.num_gpu_blocks) + main_process_metrics.set_value("available_gpu_resource", 1.0) + @property def available_gpu_resource(self): return len(self.gpu_free_block_list) / self.num_gpu_blocks if self.num_gpu_blocks > 0 else 0.0 @@ -316,8 +321,10 @@ def update_cache_config(self, cache_config): heapq.heapify(self.gpu_free_block_list) self.node_id_pool = list(range(self.num_gpu_blocks + self.num_cpu_blocks)) - main_process_metrics.max_gpu_block_num.set(self.num_gpu_blocks) - main_process_metrics.available_gpu_resource.set(1.0) + main_process_metrics.set_value("max_gpu_block_num", self.num_gpu_blocks) + main_process_metrics.set_value("free_gpu_block_num", self.num_gpu_blocks) + main_process_metrics.set_value("available_gpu_block_num", self.num_gpu_blocks) + main_process_metrics.set_value("available_gpu_resource", 1.0) def _enable_cpu_cache(self): """ @@ -359,8 +366,8 @@ def allocate_gpu_blocks(self, num_blocks): logger.info( f"allocate_gpu_blocks: {allocated_block_ids}, len(self.gpu_free_block_list) {len(self.gpu_free_block_list)}" ) - main_process_metrics.free_gpu_block_num.set(len(self.gpu_free_block_list)) - main_process_metrics.available_gpu_resource.set(self.available_gpu_resource) + main_process_metrics.set_value("free_gpu_block_num", len(self.gpu_free_block_list)) + main_process_metrics.set_value("available_gpu_resource", self.available_gpu_resource) return allocated_block_ids def recycle_gpu_blocks(self, gpu_block_ids): @@ -375,8 +382,8 @@ def recycle_gpu_blocks(self, gpu_block_ids): heapq.heappush(self.gpu_free_block_list, gpu_block_id) else: heapq.heappush(self.gpu_free_block_list, gpu_block_ids) - main_process_metrics.free_gpu_block_num.set(len(self.gpu_free_block_list)) - main_process_metrics.available_gpu_resource.set(self.available_gpu_resource) + main_process_metrics.set_value("free_gpu_block_num", len(self.gpu_free_block_list)) + main_process_metrics.set_value("available_gpu_resource", self.available_gpu_resource) def allocate_cpu_blocks(self, num_blocks): """ diff --git a/fastdeploy/engine/common_engine.py b/fastdeploy/engine/common_engine.py index 4dabc0c89e3..c9a5db2abbd 100644 --- a/fastdeploy/engine/common_engine.py +++ b/fastdeploy/engine/common_engine.py @@ -562,8 +562,8 @@ def _insert_task_to_worker(self): else: continue - main_process_metrics.num_requests_waiting.dec(len(tasks)) - main_process_metrics.num_requests_running.inc(len(tasks)) + main_process_metrics.dec_value("num_requests_waiting", len(tasks)) + main_process_metrics.inc_value("num_requests_running", len(tasks)) except Exception as e: err_msg = f"Error happend while insert task to engine: {e}, {traceback.format_exc()!s}." self.llm_logger.error(err_msg) @@ -744,7 +744,7 @@ def _insert_zmq_task_to_scheduler(self): try: request = Request.from_dict(data) start_span("ENQUEUE_ZMQ", data, trace.SpanKind.PRODUCER) - main_process_metrics.requests_number.inc() + main_process_metrics.inc_value("requests_number", 1) self.llm_logger.debug(f"Receive request: {request}") except Exception as e: self.llm_logger.error(f"Receive request error: {e}, {traceback.format_exc()!s}") @@ -775,7 +775,7 @@ def _insert_zmq_task_to_scheduler(self): added_requests.pop(request_id) if failed is None: - main_process_metrics.num_requests_waiting.inc(1) + main_process_metrics.inc_value("num_requests_waiting", 1) continue error_result = RequestOutput( diff --git a/fastdeploy/engine/resource_manager.py b/fastdeploy/engine/resource_manager.py index 39f6a80e4d4..1d728f65b7b 100644 --- a/fastdeploy/engine/resource_manager.py +++ b/fastdeploy/engine/resource_manager.py @@ -59,7 +59,8 @@ def __init__( # current batch status of the engine self.real_bsz = 0 llm_logger.info(f"{self.info()}") - main_process_metrics.max_batch_size.set(max_num_seqs) + main_process_metrics.set_value("max_batch_size", max_num_seqs) + main_process_metrics.set_value("available_batch_size", self.available_batch()) def reset_cache_config(self, cfg): """ @@ -169,7 +170,7 @@ def _recycle_block_tables(self, task): ori_number = self.available_block_num() self.cache_manager.recycle_gpu_blocks(block_tables) cur_number = self.available_block_num() - main_process_metrics.gpu_cache_usage_perc.set(self.get_gpu_cache_usage_perc()) + main_process_metrics.set_value("gpu_cache_usage_perc", self.get_gpu_cache_usage_perc()) llm_logger.info(f"recycle {req_id} {cur_number - ori_number} blocks.") def available_batch(self): @@ -311,15 +312,15 @@ def allocate_resources_for_new_tasks(self, tasks): break # record batch size here - task_used_block_num = sum([len(task.block_tables) if task else 0 for task in self.tasks_list]) - main_process_metrics.available_gpu_block_num.set(self.total_block_number() - task_used_block_num) - main_process_metrics.batch_size.set(self.max_num_seqs - self.available_batch()) - main_process_metrics.gpu_cache_usage_perc.set(self.get_gpu_cache_usage_perc()) + num_blocks_used_by_tasks = sum([len(task.block_tables) if task else 0 for task in self.tasks_list]) + main_process_metrics.set_value("available_gpu_block_num", self.total_block_number() - num_blocks_used_by_tasks) + main_process_metrics.set_value("batch_size", self.max_num_seqs - self.available_batch()) + main_process_metrics.set_value("gpu_cache_usage_perc", self.get_gpu_cache_usage_perc()) llm_logger.info( f"Number of allocated requests: {len(tasks)}, number of " f"running requests in worker: {self.real_bsz}" ) llm_logger.info(f"{self.info()}") - main_process_metrics.gpu_cache_usage_perc.set(self.get_gpu_cache_usage_perc()) + main_process_metrics.set_value("gpu_cache_usage_perc", self.get_gpu_cache_usage_perc()) return processed_tasks @@ -347,9 +348,9 @@ def _record_request_cache_info(self, task, common_block_ids, unique_block_ids, h task.cache_info = (cache_block_num, no_cache_block_num) # Report the number of cached tokens to Prometheus metrics - main_process_metrics.prefix_cache_token_num.inc(task.num_cached_tokens) - main_process_metrics.prefix_gpu_cache_token_num.inc(task.gpu_cache_token_num) - main_process_metrics.prefix_cpu_cache_token_num.inc(task.cpu_cache_token_num) + main_process_metrics.inc_value("prefix_cache_token_num", task.num_cached_tokens) + main_process_metrics.inc_value("prefix_gpu_cache_token_num", task.gpu_cache_token_num) + main_process_metrics.inc_value("prefix_cpu_cache_token_num", task.cpu_cache_token_num) cached_len = len(common_block_ids) * self.cfg.block_size task.block_tables = common_block_ids + unique_block_ids diff --git a/fastdeploy/engine/sched/resource_manager_v1.py b/fastdeploy/engine/sched/resource_manager_v1.py index 6291c8f3a5d..8ee1a4e3b22 100644 --- a/fastdeploy/engine/sched/resource_manager_v1.py +++ b/fastdeploy/engine/sched/resource_manager_v1.py @@ -91,7 +91,7 @@ def __init__(self, max_num_seqs, config, tensor_parallel_size, splitwise_role, l self.finish_execution_pool = ThreadPoolExecutor(max_workers=1) self.lock = threading.Lock() self.to_be_rescheduled_request_id_set = set() - main_process_metrics.max_batch_size.set(max_num_seqs) + main_process_metrics.set_value("max_batch_size", max_num_seqs) self.using_extend_tables_req_id = set() @@ -144,13 +144,12 @@ def _trigger_preempt(self, request, num_new_blocks, preempted_reqs, scheduled_re if preempted_req.request_id in self.req_dict: del self.req_dict[preempted_req.request_id] self._free_blocks(preempted_req) - main_process_metrics.num_requests_running.dec(1) + llm_logger.info(f"Preemption is triggered! Preempted request id: {preempted_req.request_id}") else: self._free_blocks(preempted_req) preempted_req.cached_block_num = 0 self.to_be_rescheduled_request_id_set.add(preempted_req.request_id) - main_process_metrics.num_requests_waiting.inc(1) - main_process_metrics.num_requests_running.dec(1) + llm_logger.info(f"Preemption is triggered! Preempted request id: {preempted_req.request_id}") preempted_reqs.append(preempted_req) scheduled_reqs.append(self._prepare_preempt_task(preempted_req)) if preempted_req == request: @@ -414,8 +413,6 @@ def schedule(self): request, self.config.cache_config.block_size, request.num_computed_tokens ) request.status = RequestStatus.RUNNING - main_process_metrics.num_requests_waiting.dec(1) - main_process_metrics.num_requests_running.inc(1) if self.config.scheduler_config.splitwise_role == "mixed": allocated_position = self.get_available_position() request.idx = allocated_position @@ -460,8 +457,6 @@ def schedule(self): request, self.config.cache_config.block_size, request.num_computed_tokens ) request.status = RequestStatus.RUNNING - main_process_metrics.num_requests_waiting.dec(1) - main_process_metrics.num_requests_running.inc(1) else: if self.config.cache_config.enable_prefix_caching: self._free_blocks(request) @@ -520,11 +515,17 @@ def schedule(self): continue if scheduled_reqs: - task_used_block_num = sum([len(task.block_tables) if task else 0 for task in self.tasks_list]) - main_process_metrics.available_gpu_block_num.set(self.total_block_number() - task_used_block_num) - main_process_metrics.batch_size.set(self.max_num_seqs - self.available_batch()) - main_process_metrics.gpu_cache_usage_perc.set(self.get_gpu_cache_usage_perc()) llm_logger.debug(f"schedued_reqs: {scheduled_reqs}") + + # Update metrics + num_tasks = sum([1 if task else 0 for task in self.tasks_list]) + num_blocks_used_by_tasks = sum([len(task.block_tables) if task else 0 for task in self.tasks_list]) + main_process_metrics.set_value("available_gpu_block_num", self.total_block_number() - num_blocks_used_by_tasks) + main_process_metrics.set_value("batch_size", self.max_num_seqs - self.available_batch()) + main_process_metrics.set_value("gpu_cache_usage_perc", self.get_gpu_cache_usage_perc()) + main_process_metrics.set_value("num_requests_running", len(self.running)) + main_process_metrics.set_value("num_requests_waiting", num_tasks - len(self.running)) + return scheduled_reqs def get_available_position(self) -> int: @@ -566,9 +567,9 @@ def get_prefix_cached_blocks(self, request: Request): request.skip_allocate = False # Report the number of cached tokens to Prometheus metrics - main_process_metrics.prefix_cache_token_num.inc(matched_token_num) - main_process_metrics.prefix_gpu_cache_token_num.inc(request.gpu_cache_token_num) - main_process_metrics.prefix_cpu_cache_token_num.inc(request.cpu_cache_token_num) + main_process_metrics.inc_value("prefix_cache_token_num", matched_token_num) + main_process_metrics.inc_value("prefix_gpu_cache_token_num", request.gpu_cache_token_num) + main_process_metrics.inc_value("prefix_cpu_cache_token_num", request.cpu_cache_token_num) if matched_token_num == request.need_prefill_tokens: request.num_computed_tokens = matched_token_num - self.config.cache_config.block_size diff --git a/fastdeploy/entrypoints/engine_client.py b/fastdeploy/entrypoints/engine_client.py index 239ff595053..9987fce89c5 100644 --- a/fastdeploy/entrypoints/engine_client.py +++ b/fastdeploy/entrypoints/engine_client.py @@ -158,9 +158,9 @@ async def add_requests(self, task): if "messages" in task: del task["messages"] api_server_logger.info(f"task['max_tokens']:{task['max_tokens']}") - work_process_metrics.request_params_max_tokens.observe(task["max_tokens"]) - work_process_metrics.prompt_tokens_total.inc(input_ids_len) - work_process_metrics.request_prompt_tokens.observe(input_ids_len) + work_process_metrics.obs_value("request_params_max_tokens", task["max_tokens"]) + work_process_metrics.inc_value("prompt_tokens_total", input_ids_len) + work_process_metrics.obs_value("request_prompt_tokens", input_ids_len) except Exception as e: api_server_logger.error(f"add_requests error: {e}, {str(traceback.format_exc())}") raise EngineError(str(e), error_code=400) diff --git a/fastdeploy/entrypoints/openai/serving_chat.py b/fastdeploy/entrypoints/openai/serving_chat.py index 388a81dd9c3..0faf75b7dbb 100644 --- a/fastdeploy/entrypoints/openai/serving_chat.py +++ b/fastdeploy/entrypoints/openai/serving_chat.py @@ -338,9 +338,7 @@ async def chat_completion_stream_generator( ) if res["finished"]: num_choices -= 1 - work_process_metrics.e2e_request_latency.observe( - time.time() - res["metrics"]["request_start_time"] - ) + work_process_metrics.obs_value("e2e_request_latency", time.time() - res["metrics"]["request_start_time"]) has_no_token_limit = request.max_tokens is None and request.max_completion_tokens is None max_tokens = request.max_completion_tokens or request.max_tokens if has_no_token_limit or previous_num_tokens != max_tokens: @@ -533,7 +531,7 @@ async def chat_completion_full_generator( total_tokens=num_prompt_tokens + num_generated_tokens, prompt_tokens_details=PromptTokenUsageInfo(cached_tokens=final_res.get("num_cached_tokens", 0)), ) - work_process_metrics.e2e_request_latency.observe(time.time() - final_res["metrics"]["request_start_time"]) + work_process_metrics.obs_value("e2e_request_latency", time.time() - final_res["metrics"]["request_start_time"]) res = ChatCompletionResponse( id=request_id, created=created_time, diff --git a/fastdeploy/envs.py b/fastdeploy/envs.py index 2e87049845a..480718d3759 100644 --- a/fastdeploy/envs.py +++ b/fastdeploy/envs.py @@ -116,6 +116,10 @@ # Max pre-fetch requests number in PD "FD_EP_MAX_PREFETCH_TASK_NUM": lambda: int(os.getenv("FD_EP_MAX_PREFETCH_TASK_NUM", "8")), "FD_ENABLE_MODEL_LOAD_CACHE": lambda: bool(int(os.getenv("FD_ENABLE_MODEL_LOAD_CACHE", "0"))), + # Whether to clear cpu cache when clearing model weights. + "FD_ENABLE_SWAP_SPACE_CLEARING": lambda: int(os.getenv("FD_ENABLE_SWAP_SPACE_CLEARING", "0")), + # Whether to use labels in metrics. + "FD_ENABLE_METRIC_LABELS": lambda: bool(int(os.getenv("FD_ENABLE_METRIC_LABELS", "0"))), } diff --git a/fastdeploy/metrics/interface.py b/fastdeploy/metrics/interface.py new file mode 100644 index 00000000000..b1e64114805 --- /dev/null +++ b/fastdeploy/metrics/interface.py @@ -0,0 +1,70 @@ +""" +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" + + +from prometheus_client import ( + Counter, + Gauge, + Histogram, +) +from fastdeploy import envs + +class MetricsManagerInterface: + + def set_value(self, name, value, labelvalues=None): + metric = getattr(self, name, None) + if isinstance(metric, Gauge): + if envs.FD_ENABLE_METRIC_LABELS: + if labelvalues is None: + labelvalues = {ln: "" for ln in metric._labelnames} + metric.labels(**labelvalues).set(value) + else: + metric.set(value) + return + + def inc_value(self, name, value=1, labelvalues=None): + metric = getattr(self, name, None) + if isinstance(metric, Gauge) or isinstance(metric, Counter): + if envs.FD_ENABLE_METRIC_LABELS: + if labelvalues is None: + labelvalues = {ln: "" for ln in metric._labelnames} + metric.labels(**labelvalues).inc(value) + else: + metric.inc(value) + return + + def dec_value(self, name, value=1, labelvalues=None): + metric = getattr(self, name, None) + if isinstance(metric, Gauge): + if envs.FD_ENABLE_METRIC_LABELS: + if labelvalues is None: + labelvalues = {ln: "" for ln in metric._labelnames} + metric.labels(**labelvalues).dec(value) + else: + metric.dec(value) + return + + def obs_value(self, name, value, labelvalues=None): + metric = getattr(self, name, None) + if isinstance(metric, Histogram): + if envs.FD_ENABLE_METRIC_LABELS: + if labelvalues is None: + labelvalues = {ln: "" for ln in metric._labelnames} + metric.labels(**labelvalues).observe(value) + else: + metric.observe(value) + return + diff --git a/fastdeploy/metrics/metrics.py b/fastdeploy/metrics/metrics.py index c6a6bbba752..29068fc3f89 100644 --- a/fastdeploy/metrics/metrics.py +++ b/fastdeploy/metrics/metrics.py @@ -34,6 +34,8 @@ from fastdeploy.metrics import build_1_2_5_buckets from fastdeploy.metrics.work_metrics import work_process_metrics +from fastdeploy.metrics.interface import MetricsManagerInterface +from fastdeploy import envs def cleanup_prometheus_files(is_main: bool, instance_id: str = None): @@ -51,7 +53,6 @@ def cleanup_prometheus_files(is_main: bool, instance_id: str = None): return prom_dir - class SimpleCollector(Collector): """ A custom Prometheus collector that filters out specific metrics by name. @@ -89,13 +90,19 @@ def get_filtered_metrics(exclude_names: Set[str], extra_register_func=None) -> s :param exclude_names: metric.name set to be excluded :param extra_register_func: optional, main process custom metric registration method :return: filtered metric text (str) + + generate_latest(filtered_registry) <- filtered_registry.collect <- SimpleCollector.collect + <- base_registry.collect <- MultiProcessCollector.collect """ + # Register a MultiProcessCollector to base registry + # When a MultiProcessCollector collects, it reads metrics from *.db files in PROMETHEUS_MULTIPROC_DIR base_registry = CollectorRegistry() multiprocess.MultiProcessCollector(base_registry) filtered_registry = CollectorRegistry() filtered_registry.register(SimpleCollector(base_registry, exclude_names)) + # extra_register_func is used to register custom metrics to filtered_registry if extra_register_func: extra_register_func(filtered_registry) @@ -127,7 +134,7 @@ def get_filtered_metrics(exclude_names: Set[str], extra_register_func=None) -> s ] -class MetricsManager: +class MetricsManager(MetricsManagerInterface): """Prometheus Metrics Manager handles all metric updates""" _instance = None @@ -392,11 +399,17 @@ class MetricsManager: "kwargs": {}, }, } + SPECULATIVE_METRICS = {} def __init__(self): """Initializes the Prometheus metrics and starts the HTTP server if not already initialized.""" - # 动态创建所有指标 + # Add labels to existing metrics: model_id + if envs.FD_ENABLE_METRIC_LABELS: + for metric in self.METRICS: + self.METRICS[metric]["kwargs"]["labelnames"] = ["model_id"] + + # Create metrics dynamically for metric_name, config in self.METRICS.items(): setattr( self, diff --git a/fastdeploy/metrics/work_metrics.py b/fastdeploy/metrics/work_metrics.py index 190940ff6af..2d211c08797 100644 --- a/fastdeploy/metrics/work_metrics.py +++ b/fastdeploy/metrics/work_metrics.py @@ -21,9 +21,10 @@ from prometheus_client import Counter, Histogram from fastdeploy.metrics.metrics import build_1_2_5_buckets +from fastdeploy.metrics.interface import MetricsManagerInterface +from fastdeploy import envs - -class WorkMetricsManager: +class WorkMetricsManager(MetricsManagerInterface): """Prometheus Metrics Manager handles all metric updates""" _initialized = False @@ -34,6 +35,9 @@ def __init__(self): if self._initialized: return + # Add labels to existing metrics: model_id + LABEL_NAMES = ["model_id"] if envs.FD_ENABLE_METRIC_LABELS else [] + self.e2e_request_latency = Histogram( "fastdeploy:e2e_request_latency_seconds", "End-to-end request latency (from request arrival to final response)", @@ -60,22 +64,26 @@ def __init__(self): 1920.0, 7680.0, ], + labelnames=LABEL_NAMES, ) self.request_params_max_tokens = Histogram( name="fastdeploy:request_params_max_tokens", documentation="Histogram of max_tokens parameter in request parameters", buckets=build_1_2_5_buckets(33792), + labelnames=LABEL_NAMES, ) self.prompt_tokens_total = Counter( name="fastdeploy:prompt_tokens_total", documentation="Total number of prompt tokens processed", + labelnames=LABEL_NAMES, ) self.request_prompt_tokens = Histogram( name="fastdeploy:request_prompt_tokens", documentation="Number of prefill tokens processed.", buckets=build_1_2_5_buckets(33792), + labelnames=LABEL_NAMES, ) - + self._initialized = True diff --git a/fastdeploy/output/token_processor.py b/fastdeploy/output/token_processor.py index 596b32ab44d..159899b232c 100644 --- a/fastdeploy/output/token_processor.py +++ b/fastdeploy/output/token_processor.py @@ -342,7 +342,7 @@ def process_metrics(): while current_index < len(self.prefill_time_signal.value): prefill_time = self.prefill_time_signal.value[current_index] if prefill_time > 0: - main_process_metrics.request_prefill_time.observe(prefill_time) + main_process_metrics.obs_value("request_prefill_time", prefill_time) self.prefill_time_signal.value[current_index] = 0 current_index += 1 except Exception as e: @@ -399,14 +399,10 @@ def _recycle_resources(self, task_id, index, task, result=None, is_prefill=False if task_id in self.resource_manager.req_dict: del self.resource_manager.req_dict[task_id] - task_used_block_num = sum([len(task.block_tables) if task else 0 for task in self.resource_manager.tasks_list]) - main_process_metrics.available_gpu_block_num.set( - self.resource_manager.total_block_number() - task_used_block_num - ) - main_process_metrics.batch_size.set( - self.resource_manager.max_num_seqs - self.resource_manager.available_batch() - ) - main_process_metrics.available_batch_size.set(self.resource_manager.available_batch()) + num_blocks_used_by_tasks = sum([len(task.block_tables) if task else 0 for task in self.resource_manager.tasks_list]) + main_process_metrics.set_value("available_gpu_block_num", self.resource_manager.total_block_number() - num_blocks_used_by_tasks) + main_process_metrics.set_value("batch_size", self.resource_manager.max_num_seqs - self.resource_manager.available_batch()) + main_process_metrics.set_value("available_batch_size", self.resource_manager.available_batch()) if task_id in self.tokens_counter: del self.tokens_counter[task_id] @@ -597,30 +593,30 @@ def _record_metrics(self, task, current_time, token_ids): """Record all metrics for a task""" if hasattr(task, "last_token_time") and task.last_token_time is not None: token_gen_time = current_time - task.last_token_time - main_process_metrics.time_per_output_token.observe(token_gen_time) + main_process_metrics.obs_value("time_per_output_token", token_gen_time) task.last_token_time = current_time # Record generation metrics - main_process_metrics.generation_tokens_total.inc(len(token_ids)) + main_process_metrics.inc_value("generation_tokens_total", len(token_ids)) def _record_first_token_metrics(self, task, current_time): """Record metrics for first token""" task.first_token_time = current_time - main_process_metrics.first_token_latency.set(current_time - task.inference_start_time) - main_process_metrics.time_to_first_token.observe(current_time - task.inference_start_time) - main_process_metrics.request_queue_time.observe(task.schedule_start_time - task.preprocess_end_time) + main_process_metrics.set_value("first_token_latency", current_time - task.inference_start_time) + main_process_metrics.obs_value("time_to_first_token", current_time - task.inference_start_time) + main_process_metrics.obs_value("request_queue_time", task.schedule_start_time - task.preprocess_end_time) def _record_completion_metrics(self, task, current_time): """Record metrics when request completes""" if hasattr(task, "first_token_time"): decode_time = current_time - task.first_token_time - main_process_metrics.request_decode_time.observe(decode_time) + main_process_metrics.obs_value("request_decode_time", decode_time) - main_process_metrics.num_requests_running.dec(1) - main_process_metrics.request_success_total.inc() - main_process_metrics.infer_latency.set(current_time - task.inference_start_time) - main_process_metrics.request_inference_time.observe(current_time - task.inference_start_time) - main_process_metrics.request_generation_tokens.observe(self.tokens_counter[task.request_id]) + main_process_metrics.dec_value("num_requests_running", 1) + main_process_metrics.inc_value("request_success_total", 1) + main_process_metrics.set_value("infer_latency", current_time - task.inference_start_time) + main_process_metrics.obs_value("request_inference_time", current_time - task.inference_start_time) + main_process_metrics.obs_value("request_generation_tokens", self.tokens_counter[task.request_id]) def _record_speculative_decoding_mertics(self, accept_num): """Record metrics of speculative decoding""" diff --git a/fastdeploy/splitwise/internal_adapter_utils.py b/fastdeploy/splitwise/internal_adapter_utils.py index 0e1ba449410..010df6e5b7a 100644 --- a/fastdeploy/splitwise/internal_adapter_utils.py +++ b/fastdeploy/splitwise/internal_adapter_utils.py @@ -22,7 +22,7 @@ import zmq from fastdeploy.inter_communicator import ZmqTcpServer -from fastdeploy.metrics.metrics import get_filtered_metrics, main_process_metrics +from fastdeploy.metrics.metrics import EXCLUDE_LABELS, get_filtered_metrics, main_process_metrics from fastdeploy.utils import envs, get_logger logger = get_logger("internal_adapter_utils", "internal_adapter_utils.log") @@ -89,7 +89,7 @@ def _recv_external_module_control_instruct(self): elif task["cmd"] == "get_metrics": metrics_text = get_filtered_metrics( - [], + EXCLUDE_LABELS, extra_register_func=lambda reg: main_process_metrics.register_all(reg, workers=1), ) result = {"task_id": task_id_str, "result": metrics_text} diff --git a/fastdeploy/splitwise/splitwise_connector.py b/fastdeploy/splitwise/splitwise_connector.py index 62d33f433fb..ff49a5f271e 100644 --- a/fastdeploy/splitwise/splitwise_connector.py +++ b/fastdeploy/splitwise/splitwise_connector.py @@ -161,7 +161,7 @@ def _send_message(self, addr, msg_type: str, payload): self.logger.warning(f"Send queue full for {addr}") except Exception as e: self.logger.error(f"Send to {addr} failed: {e}, {str(traceback.format_exc())}") - main_process_metrics.send_cache_failed_num.inc() + main_process_metrics.inc_value("send_cache_failed_num") self._close_connection(addr) except Exception as e: From fa13f6e2e6747bc693a1fe11eefe485bbd05c6b8 Mon Sep 17 00:00:00 2001 From: liyonghua0910 Date: Fri, 17 Oct 2025 18:28:28 +0800 Subject: [PATCH 2/2] [feat] support FD_DEFAULT_METRIC_LABEL_VALUES --- fastdeploy/envs.py | 2 ++ fastdeploy/metrics/interface.py | 17 +++++++++++++---- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/fastdeploy/envs.py b/fastdeploy/envs.py index 480718d3759..5ef9be1f23d 100644 --- a/fastdeploy/envs.py +++ b/fastdeploy/envs.py @@ -120,6 +120,8 @@ "FD_ENABLE_SWAP_SPACE_CLEARING": lambda: int(os.getenv("FD_ENABLE_SWAP_SPACE_CLEARING", "0")), # Whether to use labels in metrics. "FD_ENABLE_METRIC_LABELS": lambda: bool(int(os.getenv("FD_ENABLE_METRIC_LABELS", "0"))), + # Default label values in metrics. + "FD_DEFAULT_METRIC_LABEL_VALUES": lambda: os.getenv("FD_DEFAULT_METRIC_LABEL_VALUES", "{}"), } diff --git a/fastdeploy/metrics/interface.py b/fastdeploy/metrics/interface.py index b1e64114805..d8766151387 100644 --- a/fastdeploy/metrics/interface.py +++ b/fastdeploy/metrics/interface.py @@ -21,6 +21,7 @@ Histogram, ) from fastdeploy import envs +import json class MetricsManagerInterface: @@ -28,8 +29,10 @@ def set_value(self, name, value, labelvalues=None): metric = getattr(self, name, None) if isinstance(metric, Gauge): if envs.FD_ENABLE_METRIC_LABELS: + default_labelvalues = json.loads(envs.FD_DEFAULT_METRIC_LABEL_VALUES) if labelvalues is None: - labelvalues = {ln: "" for ln in metric._labelnames} + labelvalues = {} + labelvalues = {ln: labelvalues.get(ln, default_labelvalues.get(ln, "")) for ln in metric._labelnames} metric.labels(**labelvalues).set(value) else: metric.set(value) @@ -39,8 +42,10 @@ def inc_value(self, name, value=1, labelvalues=None): metric = getattr(self, name, None) if isinstance(metric, Gauge) or isinstance(metric, Counter): if envs.FD_ENABLE_METRIC_LABELS: + default_labelvalues = json.loads(envs.FD_DEFAULT_METRIC_LABEL_VALUES) if labelvalues is None: - labelvalues = {ln: "" for ln in metric._labelnames} + labelvalues = {} + labelvalues = {ln: labelvalues.get(ln, default_labelvalues.get(ln, "")) for ln in metric._labelnames} metric.labels(**labelvalues).inc(value) else: metric.inc(value) @@ -50,8 +55,10 @@ def dec_value(self, name, value=1, labelvalues=None): metric = getattr(self, name, None) if isinstance(metric, Gauge): if envs.FD_ENABLE_METRIC_LABELS: + default_labelvalues = json.loads(envs.FD_DEFAULT_METRIC_LABEL_VALUES) if labelvalues is None: - labelvalues = {ln: "" for ln in metric._labelnames} + labelvalues = {} + labelvalues = {ln: labelvalues.get(ln, default_labelvalues.get(ln, "")) for ln in metric._labelnames} metric.labels(**labelvalues).dec(value) else: metric.dec(value) @@ -61,8 +68,10 @@ def obs_value(self, name, value, labelvalues=None): metric = getattr(self, name, None) if isinstance(metric, Histogram): if envs.FD_ENABLE_METRIC_LABELS: + default_labelvalues = json.loads(envs.FD_DEFAULT_METRIC_LABEL_VALUES) if labelvalues is None: - labelvalues = {ln: "" for ln in metric._labelnames} + labelvalues = {} + labelvalues = {ln: labelvalues.get(ln, default_labelvalues.get(ln, "")) for ln in metric._labelnames} metric.labels(**labelvalues).observe(value) else: metric.observe(value)