feat(observability): add OpenTelemetry tracing for speculative decoding#19545
feat(observability): add OpenTelemetry tracing for speculative decoding#19545RichardoMrMu wants to merge 2 commits intosgl-project:mainfrom
Conversation
|
Warning You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again! |
| if tracing_enabled: | ||
| draft_start_ts = convert_time_to_realtime_ns(time.perf_counter()) | ||
| for req in batch.reqs: | ||
| req.time_stats.trace_ctx.trace_slice_start( |
There was a problem hiding this comment.
You can add set_draft_start_time() to SchedulerTimeStats similar to the previous implementation, and then call set_time_batch(batch.reqs, "set_draft_start_time") here. This will make it easier to extend to metrics and logging in the future, if necessary.
The timestamp retrieval can also be placed inside the set_*_time functions. In fact, there's no need to manually obtain the timestamp at all—the trace_ctx's interfaces will automatically capture the current timestamp.
You can add a default bool parameter trace_only to set_time_batch, and if it's only used for tracing, you can return early by checking tracing_enable.
The same applies below.
| req.time_stats.trace_ctx.trace_slice_end( | ||
| "spec_verify", 2, verify_end_ts | ||
| ) | ||
| req.time_stats.trace_ctx.trace_event( |
There was a problem hiding this comment.
The event should be placed before the trace end; otherwise, the event might be lost or attached to the next span.
Could we consider attaching these attributes to the span instead of creating a separate event?
|
I made some changes to the code, but I don’t have write access to your repository, so I’m posting the patch here. Please rebase onto the latest upstream main branch before applying the patch. From 054f2b3ecf9e3c7be8c86c067c61c85774a3b3c9 Mon Sep 17 00:00:00 2001
From: Feng Su <sufeng@linux.alibaba.com>
Date: Mon, 23 Mar 2026 13:46:40 +0800
Subject: [PATCH] Update the code to minimize the creation of half-open spans
---
.../srt/observability/req_time_stats.py | 49 ++++++++++++-------
python/sglang/srt/speculative/eagle_worker.py | 23 +++------
python/sglang/srt/speculative/ngram_worker.py | 16 ++----
3 files changed, 42 insertions(+), 46 deletions(-)
diff --git a/python/sglang/srt/observability/req_time_stats.py b/python/sglang/srt/observability/req_time_stats.py
index 11ecaf2ad..7798169cd 100644
--- a/python/sglang/srt/observability/req_time_stats.py
+++ b/python/sglang/srt/observability/req_time_stats.py
@@ -184,6 +184,22 @@ class RequestStage:
metrics_is_observed=True,
)
+ # speculative decode
+ SPEC_DRAFT = RequestStageConfig(
+ "spec_draft",
+ level=2,
+ )
+
+ SPEC_VERIFY = RequestStageConfig(
+ "spec_verify",
+ level=2,
+ )
+
+ SPEC_DRAFT_EXTEND = RequestStageConfig(
+ "spec_draft_extend",
+ level=3,
+ )
+
# other
ANONYMOUS = RequestStageConfig("")
@@ -548,6 +564,11 @@ class SchedulerReqTimeStats(ReqTimeStatsBase):
last_forward_entry_time: float = 0.0
last_prefill_finished_time: float = 0.0
+ # speculative decoding
+ spec_draft_start_time: float = 0.0
+ spec_verify_start_time: float = 0.0
+ spec_draft_extend_start_time: float = 0.0
+
# other
transfer_speed_gb_s: float = 0.0
transfer_total_mb: float = 0.0
@@ -578,45 +599,39 @@ class SchedulerReqTimeStats(ReqTimeStatsBase):
def set_spec_draft_start_time(self, ts=None):
if ts is None:
ts = time.perf_counter()
- self.trace_ctx.trace_slice_start(
- "spec_draft", 2, convert_time_to_realtime_ns(ts)
- )
+ self.spec_draft_start_time = ts
def set_spec_draft_end_time(self, ts=None):
if ts is None:
ts = time.perf_counter()
- self.trace_ctx.trace_slice_end(
- "spec_draft", 2, convert_time_to_realtime_ns(ts)
- )
+
+ stage = RequestStage.SPEC_DRAFT
+ self.trace_slice(stage, self.spec_draft_start_time, ts)
def set_spec_verify_start_time(self, ts=None):
if ts is None:
ts = time.perf_counter()
- self.trace_ctx.trace_slice_start(
- "spec_verify", 2, convert_time_to_realtime_ns(ts)
- )
+ self.spec_verify_start_time = ts
def set_spec_verify_end_time(self, ts=None, accepted_tokens: int = 0):
if ts is None:
ts = time.perf_counter()
realtime_ns = convert_time_to_realtime_ns(ts)
- self.trace_ctx.trace_slice_end(
- "spec_verify", 2, realtime_ns, {"accepted_tokens": accepted_tokens}
+ stage = RequestStage.SPEC_VERIFY
+ self.trace_slice(
+ stage, self.spec_verify_start_time, ts, {"accepted_tokens": accepted_tokens}
)
def set_spec_draft_extend_start_time(self, ts=None):
if ts is None:
ts = time.perf_counter()
- self.trace_ctx.trace_slice_start(
- "spec_draft_extend", 3, convert_time_to_realtime_ns(ts)
- )
+ self.spec_draft_extend_start_time = ts
def set_spec_draft_extend_end_time(self, ts=None):
if ts is None:
ts = time.perf_counter()
- self.trace_ctx.trace_slice_end(
- "spec_draft_extend", 3, convert_time_to_realtime_ns(ts)
- )
+ stage = RequestStage.SPEC_DRAFT_EXTEND
+ self.trace_slice(stage, self.spec_draft_extend_start_time, ts)
def set_retract_time(self, ts=None):
if ts is None:
diff --git a/python/sglang/srt/speculative/eagle_worker.py b/python/sglang/srt/speculative/eagle_worker.py
index 04f9b498f..333ed0038 100644
--- a/python/sglang/srt/speculative/eagle_worker.py
+++ b/python/sglang/srt/speculative/eagle_worker.py
@@ -4,9 +4,6 @@ from typing import List, Optional, Tuple
import torch
-from sglang.srt.observability.req_time_stats import set_time_batch
-from sglang.srt.observability.trace import get_global_tracing_enabled
-
from sglang.srt.distributed import get_tp_group
from sglang.srt.hardware_backend.npu.graph_runner.eagle_draft_npu_graph_runner import (
EAGLEDraftNpuGraphRunner,
@@ -32,6 +29,8 @@ from sglang.srt.model_executor.forward_batch_info import (
ForwardBatch,
ForwardMode,
)
+from sglang.srt.observability.req_time_stats import set_time_batch
+from sglang.srt.observability.trace import get_global_tracing_enabled
from sglang.srt.server_args import ServerArgs
from sglang.srt.speculative.draft_utils import DraftBackendFactory
from sglang.srt.speculative.eagle_draft_cuda_graph_runner import (
@@ -315,34 +314,24 @@ class EAGLEWorker(TpModelWorker):
can_run_cuda_graph=can_run_cuda_graph,
)
else:
- set_time_batch(
- batch.reqs, "set_spec_draft_start_time", trace_only=True
- )
+ set_time_batch(batch.reqs, "set_spec_draft_start_time", trace_only=True)
with self.draft_tp_context(
self.draft_model_runner.tp_group
), speculative_moe_backend_context(), speculative_moe_a2a_backend_context():
spec_info = self.draft(batch)
- set_time_batch(
- batch.reqs, "set_spec_draft_end_time", trace_only=True
- )
-
- set_time_batch(
- batch.reqs, "set_spec_verify_start_time", trace_only=True
- )
+ set_time_batch(batch.reqs, "set_spec_draft_end_time", trace_only=True)
+ set_time_batch(batch.reqs, "set_spec_verify_start_time", trace_only=True)
logits_output, verify_output, model_worker_batch, can_run_cuda_graph = (
self.verify(batch, spec_info)
)
if get_global_tracing_enabled():
- ts = time.perf_counter()
for idx, req in enumerate(batch.reqs):
accepted = verify_output.accept_length_per_req_cpu[idx]
- req.time_stats.set_spec_verify_end_time(
- ts, accepted_tokens=accepted
- )
+ req.time_stats.set_spec_verify_end_time(accepted_tokens=accepted)
set_time_batch(
batch.reqs, "set_spec_draft_extend_start_time", trace_only=True
diff --git a/python/sglang/srt/speculative/ngram_worker.py b/python/sglang/srt/speculative/ngram_worker.py
index d1c6d7b1a..d27d800b3 100644
--- a/python/sglang/srt/speculative/ngram_worker.py
+++ b/python/sglang/srt/speculative/ngram_worker.py
@@ -1,20 +1,17 @@
import logging
-import time
from typing import List, Optional
import numpy as np
import torch
from sgl_kernel.speculative import reconstruct_indices_from_tree_mask
-from sglang.srt.observability.req_time_stats import set_time_batch
-from sglang.srt.observability.trace import get_global_tracing_enabled
-
-
from sglang.srt.layers.utils.logprob import add_output_logprobs_for_spec_v1
from sglang.srt.managers.schedule_batch import ScheduleBatch
from sglang.srt.managers.scheduler import GenerationBatchResult
from sglang.srt.managers.tp_worker import TpModelWorker
from sglang.srt.model_executor.forward_batch_info import ForwardMode
+from sglang.srt.observability.req_time_stats import set_time_batch
+from sglang.srt.observability.trace import get_global_tracing_enabled
from sglang.srt.server_args import ServerArgs
from sglang.srt.speculative.cpp_ngram.ngram_corpus import NgramCorpus
from sglang.srt.speculative.ngram_info import NgramVerifyInput
@@ -239,9 +236,7 @@ class NGRAMWorker:
spec_info.retrive_next_token.shape
).cpu()
- set_time_batch(
- batch.reqs, "set_spec_verify_start_time", trace_only=True
- )
+ set_time_batch(batch.reqs, "set_spec_verify_start_time", trace_only=True)
batch_result = self.target_worker.forward_batch_generation(
model_worker_batch, is_verify=True
@@ -277,16 +272,13 @@ class NGRAMWorker:
)
if get_global_tracing_enabled():
- ts = time.perf_counter()
for idx, req in enumerate(batch.reqs):
accepted = (
verify_input.accept_length[idx].item()
if verify_input.accept_length is not None
else 0
)
- req.time_stats.set_spec_verify_end_time(
- ts, accepted_tokens=accepted
- )
+ req.time_stats.set_spec_verify_end_time(accepted_tokens=accepted)
# Store accept_lens for per-request metrics
accept_lens = verify_input.accept_length
--
2.43.0
|
Add trace spans for the speculative decoding pipeline in both EAGLE and NGRAM workers, covering the draft, verify, and accept phases. EAGLE worker (eagle_worker.py): - spec_draft (level 2): wraps the draft() call - spec_verify (level 2): wraps the verify() call - spec_accept (level 2 event): records accepted_tokens per request - spec_draft_extend (level 3): wraps forward_draft_extend_after_decode() NGRAM worker (ngram_worker.py): - spec_draft (level 2): wraps _prepare_for_speculative_decoding() - spec_verify (level 2): wraps the target verify forward + verify() - spec_accept (level 2 event): records accepted_tokens per request All tracing is gated behind get_global_tracing_enabled() to avoid any overhead when tracing is disabled. Uses the existing TraceReqContext / TraceNullContext pattern from req_time_stats.py. Part of the tracing roadmap: sgl-project#13511
0213d17 to
d528560
Compare
| def set_spec_verify_end_time(self, ts=None, accepted_tokens: int = 0): | ||
| if ts is None: | ||
| ts = time.perf_counter() | ||
| realtime_ns = convert_time_to_realtime_ns(ts) |
There was a problem hiding this comment.
Sorry, this deprecated line needs to be removed; otherwise, it may fail the lint check.
d528560 to
4ba35a2
Compare
|
/tag-and-rerun-ci |
Motivation
This PR adds OpenTelemetry tracing support for the speculative decoding pipeline, as part of the tracing roadmap (#13511).
Currently, the speculative decoding phases (draft, verify, accept) are not instrumented, making it difficult to diagnose performance bottlenecks in the speculative decoding pipeline. This PR fills that gap.
Changes
EAGLE Worker (
eagle_worker.py)spec_draft(level 2): trace span wrapping thedraft()callspec_verify(level 2): trace span wrapping theverify()callspec_accept(level 2 event): recordsaccepted_tokensper request after verificationspec_draft_extend(level 3): trace span wrappingforward_draft_extend_after_decode()NGRAM Worker (
ngram_worker.py)spec_draft(level 2): trace span wrapping_prepare_for_speculative_decoding()spec_verify(level 2): trace span wrapping the target verify forward +verify()spec_accept(level 2 event): recordsaccepted_tokensper request after verificationDesign Decisions
get_global_tracing_enabled()to avoid any overhead when tracing is disabledTraceReqContext/TraceNullContextpattern fromreq_time_stats.pybatch.reqsto emit spans for each request individuallyconvert_time_to_realtime_ns(time.perf_counter())consistent with existing tracing code