Skip to content

Refactor omni metrics into stats → aggregator → logger pipeline#1

Closed
Copilot wants to merge 7 commits into
mainfrom
copilot/refactor-metrics-for-vllm-omni
Closed

Refactor omni metrics into stats → aggregator → logger pipeline#1
Copilot wants to merge 7 commits into
mainfrom
copilot/refactor-metrics-for-vllm-omni

Conversation

Copy link
Copy Markdown

Copilot AI commented Jan 12, 2026

Refines vllm-omni metrics to mirror vLLM’s three-layer design (Stats → Aggregator → Loggers/Exporters) with per-transfer, per-stage, and e2e summaries plus optional Prometheus export.

  • Architecture & data models
    • Added vllm_omni.metrics package with StageRequestStats, TransferEdgeStats, RequestE2EStats, RunSummary, and OrchestratorAggregator (keeps OrchestratorMetrics alias for compatibility).
    • Centralized transfer/stage logging helpers and safe rate calculations.
  • Logging/Exporters
    • Introduced OmniStatLoggerBase, OmniLoggingStatLogger, OmniPrometheusStatLogger, and OmniStatLoggerManager for periodic interval logging/export.
  • Wiring
    • Hooked Omni and AsyncOmni to the new aggregator/manager, passing final-stage mapping for e2e stats and avoiding duplicate summary computation.
    • entrypoints/log_utils.py now re-exports the new metrics API for backward compatibility.
  • Tests
    • Added unit coverage for aggregator summary building and logger manager emission triggers.

Example:

from vllm_omni.metrics import OrchestratorMetrics, OmniLoggingStatLogger, OmniStatLoggerManager

agg = OrchestratorMetrics(num_stages=2, enable_debug_events=False, wall_start_ts=0.0)
mgr = OmniStatLoggerManager(aggregator=agg, loggers=[OmniLoggingStatLogger(interval_s=5)])

agg.on_forward(0, 1, "req-1", size_bytes=1024, tx_ms=5.0, used_shm=False)
agg.on_stage_metrics(0, "req-1", {"num_tokens_out": 10, "stage_gen_time_ms": 30.0, "batch_id": 1, "batch_size": 1})
agg.on_finalize_request(1, "req-1", req_start_ts=0.0)
summary = mgr.force_log().to_dict()

