Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 137 additions & 5 deletions lib/runtime/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,8 @@ pub fn make_request_span<B>(req: &Request<B>) -> Span {
let version = format!("{:?}", req.version());
let trace_parent = TraceParent::from_headers(req.headers());

let otel_context = extract_otel_context_from_http_headers(req.headers());

let span = tracing::info_span!(
"http-request",
method = %method,
Expand All @@ -286,12 +288,52 @@ pub fn make_request_span<B>(req: &Request<B>) -> Span {
trace_id = trace_parent.trace_id,
parent_id = trace_parent.parent_id,
x_request_id = trace_parent.x_request_id,
x_dynamo_request_id = trace_parent.x_dynamo_request_id,
x_dynamo_request_id = trace_parent.x_dynamo_request_id,
);

if let Some(context) = otel_context {
let _ = span.set_parent(context);
}

span
}

/// Extract OpenTelemetry context from HTTP headers for distributed tracing
fn extract_otel_context_from_http_headers(
headers: &http::HeaderMap,
) -> Option<opentelemetry::Context> {
let traceparent_value = headers.get("traceparent")?.to_str().ok()?;

struct HttpHeaderExtractor<'a>(&'a http::HeaderMap);

impl<'a> Extractor for HttpHeaderExtractor<'a> {
fn get(&self, key: &str) -> Option<&str> {
self.0.get(key).and_then(|v| v.to_str().ok())
}

fn keys(&self) -> Vec<&str> {
vec!["traceparent", "tracestate"]
.into_iter()
.filter(|&key| self.0.get(key).is_some())
.collect()
}
}

// Early return if traceparent is empty
if traceparent_value.is_empty() {
return None;
}

let extractor = HttpHeaderExtractor(headers);
let otel_context = TRACE_PROPAGATOR.extract(&extractor);

if otel_context.span().span_context().is_valid() {
Some(otel_context)
} else {
None
}
}

/// Create a handle_payload span from NATS headers with component context
pub fn make_handle_payload_span(
headers: &async_nats::HeaderMap,
Expand Down Expand Up @@ -335,6 +377,93 @@ pub fn make_handle_payload_span(
}
}

/// Create a handle_payload span from TCP/HashMap headers with component context
pub fn make_handle_payload_span_from_tcp_headers(
headers: &std::collections::HashMap<String, String>,
component: &str,
endpoint: &str,
namespace: &str,
instance_id: u64,
) -> Span {
let (otel_context, trace_id, parent_span_id) = extract_otel_context_from_tcp_headers(headers);
let x_request_id = headers.get("x-request-id").cloned();
let x_dynamo_request_id = headers.get("x-dynamo-request-id").cloned();
let tracestate = headers.get("tracestate").cloned();

if let (Some(trace_id), Some(parent_id)) = (trace_id.as_ref(), parent_span_id.as_ref()) {
let span = tracing::info_span!(
"handle_payload",
trace_id = trace_id.as_str(),
parent_id = parent_id.as_str(),
x_request_id = x_request_id,
x_dynamo_request_id = x_dynamo_request_id,
tracestate = tracestate,
component = component,
endpoint = endpoint,
namespace = namespace,
instance_id = instance_id,
);

if let Some(context) = otel_context {
let _ = span.set_parent(context);
}
span
} else {
tracing::info_span!(
"handle_payload",
x_request_id = x_request_id,
x_dynamo_request_id = x_dynamo_request_id,
tracestate = tracestate,
component = component,
endpoint = endpoint,
namespace = namespace,
instance_id = instance_id,
)
}
}

/// Extract OpenTelemetry trace context from TCP/HashMap headers for distributed tracing
fn extract_otel_context_from_tcp_headers(
headers: &std::collections::HashMap<String, String>,
) -> (
Option<opentelemetry::Context>,
Option<String>,
Option<String>,
) {
let traceparent_value = match headers.get("traceparent") {
Some(value) => value.as_str(),
None => return (None, None, None),
};

let (trace_id, parent_span_id) = parse_traceparent(traceparent_value);

struct TcpHeaderExtractor<'a>(&'a std::collections::HashMap<String, String>);

impl<'a> Extractor for TcpHeaderExtractor<'a> {
fn get(&self, key: &str) -> Option<&str> {
self.0.get(key).map(|s| s.as_str())
}

fn keys(&self) -> Vec<&str> {
vec!["traceparent", "tracestate"]
.into_iter()
.filter(|&key| self.0.get(key).is_some())
.collect()
}
}

let extractor = TcpHeaderExtractor(headers);
let otel_context = TRACE_PROPAGATOR.extract(&extractor);

let context_with_trace = if otel_context.span().span_context().is_valid() {
Some(otel_context)
} else {
None
};

(context_with_trace, trace_id, parent_span_id)
}

/// Extract OpenTelemetry trace context from NATS headers for distributed tracing
pub fn extract_otel_context_from_nats_headers(
headers: &async_nats::HeaderMap,
Expand Down Expand Up @@ -366,8 +495,7 @@ pub fn extract_otel_context_from_nats_headers(
}

let extractor = NatsHeaderExtractor(headers);
let propagator = opentelemetry_sdk::propagation::TraceContextPropagator::new();
let otel_context = propagator.extract(&extractor);
let otel_context = TRACE_PROPAGATOR.extract(&extractor);

let context_with_trace = if otel_context.span().span_context().is_valid() {
Some(otel_context)
Expand All @@ -394,8 +522,7 @@ pub fn inject_otel_context_into_nats_headers(
}

let mut injector = NatsHeaderInjector(headers);
let propagator = opentelemetry_sdk::propagation::TraceContextPropagator::new();
propagator.inject_context(&otel_context, &mut injector);
TRACE_PROPAGATOR.inject_context(&otel_context, &mut injector);
}

/// Inject trace context from current span into NATS headers
Expand Down Expand Up @@ -948,6 +1075,11 @@ impl CustomJsonFormatter {

use once_cell::sync::Lazy;
use regex::Regex;

/// Static W3C Trace Context propagator instance to avoid repeated allocations
static TRACE_PROPAGATOR: Lazy<opentelemetry_sdk::propagation::TraceContextPropagator> =
Lazy::new(opentelemetry_sdk::propagation::TraceContextPropagator::new);

fn parse_tracing_duration(s: &str) -> Option<u64> {
static RE: Lazy<Regex> =
Lazy::new(|| Regex::new(r#"^["']?\s*([0-9.]+)\s*(µs|us|ns|ms|s)\s*["']?$"#).unwrap());
Expand Down
Loading
Loading