fix: distributed tracing propagation for TCP transport and SGLang integration#5122
fix: distributed tracing propagation for TCP transport and SGLang integration#5122ishandhanani wants to merge 6 commits intomainfrom
Conversation
Move prometheus_client imports from module level to inside functions to ensure they occur AFTER SGLang's set_prometheus_multiproc_dir() is called. Problem: - prometheus_client was imported at module level in publisher.py and prometheus.py - This happened before sgl.Engine() called set_prometheus_multiproc_dir() - prometheus_client initialized in single-process mode, ignoring PROMETHEUS_MULTIPROC_DIR - TokenizerMetricsCollector metrics were stored in memory only, not mmap'd files - MultiProcessCollector couldn't find them when scraping /metrics Solution: - Move imports inside functions that are called after engine initialization - publisher.py: import in setup_prometheus_registry() - prometheus.py: import in get_prometheus_expfmt() Affected metrics now correctly exposed: - sglang:prompt_tokens_total - sglang:generation_tokens_total - sglang:time_to_first_token_seconds - sglang:e2e_request_latency_seconds - sglang:inter_token_latency_seconds - sglang:num_requests_total - sglang:cached_tokens_total - sglang:num_retractions 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
…egration This PR fixes distributed tracing by: 1. SGLang Backend: Update to use new `external_trace_header` API from SGLang PR #15814 - Replace `_propagate_trace_context_to_sglang()` with `_get_trace_header()` - Pass trace headers directly to `async_generate()` instead of using global state 2. HTTP Frontend: Fix parent context linking for incoming requests - Add `extract_otel_context_from_http_headers()` to extract OTEL context - Update `make_request_span()` to call `span.set_parent(context)` 3. TCP Transport: Fix trace header propagation (root cause of broken tracing) - Add `headers` field to `TcpRequestMessage` wire protocol - Include trace headers (traceparent, tracestate, etc.) in TCP messages - Add `make_handle_payload_span_from_tcp_headers()` for worker-side span creation The TCP transport was silently dropping all trace headers - only using `x-endpoint-path` for routing. This caused frontend and worker spans to be disconnected. NATS transport worked correctly due to native header support. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
WalkthroughThis change defers prometheus_client imports after engine initialization and refactors trace context handling in SGLang request handlers from a propagation-based approach to header-based. Simultaneously, it adds OpenTelemetry context extraction from HTTP/TCP/NATS headers and extends the TCP wire protocol to carry trace headers in request messages. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
Pre-merge checks❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
lib/runtime/src/pipeline/network/egress/tcp_client.rs (1)
662-680: Tests don't account for the new headers section in the wire protocol.The mock server reads
path_len,path, then immediatelypayload_lenandpayload, but the new wire format includesheaders_lenandheadersbetween path and payload. This will cause the test to misinterpret headers data as the payload length.The same issue exists in
test_concurrent_requests_single_connection(lines 720-741) andtest_connection_pool_reuse(lines 819-834).🔎 Proposed fix for test_connection_health_check
let mut path_buf = vec![0u8; path_len]; read_half.read_exact(&mut path_buf).await.unwrap(); + // Read headers length (2 bytes) + let mut headers_len_buf = [0u8; 2]; + read_half.read_exact(&mut headers_len_buf).await.unwrap(); + let headers_len = u16::from_be_bytes(headers_len_buf) as usize; + + // Read headers + let mut headers_buf = vec![0u8; headers_len]; + read_half.read_exact(&mut headers_buf).await.unwrap(); + let mut len_buf = [0u8; 4]; read_half.read_exact(&mut len_buf).await.unwrap(); let payload_len = u32::from_be_bytes(len_buf) as usize;components/src/dynamo/sglang/request_handlers/llm/decode_handler.py (1)
119-141: Fix Black formatting to pass CI.The pipeline indicates Black formatting failed on these lines. Run
black components/src/dynamo/sglang/request_handlers/llm/decode_handler.pyto fix the formatting issues before merging.
🧹 Nitpick comments (3)
lib/runtime/src/pipeline/network/codec.rs (1)
70-107: JSON encoding for headers is acceptable but adds overhead.Using JSON for headers serialization is straightforward and flexible. For high-throughput scenarios, consider whether a more compact binary format might be beneficial in the future.
lib/runtime/src/logging.rs (1)
301-334: Consider consolidating the three Extractor implementations.
HttpHeaderExtractor,TcpHeaderExtractor, andNatsHeaderExtractorfollow nearly identical patterns. A generic extractor or shared trait implementation could reduce duplication.🔎 Potential consolidation approach
// Generic extractor that works with any header source struct GenericHeaderExtractor<F>(F) where F: Fn(&str) -> Option<&str>; impl<F> Extractor for GenericHeaderExtractor<F> where F: Fn(&str) -> Option<&str>, { fn get(&self, key: &str) -> Option<&str> { (self.0)(key) } fn keys(&self) -> Vec<&str> { vec!["traceparent", "tracestate"] .into_iter() .filter(|&key| self.get(key).is_some()) .collect() } }Also applies to: 424-465, 467-508
components/src/dynamo/sglang/request_handlers/llm/decode_handler.py (1)
122-122: Trace header integration looks correct.The trace header retrieval and passing to
engine.async_generateis correctly implemented in both disaggregated and aggregated paths, consistent with the new header-based tracing approach.💡 Optional: Consider extracting repeated trace header logic
The conditional
trace_header = self._get_trace_header(context) if self.enable_trace else Noneappears in both paths. You could optionally extract this to reduce duplication:def _get_trace_header_if_enabled(self, context: Context) -> Optional[Dict[str, str]]: """Get trace header if tracing is enabled.""" return self._get_trace_header(context) if self.enable_trace else NoneThen use:
trace_header = self._get_trace_header_if_enabled(context)However, given the simplicity and clarity of the current pattern, this may not be necessary.
Also applies to: 131-131, 142-142, 148-148
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (9)
components/src/dynamo/common/utils/prometheus.pycomponents/src/dynamo/sglang/publisher.pycomponents/src/dynamo/sglang/request_handlers/handler_base.pycomponents/src/dynamo/sglang/request_handlers/llm/decode_handler.pycomponents/src/dynamo/sglang/request_handlers/llm/prefill_handler.pylib/runtime/src/logging.rslib/runtime/src/pipeline/network/codec.rslib/runtime/src/pipeline/network/egress/tcp_client.rslib/runtime/src/pipeline/network/ingress/shared_tcp_endpoint.rs
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-09-19T07:32:44.210Z
Learnt from: ishandhanani
Repo: ai-dynamo/dynamo PR: 0
File: :0-0
Timestamp: 2025-09-19T07:32:44.210Z
Learning: The skip_tokenizer_init=True path in SGLang backend bypasses tokenization but has array slicing overhead in _process_token_stream that creates O(n) memory copying on every stream chunk, potentially causing quadratic behavior for long sequences.
Applied to files:
components/src/dynamo/sglang/request_handlers/llm/decode_handler.py
📚 Learning: 2025-09-11T03:24:47.820Z
Learnt from: kthui
Repo: ai-dynamo/dynamo PR: 3004
File: lib/runtime/src/pipeline/network/ingress/push_handler.rs:271-277
Timestamp: 2025-09-11T03:24:47.820Z
Learning: In lib/runtime/src/pipeline/network/ingress/push_handler.rs, the maintainer prefers to keep the existing error comparison logic using format!("{:?}", err) == STREAM_ERR_MSG unchanged until proper error types are implemented, even though it has technical debt. Avoid suggesting changes to working legacy code that will be refactored later.
Applied to files:
lib/runtime/src/pipeline/network/codec.rslib/runtime/src/pipeline/network/egress/tcp_client.rslib/runtime/src/pipeline/network/ingress/shared_tcp_endpoint.rs
🧬 Code graph analysis (5)
components/src/dynamo/sglang/request_handlers/llm/decode_handler.py (1)
components/src/dynamo/sglang/request_handlers/handler_base.py (1)
_get_trace_header(143-156)
lib/runtime/src/pipeline/network/egress/tcp_client.rs (1)
lib/runtime/src/pipeline/network/codec.rs (1)
with_headers(46-56)
lib/runtime/src/pipeline/network/ingress/shared_tcp_endpoint.rs (1)
lib/runtime/src/logging.rs (1)
make_handle_payload_span_from_tcp_headers(380-422)
components/src/dynamo/sglang/request_handlers/llm/prefill_handler.py (1)
components/src/dynamo/sglang/request_handlers/handler_base.py (1)
_get_trace_header(143-156)
components/src/dynamo/sglang/request_handlers/handler_base.py (1)
lib/bindings/python/src/dynamo/_core.pyi (3)
Context(275-360)trace_id(333-340)span_id(343-350)
🪛 GitHub Actions: Pre Merge Validation of (ai-dynamo/dynamo/refs/pull/5122/merge) by ishandhanani.
components/src/dynamo/sglang/request_handlers/llm/decode_handler.py
[error] 119-141: Black formatting failed: 1 file reformatted by this hook. The pre-commit run modified the file(s); please review and commit the changes. Re-run the pre-commit hook (e.g., 'pre-commit run --all-files').
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
- GitHub Check: Build and Test - dynamo
- GitHub Check: vllm (amd64)
- GitHub Check: clippy (launch/dynamo-run)
- GitHub Check: clippy (lib/runtime/examples)
- GitHub Check: clippy (.)
- GitHub Check: clippy (lib/bindings/python)
🔇 Additional comments (11)
components/src/dynamo/common/utils/prometheus.py (1)
120-145: LGTM! Lazy import pattern correctly addresses multi-process Prometheus initialization.The deferred import of
generate_latestensuresprometheus_clientinitializes afterPROMETHEUS_MULTIPROC_DIRis set, preventing metrics collection failures in SGLang's multi-process architecture.components/src/dynamo/sglang/publisher.py (1)
14-16: LGTM! Consistent lazy import pattern for prometheus_client.The
TYPE_CHECKINGguard with forward reference annotation correctly preserves type hints while deferring the runtime import until after SGLang engine initialization.Also applies to: 227-250
lib/runtime/src/pipeline/network/egress/tcp_client.rs (1)
327-329: LGTM! Headers now propagated through TCP transport.The change from
TcpRequestMessage::new()toTcpRequestMessage::with_headers()correctly includes trace headers in the encoded message, enabling distributed tracing context propagation.lib/runtime/src/pipeline/network/ingress/shared_tcp_endpoint.rs (2)
269-276: LGTM! Headers extraction from TCP wire protocol.Correctly reads the headers section (length prefix + JSON data) from the updated wire format.
377-389: LGTM! Trace-aware span creation from TCP headers.The span is now created using
make_handle_payload_span_from_tcp_headerswith headers extracted from the TCP message, enabling proper distributed trace context propagation and parent linking.lib/runtime/src/pipeline/network/codec.rs (2)
30-56: LGTM! TcpRequestMessage extended with headers support.The dual constructor pattern (
new()for backward compatibility,with_headers()for trace propagation) is clean. Empty headers map fornew()maintains compatibility with existing code.
241-264: LGTM! Decoder correctly handles the new wire format.The peeking logic properly accounts for the headers section when determining if a complete message is available, calculating
header_sizeas2 + endpoint_len + 2 + headers_len + 4.lib/runtime/src/logging.rs (2)
281-298: LGTM! HTTP request spans now properly linked to parent trace context.Extracting OTEL context from incoming HTTP headers and calling
span.set_parent(context)ensures frontend spans correctly connect to upstream traces in Tempo/Grafana.
379-422: LGTM! TCP header span creation mirrors NATS pattern.The function correctly extracts trace context from TCP headers and creates properly-linked spans for distributed tracing through the TCP transport.
components/src/dynamo/sglang/request_handlers/llm/prefill_handler.py (1)
116-116: LGTM! Clean integration with the new trace header approach.The conditional trace header retrieval and passing to
engine.async_generatecorrectly implements the new header-based distributed tracing mechanism, replacing the previous propagation approach.Also applies to: 125-125
components/src/dynamo/sglang/request_handlers/handler_base.py (1)
143-156: Add validation or document that trace_id and span_id conform to W3C traceparent format.SGLang 0.5.6.post2 does support the
external_trace_headerparameter. However, the_get_trace_header()method constructs a W3C traceparent header without validating thatcontext.trace_idis 32 lowercase hex characters andcontext.span_idis 16 lowercase hex characters as required by the W3C Trace Context specification. Either add format validation before constructing the traceparent header, or document that the Context object guarantees these values conform to the specification.
Code Duplication AnalysisReviewed the PR for potential duplication between NATS and TCP request plane implementations. Identified DuplicationThere are 3 sets of nearly identical functions in
AnalysisThe duplication exists because each header type has a different API:
RecommendationThe duplication is acceptable for now because:
If we wanted to refactor (future work)We could create a simple trait: trait HeaderAccessor {
fn get_header(&self, key: &str) -> Option<&str>;
}And implement it for each type, then have a single generic: fn extract_otel_context<H: HeaderAccessor>(headers: &H) -> (Option<Context>, Option<String>, Option<String>)But this adds complexity for ~40 lines of duplicated code that's unlikely to change frequently. VerdictThe current code is fine. The duplication is localized, the functions are small, and the pattern is clear. Refactoring would add abstraction overhead without significant benefit. If we add more transport types in the future, we should consider the trait approach. |
4bf3824 to
147988e
Compare
147988e to
e541a0c
Compare
When spawn_prefill_task uses tokio::spawn, the spawned task loses the current span context. This causes get_distributed_tracing_context() to return None, preventing trace headers from being injected into prefill requests. Changes: - Move tracing::Instrument import to top of file - Capture current span and use .instrument(span) to propagate trace context to the spawned task - Add prefill_routing span to track prefill routing timing - Add kv_find_best_match span to track KV worker selection time 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
e541a0c to
ade6a7e
Compare
|
Thanks @Swipe4057 - I'll probably make the bump to 0.5.7 on |
|
Auto-linked to DIS-1228 |
Summary
This PR fixes distributed tracing by addressing several issues:
external_trace_headerAPI from SGLang PR #15814http-requestspans are properly linked to incoming trace contextRoot Cause
The TCP transport was silently dropping all trace headers. Only
x-endpoint-pathwas being used for routing, buttraceparent,tracestate,x-request-id, etc. were never included in the wire protocol. This caused frontend and worker spans to appear disconnected in Tempo/Grafana.NATS transport worked correctly because it has native header support.
Changes
SGLang Backend (
components/src/dynamo/sglang/)_propagate_trace_context_to_sglang()with_get_trace_header()async_generate(external_trace_header=...)instead of using global stateHTTP Frontend (
lib/runtime/src/logging.rs)extract_otel_context_from_http_headers()to extract OTEL context from incoming requestsmake_request_span()to callspan.set_parent(context)for proper parent linkingTCP Transport Wire Protocol (
lib/runtime/src/pipeline/network/)headers: HashMap<String, String>field toTcpRequestMessageendpoint_path_len | endpoint_path | headers_len | headers_json | payload_len | payloadtcp_client.rsto include headers when sendingshared_tcp_endpoint.rsto extract headers and create properly linked spansmake_handle_payload_span_from_tcp_headers()helper functionTest plan
--enable-otelworksDependencies
Requires SGLang with PR #15814 merged or installed from that branch.
🤖 Generated with Claude Code
Summary by CodeRabbit
New Features
Refactor
✏️ Tip: You can customize this high-level summary in your review settings.