(APIServer pid=3672142) ERROR 03-20 15:33:03 [async_omni_engine.py:514] pydantic_core._pydantic_core.ValidationError: 1 validation error for OmniModelConfig
(APIServer pid=3672142) ERROR 03-20 15:33:03 [async_omni_engine.py:514]   Value error, User-specified max_model_len (18192) is greater than the derived max_model_len (max_position_embeddings=8192.0 or model_max_length=None in model's config.json). To allow overriding this maximum, set the env var VLLM_ALLOW_LONG_MAX_MODEL_LEN=1. VLLM_ALLOW_LONG_MAX_MODEL_LEN must be used with extreme caution. If the model uses relative position encoding (RoPE), positions exceeding derived_max_model_len lead to nan. If the model uses absolute position encoding, positions exceeding derived_max_model_len will cause a CUDA array out-of-bounds error. [type=value_error, input_value=ArgsKwargs((), {'model': ... 1}, 'task_type': None}), input_type=ArgsKwargs]


根本原因 1:_proj_buf 持久缓冲区跨请求污染

PR vllm-project#1758 使用了持久的 self._proj_buf,多个并发请求共享同一块 GPU 内存。请求 A 写入的 embedding 数据会被请求 B 覆盖,导致后续 AR step 读到错误的历史 embedding。这是并发测试间歇性失败的主因。

修复:每次 forward() 调用局部分配 proj_buf = torch.zeros(...)。

根本原因 2:summed_embeddings 3D + text_step 2D 广播错误

summed_embeddings shape 为 [B, S, H](3D),text_step shape 为 [B*S, H](2D)。两者相加时 PyTorch 不会报错,而是静默广播——当 B > 1 时结果完全错误。

修复:summed_embeddings.reshape(-1, H) 统一为 2D 再相加。

Original prompt
标记点 触发位置 物理含义 对应指标计算
  serve.py 收到请求入口 请求开始 -
  Thinker 输出结束,即将调用 Talker Thinker 完成  
  Talker 生成第一个 RVQ Token Talker 首字  
  Talker 生成所有 RVQ Token 结束 Talker 完成  
  Code2Wav 生成第一个音频帧/包 音频首包  
  完整音频生成结束 任务完成  

💬 We'd love your input! Share your thoughts on Copilot coding agent in our 2 minute survey.

Copilot AI and others added 6 commits January 12, 2026 11:49
Co-authored-by: LJH-LBJ <98734602+LJH-LBJ@users.noreply.github.com>
Co-authored-by: LJH-LBJ <98734602+LJH-LBJ@users.noreply.github.com>
Co-authored-by: LJH-LBJ <98734602+LJH-LBJ@users.noreply.github.com>
Co-authored-by: LJH-LBJ <98734602+LJH-LBJ@users.noreply.github.com>
Co-authored-by: LJH-LBJ <98734602+LJH-LBJ@users.noreply.github.com>
Co-authored-by: LJH-LBJ <98734602+LJH-LBJ@users.noreply.github.com>
Copilot AI changed the title [WIP] Refactor metrics architecture for vllm-omni Refactor omni metrics into stats → aggregator → logger pipeline Jan 12, 2026
Copilot AI requested a review from LJH-LBJ January 12, 2026 12:06
@LJH-LBJ
Copy link
Copy Markdown
Owner

LJH-LBJ commented Jan 13, 2026

@copilot vision encoder time, audio encoder time, thinker prefill time, thinker decode time, talker prefill time, talker decode如何获得呢

Copy link
Copy Markdown
Author

Copilot AI commented Jan 13, 2026

@copilot vision encoder time, audio encoder time, thinker prefill time, thinker decode time, talker prefill time, talker decode如何获得呢

当前框架里还没有采集 vision encoder / audio encoder / thinker prefill&decode / talker prefill&decode 这些细粒度耗时。要获得它们需要在对应 stage 的引擎或输出处理处埋点,把时间写入传给 orchestrator 的 metrics(例如 StageRequestStats/StageRequestMetrics 的自定义字段),再由 aggregator/logger 统一输出。现有代码不会自动生成这些耗时。

@LJH-LBJ
Copy link
Copy Markdown
Owner

LJH-LBJ commented Jan 13, 2026

@copilot vision encoder / audio encoder 在vllm_omni\model_executor\models\qwen3_omni\qwen3_omni_moe_thinker.py的embed_multimodal的_process_image_input和_process_audio_input,thinker prefill&decode / talker prefill&decode 在vllm_omni\entrypoints\async_omni_llm.py的_run_output_handler会调用logger_manager.record记录数据,我感觉这些函数的调用在vllm侧,不方便修改vllm的代码,是不是只能通过prometheus_client传递

Copy link
Copy Markdown
Author

Copilot AI commented Jan 13, 2026

@copilot vision encoder / audio encoder 在vllm_omni\model_executor\models\qwen3_omni\qwen3_omni_moe_thinker.py的embed_multimodal的_process_image_input和_process_audio_input,thinker prefill&decod...

这些耗时仍需在调用点埋点:

  • 在 qwen3_omni_moe_thinker.embed_multimodal 的 _process_image_input/_process_audio_input 内部计时(time.perf_counter),把耗时写入返回的 metrics 字段(或附加到 StageRequestStats 的自定义键,如 vision_encoder_ms/audio_encoder_ms),并随同请求返回给 orchestrator。
  • 在 async_omni_llm._run_output_handler 里,捕获 thinker/talker prefill/decode 的时间点并写入同一 metrics 结构(如 thinker_prefill_ms/thinker_decode_ms/talker_prefill_ms/talker_decode_ms),logger_manager.record 会随 stats 传出。
  • Prometheus 只是一个输出端,无法凭空获得这些时间,仍需先在上述位置记录;若要直接暴露时序指标,也可在埋点时用 prometheus_client 的 Gauge/Histogram 递交。

@LJH-LBJ
Copy link
Copy Markdown
Owner

LJH-LBJ commented Jan 13, 2026

@copilot 在 async_omni_llm._run_output_handler 里,捕获 thinker/talker prefill/decode 的时间点并写入同一 metrics 结构(如 thinker_prefill_ms/thinker_decode_ms/talker_prefill_ms/talker_decode_ms),logger_manager.record 会随 stats 传出
给我写个例子

Copy link
Copy Markdown
Author

Copilot AI commented Jan 13, 2026

@copilot 在 async_omni_llm._run_output_handler 里,捕获 thinker/talker prefill/decode 的时间点并写入同一 metrics 结构(如 thinker_prefill_ms/thinker_decode_ms/talker_prefill_ms/talke...

可以在 _run_output_handler 拿到阶段性时间戳后直接写入 metrics 字典,示例(示意性代码):

import time

# 在处理某个请求的循环里
start_prefill = time.perf_counter()
# ... 调用 thinker/talker prefill ...
end_prefill = time.perf_counter()

start_decode = time.perf_counter()
# ... 调用 thinker/talker decode ...
end_decode = time.perf_counter()

m = result.get("metrics") or {}
# thinker
m["thinker_prefill_ms"] = (end_prefill - start_prefill) * 1000
m["thinker_decode_ms"] = (end_decode - start_decode) * 1000
# 如果是 talker,同理写 talker_prefill_ms / talker_decode_ms

# 传给 orchestrator
metrics.on_stage_metrics(stage_id, req_id, m)

关键点:

  1. 在调用点用 perf_counter 记录时长;2) 把毫秒数写入 metrics 字典(或自定义 StageRequestStats 字段);3) 继续用 metrics.on_stage_metrics/aggregator 输出或 Prometheus 导出。

