Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions fastdeploy/cache_manager/cache_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ def _update_history_hit_metrics(self):
self.cpu_hit_token_ratio = self.total_cpu_matched_token_num / self.total_token_num
self.gpu_hit_token_ratio = self.total_gpu_matched_token_num / self.total_token_num

main_process_metrics.hit_req_rate.set(self.hit_req_ratio)
main_process_metrics.hit_token_rate.set(self.hit_token_ratio)
main_process_metrics.cpu_hit_token_rate.set(self.cpu_hit_token_ratio)
main_process_metrics.gpu_hit_token_rate.set(self.gpu_hit_token_ratio)
main_process_metrics.set_value("hit_req_rate", self.hit_req_ratio)
main_process_metrics.set_value("hit_token_rate", self.hit_token_ratio)
main_process_metrics.set_value("cpu_hit_token_rate", self.cpu_hit_token_ratio)
main_process_metrics.set_value("gpu_hit_token_rate", self.gpu_hit_token_ratio)

logger.info(
f"Metrics for all requests: req_count {self.req_count} hit_req_count {self.hit_req_count}"
Expand Down
19 changes: 13 additions & 6 deletions fastdeploy/cache_manager/prefix_cache_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@ def __init__(
+ f"{self.num_cpu_blocks}, bytes_per_layer_per_block {self.cache_config.bytes_per_layer_per_block}"
)

main_process_metrics.set_value("max_gpu_block_num", self.num_gpu_blocks)
main_process_metrics.set_value("available_gpu_block_num", self.num_gpu_blocks)
main_process_metrics.set_value("free_gpu_block_num", self.num_gpu_blocks)
main_process_metrics.set_value("available_gpu_resource", 1.0)

@property
def available_gpu_resource(self):
return len(self.gpu_free_block_list) / self.num_gpu_blocks if self.num_gpu_blocks > 0 else 0.0
Expand Down Expand Up @@ -316,8 +321,10 @@ def update_cache_config(self, cache_config):
heapq.heapify(self.gpu_free_block_list)
self.node_id_pool = list(range(self.num_gpu_blocks + self.num_cpu_blocks))

main_process_metrics.max_gpu_block_num.set(self.num_gpu_blocks)
main_process_metrics.available_gpu_resource.set(1.0)
main_process_metrics.set_value("max_gpu_block_num", self.num_gpu_blocks)
main_process_metrics.set_value("free_gpu_block_num", self.num_gpu_blocks)
main_process_metrics.set_value("available_gpu_block_num", self.num_gpu_blocks)
main_process_metrics.set_value("available_gpu_resource", 1.0)

def _enable_cpu_cache(self):
"""
Expand Down Expand Up @@ -359,8 +366,8 @@ def allocate_gpu_blocks(self, num_blocks):
logger.info(
f"allocate_gpu_blocks: {allocated_block_ids}, len(self.gpu_free_block_list) {len(self.gpu_free_block_list)}"
)
main_process_metrics.free_gpu_block_num.set(len(self.gpu_free_block_list))
main_process_metrics.available_gpu_resource.set(self.available_gpu_resource)
main_process_metrics.set_value("free_gpu_block_num", len(self.gpu_free_block_list))
main_process_metrics.set_value("available_gpu_resource", self.available_gpu_resource)
return allocated_block_ids

def recycle_gpu_blocks(self, gpu_block_ids):
Expand All @@ -375,8 +382,8 @@ def recycle_gpu_blocks(self, gpu_block_ids):
heapq.heappush(self.gpu_free_block_list, gpu_block_id)
else:
heapq.heappush(self.gpu_free_block_list, gpu_block_ids)
main_process_metrics.free_gpu_block_num.set(len(self.gpu_free_block_list))
main_process_metrics.available_gpu_resource.set(self.available_gpu_resource)
main_process_metrics.set_value("free_gpu_block_num", len(self.gpu_free_block_list))
main_process_metrics.set_value("available_gpu_resource", self.available_gpu_resource)

def allocate_cpu_blocks(self, num_blocks):
"""
Expand Down
8 changes: 4 additions & 4 deletions fastdeploy/engine/common_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -562,8 +562,8 @@ def _insert_task_to_worker(self):
else:
continue

main_process_metrics.num_requests_waiting.dec(len(tasks))
main_process_metrics.num_requests_running.inc(len(tasks))
main_process_metrics.dec_value("num_requests_waiting", len(tasks))
main_process_metrics.inc_value("num_requests_running", len(tasks))
except Exception as e:
err_msg = f"Error happend while insert task to engine: {e}, {traceback.format_exc()!s}."
self.llm_logger.error(err_msg)
Expand Down Expand Up @@ -744,7 +744,7 @@ def _insert_zmq_task_to_scheduler(self):
try:
request = Request.from_dict(data)
start_span("ENQUEUE_ZMQ", data, trace.SpanKind.PRODUCER)
main_process_metrics.requests_number.inc()
main_process_metrics.inc_value("requests_number", 1)
self.llm_logger.debug(f"Receive request: {request}")
except Exception as e:
self.llm_logger.error(f"Receive request error: {e}, {traceback.format_exc()!s}")
Expand Down Expand Up @@ -775,7 +775,7 @@ def _insert_zmq_task_to_scheduler(self):
added_requests.pop(request_id)

if failed is None:
main_process_metrics.num_requests_waiting.inc(1)
main_process_metrics.inc_value("num_requests_waiting", 1)
continue

error_result = RequestOutput(
Expand Down
21 changes: 11 additions & 10 deletions fastdeploy/engine/resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ def __init__(
# current batch status of the engine
self.real_bsz = 0
llm_logger.info(f"{self.info()}")
main_process_metrics.max_batch_size.set(max_num_seqs)
main_process_metrics.set_value("max_batch_size", max_num_seqs)
main_process_metrics.set_value("available_batch_size", self.available_batch())

def reset_cache_config(self, cfg):
"""
Expand Down Expand Up @@ -169,7 +170,7 @@ def _recycle_block_tables(self, task):
ori_number = self.available_block_num()
self.cache_manager.recycle_gpu_blocks(block_tables)
cur_number = self.available_block_num()
main_process_metrics.gpu_cache_usage_perc.set(self.get_gpu_cache_usage_perc())
main_process_metrics.set_value("gpu_cache_usage_perc", self.get_gpu_cache_usage_perc())
llm_logger.info(f"recycle {req_id} {cur_number - ori_number} blocks.")

def available_batch(self):
Expand Down Expand Up @@ -311,15 +312,15 @@ def allocate_resources_for_new_tasks(self, tasks):
break

# record batch size here
task_used_block_num = sum([len(task.block_tables) if task else 0 for task in self.tasks_list])
main_process_metrics.available_gpu_block_num.set(self.total_block_number() - task_used_block_num)
main_process_metrics.batch_size.set(self.max_num_seqs - self.available_batch())
main_process_metrics.gpu_cache_usage_perc.set(self.get_gpu_cache_usage_perc())
num_blocks_used_by_tasks = sum([len(task.block_tables) if task else 0 for task in self.tasks_list])
main_process_metrics.set_value("available_gpu_block_num", self.total_block_number() - num_blocks_used_by_tasks)
main_process_metrics.set_value("batch_size", self.max_num_seqs - self.available_batch())
main_process_metrics.set_value("gpu_cache_usage_perc", self.get_gpu_cache_usage_perc())
llm_logger.info(
f"Number of allocated requests: {len(tasks)}, number of " f"running requests in worker: {self.real_bsz}"
)
llm_logger.info(f"{self.info()}")
main_process_metrics.gpu_cache_usage_perc.set(self.get_gpu_cache_usage_perc())
main_process_metrics.set_value("gpu_cache_usage_perc", self.get_gpu_cache_usage_perc())

return processed_tasks

Expand Down Expand Up @@ -347,9 +348,9 @@ def _record_request_cache_info(self, task, common_block_ids, unique_block_ids, h
task.cache_info = (cache_block_num, no_cache_block_num)

# Report the number of cached tokens to Prometheus metrics
main_process_metrics.prefix_cache_token_num.inc(task.num_cached_tokens)
main_process_metrics.prefix_gpu_cache_token_num.inc(task.gpu_cache_token_num)
main_process_metrics.prefix_cpu_cache_token_num.inc(task.cpu_cache_token_num)
main_process_metrics.inc_value("prefix_cache_token_num", task.num_cached_tokens)
main_process_metrics.inc_value("prefix_gpu_cache_token_num", task.gpu_cache_token_num)
main_process_metrics.inc_value("prefix_cpu_cache_token_num", task.cpu_cache_token_num)

cached_len = len(common_block_ids) * self.cfg.block_size
task.block_tables = common_block_ids + unique_block_ids
Expand Down
31 changes: 16 additions & 15 deletions fastdeploy/engine/sched/resource_manager_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def __init__(self, max_num_seqs, config, tensor_parallel_size, splitwise_role, l
self.finish_execution_pool = ThreadPoolExecutor(max_workers=1)
self.lock = threading.Lock()
self.to_be_rescheduled_request_id_set = set()
main_process_metrics.max_batch_size.set(max_num_seqs)
main_process_metrics.set_value("max_batch_size", max_num_seqs)

self.using_extend_tables_req_id = set()

Expand Down Expand Up @@ -144,13 +144,12 @@ def _trigger_preempt(self, request, num_new_blocks, preempted_reqs, scheduled_re
if preempted_req.request_id in self.req_dict:
del self.req_dict[preempted_req.request_id]
self._free_blocks(preempted_req)
main_process_metrics.num_requests_running.dec(1)
llm_logger.info(f"Preemption is triggered! Preempted request id: {preempted_req.request_id}")
else:
self._free_blocks(preempted_req)
preempted_req.cached_block_num = 0
self.to_be_rescheduled_request_id_set.add(preempted_req.request_id)
main_process_metrics.num_requests_waiting.inc(1)
main_process_metrics.num_requests_running.dec(1)
llm_logger.info(f"Preemption is triggered! Preempted request id: {preempted_req.request_id}")
preempted_reqs.append(preempted_req)
scheduled_reqs.append(self._prepare_preempt_task(preempted_req))
if preempted_req == request:
Expand Down Expand Up @@ -414,8 +413,6 @@ def schedule(self):
request, self.config.cache_config.block_size, request.num_computed_tokens
)
request.status = RequestStatus.RUNNING
main_process_metrics.num_requests_waiting.dec(1)
main_process_metrics.num_requests_running.inc(1)
if self.config.scheduler_config.splitwise_role == "mixed":
allocated_position = self.get_available_position()
request.idx = allocated_position
Expand Down Expand Up @@ -460,8 +457,6 @@ def schedule(self):
request, self.config.cache_config.block_size, request.num_computed_tokens
)
request.status = RequestStatus.RUNNING
main_process_metrics.num_requests_waiting.dec(1)
main_process_metrics.num_requests_running.inc(1)
else:
if self.config.cache_config.enable_prefix_caching:
self._free_blocks(request)
Expand Down Expand Up @@ -520,11 +515,17 @@ def schedule(self):
continue

if scheduled_reqs:
task_used_block_num = sum([len(task.block_tables) if task else 0 for task in self.tasks_list])
main_process_metrics.available_gpu_block_num.set(self.total_block_number() - task_used_block_num)
main_process_metrics.batch_size.set(self.max_num_seqs - self.available_batch())
main_process_metrics.gpu_cache_usage_perc.set(self.get_gpu_cache_usage_perc())
llm_logger.debug(f"schedued_reqs: {scheduled_reqs}")

# Update metrics
num_tasks = sum([1 if task else 0 for task in self.tasks_list])
num_blocks_used_by_tasks = sum([len(task.block_tables) if task else 0 for task in self.tasks_list])
main_process_metrics.set_value("available_gpu_block_num", self.total_block_number() - num_blocks_used_by_tasks)
main_process_metrics.set_value("batch_size", self.max_num_seqs - self.available_batch())
main_process_metrics.set_value("gpu_cache_usage_perc", self.get_gpu_cache_usage_perc())
main_process_metrics.set_value("num_requests_running", len(self.running))
main_process_metrics.set_value("num_requests_waiting", num_tasks - len(self.running))

return scheduled_reqs

def get_available_position(self) -> int:
Expand Down Expand Up @@ -566,9 +567,9 @@ def get_prefix_cached_blocks(self, request: Request):
request.skip_allocate = False

# Report the number of cached tokens to Prometheus metrics
main_process_metrics.prefix_cache_token_num.inc(matched_token_num)
main_process_metrics.prefix_gpu_cache_token_num.inc(request.gpu_cache_token_num)
main_process_metrics.prefix_cpu_cache_token_num.inc(request.cpu_cache_token_num)
main_process_metrics.inc_value("prefix_cache_token_num", matched_token_num)
main_process_metrics.inc_value("prefix_gpu_cache_token_num", request.gpu_cache_token_num)
main_process_metrics.inc_value("prefix_cpu_cache_token_num", request.cpu_cache_token_num)

if matched_token_num == request.need_prefill_tokens:
request.num_computed_tokens = matched_token_num - self.config.cache_config.block_size
Expand Down
6 changes: 3 additions & 3 deletions fastdeploy/entrypoints/engine_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,9 @@ async def add_requests(self, task):
if "messages" in task:
del task["messages"]
api_server_logger.info(f"task['max_tokens']:{task['max_tokens']}")
work_process_metrics.request_params_max_tokens.observe(task["max_tokens"])
work_process_metrics.prompt_tokens_total.inc(input_ids_len)
work_process_metrics.request_prompt_tokens.observe(input_ids_len)
work_process_metrics.obs_value("request_params_max_tokens", task["max_tokens"])
work_process_metrics.inc_value("prompt_tokens_total", input_ids_len)
work_process_metrics.obs_value("request_prompt_tokens", input_ids_len)
except Exception as e:
api_server_logger.error(f"add_requests error: {e}, {str(traceback.format_exc())}")
raise EngineError(str(e), error_code=400)
Expand Down
6 changes: 2 additions & 4 deletions fastdeploy/entrypoints/openai/serving_chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,9 +338,7 @@ async def chat_completion_stream_generator(
)
if res["finished"]:
num_choices -= 1
work_process_metrics.e2e_request_latency.observe(
time.time() - res["metrics"]["request_start_time"]
)
work_process_metrics.obs_value("e2e_request_latency", time.time() - res["metrics"]["request_start_time"])
has_no_token_limit = request.max_tokens is None and request.max_completion_tokens is None
max_tokens = request.max_completion_tokens or request.max_tokens
if has_no_token_limit or previous_num_tokens != max_tokens:
Expand Down Expand Up @@ -533,7 +531,7 @@ async def chat_completion_full_generator(
total_tokens=num_prompt_tokens + num_generated_tokens,
prompt_tokens_details=PromptTokenUsageInfo(cached_tokens=final_res.get("num_cached_tokens", 0)),
)
work_process_metrics.e2e_request_latency.observe(time.time() - final_res["metrics"]["request_start_time"])
work_process_metrics.obs_value("e2e_request_latency", time.time() - final_res["metrics"]["request_start_time"])
res = ChatCompletionResponse(
id=request_id,
created=created_time,
Expand Down
6 changes: 6 additions & 0 deletions fastdeploy/envs.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,12 @@
# Max pre-fetch requests number in PD
"FD_EP_MAX_PREFETCH_TASK_NUM": lambda: int(os.getenv("FD_EP_MAX_PREFETCH_TASK_NUM", "8")),
"FD_ENABLE_MODEL_LOAD_CACHE": lambda: bool(int(os.getenv("FD_ENABLE_MODEL_LOAD_CACHE", "0"))),
# Whether to clear cpu cache when clearing model weights.
"FD_ENABLE_SWAP_SPACE_CLEARING": lambda: int(os.getenv("FD_ENABLE_SWAP_SPACE_CLEARING", "0")),
# Whether to use labels in metrics.
"FD_ENABLE_METRIC_LABELS": lambda: bool(int(os.getenv("FD_ENABLE_METRIC_LABELS", "0"))),
# Default label values in metrics.
"FD_DEFAULT_METRIC_LABEL_VALUES": lambda: os.getenv("FD_DEFAULT_METRIC_LABEL_VALUES", "{}"),
}


Expand Down
79 changes: 79 additions & 0 deletions fastdeploy/metrics/interface.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
"""
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""


from prometheus_client import (
Counter,
Gauge,
Histogram,
)
from fastdeploy import envs
import json

class MetricsManagerInterface:

def set_value(self, name, value, labelvalues=None):
metric = getattr(self, name, None)
if isinstance(metric, Gauge):
if envs.FD_ENABLE_METRIC_LABELS:
default_labelvalues = json.loads(envs.FD_DEFAULT_METRIC_LABEL_VALUES)
if labelvalues is None:
labelvalues = {}
labelvalues = {ln: labelvalues.get(ln, default_labelvalues.get(ln, "")) for ln in metric._labelnames}
metric.labels(**labelvalues).set(value)
else:
metric.set(value)
return

def inc_value(self, name, value=1, labelvalues=None):
metric = getattr(self, name, None)
if isinstance(metric, Gauge) or isinstance(metric, Counter):
if envs.FD_ENABLE_METRIC_LABELS:
default_labelvalues = json.loads(envs.FD_DEFAULT_METRIC_LABEL_VALUES)
if labelvalues is None:
labelvalues = {}
labelvalues = {ln: labelvalues.get(ln, default_labelvalues.get(ln, "")) for ln in metric._labelnames}
metric.labels(**labelvalues).inc(value)
else:
metric.inc(value)
return

def dec_value(self, name, value=1, labelvalues=None):
metric = getattr(self, name, None)
if isinstance(metric, Gauge):
if envs.FD_ENABLE_METRIC_LABELS:
default_labelvalues = json.loads(envs.FD_DEFAULT_METRIC_LABEL_VALUES)
if labelvalues is None:
labelvalues = {}
labelvalues = {ln: labelvalues.get(ln, default_labelvalues.get(ln, "")) for ln in metric._labelnames}
metric.labels(**labelvalues).dec(value)
else:
metric.dec(value)
return

def obs_value(self, name, value, labelvalues=None):
metric = getattr(self, name, None)
if isinstance(metric, Histogram):
if envs.FD_ENABLE_METRIC_LABELS:
default_labelvalues = json.loads(envs.FD_DEFAULT_METRIC_LABEL_VALUES)
if labelvalues is None:
labelvalues = {}
labelvalues = {ln: labelvalues.get(ln, default_labelvalues.get(ln, "")) for ln in metric._labelnames}
metric.labels(**labelvalues).observe(value)
else:
metric.observe(value)
return

Loading