Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The `opentelemetry` source now correctly emits the `component_received_events_total` metric when `use_otlp_decoding` is enabled for HTTP requests. Previously, this metric would show 0 despite events being received and processed.

authors: thomasqueirozb
28 changes: 23 additions & 5 deletions src/sources/opentelemetry/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,19 @@ fn parse_with_deserializer(
deserializer: &OtlpDeserializer,
body: Bytes,
log_namespace: LogNamespace,
events_received: &Registered<EventsReceived>,
) -> Result<Vec<Event>, ErrorMessage> {
deserializer
let events = deserializer
.parse(body, log_namespace)
.map(|r| r.into_vec())
.map_err(emit_decode_error)
.map_err(emit_decode_error)?;

events_received.emit(CountByteSize(
events.len(),
events.estimated_json_encoded_size_of(),
));

Ok(events)
}

fn build_ingest_filter<Resp, F>(
Expand Down Expand Up @@ -227,7 +235,7 @@ fn build_warp_log_filter(
.and_then(|decoded_body| {
bytes_received.emit(ByteSize(decoded_body.len()));
if let Some(d) = deserializer.as_ref() {
parse_with_deserializer(d, decoded_body, log_namespace)
parse_with_deserializer(d, decoded_body, log_namespace, &events_received)
} else {
decode_log_body(decoded_body, log_namespace, &events_received)
}
Expand Down Expand Up @@ -266,7 +274,12 @@ fn build_warp_metrics_filter(
.and_then(|decoded_body| {
bytes_received.emit(ByteSize(decoded_body.len()));
if let Some(d) = deserializer.as_ref() {
parse_with_deserializer(d, decoded_body, LogNamespace::default())
parse_with_deserializer(
d,
decoded_body,
LogNamespace::default(),
&events_received,
)
} else {
decode_metrics_body(decoded_body, &events_received)
}
Expand Down Expand Up @@ -302,7 +315,12 @@ fn build_warp_trace_filter(
.and_then(|decoded_body| {
bytes_received.emit(ByteSize(decoded_body.len()));
if let Some(d) = deserializer.as_ref() {
parse_with_deserializer(d, decoded_body, LogNamespace::default())
parse_with_deserializer(
d,
decoded_body,
LogNamespace::default(),
&events_received,
)
} else {
decode_trace_body(decoded_body, &events_received)
}
Expand Down
90 changes: 90 additions & 0 deletions src/sources/opentelemetry/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1319,3 +1319,93 @@ fn current_time_and_nanos() -> (SystemTime, u64) {
.unwrap();
(time, nanos)
}

#[tokio::test]
async fn http_logs_use_otlp_decoding_emits_metric() {
use crate::metrics::Controller;

test_util::trace_init();

let (_guard_0, grpc_addr) = next_addr();
let (_guard_1, http_addr) = next_addr();

let source = OpentelemetryConfig {
grpc: GrpcConfig {
address: grpc_addr,
tls: Default::default(),
},
http: HttpConfig {
address: http_addr,
tls: Default::default(),
keepalive: Default::default(),
headers: Default::default(),
},
acknowledgements: Default::default(),
log_namespace: None,
use_otlp_decoding: true,
};

let (sender, logs_output, _) = new_source(EventStatus::Delivered, LOGS.to_string());
let server = source
.build(SourceContext::new_test(sender, None))
.await
.unwrap();
tokio::spawn(server);
test_util::wait_for_tcp(http_addr).await;

let client = reqwest::Client::new();
let req = ExportLogsServiceRequest {
resource_logs: vec![ResourceLogs {
resource: None,
scope_logs: vec![ScopeLogs {
scope: None,
log_records: vec![LogRecord {
time_unix_nano: 1,
observed_time_unix_nano: 2,
severity_number: 9,
severity_text: "info".into(),
body: Some(AnyValue {
value: Some(StringValue("log body".into())),
}),
attributes: vec![],
dropped_attributes_count: 0,
flags: 4,
trace_id: str_into_hex_bytes("4ac52aadf321c2e531db005df08792f5"),
span_id: str_into_hex_bytes("0b9e4bda2a55530d"),
}],
schema_url: "v1".into(),
}],
schema_url: "v1".into(),
}],
};
let _res = client
.post(format!("http://{http_addr}/v1/logs"))
.header("Content-Type", "application/x-protobuf")
.body(req.encode_to_vec())
.send()
.await
.expect("Failed to send log to Opentelemetry Collector.");

let mut output = test_util::collect_ready(logs_output).await;
assert_eq!(output.len(), 1);
output.pop().unwrap();

// Check that the metric was emitted
let metrics = Controller::get().unwrap().capture_metrics();
let received_events_metric = metrics
.iter()
.find(|m| m.name() == "component_received_events_total")
.expect("component_received_events_total metric should be present");

// Verify it has a non-zero count
match received_events_metric.value() {
MetricValue::Counter { value } => {
assert!(
*value > 0.0,
"component_received_events_total should be > 0, got {}",
value
);
}
_ => panic!("component_received_events_total should be a counter"),
}
}
Loading