Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions components/src/dynamo/common/utils/prometheus.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
from functools import lru_cache
from typing import TYPE_CHECKING, Optional, Pattern

from prometheus_client import generate_latest

from dynamo._core import Endpoint

# Import CollectorRegistry only for type hints to avoid importing prometheus_client at module load time.
Expand Down Expand Up @@ -119,6 +117,11 @@ def get_prometheus_expfmt(
Collects all metrics from the registry and returns them in Prometheus text exposition format.
Optionally filters metrics by prefix, excludes certain prefixes, and adds a prefix.

IMPORTANT: prometheus_client is imported lazily here because it must be imported AFTER
set_prometheus_multiproc_dir() is called by SGLang's engine initialization. Importing
at module level causes prometheus_client to initialize in single-process mode before
PROMETHEUS_MULTIPROC_DIR is set, which breaks TokenizerMetricsCollector metrics.

Args:
registry: Prometheus registry to collect from.
Pass CollectorRegistry with MultiProcessCollector for SGLang.
Expand All @@ -138,6 +141,8 @@ def get_prometheus_expfmt(
# Filter out python_/process_ metrics and add trtllm_ prefix
get_prometheus_expfmt(registry, exclude_prefixes=["python_", "process_"], add_prefix="trtllm_")
"""
from prometheus_client import generate_latest

try:
# Generate metrics in Prometheus text format
metrics_text = generate_latest(registry).decode("utf-8")
Expand Down
15 changes: 12 additions & 3 deletions components/src/dynamo/sglang/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@
import asyncio
import json
import logging
from typing import List, Optional, Tuple
from typing import TYPE_CHECKING, List, Optional, Tuple

import sglang as sgl
import zmq
import zmq.asyncio
from prometheus_client import CollectorRegistry, multiprocess
from sglang.srt.utils import get_local_ip_auto, get_zmq_socket, maybe_wrap_ipv6_address

if TYPE_CHECKING:
from prometheus_client import CollectorRegistry

from dynamo.common.utils.prometheus import register_engine_metrics_callback
from dynamo.llm import (
ForwardPassMetrics,
Expand Down Expand Up @@ -224,21 +226,28 @@ def _record_values(

def setup_prometheus_registry(
engine: sgl.Engine, generate_endpoint: Endpoint
) -> CollectorRegistry:
) -> "CollectorRegistry":
"""Set up Prometheus registry for SGLang metrics collection.

SGLang uses multiprocess architecture where metrics are stored in shared memory.
MultiProcessCollector aggregates metrics from all worker processes. The Prometheus
registry collects sglang:* metrics which are exposed via the metrics server endpoint
(set DYN_SYSTEM_PORT to a positive value to enable, e.g., DYN_SYSTEM_PORT=8081).

IMPORTANT: prometheus_client must be imported AFTER sgl.Engine() has called
set_prometheus_multiproc_dir(). Importing at module level causes prometheus_client
to initialize in single-process mode before PROMETHEUS_MULTIPROC_DIR is set,
which breaks TokenizerMetricsCollector metrics (TTFT, ITL, e2e latency, etc.).

Args:
engine: The SGLang engine instance.
generate_endpoint: The Dynamo endpoint for generation requests.

Returns:
Configured CollectorRegistry with multiprocess support.
"""
from prometheus_client import CollectorRegistry, multiprocess

registry = CollectorRegistry()
multiprocess.MultiProcessCollector(registry)
register_engine_metrics_callback(
Expand Down
35 changes: 7 additions & 28 deletions components/src/dynamo/sglang/request_handlers/handler_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
# SPDX-License-Identifier: Apache-2.0

import asyncio
import base64
import json
import logging
import random
import socket
Expand All @@ -12,7 +10,6 @@
from typing import Any, AsyncGenerator, Dict, Optional, Tuple

import sglang as sgl
from sglang.srt.tracing import trace as sglang_trace
from sglang.srt.utils import get_local_ip_auto

from dynamo._core import Component, Context
Expand Down Expand Up @@ -143,38 +140,20 @@ def _get_bootstrap_info(engine: sgl.Engine) -> Tuple[str, int]:

return bootstrap_host, bootstrap_port

def _propagate_trace_context_to_sglang(
self, context: Context, bootstrap_room: int = 0
):
"""Propagate Dynamo's trace context to SGLang for distributed tracing. SGLang expects a certain
format derived by loooking at https://github.com/sgl-project/sglang/blob/main/python/sglang/srt/tracing/trace.py
in the to_dict() method.
def _get_trace_header(self, context: Context) -> Optional[Dict[str, str]]:
"""Get trace header dict for passing to SGLang's external_trace_header parameter.

Args:
context: Dynamo Context object containing trace information.
bootstrap_room: Bootstrap room ID (0 for aggregated, actual room for disaggregated).

Returns:
Dict with traceparent header if trace context available, None otherwise.
"""
trace_id = context.trace_id
span_id = context.span_id
if not trace_id or not span_id:
return

# Build trace context for SGLang
trace_context = {
str(bootstrap_room): {
"root_span": {"traceparent": f"00-{trace_id}-{span_id}-01"},
"prev_span": {
"span_id": int(span_id, 16),
"trace_id": int(trace_id, 16),
},
}
}

# Encode and propagate
base64_context = base64.b64encode(
json.dumps(trace_context, ensure_ascii=False).encode("utf-8")
).decode("utf-8")
sglang_trace.trace_set_remote_propagate_context(base64_context)
return None
return {"traceparent": f"00-{trace_id}-{span_id}-01"}

async def _handle_cancellation(
self, request_id_future: asyncio.Future, context: Context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,7 @@ async def generate(
f"room={bootstrap_info['bootstrap_room']}"
)

if self.enable_trace:
self._propagate_trace_context_to_sglang(
context, bootstrap_info["bootstrap_room"]
)
trace_header = self._get_trace_header(context) if self.enable_trace else None

decode = await self.engine.async_generate(
**input_param,
Expand All @@ -131,6 +128,7 @@ async def generate(
bootstrap_host=bootstrap_info["bootstrap_host"],
bootstrap_port=bootstrap_info["bootstrap_port"],
bootstrap_room=bootstrap_info["bootstrap_room"],
external_trace_header=trace_header,
rid=trace_id,
)

Expand All @@ -141,13 +139,13 @@ async def generate(
async for out in self._process_text_stream(decode, context):
yield out
else:
if self.enable_trace:
self._propagate_trace_context_to_sglang(context)
trace_header = self._get_trace_header(context) if self.enable_trace else None

agg = await self.engine.async_generate(
**input_param,
sampling_params=sampling_params,
stream=True,
external_trace_header=trace_header,
rid=trace_id,
)
if self.skip_tokenizer_init:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,7 @@ async def generate(

input_param = self._get_input_param(inner_request)

# Propagate trace context to SGLang
if self.enable_trace:
self._propagate_trace_context_to_sglang(context, bootstrap_room)
trace_header = self._get_trace_header(context) if self.enable_trace else None

results = await self.engine.async_generate(
**input_param,
Expand All @@ -124,6 +122,7 @@ async def generate(
bootstrap_host=self.bootstrap_host,
bootstrap_port=self.bootstrap_port,
bootstrap_room=bootstrap_room,
external_trace_header=trace_header,
rid=trace_id,
)

Expand Down
131 changes: 130 additions & 1 deletion lib/runtime/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,8 @@ pub fn make_request_span<B>(req: &Request<B>) -> Span {
let version = format!("{:?}", req.version());
let trace_parent = TraceParent::from_headers(req.headers());

let otel_context = extract_otel_context_from_http_headers(req.headers());

let span = tracing::info_span!(
"http-request",
method = %method,
Expand All @@ -286,12 +288,51 @@ pub fn make_request_span<B>(req: &Request<B>) -> Span {
trace_id = trace_parent.trace_id,
parent_id = trace_parent.parent_id,
x_request_id = trace_parent.x_request_id,
x_dynamo_request_id = trace_parent.x_dynamo_request_id,
x_dynamo_request_id = trace_parent.x_dynamo_request_id,
);

if let Some(context) = otel_context {
let _ = span.set_parent(context);
}

span
}

/// Extract OpenTelemetry context from HTTP headers for distributed tracing
fn extract_otel_context_from_http_headers(headers: &http::HeaderMap) -> Option<opentelemetry::Context> {
let traceparent_value = headers.get("traceparent")?.to_str().ok()?;

struct HttpHeaderExtractor<'a>(&'a http::HeaderMap);

impl<'a> Extractor for HttpHeaderExtractor<'a> {
fn get(&self, key: &str) -> Option<&str> {
self.0.get(key).and_then(|v| v.to_str().ok())
}

fn keys(&self) -> Vec<&str> {
vec!["traceparent", "tracestate"]
.into_iter()
.filter(|&key| self.0.get(key).is_some())
.collect()
}
}

// Early return if traceparent is empty
if traceparent_value.is_empty() {
return None;
}

let extractor = HttpHeaderExtractor(headers);
let propagator = opentelemetry_sdk::propagation::TraceContextPropagator::new();
let otel_context = propagator.extract(&extractor);

if otel_context.span().span_context().is_valid() {
Some(otel_context)
} else {
None
}
}

/// Create a handle_payload span from NATS headers with component context
pub fn make_handle_payload_span(
headers: &async_nats::HeaderMap,
Expand Down Expand Up @@ -335,6 +376,94 @@ pub fn make_handle_payload_span(
}
}

/// Create a handle_payload span from TCP/HashMap headers with component context
pub fn make_handle_payload_span_from_tcp_headers(
headers: &std::collections::HashMap<String, String>,
component: &str,
endpoint: &str,
namespace: &str,
instance_id: u64,
) -> Span {
let (otel_context, trace_id, parent_span_id) = extract_otel_context_from_tcp_headers(headers);
let x_request_id = headers.get("x-request-id").cloned();
let x_dynamo_request_id = headers.get("x-dynamo-request-id").cloned();
let tracestate = headers.get("tracestate").cloned();

if let (Some(trace_id), Some(parent_id)) = (trace_id.as_ref(), parent_span_id.as_ref()) {
let span = tracing::info_span!(
"handle_payload",
trace_id = trace_id.as_str(),
parent_id = parent_id.as_str(),
x_request_id = x_request_id,
x_dynamo_request_id = x_dynamo_request_id,
tracestate = tracestate,
component = component,
endpoint = endpoint,
namespace = namespace,
instance_id = instance_id,
);

if let Some(context) = otel_context {
let _ = span.set_parent(context);
}
span
} else {
tracing::info_span!(
"handle_payload",
x_request_id = x_request_id,
x_dynamo_request_id = x_dynamo_request_id,
tracestate = tracestate,
component = component,
endpoint = endpoint,
namespace = namespace,
instance_id = instance_id,
)
}
}

/// Extract OpenTelemetry trace context from TCP/HashMap headers for distributed tracing
fn extract_otel_context_from_tcp_headers(
headers: &std::collections::HashMap<String, String>,
) -> (
Option<opentelemetry::Context>,
Option<String>,
Option<String>,
) {
let traceparent_value = match headers.get("traceparent") {
Some(value) => value.as_str(),
None => return (None, None, None),
};

let (trace_id, parent_span_id) = parse_traceparent(traceparent_value);

struct TcpHeaderExtractor<'a>(&'a std::collections::HashMap<String, String>);

impl<'a> Extractor for TcpHeaderExtractor<'a> {
fn get(&self, key: &str) -> Option<&str> {
self.0.get(key).map(|s| s.as_str())
}

fn keys(&self) -> Vec<&str> {
vec!["traceparent", "tracestate"]
.into_iter()
.filter(|&key| self.0.get(key).is_some())
.collect()
}
}

let extractor = TcpHeaderExtractor(headers);
let propagator = opentelemetry_sdk::propagation::TraceContextPropagator::new();
let otel_context = propagator.extract(&extractor);

let context_with_trace = if otel_context.span().span_context().is_valid() {
Some(otel_context)
} else {
None
};

(context_with_trace, trace_id, parent_span_id)
}

/// Extract OpenTelemetry trace context from NATS headers for distributed tracing
pub fn extract_otel_context_from_nats_headers(
headers: &async_nats::HeaderMap,
Expand Down
Loading
Loading