From 1bfd6a4ff41b86ce4708f4fb05c2c8d3bcef5f24 Mon Sep 17 00:00:00 2001 From: Thomas Date: Fri, 9 Jan 2026 15:55:31 -0500 Subject: [PATCH 1/4] fix(opentelemetry source): emit events_received metric when use_otlp_decoding is enabled --- src/sources/opentelemetry/http.rs | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/src/sources/opentelemetry/http.rs b/src/sources/opentelemetry/http.rs index 72ca6cef1de43..1d3af5c7189eb 100644 --- a/src/sources/opentelemetry/http.rs +++ b/src/sources/opentelemetry/http.rs @@ -157,11 +157,19 @@ fn parse_with_deserializer( deserializer: &OtlpDeserializer, body: Bytes, log_namespace: LogNamespace, + events_received: &Registered, ) -> Result, ErrorMessage> { - deserializer + let events = deserializer .parse(body, log_namespace) .map(|r| r.into_vec()) - .map_err(emit_decode_error) + .map_err(emit_decode_error)?; + + events_received.emit(CountByteSize( + events.len(), + events.estimated_json_encoded_size_of(), + )); + + Ok(events) } fn build_ingest_filter( @@ -227,7 +235,7 @@ fn build_warp_log_filter( .and_then(|decoded_body| { bytes_received.emit(ByteSize(decoded_body.len())); if let Some(d) = deserializer.as_ref() { - parse_with_deserializer(d, decoded_body, log_namespace) + parse_with_deserializer(d, decoded_body, log_namespace, &events_received) } else { decode_log_body(decoded_body, log_namespace, &events_received) } @@ -266,7 +274,7 @@ fn build_warp_metrics_filter( .and_then(|decoded_body| { bytes_received.emit(ByteSize(decoded_body.len())); if let Some(d) = deserializer.as_ref() { - parse_with_deserializer(d, decoded_body, LogNamespace::default()) + parse_with_deserializer(d, decoded_body, LogNamespace::default(), &events_received) } else { decode_metrics_body(decoded_body, &events_received) } @@ -302,7 +310,7 @@ fn build_warp_trace_filter( .and_then(|decoded_body| { bytes_received.emit(ByteSize(decoded_body.len())); if let Some(d) = deserializer.as_ref() { - parse_with_deserializer(d, decoded_body, LogNamespace::default()) + parse_with_deserializer(d, decoded_body, LogNamespace::default(), &events_received) } else { decode_trace_body(decoded_body, &events_received) } From 9b08c24d69c41b89a143a972ac7dd60d9ea6bd9d Mon Sep 17 00:00:00 2001 From: Thomas Date: Fri, 9 Jan 2026 16:39:54 -0500 Subject: [PATCH 2/4] chore: add changelog fragment for opentelemetry metric fix --- .../24316_opentelemetry_use_otlp_decoding_metrics.fix.md | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 changelog.d/24316_opentelemetry_use_otlp_decoding_metrics.fix.md diff --git a/changelog.d/24316_opentelemetry_use_otlp_decoding_metrics.fix.md b/changelog.d/24316_opentelemetry_use_otlp_decoding_metrics.fix.md new file mode 100644 index 0000000000000..6a67eeb2c6330 --- /dev/null +++ b/changelog.d/24316_opentelemetry_use_otlp_decoding_metrics.fix.md @@ -0,0 +1,3 @@ +The `opentelemetry` source now correctly emits the `component_received_events_total` metric when `use_otlp_decoding` is enabled for HTTP requests. Previously, this metric would show 0 despite events being received and processed. + +authors: thomasqueirozb From 2cace5def1c79eacbaa6569d5286385472267f65 Mon Sep 17 00:00:00 2001 From: Thomas Date: Fri, 9 Jan 2026 16:50:27 -0500 Subject: [PATCH 3/4] test(opentelemetry): add test to verify metric emission with use_otlp_decoding --- src/sources/opentelemetry/tests.rs | 86 ++++++++++++++++++++++++++++++ 1 file changed, 86 insertions(+) diff --git a/src/sources/opentelemetry/tests.rs b/src/sources/opentelemetry/tests.rs index f6acd07533f5b..ddbfa8e9eb578 100644 --- a/src/sources/opentelemetry/tests.rs +++ b/src/sources/opentelemetry/tests.rs @@ -1319,3 +1319,89 @@ fn current_time_and_nanos() -> (SystemTime, u64) { .unwrap(); (time, nanos) } + +#[tokio::test] +async fn http_logs_use_otlp_decoding_emits_metric() { + use crate::metrics::Controller; + + test_util::trace_init(); + + let (_guard_0, grpc_addr) = next_addr(); + let (_guard_1, http_addr) = next_addr(); + + let source = OpentelemetryConfig { + grpc: GrpcConfig { + address: grpc_addr, + tls: Default::default(), + }, + http: HttpConfig { + address: http_addr, + tls: Default::default(), + keepalive: Default::default(), + headers: Default::default(), + }, + acknowledgements: Default::default(), + log_namespace: None, + use_otlp_decoding: true, + }; + + let (sender, logs_output, _) = new_source(EventStatus::Delivered, LOGS.to_string()); + let server = source + .build(SourceContext::new_test(sender, None)) + .await + .unwrap(); + tokio::spawn(server); + test_util::wait_for_tcp(http_addr).await; + + let client = reqwest::Client::new(); + let req = ExportLogsServiceRequest { + resource_logs: vec![ResourceLogs { + resource: None, + scope_logs: vec![ScopeLogs { + scope: None, + log_records: vec![LogRecord { + time_unix_nano: 1, + observed_time_unix_nano: 2, + severity_number: 9, + severity_text: "info".into(), + body: Some(AnyValue { + value: Some(StringValue("log body".into())), + }), + attributes: vec![], + dropped_attributes_count: 0, + flags: 4, + trace_id: str_into_hex_bytes("4ac52aadf321c2e531db005df08792f5"), + span_id: str_into_hex_bytes("0b9e4bda2a55530d"), + }], + schema_url: "v1".into(), + }], + schema_url: "v1".into(), + }], + }; + let _res = client + .post(format!("http://{http_addr}/v1/logs")) + .header("Content-Type", "application/x-protobuf") + .body(req.encode_to_vec()) + .send() + .await + .expect("Failed to send log to Opentelemetry Collector."); + + let mut output = test_util::collect_ready(logs_output).await; + assert_eq!(output.len(), 1); + output.pop().unwrap(); + + // Check that the metric was emitted + let metrics = Controller::get().unwrap().capture_metrics(); + let received_events_metric = metrics + .iter() + .find(|m| m.name() == "component_received_events_total") + .expect("component_received_events_total metric should be present"); + + // Verify it has a non-zero count + match received_events_metric.value() { + MetricValue::Counter { value } => { + assert!(*value > 0.0, "component_received_events_total should be > 0, got {}", value); + } + _ => panic!("component_received_events_total should be a counter"), + } +} From b508400c8a13a411e0441c1aff60c3c567afd41e Mon Sep 17 00:00:00 2001 From: Thomas Date: Mon, 12 Jan 2026 10:31:30 -0500 Subject: [PATCH 4/4] Format long lines --- src/sources/opentelemetry/http.rs | 14 ++++++++++++-- src/sources/opentelemetry/tests.rs | 6 +++++- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/src/sources/opentelemetry/http.rs b/src/sources/opentelemetry/http.rs index 1d3af5c7189eb..66f3e210c0bd4 100644 --- a/src/sources/opentelemetry/http.rs +++ b/src/sources/opentelemetry/http.rs @@ -274,7 +274,12 @@ fn build_warp_metrics_filter( .and_then(|decoded_body| { bytes_received.emit(ByteSize(decoded_body.len())); if let Some(d) = deserializer.as_ref() { - parse_with_deserializer(d, decoded_body, LogNamespace::default(), &events_received) + parse_with_deserializer( + d, + decoded_body, + LogNamespace::default(), + &events_received, + ) } else { decode_metrics_body(decoded_body, &events_received) } @@ -310,7 +315,12 @@ fn build_warp_trace_filter( .and_then(|decoded_body| { bytes_received.emit(ByteSize(decoded_body.len())); if let Some(d) = deserializer.as_ref() { - parse_with_deserializer(d, decoded_body, LogNamespace::default(), &events_received) + parse_with_deserializer( + d, + decoded_body, + LogNamespace::default(), + &events_received, + ) } else { decode_trace_body(decoded_body, &events_received) } diff --git a/src/sources/opentelemetry/tests.rs b/src/sources/opentelemetry/tests.rs index ddbfa8e9eb578..466196ded223d 100644 --- a/src/sources/opentelemetry/tests.rs +++ b/src/sources/opentelemetry/tests.rs @@ -1400,7 +1400,11 @@ async fn http_logs_use_otlp_decoding_emits_metric() { // Verify it has a non-zero count match received_events_metric.value() { MetricValue::Counter { value } => { - assert!(*value > 0.0, "component_received_events_total should be > 0, got {}", value); + assert!( + *value > 0.0, + "component_received_events_total should be > 0, got {}", + value + ); } _ => panic!("component_received_events_total should be a counter"), }