From 0b297ae07ac8e2686f8feb4fc58de31678c78535 Mon Sep 17 00:00:00 2001 From: Ishan Dhanani Date: Wed, 31 Dec 2025 22:15:43 +0000 Subject: [PATCH 1/5] fix(sglang): expose TokenizerMetricsCollector metrics via Prometheus MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Move prometheus_client imports from module level to inside functions to ensure they occur AFTER SGLang's set_prometheus_multiproc_dir() is called. Problem: - prometheus_client was imported at module level in publisher.py and prometheus.py - This happened before sgl.Engine() called set_prometheus_multiproc_dir() - prometheus_client initialized in single-process mode, ignoring PROMETHEUS_MULTIPROC_DIR - TokenizerMetricsCollector metrics were stored in memory only, not mmap'd files - MultiProcessCollector couldn't find them when scraping /metrics Solution: - Move imports inside functions that are called after engine initialization - publisher.py: import in setup_prometheus_registry() - prometheus.py: import in get_prometheus_expfmt() Affected metrics now correctly exposed: - sglang:prompt_tokens_total - sglang:generation_tokens_total - sglang:time_to_first_token_seconds - sglang:e2e_request_latency_seconds - sglang:inter_token_latency_seconds - sglang:num_requests_total - sglang:cached_tokens_total - sglang:num_retractions 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- components/src/dynamo/common/utils/prometheus.py | 9 +++++++-- components/src/dynamo/sglang/publisher.py | 15 ++++++++++++--- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/components/src/dynamo/common/utils/prometheus.py b/components/src/dynamo/common/utils/prometheus.py index 1f02be4f47f..267dbefc9ef 100644 --- a/components/src/dynamo/common/utils/prometheus.py +++ b/components/src/dynamo/common/utils/prometheus.py @@ -16,8 +16,6 @@ from functools import lru_cache from typing import TYPE_CHECKING, Optional, Pattern -from prometheus_client import generate_latest - from dynamo._core import Endpoint # Import CollectorRegistry only for type hints to avoid importing prometheus_client at module load time. @@ -119,6 +117,11 @@ def get_prometheus_expfmt( Collects all metrics from the registry and returns them in Prometheus text exposition format. Optionally filters metrics by prefix, excludes certain prefixes, and adds a prefix. + IMPORTANT: prometheus_client is imported lazily here because it must be imported AFTER + set_prometheus_multiproc_dir() is called by SGLang's engine initialization. Importing + at module level causes prometheus_client to initialize in single-process mode before + PROMETHEUS_MULTIPROC_DIR is set, which breaks TokenizerMetricsCollector metrics. + Args: registry: Prometheus registry to collect from. Pass CollectorRegistry with MultiProcessCollector for SGLang. @@ -138,6 +141,8 @@ def get_prometheus_expfmt( # Filter out python_/process_ metrics and add trtllm_ prefix get_prometheus_expfmt(registry, exclude_prefixes=["python_", "process_"], add_prefix="trtllm_") """ + from prometheus_client import generate_latest + try: # Generate metrics in Prometheus text format metrics_text = generate_latest(registry).decode("utf-8") diff --git a/components/src/dynamo/sglang/publisher.py b/components/src/dynamo/sglang/publisher.py index b07a69ad1d9..383a397d7a0 100644 --- a/components/src/dynamo/sglang/publisher.py +++ b/components/src/dynamo/sglang/publisher.py @@ -4,14 +4,16 @@ import asyncio import json import logging -from typing import List, Optional, Tuple +from typing import TYPE_CHECKING, List, Optional, Tuple import sglang as sgl import zmq import zmq.asyncio -from prometheus_client import CollectorRegistry, multiprocess from sglang.srt.utils import get_local_ip_auto, get_zmq_socket, maybe_wrap_ipv6_address +if TYPE_CHECKING: + from prometheus_client import CollectorRegistry + from dynamo.common.utils.prometheus import register_engine_metrics_callback from dynamo.llm import ( ForwardPassMetrics, @@ -224,7 +226,7 @@ def _record_values( def setup_prometheus_registry( engine: sgl.Engine, generate_endpoint: Endpoint -) -> CollectorRegistry: +) -> "CollectorRegistry": """Set up Prometheus registry for SGLang metrics collection. SGLang uses multiprocess architecture where metrics are stored in shared memory. @@ -232,6 +234,11 @@ def setup_prometheus_registry( registry collects sglang:* metrics which are exposed via the metrics server endpoint (set DYN_SYSTEM_PORT to a positive value to enable, e.g., DYN_SYSTEM_PORT=8081). + IMPORTANT: prometheus_client must be imported AFTER sgl.Engine() has called + set_prometheus_multiproc_dir(). Importing at module level causes prometheus_client + to initialize in single-process mode before PROMETHEUS_MULTIPROC_DIR is set, + which breaks TokenizerMetricsCollector metrics (TTFT, ITL, e2e latency, etc.). + Args: engine: The SGLang engine instance. generate_endpoint: The Dynamo endpoint for generation requests. @@ -239,6 +246,8 @@ def setup_prometheus_registry( Returns: Configured CollectorRegistry with multiprocess support. """ + from prometheus_client import CollectorRegistry, multiprocess + registry = CollectorRegistry() multiprocess.MultiProcessCollector(registry) register_engine_metrics_callback( From d484c4451a6dcdf7bf7f54d25389202defea1817 Mon Sep 17 00:00:00 2001 From: Ishan Dhanani Date: Wed, 31 Dec 2025 23:16:02 +0000 Subject: [PATCH 2/5] fix: distributed tracing propagation for TCP transport and SGLang integration MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR fixes distributed tracing by: 1. SGLang Backend: Update to use new `external_trace_header` API from SGLang PR #15814 - Replace `_propagate_trace_context_to_sglang()` with `_get_trace_header()` - Pass trace headers directly to `async_generate()` instead of using global state 2. HTTP Frontend: Fix parent context linking for incoming requests - Add `extract_otel_context_from_http_headers()` to extract OTEL context - Update `make_request_span()` to call `span.set_parent(context)` 3. TCP Transport: Fix trace header propagation (root cause of broken tracing) - Add `headers` field to `TcpRequestMessage` wire protocol - Include trace headers (traceparent, tracestate, etc.) in TCP messages - Add `make_handle_payload_span_from_tcp_headers()` for worker-side span creation The TCP transport was silently dropping all trace headers - only using `x-endpoint-path` for routing. This caused frontend and worker spans to be disconnected. NATS transport worked correctly due to native header support. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .../sglang/request_handlers/handler_base.py | 35 +---- .../request_handlers/llm/decode_handler.py | 10 +- .../request_handlers/llm/prefill_handler.py | 5 +- lib/runtime/src/logging.rs | 131 +++++++++++++++++- lib/runtime/src/pipeline/network/codec.rs | 128 ++++++++++++++++- .../src/pipeline/network/egress/tcp_client.rs | 3 +- .../network/ingress/shared_tcp_endpoint.rs | 32 +++-- 7 files changed, 290 insertions(+), 54 deletions(-) diff --git a/components/src/dynamo/sglang/request_handlers/handler_base.py b/components/src/dynamo/sglang/request_handlers/handler_base.py index a357b173894..e539d39928a 100644 --- a/components/src/dynamo/sglang/request_handlers/handler_base.py +++ b/components/src/dynamo/sglang/request_handlers/handler_base.py @@ -2,8 +2,6 @@ # SPDX-License-Identifier: Apache-2.0 import asyncio -import base64 -import json import logging import random import socket @@ -12,7 +10,6 @@ from typing import Any, AsyncGenerator, Dict, Optional, Tuple import sglang as sgl -from sglang.srt.tracing import trace as sglang_trace from sglang.srt.utils import get_local_ip_auto from dynamo._core import Component, Context @@ -143,38 +140,20 @@ def _get_bootstrap_info(engine: sgl.Engine) -> Tuple[str, int]: return bootstrap_host, bootstrap_port - def _propagate_trace_context_to_sglang( - self, context: Context, bootstrap_room: int = 0 - ): - """Propagate Dynamo's trace context to SGLang for distributed tracing. SGLang expects a certain - format derived by loooking at https://github.com/sgl-project/sglang/blob/main/python/sglang/srt/tracing/trace.py - in the to_dict() method. + def _get_trace_header(self, context: Context) -> Optional[Dict[str, str]]: + """Get trace header dict for passing to SGLang's external_trace_header parameter. Args: context: Dynamo Context object containing trace information. - bootstrap_room: Bootstrap room ID (0 for aggregated, actual room for disaggregated). + + Returns: + Dict with traceparent header if trace context available, None otherwise. """ trace_id = context.trace_id span_id = context.span_id if not trace_id or not span_id: - return - - # Build trace context for SGLang - trace_context = { - str(bootstrap_room): { - "root_span": {"traceparent": f"00-{trace_id}-{span_id}-01"}, - "prev_span": { - "span_id": int(span_id, 16), - "trace_id": int(trace_id, 16), - }, - } - } - - # Encode and propagate - base64_context = base64.b64encode( - json.dumps(trace_context, ensure_ascii=False).encode("utf-8") - ).decode("utf-8") - sglang_trace.trace_set_remote_propagate_context(base64_context) + return None + return {"traceparent": f"00-{trace_id}-{span_id}-01"} async def _handle_cancellation( self, request_id_future: asyncio.Future, context: Context diff --git a/components/src/dynamo/sglang/request_handlers/llm/decode_handler.py b/components/src/dynamo/sglang/request_handlers/llm/decode_handler.py index f10703338b8..ad98c9bfa65 100644 --- a/components/src/dynamo/sglang/request_handlers/llm/decode_handler.py +++ b/components/src/dynamo/sglang/request_handlers/llm/decode_handler.py @@ -119,10 +119,7 @@ async def generate( f"room={bootstrap_info['bootstrap_room']}" ) - if self.enable_trace: - self._propagate_trace_context_to_sglang( - context, bootstrap_info["bootstrap_room"] - ) + trace_header = self._get_trace_header(context) if self.enable_trace else None decode = await self.engine.async_generate( **input_param, @@ -131,6 +128,7 @@ async def generate( bootstrap_host=bootstrap_info["bootstrap_host"], bootstrap_port=bootstrap_info["bootstrap_port"], bootstrap_room=bootstrap_info["bootstrap_room"], + external_trace_header=trace_header, rid=trace_id, ) @@ -141,13 +139,13 @@ async def generate( async for out in self._process_text_stream(decode, context): yield out else: - if self.enable_trace: - self._propagate_trace_context_to_sglang(context) + trace_header = self._get_trace_header(context) if self.enable_trace else None agg = await self.engine.async_generate( **input_param, sampling_params=sampling_params, stream=True, + external_trace_header=trace_header, rid=trace_id, ) if self.skip_tokenizer_init: diff --git a/components/src/dynamo/sglang/request_handlers/llm/prefill_handler.py b/components/src/dynamo/sglang/request_handlers/llm/prefill_handler.py index 3979f645317..5ded2ca5cd8 100644 --- a/components/src/dynamo/sglang/request_handlers/llm/prefill_handler.py +++ b/components/src/dynamo/sglang/request_handlers/llm/prefill_handler.py @@ -113,9 +113,7 @@ async def generate( input_param = self._get_input_param(inner_request) - # Propagate trace context to SGLang - if self.enable_trace: - self._propagate_trace_context_to_sglang(context, bootstrap_room) + trace_header = self._get_trace_header(context) if self.enable_trace else None results = await self.engine.async_generate( **input_param, @@ -124,6 +122,7 @@ async def generate( bootstrap_host=self.bootstrap_host, bootstrap_port=self.bootstrap_port, bootstrap_room=bootstrap_room, + external_trace_header=trace_header, rid=trace_id, ) diff --git a/lib/runtime/src/logging.rs b/lib/runtime/src/logging.rs index 8127b4d6354..7408b38e4a4 100644 --- a/lib/runtime/src/logging.rs +++ b/lib/runtime/src/logging.rs @@ -278,6 +278,8 @@ pub fn make_request_span(req: &Request) -> Span { let version = format!("{:?}", req.version()); let trace_parent = TraceParent::from_headers(req.headers()); + let otel_context = extract_otel_context_from_http_headers(req.headers()); + let span = tracing::info_span!( "http-request", method = %method, @@ -286,12 +288,51 @@ pub fn make_request_span(req: &Request) -> Span { trace_id = trace_parent.trace_id, parent_id = trace_parent.parent_id, x_request_id = trace_parent.x_request_id, - x_dynamo_request_id = trace_parent.x_dynamo_request_id, + x_dynamo_request_id = trace_parent.x_dynamo_request_id, ); + if let Some(context) = otel_context { + let _ = span.set_parent(context); + } + span } +/// Extract OpenTelemetry context from HTTP headers for distributed tracing +fn extract_otel_context_from_http_headers(headers: &http::HeaderMap) -> Option { + let traceparent_value = headers.get("traceparent")?.to_str().ok()?; + + struct HttpHeaderExtractor<'a>(&'a http::HeaderMap); + + impl<'a> Extractor for HttpHeaderExtractor<'a> { + fn get(&self, key: &str) -> Option<&str> { + self.0.get(key).and_then(|v| v.to_str().ok()) + } + + fn keys(&self) -> Vec<&str> { + vec!["traceparent", "tracestate"] + .into_iter() + .filter(|&key| self.0.get(key).is_some()) + .collect() + } + } + + // Early return if traceparent is empty + if traceparent_value.is_empty() { + return None; + } + + let extractor = HttpHeaderExtractor(headers); + let propagator = opentelemetry_sdk::propagation::TraceContextPropagator::new(); + let otel_context = propagator.extract(&extractor); + + if otel_context.span().span_context().is_valid() { + Some(otel_context) + } else { + None + } +} + /// Create a handle_payload span from NATS headers with component context pub fn make_handle_payload_span( headers: &async_nats::HeaderMap, @@ -335,6 +376,94 @@ pub fn make_handle_payload_span( } } +/// Create a handle_payload span from TCP/HashMap headers with component context +pub fn make_handle_payload_span_from_tcp_headers( + headers: &std::collections::HashMap, + component: &str, + endpoint: &str, + namespace: &str, + instance_id: u64, +) -> Span { + let (otel_context, trace_id, parent_span_id) = extract_otel_context_from_tcp_headers(headers); + let x_request_id = headers.get("x-request-id").cloned(); + let x_dynamo_request_id = headers.get("x-dynamo-request-id").cloned(); + let tracestate = headers.get("tracestate").cloned(); + + if let (Some(trace_id), Some(parent_id)) = (trace_id.as_ref(), parent_span_id.as_ref()) { + let span = tracing::info_span!( + "handle_payload", + trace_id = trace_id.as_str(), + parent_id = parent_id.as_str(), + x_request_id = x_request_id, + x_dynamo_request_id = x_dynamo_request_id, + tracestate = tracestate, + component = component, + endpoint = endpoint, + namespace = namespace, + instance_id = instance_id, + ); + + if let Some(context) = otel_context { + let _ = span.set_parent(context); + } + span + } else { + tracing::info_span!( + "handle_payload", + x_request_id = x_request_id, + x_dynamo_request_id = x_dynamo_request_id, + tracestate = tracestate, + component = component, + endpoint = endpoint, + namespace = namespace, + instance_id = instance_id, + ) + } +} + +/// Extract OpenTelemetry trace context from TCP/HashMap headers for distributed tracing +fn extract_otel_context_from_tcp_headers( + headers: &std::collections::HashMap, +) -> ( + Option, + Option, + Option, +) { + let traceparent_value = match headers.get("traceparent") { + Some(value) => value.as_str(), + None => return (None, None, None), + }; + + let (trace_id, parent_span_id) = parse_traceparent(traceparent_value); + + struct TcpHeaderExtractor<'a>(&'a std::collections::HashMap); + + impl<'a> Extractor for TcpHeaderExtractor<'a> { + fn get(&self, key: &str) -> Option<&str> { + self.0.get(key).map(|s| s.as_str()) + } + + fn keys(&self) -> Vec<&str> { + vec!["traceparent", "tracestate"] + .into_iter() + .filter(|&key| self.0.get(key).is_some()) + .collect() + } + } + + let extractor = TcpHeaderExtractor(headers); + let propagator = opentelemetry_sdk::propagation::TraceContextPropagator::new(); + let otel_context = propagator.extract(&extractor); + + let context_with_trace = if otel_context.span().span_context().is_valid() { + Some(otel_context) + } else { + None + }; + + (context_with_trace, trace_id, parent_span_id) +} + /// Extract OpenTelemetry trace context from NATS headers for distributed tracing pub fn extract_otel_context_from_nats_headers( headers: &async_nats::HeaderMap, diff --git a/lib/runtime/src/pipeline/network/codec.rs b/lib/runtime/src/pipeline/network/codec.rs index bded78f41a7..fb179e25b65 100644 --- a/lib/runtime/src/pipeline/network/codec.rs +++ b/lib/runtime/src/pipeline/network/codec.rs @@ -18,16 +18,19 @@ mod two_part; pub use two_part::{TwoPartCodec, TwoPartMessage, TwoPartMessageType}; -/// TCP request plane protocol message with endpoint routing +/// TCP request plane protocol message with endpoint routing and trace headers /// /// Wire format: /// - endpoint_path_len: u16 (big-endian) /// - endpoint_path: UTF-8 string +/// - headers_len: u16 (big-endian) +/// - headers: JSON-encoded HashMap /// - payload_len: u32 (big-endian) /// - payload: bytes #[derive(Debug, Clone, PartialEq, Eq)] pub struct TcpRequestMessage { pub endpoint_path: String, + pub headers: std::collections::HashMap, pub payload: Bytes, } @@ -35,6 +38,19 @@ impl TcpRequestMessage { pub fn new(endpoint_path: String, payload: Bytes) -> Self { Self { endpoint_path, + headers: std::collections::HashMap::new(), + payload, + } + } + + pub fn with_headers( + endpoint_path: String, + headers: std::collections::HashMap, + payload: Bytes, + ) -> Self { + Self { + endpoint_path, + headers, payload, } } @@ -51,6 +67,22 @@ impl TcpRequestMessage { )); } + // Encode headers as JSON + let headers_json = serde_json::to_vec(&self.headers).map_err(|e| { + std::io::Error::new( + std::io::ErrorKind::InvalidInput, + format!("Failed to encode headers: {}", e), + ) + })?; + let headers_len = headers_json.len(); + + if headers_len > u16::MAX as usize { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + format!("Headers too large: {} bytes", headers_len), + )); + } + if self.payload.len() > u32::MAX as usize { return Err(std::io::Error::new( std::io::ErrorKind::InvalidInput, @@ -59,7 +91,8 @@ impl TcpRequestMessage { } // Use BytesMut for efficient buffer building - let mut buf = BytesMut::with_capacity(2 + endpoint_len + 4 + self.payload.len()); + let mut buf = + BytesMut::with_capacity(2 + endpoint_len + 2 + headers_len + 4 + self.payload.len()); // Write endpoint path length (2 bytes) buf.put_u16(endpoint_len as u16); @@ -67,6 +100,12 @@ impl TcpRequestMessage { // Write endpoint path buf.put_slice(endpoint_bytes); + // Write headers length (2 bytes) + buf.put_u16(headers_len as u16); + + // Write headers + buf.put_slice(&headers_json); + // Write payload length (4 bytes) buf.put_u32(self.payload.len() as u32); @@ -102,11 +141,39 @@ impl TcpRequestMessage { .map_err(|e| { std::io::Error::new( std::io::ErrorKind::InvalidData, - format!("Invalid UTF-8: {}", e), + format!("Invalid UTF-8 in endpoint path: {}", e), ) })?; offset += endpoint_len; + if bytes.len() < offset + 2 { + return Err(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "Not enough bytes for headers length", + )); + } + + // Read headers length (2 bytes) + let headers_len = u16::from_be_bytes([bytes[offset], bytes[offset + 1]]) as usize; + offset += 2; + + if bytes.len() < offset + headers_len { + return Err(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "Not enough bytes for headers", + )); + } + + // Read and parse headers + let headers: std::collections::HashMap = + serde_json::from_slice(&bytes[offset..offset + headers_len]).map_err(|e| { + std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!("Invalid JSON in headers: {}", e), + ) + })?; + offset += headers_len; + if bytes.len() < offset + 4 { return Err(std::io::Error::new( std::io::ErrorKind::UnexpectedEof, @@ -139,6 +206,7 @@ impl TcpRequestMessage { Ok(Self { endpoint_path, + headers, payload, }) } @@ -169,14 +237,24 @@ impl Decoder for TcpRequestCodec { // Peek at endpoint path length without consuming let endpoint_len = u16::from_be_bytes([src[0], src[1]]) as usize; - let header_size = 2 + endpoint_len + 4; // path_len + path + payload_len + // Need path + headers_len + if src.len() < 2 + endpoint_len + 2 { + return Ok(None); + } + + // Peek at headers length + let headers_len_offset = 2 + endpoint_len; + let headers_len = u16::from_be_bytes([src[headers_len_offset], src[headers_len_offset + 1]]) as usize; + + // Need path + headers + payload_len + let header_size = 2 + endpoint_len + 2 + headers_len + 4; if src.len() < header_size { return Ok(None); } // Peek at payload length - let payload_len_offset = 2 + endpoint_len; + let payload_len_offset = 2 + endpoint_len + 2 + headers_len; let payload_len = u32::from_be_bytes([ src[payload_len_offset], src[payload_len_offset + 1], @@ -204,7 +282,7 @@ impl Decoder for TcpRequestCodec { return Ok(None); } - // We have a complete message, advance past length prefix + // We have a complete message, advance past endpoint path length prefix src.advance(2); // Read endpoint path @@ -216,6 +294,19 @@ impl Decoder for TcpRequestCodec { ) })?; + // Advance past headers length + src.advance(2); + + // Read and parse headers + let headers_bytes = src.split_to(headers_len); + let headers: std::collections::HashMap = + serde_json::from_slice(&headers_bytes).map_err(|e| { + std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!("Invalid JSON in headers: {}", e), + ) + })?; + // Advance past payload length src.advance(4); @@ -224,6 +315,7 @@ impl Decoder for TcpRequestCodec { Ok(Some(TcpRequestMessage { endpoint_path, + headers, payload, })) } @@ -243,6 +335,22 @@ impl Encoder for TcpRequestCodec { )); } + // Encode headers as JSON + let headers_json = serde_json::to_vec(&item.headers).map_err(|e| { + std::io::Error::new( + std::io::ErrorKind::InvalidInput, + format!("Failed to encode headers: {}", e), + ) + })?; + let headers_len = headers_json.len(); + + if headers_len > u16::MAX as usize { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + format!("Headers too large: {} bytes", headers_len), + )); + } + if item.payload.len() > u32::MAX as usize { return Err(std::io::Error::new( std::io::ErrorKind::InvalidInput, @@ -250,7 +358,7 @@ impl Encoder for TcpRequestCodec { )); } - let total_len = 2 + endpoint_len + 4 + item.payload.len(); + let total_len = 2 + endpoint_len + 2 + headers_len + 4 + item.payload.len(); // Check max message size if let Some(max_size) = self.max_message_size @@ -274,6 +382,12 @@ impl Encoder for TcpRequestCodec { // Write endpoint path dst.put_slice(endpoint_bytes); + // Write headers length + dst.put_u16(headers_len as u16); + + // Write headers + dst.put_slice(&headers_json); + // Write payload length dst.put_u32(item.payload.len() as u32); diff --git a/lib/runtime/src/pipeline/network/egress/tcp_client.rs b/lib/runtime/src/pipeline/network/egress/tcp_client.rs index d84f8cbb634..ea4e230cf27 100644 --- a/lib/runtime/src/pipeline/network/egress/tcp_client.rs +++ b/lib/runtime/src/pipeline/network/egress/tcp_client.rs @@ -324,7 +324,8 @@ impl TcpConnection { // Encode request on caller's thread (hot path optimization) // This allows multiple concurrent callers to encode in parallel // rather than serializing through the writer task - let request_msg = TcpRequestMessage::new(endpoint_path, payload); + // Include all headers (especially trace headers) in the message + let request_msg = TcpRequestMessage::with_headers(endpoint_path, headers.clone(), payload); let encoded_data = request_msg.encode()?; // Create response channel diff --git a/lib/runtime/src/pipeline/network/ingress/shared_tcp_endpoint.rs b/lib/runtime/src/pipeline/network/ingress/shared_tcp_endpoint.rs index c7f16999bb4..564e5624e92 100644 --- a/lib/runtime/src/pipeline/network/ingress/shared_tcp_endpoint.rs +++ b/lib/runtime/src/pipeline/network/ingress/shared_tcp_endpoint.rs @@ -266,6 +266,15 @@ impl SharedTcpServer { let mut path_buf = vec![0u8; path_len]; read_half.read_exact(&mut path_buf).await?; + // Read headers length (2 bytes) + let mut headers_len_buf = [0u8; 2]; + read_half.read_exact(&mut headers_len_buf).await?; + let headers_len = u16::from_be_bytes(headers_len_buf) as usize; + + // Read headers + let mut headers_buf = vec![0u8; headers_len]; + read_half.read_exact(&mut headers_buf).await?; + // Read payload length (4 bytes) let mut len_buf = [0u8; 4]; read_half.read_exact(&mut len_buf).await?; @@ -293,9 +302,12 @@ impl SharedTcpServer { read_half.read_exact(&mut payload_buf).await?; // Reconstruct the full message buffer for decoding using BytesMut - let mut full_msg = BytesMut::with_capacity(2 + path_len + 4 + payload_len); + let mut full_msg = + BytesMut::with_capacity(2 + path_len + 2 + headers_len + 4 + payload_len); full_msg.extend_from_slice(&path_len_buf); full_msg.extend_from_slice(&path_buf); + full_msg.extend_from_slice(&headers_len_buf); + full_msg.extend_from_slice(&headers_buf); full_msg.extend_from_slice(&len_buf); full_msg.extend_from_slice(&payload_buf); @@ -316,6 +328,7 @@ impl SharedTcpServer { }; let endpoint_path = request_msg.endpoint_path; + let headers = request_msg.headers; let payload = request_msg.payload; // Look up handler (lock-free read with DashMap) @@ -361,15 +374,18 @@ impl SharedTcpServer { tokio::spawn(async move { tracing::trace!(instance_id, "handling TCP request"); + // Create span with trace context from headers + let span = crate::logging::make_handle_payload_span_from_tcp_headers( + &headers, + &component_name, + &endpoint_name, + &namespace, + instance_id, + ); + let result = service_handler .handle_payload(payload) - .instrument(tracing::info_span!( - "handle_payload", - component = component_name.as_str(), - endpoint = endpoint_name.as_str(), - namespace = namespace.as_str(), - instance_id = instance_id, - )) + .instrument(span) .await; match result { From ade6a7e3e81a54f6418fc332200a31417a2f04c5 Mon Sep 17 00:00:00 2001 From: Ishan Dhanani Date: Thu, 1 Jan 2026 00:07:28 +0000 Subject: [PATCH 3/5] fix(tracing): propagate span context in prefill background task MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When spawn_prefill_task uses tokio::spawn, the spawned task loses the current span context. This causes get_distributed_tracing_context() to return None, preventing trace headers from being injected into prefill requests. Changes: - Move tracing::Instrument import to top of file - Capture current span and use .instrument(span) to propagate trace context to the spawned task - Add prefill_routing span to track prefill routing timing - Add kv_find_best_match span to track KV worker selection time 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- lib/llm/src/kv_router/prefill_router.rs | 100 ++++++++++++++---------- 1 file changed, 57 insertions(+), 43 deletions(-) diff --git a/lib/llm/src/kv_router/prefill_router.rs b/lib/llm/src/kv_router/prefill_router.rs index 6088340fc78..cd8e54cd8e7 100644 --- a/lib/llm/src/kv_router/prefill_router.rs +++ b/lib/llm/src/kv_router/prefill_router.rs @@ -8,6 +8,7 @@ use futures::StreamExt; use rand::Rng; use tokio::sync::oneshot; use tokio_util::sync::CancellationToken; +use tracing::Instrument; use dynamo_runtime::{ component::Endpoint, @@ -264,10 +265,14 @@ impl PrefillRouter { InnerPrefillRouter::KvRouter(r) => r, _ => return None, }; - match kv_router - .chooser - .find_best_match(None, &req.token_ids, None, false) - .await + match async { + kv_router + .chooser + .find_best_match(None, &req.token_ids, None, false) + .await + } + .instrument(tracing::info_span!("kv_find_best_match")) + .await { Ok((worker, _overlap)) => (worker.worker_id, worker.dp_rank), Err(_) => return None, @@ -387,17 +392,22 @@ impl PrefillRouter { target_worker: Option, ) { let router = self.prefill_router.get().cloned(); - - tokio::spawn(async move { - match Self::execute_prefill(router, prefill_request, target_worker).await { - Ok(_) => { - tracing::debug!("Prefill background task completed"); - } - Err(e) => { - tracing::warn!("Prefill background task error: {e:?}"); + // Capture current span to propagate trace context to the spawned task + let span = tracing::Span::current(); + + tokio::spawn( + async move { + match Self::execute_prefill(router, prefill_request, target_worker).await { + Ok(_) => { + tracing::debug!("Prefill background task completed"); + } + Err(e) => { + tracing::warn!("Prefill background task error: {e:?}"); + } } } - }); + .instrument(span), + ); } /// Call the prefill router and extract structured prefill result and worker ID @@ -532,38 +542,42 @@ impl .as_ref() .and_then(|r| r.prefill_worker_id); - let prefill_result = if !is_gaie_stage1 - && let Some((worker_id, dp_rank, bootstrap_info)) = self - .build_bootstrap_info(&prefill_req, preselected_worker) - .await - { - // Bootstrap optimization path: spawn prefill in background - let routing = prefill_req.routing_mut(); - routing.prefill_worker_id = Some(worker_id); - routing.backend_instance_id = Some(worker_id); // Route prefill to the SAME worker we got bootstrap_info from - routing.dp_rank = Some(dp_rank); - prefill_req.bootstrap_info = Some(bootstrap_info.clone()); - - let prefill_context = Context::with_id(prefill_req, request_id.clone()); - engine_ctx.link_child(prefill_context.context()); - - self.spawn_prefill_task(prefill_context, Some(worker_id)); - - Ok((None, Some(worker_id), Some(bootstrap_info))) - } else { - // Original prefill path: wait for prefill to complete - tracing::debug!( - is_gaie_stage1 = is_gaie_stage1, - "Using original prefill path" - ); + let prefill_result = async { + if !is_gaie_stage1 + && let Some((worker_id, dp_rank, bootstrap_info)) = self + .build_bootstrap_info(&prefill_req, preselected_worker) + .await + { + // Bootstrap optimization path: spawn prefill in background + let routing = prefill_req.routing_mut(); + routing.prefill_worker_id = Some(worker_id); + routing.backend_instance_id = Some(worker_id); // Route prefill to the SAME worker we got bootstrap_info from + routing.dp_rank = Some(dp_rank); + prefill_req.bootstrap_info = Some(bootstrap_info.clone()); + + let prefill_context = Context::with_id(prefill_req, request_id.clone()); + engine_ctx.link_child(prefill_context.context()); + + self.spawn_prefill_task(prefill_context, Some(worker_id)); + + Ok((None, Some(worker_id), Some(bootstrap_info))) + } else { + // Original prefill path: wait for prefill to complete + tracing::debug!( + is_gaie_stage1 = is_gaie_stage1, + "Using original prefill path" + ); - let prefill_context = Context::with_id(prefill_req, request_id.clone()); - engine_ctx.link_child(prefill_context.context()); + let prefill_context = Context::with_id(prefill_req, request_id.clone()); + engine_ctx.link_child(prefill_context.context()); - self.call_prefill(prefill_context) - .await - .map(|(result, worker_id)| (Some(result), worker_id, None)) - }; + self.call_prefill(prefill_context) + .await + .map(|(result, worker_id)| (Some(result), worker_id, None)) + } + } + .instrument(tracing::info_span!("prefill_routing")) + .await; // Abort if cancelled during prefill if engine_ctx.is_stopped() || engine_ctx.is_killed() { From 8b9597bf48467e410c9c71c101eac1b967fb94e4 Mon Sep 17 00:00:00 2001 From: Ishan Dhanani Date: Thu, 1 Jan 2026 02:14:29 +0000 Subject: [PATCH 4/5] go --- .../dynamo/sglang/request_handlers/llm/decode_handler.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/components/src/dynamo/sglang/request_handlers/llm/decode_handler.py b/components/src/dynamo/sglang/request_handlers/llm/decode_handler.py index ad98c9bfa65..3706ac68030 100644 --- a/components/src/dynamo/sglang/request_handlers/llm/decode_handler.py +++ b/components/src/dynamo/sglang/request_handlers/llm/decode_handler.py @@ -119,7 +119,9 @@ async def generate( f"room={bootstrap_info['bootstrap_room']}" ) - trace_header = self._get_trace_header(context) if self.enable_trace else None + trace_header = ( + self._get_trace_header(context) if self.enable_trace else None + ) decode = await self.engine.async_generate( **input_param, @@ -139,7 +141,9 @@ async def generate( async for out in self._process_text_stream(decode, context): yield out else: - trace_header = self._get_trace_header(context) if self.enable_trace else None + trace_header = ( + self._get_trace_header(context) if self.enable_trace else None + ) agg = await self.engine.async_generate( **input_param, From 9dfe7a8692481aaadc74f8431dc10813c6ad5c61 Mon Sep 17 00:00:00 2001 From: Ishan Dhanani Date: Thu, 1 Jan 2026 02:25:13 +0000 Subject: [PATCH 5/5] go --- lib/runtime/src/logging.rs | 4 +++- lib/runtime/src/pipeline/network/codec.rs | 3 ++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/lib/runtime/src/logging.rs b/lib/runtime/src/logging.rs index 7408b38e4a4..79fc656aaa1 100644 --- a/lib/runtime/src/logging.rs +++ b/lib/runtime/src/logging.rs @@ -299,7 +299,9 @@ pub fn make_request_span(req: &Request) -> Span { } /// Extract OpenTelemetry context from HTTP headers for distributed tracing -fn extract_otel_context_from_http_headers(headers: &http::HeaderMap) -> Option { +fn extract_otel_context_from_http_headers( + headers: &http::HeaderMap, +) -> Option { let traceparent_value = headers.get("traceparent")?.to_str().ok()?; struct HttpHeaderExtractor<'a>(&'a http::HeaderMap); diff --git a/lib/runtime/src/pipeline/network/codec.rs b/lib/runtime/src/pipeline/network/codec.rs index fb179e25b65..a286d9fff5d 100644 --- a/lib/runtime/src/pipeline/network/codec.rs +++ b/lib/runtime/src/pipeline/network/codec.rs @@ -245,7 +245,8 @@ impl Decoder for TcpRequestCodec { // Peek at headers length let headers_len_offset = 2 + endpoint_len; - let headers_len = u16::from_be_bytes([src[headers_len_offset], src[headers_len_offset + 1]]) as usize; + let headers_len = + u16::from_be_bytes([src[headers_len_offset], src[headers_len_offset + 1]]) as usize; // Need path + headers + payload_len let header_size = 2 + endpoint_len + 2 + headers_len + 4;