fix: distributed tracing propagation for TCP transport#5283
Conversation
This PR fixes distributed tracing by: 1. HTTP Frontend: Fix parent context linking for incoming requests - Add `extract_otel_context_from_http_headers()` to extract OTEL context - Update `make_request_span()` to call `span.set_parent(context)` 2. TCP Transport: Fix trace header propagation (root cause of broken tracing) - Add `headers` field to `TcpRequestMessage` wire protocol - Include trace headers (traceparent, tracestate, etc.) in TCP messages - Add `make_handle_payload_span_from_tcp_headers()` for worker-side span creation The TCP transport was silently dropping all trace headers - only using `x-endpoint-path` for routing. This caused frontend and worker spans to be disconnected. NATS transport worked correctly due to native header support.
WalkthroughAdds OpenTelemetry context extraction and propagation across HTTP, TCP, and NATS; extends the TCP wire protocol to carry JSON-encoded headers; and uses extracted context to create/attach parent spans for request handling. Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. 📜 Recent review detailsConfiguration used: Path: .coderabbit.yaml Review profile: CHILL Plan: Pro 📒 Files selected for processing (2)
🧰 Additional context used🧠 Learnings (1)📚 Learning: 2025-09-11T03:24:47.820ZApplied to files:
🧬 Code graph analysis (1)lib/runtime/src/pipeline/network/egress/tcp_client.rs (1)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
🔇 Additional comments (7)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
lib/runtime/src/pipeline/network/egress/tcp_client.rs (1)
662-675: Tests use outdated wire format - will fail with new header-aware protocol.The mock server reads
path_len → path → payload_len → payload, but the new wire format ispath_len → path → headers_len → headers → payload_len → payload. The test will readheaders_lenbytes aspayload_len, causing incorrect behavior or hangs.This affects
test_connection_health_check,test_concurrent_requests_single_connection, andtest_connection_pool_reuse.🔧 Proposed fix for test_connection_health_check mock server
// Read request let mut len_buf = [0u8; 2]; read_half.read_exact(&mut len_buf).await.unwrap(); let path_len = u16::from_be_bytes(len_buf) as usize; let mut path_buf = vec![0u8; path_len]; read_half.read_exact(&mut path_buf).await.unwrap(); + // Read headers length and headers (new wire format) + let mut headers_len_buf = [0u8; 2]; + read_half.read_exact(&mut headers_len_buf).await.unwrap(); + let headers_len = u16::from_be_bytes(headers_len_buf) as usize; + + let mut headers_buf = vec![0u8; headers_len]; + read_half.read_exact(&mut headers_buf).await.unwrap(); + let mut len_buf = [0u8; 4]; read_half.read_exact(&mut len_buf).await.unwrap(); let payload_len = u32::from_be_bytes(len_buf) as usize;
🧹 Nitpick comments (2)
lib/runtime/src/pipeline/network/codec.rs (1)
571-582: Consider adding tests for header propagation.The existing tests use
TcpRequestMessage::new()which creates empty headers. Consider adding tests that verify:
with_headers()constructor works correctly- Non-empty headers encode/decode round-trip
- Header size limit validation (> u16::MAX)
📝 Example test for headers
#[test] fn test_tcp_request_with_headers() { let mut headers = std::collections::HashMap::new(); headers.insert("traceparent".to_string(), "00-abc123-def456-01".to_string()); headers.insert("x-request-id".to_string(), "req-123".to_string()); let msg = TcpRequestMessage::with_headers( "test.endpoint".to_string(), headers, Bytes::from(vec![1, 2, 3]), ); let encoded = msg.encode().unwrap(); let decoded = TcpRequestMessage::decode(&encoded).unwrap(); assert_eq!(decoded.endpoint_path, "test.endpoint"); assert_eq!(decoded.headers.get("traceparent"), Some(&"00-abc123-def456-01".to_string())); assert_eq!(decoded.headers.get("x-request-id"), Some(&"req-123".to_string())); assert_eq!(decoded.payload, Bytes::from(vec![1, 2, 3])); }lib/runtime/src/logging.rs (1)
426-510: Consider consolidating header extractors to reduce duplication.The three extraction functions (
extract_otel_context_from_http_headers,extract_otel_context_from_tcp_headers,extract_otel_context_from_nats_headers) follow identical patterns. Consider a generic approach using the existingGenericHeaderstrait.♻️ Possible consolidation approach
fn extract_otel_context_from_headers<H: GenericHeaders>( headers: &H, ) -> (Option<opentelemetry::Context>, Option<String>, Option<String>) { let traceparent_value = match headers.get("traceparent") { Some(value) => value, None => return (None, None, None), }; let (trace_id, parent_span_id) = parse_traceparent(traceparent_value); struct GenericExtractor<'a, H>(&'a H); impl<'a, H: GenericHeaders> Extractor for GenericExtractor<'a, H> { fn get(&self, key: &str) -> Option<&str> { self.0.get(key) } fn keys(&self) -> Vec<&str> { vec!["traceparent", "tracestate"] .into_iter() .filter(|&key| self.0.get(key).is_some()) .collect() } } let extractor = GenericExtractor(headers); let propagator = opentelemetry_sdk::propagation::TraceContextPropagator::new(); let otel_context = propagator.extract(&extractor); let context_with_trace = if otel_context.span().span_context().is_valid() { Some(otel_context) } else { None }; (context_with_trace, trace_id, parent_span_id) }Note: This would require extending
GenericHeadersto supportHashMap<String, String>or creating a wrapper.
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
lib/runtime/src/logging.rslib/runtime/src/pipeline/network/codec.rslib/runtime/src/pipeline/network/egress/tcp_client.rslib/runtime/src/pipeline/network/ingress/shared_tcp_endpoint.rs
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-09-11T03:24:47.820Z
Learnt from: kthui
Repo: ai-dynamo/dynamo PR: 3004
File: lib/runtime/src/pipeline/network/ingress/push_handler.rs:271-277
Timestamp: 2025-09-11T03:24:47.820Z
Learning: In lib/runtime/src/pipeline/network/ingress/push_handler.rs, the maintainer prefers to keep the existing error comparison logic using format!("{:?}", err) == STREAM_ERR_MSG unchanged until proper error types are implemented, even though it has technical debt. Avoid suggesting changes to working legacy code that will be refactored later.
Applied to files:
lib/runtime/src/pipeline/network/egress/tcp_client.rslib/runtime/src/pipeline/network/ingress/shared_tcp_endpoint.rs
🧬 Code graph analysis (2)
lib/runtime/src/pipeline/network/egress/tcp_client.rs (1)
lib/runtime/src/pipeline/network/codec.rs (1)
with_headers(46-56)
lib/runtime/src/pipeline/network/ingress/shared_tcp_endpoint.rs (1)
lib/runtime/src/logging.rs (1)
make_handle_payload_span_from_tcp_headers(382-424)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (14)
- GitHub Check: vllm (amd64)
- GitHub Check: trtllm (arm64)
- GitHub Check: sglang (amd64)
- GitHub Check: sglang (arm64)
- GitHub Check: trtllm (amd64)
- GitHub Check: vllm (arm64)
- GitHub Check: Build and Test - dynamo
- GitHub Check: tests (.)
- GitHub Check: clippy (lib/bindings/python)
- GitHub Check: tests (lib/runtime/examples)
- GitHub Check: clippy (launch/dynamo-run)
- GitHub Check: clippy (.)
- GitHub Check: tests (lib/bindings/python)
- GitHub Check: tests (launch/dynamo-run)
🔇 Additional comments (14)
lib/runtime/src/pipeline/network/egress/tcp_client.rs (1)
327-329: LGTM! Proper header propagation for distributed tracing.The change correctly switches to
with_headersto include trace headers (traceparent, tracestate, x-request-id, etc.) in the TCP wire protocol, enabling end-to-end trace continuity across frontend and worker services.lib/runtime/src/pipeline/network/ingress/shared_tcp_endpoint.rs (3)
269-276: LGTM! Correct wire protocol extension for headers.The header reading logic correctly follows the new wire format, reading
headers_len(2 bytes) followed by the headers payload after the endpoint path.
305-312: LGTM! Correct capacity calculation and message reconstruction.The capacity calculation properly accounts for all segments including the new headers field, and the message reconstruction correctly assembles the complete buffer for decoding.
377-389: LGTM! Dynamic span creation enables distributed tracing across TCP transport.The span creation correctly extracts OTEL context from TCP headers and links it as the parent span, enabling trace continuity between frontend and worker services.
lib/runtime/src/pipeline/network/codec.rs (6)
21-35: LGTM! Clear wire format documentation and struct definition.The wire format documentation is well-structured, and the
headersfield addition withHashMap<String, String>is appropriate for propagating trace headers.
37-56: LGTM! Backward-compatible API with new header support.The
new()constructor maintains backward compatibility by initializing an empty headers map, whilewith_headers()provides explicit header propagation when needed.
70-117: LGTM! Proper header encoding with size validation.The JSON serialization approach is flexible and the
u16::MAXsize check ensures the headers fit within the wire format constraints. The capacity calculation and write order are correct.
149-212: LGTM! Robust header decoding with proper bounds checking.The decode logic correctly reads
headers_len, validates buffer bounds, and parses the JSON headers. Error messages are descriptive and offset tracking is accurate throughout.
241-322: LGTM! Codec Decoder correctly handles new wire format.The peek logic, header size calculation, and decoding flow are all correctly updated to handle the headers segment in the wire protocol.
339-400: LGTM! Codec Encoder properly handles headers.The encoder correctly serializes headers to JSON, validates size constraints, and writes them in the proper position within the wire format.
lib/runtime/src/logging.rs (4)
275-299: LGTM! HTTP request span now properly linked to parent trace context.The
make_request_spanfunction correctly extracts OTEL context from HTTP headers and sets it as the parent context for the span, enabling trace continuity for incoming HTTP requests.
301-336: LGTM! Proper OTEL context extraction from HTTP headers.The implementation correctly uses the W3C Trace Context propagator, validates the traceparent header, and ensures only valid span contexts are returned. The pattern aligns with the existing NATS header extraction.
381-424: LGTM! Proper span creation from TCP headers for distributed tracing.The function correctly mirrors the NATS
make_handle_payload_spanpattern, extracting OTEL context and creating appropriately linked spans for TCP payload handling.
426-467: LGTM! TCP header OTEL context extraction follows established patterns.The implementation correctly extracts W3C Trace Context from
HashMap<String, String>headers, returning both the OTEL context and individual trace identifiers for logging purposes.
|
@coderabbitai incremental review |
|
@jh-nv, I'll review the changes in this PR. ✅ Actions performedReview triggered.
|
Co-authored-by: Ishan Dhanani <ishandhanani@gmail.com>
Overview:
Propagate OTEL tracing context for TCP transport.
Details:
The TCP transport was silently dropping all trace headers. Only x-endpoint-path was being used for routing, but traceparent, tracestate, x-request-id, etc. were never included in the wire protocol. This caused frontend and worker spans to appear disconnected in Tempo/Grafana.
Where should the reviewer start?
Related Issues: (use one of the action keywords Closes / Fixes / Resolves / Relates to)
Summary by CodeRabbit
✏️ Tip: You can customize this high-level summary in your review settings.