@LJH-LBJ
Copy link
Copy Markdown
Owner

LJH-LBJ commented Jan 13, 2026

@copilot 代码里怎么改?

Copy link
Copy Markdown
Author

Copilot AI commented Jan 13, 2026

@copilot 代码里怎么改?

可以在 _run_output_handler 的循环里直接埋点(示意代码):

# 假设当前处于 result 处理分支
import time

start_prefill = time.perf_counter()
# 调用 thinker/talker prefill 逻辑
end_prefill = time.perf_counter()

start_decode = time.perf_counter()
# 调用 thinker/talker decode 逻辑
end_decode = time.perf_counter()

m = result.get("metrics") or {}
m["thinker_prefill_ms"] = (end_prefill - start_prefill) * 1000
m["thinker_decode_ms"] = (end_decode - start_decode) * 1000
# 对 talker 同理写 talker_prefill_ms / talker_decode_ms

metrics.on_stage_metrics(stage_id, req_id, m)

核心是用 perf_counter 记录时长并把毫秒数写入 metrics 字典传给 metrics.on_stage_metrics,后续 aggregator/logger 或 Prometheus 就能输出这些字段。

@LJH-LBJ
Copy link
Copy Markdown
Owner

LJH-LBJ commented Jan 13, 2026

@copilot 如何把perf_counter 记录时长并把毫秒数写入 metrics 字典?给我一份直接能用的代码

Copy link
Copy Markdown
Author

Copilot AI commented Jan 13, 2026

@copilot 如何把perf_counter 记录时长并把毫秒数写入 metrics 字典?给我一份直接能用的代码

示例可直接放进 _run_output_handler 处理 result 的分支里(不改框架其它部分):

import time

# ... 取到 result, stage_id, req_id, metrics 对象 (OrchestratorMetrics) ...

# Thinker prefill
prefill_t0 = time.perf_counter()
# 调用 thinker prefill 逻辑
prefill_t1 = time.perf_counter()

# Thinker decode
decode_t0 = time.perf_counter()
# 调用 thinker decode 逻辑
decode_t1 = time.perf_counter()

m = result.get("metrics") or {}
# 写毫秒
m["thinker_prefill_ms"] = (prefill_t1 - prefill_t0) * 1000
m["thinker_decode_ms"] = (decode_t1 - decode_t0) * 1000

# 如果是 talker,同理记录 talker_prefill_ms / talker_decode_ms

# 继续交给聚合器
metrics.on_stage_metrics(stage_id, req_id, m)

关键点:用 perf_counter() 记录前后时间,差值乘 1000 得到毫秒,写入 metrics 字典后仍然调用现有 metrics.on_stage_metrics,后续 logger/prometheus 就能输出这些字段。

@LJH-LBJ
Copy link
Copy Markdown
Owner

LJH-LBJ commented Jan 13, 2026

@copilot 介绍一下StageRequestStats, TransferEdgeStats, RequestE2EStats, RunSummary, and OrchestratorAggregator、OmniStatLoggerBase, OmniLoggingStatLogger的作用

@LJH-LBJ
Copy link
Copy Markdown
Owner

LJH-LBJ commented Jan 15, 2026

@copilot 我要写一个merge函数,对StageRequestStats,TransferEdgeStats,RequestE2EStats里的event计算并输出平均数

Copy link
Copy Markdown
Author

Copilot AI commented Jan 15, 2026

@copilot 我要写一个merge函数,对StageRequestStats,TransferEdgeStats,RequestE2EStats里的event计算并输出平均数

可以用已有对象直接聚合,示例(放在你需要的位置,输入三类事件列表):

from statistics import mean
from vllm_omni.metrics import RunSummary, StageRequestStats, TransferEdgeStats, RequestE2EStats

def merge_and_avg(stage_events: list[StageRequestStats],
                  transfer_events: list[TransferEdgeStats],
                  request_events: list[RequestE2EStats]) -> dict:
    out: dict[str, float] = {}

    if stage_events:
        out["stage_tokens_out_avg"] = mean(ev.num_tokens_out for ev in stage_events)
        out["stage_latency_ms_avg"] = mean(ev.stage_gen_time_ms for ev in stage_events)
    if transfer_events:
        out["transfer_size_bytes_avg"] = mean(ev.size_bytes for ev in transfer_events)
        out["transfer_total_ms_avg"] = mean(ev.total_time_ms for ev in transfer_events)
    if request_events:
        out["e2e_time_ms_avg"] = mean(ev.e2e_time_ms for ev in request_events)
        out["e2e_tokens_avg"] = mean(ev.e2e_total_tokens for ev in request_events)

    return out

如已使用 OrchestratorAggregator,它有 stage_events/transfer_events/request_events(开启 enable_debug_events 时累积)。直接把这三个列表传给上面的函数即可返回平均值字典。

@LJH-LBJ
Copy link
Copy Markdown
Owner

LJH-LBJ commented Jan 18, 2026

@copilot 整理事件流程

Copy link
Copy Markdown
Author

Copilot AI commented Jan 18, 2026

@copilot 整理事件流程

事件数据流(开启 enable_debug_events 时):

  1. Stage 内部:
    • sender 侧在连接器发送时调用 metrics.on_forward 记录 TX 事件;
    • receiver 侧在收到结果时调用 metrics.on_stage_metrics(stage_id, req_id, metrics_dict_or_stats),把每个请求的阶段耗时/吞吐写入。
  2. orchestrator aggregator (OrchestratorAggregator/OrchestratorMetrics):
    • 将阶段事件累积到 stage_events(含 num_tokens_in/out、stage_gen_time_ms 等);
    • 将链路事件累积到 transfer_events(from/to、size_bytes、tx_ms、rx_decode_time_ms、in_flight_time_ms);
    • on_finalize_request 时汇总 E2E,存到 request_events(e2e_time_ms、e2e_total_tokens、transfers_*)。
  3. 定期输出:
    • OmniStatLoggerManagermaybe_log/force_log 触发 loggers;
    • OmniLoggingStatLogger 将聚合后的 RunSummary 打印,OmniPrometheusStatLogger 写入 Prometheus 指标。
  4. 最终汇总:
    • build_summary 返回 RunSummary.to_dict(),包含 stages/transfers/e2e 汇总;
    • 如果需要平均值,可对 stage_events/transfer_events/request_events 进一步自行聚合(如之前的 merge_and_avg 示例)。

@LJH-LBJ
Copy link
Copy Markdown
Owner

LJH-LBJ commented Jan 19, 2026

@copilot 现在有一个问题:我在前一个周期得到了stage0的信息,后一个周期得到了stage1、2,但这样最后的stage统计就少了stage0,这应该怎么办

Copy link
Copy Markdown
Author

Copilot AI commented Jan 19, 2026

@copilot 现在有一个问题:我在前一个周期得到了stage0的信息,后一个周期得到了stage1、2,但这样最后的stage统计就少了stage0,这应该怎么办

避免漏掉 stage0 的两种方式:

  1. 不要每个周期清空 aggregator 的状态,使用累计数据(OrchestratorAggregator 默认是累计的),定期 build_summary() 输出即可;即使 stage1/2 在后一个周期到达,之前的 stage0 数据仍保留。
  2. 如果必须做按周期窗口输出,就把上一周期的 stage_summary 缓存下来,下一周期输出前将缓存的 stage0 条目与当前周期的 stage1/2 条目合并后再输出(key=stage_id 合并)。本质是“未到的数据不要丢”,要么使用累计汇总,要么在窗口内做阶段级 merge。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants