diff --git a/rust/otap-dataflow/crates/core-nodes/src/processors/batch_processor/mod.rs b/rust/otap-dataflow/crates/core-nodes/src/processors/batch_processor/mod.rs index 58c07b1fc8..dd6a9d063a 100644 --- a/rust/otap-dataflow/crates/core-nodes/src/processors/batch_processor/mod.rs +++ b/rust/otap-dataflow/crates/core-nodes/src/processors/batch_processor/mod.rs @@ -411,8 +411,8 @@ struct BatchContext { struct BatchPortion { /// The number of these matches Signalbuffer.inbound[inkey] inkey: Option, - /// Number of items - items: usize, + /// Weight of this portion in the active sizer's unit. + weight: usize, } struct Inputs { @@ -422,8 +422,8 @@ struct Inputs { /// Waiter context context: Vec, - /// A count defined by num_items(), number of spans, log records, or metric data points. - items: usize, + /// Total weight across all pending portions, in the active sizer's unit. + weight: usize, } struct MultiContext { @@ -496,16 +496,6 @@ enum FlushReason { #[metric_set(name = "otap.processor.batch")] #[derive(Debug, Default, Clone)] pub struct BatchProcessorMetrics { - /// Total items consumed for logs signal - #[metric(unit = "{item}")] - consumed_items_logs: Counter, - /// Total items consumed for metrics signal - #[metric(unit = "{item}")] - consumed_items_metrics: Counter, - /// Total items consumed for traces signal - #[metric(unit = "{item}")] - consumed_items_traces: Counter, - /// Total batches consumed for logs signal #[metric(unit = "{item}")] consumed_batches_logs: Counter, @@ -516,16 +506,6 @@ pub struct BatchProcessorMetrics { #[metric(unit = "{item}")] consumed_batches_traces: Counter, - /// Total items produced for logs signal - #[metric(unit = "{item}")] - produced_items_logs: Counter, - /// Total items produced for metrics signal - #[metric(unit = "{item}")] - produced_items_metrics: Counter, - /// Total items produced for traces signal - #[metric(unit = "{item}")] - produced_items_traces: Counter, - /// Total batches produced for logs signal #[metric(unit = "{item}")] produced_batches_logs: Counter, @@ -546,9 +526,6 @@ pub struct BatchProcessorMetrics { /// Number of input requests pending at flush time #[metric(unit = "{request}")] flush_pending_requests: Mmsc, - /// Number of primary signal items pending at flush time - #[metric(unit = "{item}")] - flush_pending_items: Mmsc, /// Number of bytes pending at flush time when byte size is known #[metric(unit = "By")] flush_pending_bytes: Mmsc, @@ -561,9 +538,6 @@ pub struct BatchProcessorMetrics { /// Number of output batches emitted by each flush #[metric(unit = "{batch}")] flush_output_batches: Mmsc, - /// Number of primary signal items emitted by each flush - #[metric(unit = "{item}")] - flush_output_items: Mmsc, /// Number of bytes emitted by each flush when byte size is known #[metric(unit = "By")] flush_output_bytes: Mmsc, @@ -574,9 +548,6 @@ pub struct BatchProcessorMetrics { /// Number of batches for which errors encountered #[metric(unit = "{error}")] batching_errors: Counter, - /// Number of empty records dropped - #[metric(unit = "{msg}")] - dropped_empty_records: Counter, /// Number of requests nacked due to inbound slot exhaustion #[metric(unit = "{msg}")] nacked_inbound_slots: Counter, @@ -738,30 +709,11 @@ impl BatchProcessor { effect: &mut local::EffectHandler, request: OtapPdata, ) -> Result<(), EngineError> { - let items = request.num_items(); - - if items == 0 { - self.metrics.dropped_empty_records.inc(); - // Note: Failure to Ack/Nack is an engine-level error. - effect.notify_ack(AckMsg::new(request)).await?; - return Ok(()); - } - - // Increment consumed_items for the appropriate signal let signal = request.signal_type(); match signal { - SignalType::Logs => { - self.metrics.consumed_items_logs.add(items as u64); - self.metrics.consumed_batches_logs.add(1); - } - SignalType::Metrics => { - self.metrics.consumed_items_metrics.add(items as u64); - self.metrics.consumed_batches_metrics.add(1); - } - SignalType::Traces => { - self.metrics.consumed_items_traces.add(items as u64); - self.metrics.consumed_batches_traces.add(1); - } + SignalType::Logs => self.metrics.consumed_batches_logs.add(1), + SignalType::Metrics => self.metrics.consumed_batches_metrics.add(1), + SignalType::Traces => self.metrics.consumed_batches_traces.add(1), }; let (ctx, payload) = request.into_parts(); @@ -771,12 +723,13 @@ impl BatchProcessor { if let Some(mut otap_format) = self.otap_format() { otap_format .for_signal(signal) - .accept_payload(effect, ctx, otap, items) + .accept_payload(effect, ctx, otap) .await? } else if let Some(mut otlp_format) = self.otlp_format() { + let otlp_payload = otap.try_into_with_default()?; otlp_format .for_signal(signal) - .accept_payload(effect, ctx, otap.try_into_with_default()?, items) + .accept_payload(effect, ctx, otlp_payload) .await? } else { return Err(Self::no_active_format_error()); @@ -786,12 +739,13 @@ impl BatchProcessor { if let Some(mut otlp_format) = self.otlp_format() { otlp_format .for_signal(signal) - .accept_payload(effect, ctx, otlp, items) + .accept_payload(effect, ctx, otlp) .await? } else if let Some(mut otap_format) = self.otap_format() { + let otap_payload = otlp.try_into_with_default()?; otap_format .for_signal(signal) - .accept_payload(effect, ctx, otlp.try_into_with_default()?, items) + .accept_payload(effect, ctx, otap_payload) .await? } else { return Err(Self::no_active_format_error()); @@ -880,8 +834,17 @@ where effect: &mut local::EffectHandler, ctx: Context, payload: T, - items: usize, ) -> Result<(), EngineError> { + let weight = self.fmtcfg.sizer.batch_size(&payload)?; + if weight == 0 { + // Note: we do not check for empty envelopes, e.g., logs + // requests with only a resource and no log records. We do + // not count these. + let pdata = OtapPdata::new(ctx, payload.into()); + effect.notify_ack(AckMsg::new(pdata)).await?; + return Ok(()); + } + // If there are subscribers, calculate an inbound slot key. let inkey = if ctx.has_subscribers() { let slot = self @@ -920,7 +883,7 @@ where self.buffer .inputs - .accept(payload, BatchPortion::new(inkey, items)); + .accept(payload, BatchPortion::new(inkey, weight)); let pending_size = self.buffer.inputs.size_by(self.fmtcfg.sizer)?; @@ -988,9 +951,6 @@ where self.metrics .flush_pending_requests .record(self.buffer.inputs.requests() as f64); - self.metrics - .flush_pending_items - .record(self.buffer.inputs.items as f64); if let Some(bytes) = self.buffer.inputs.known_bytes() { self.metrics.flush_pending_bytes.record(bytes as f64); } @@ -1067,7 +1027,8 @@ where .batch_size(&output_batches[num_output - 1])?; if last_batch_size < self.fmtcfg.lower_limit() { - self.buffer.take_remaining(&mut inputs, &mut output_batches); + self.buffer + .take_remaining(self.fmtcfg.sizer, &mut inputs, &mut output_batches); // We use the latest arrival time as the new arrival for timeout purposes. self.buffer @@ -1080,12 +1041,6 @@ where self.metrics .flush_output_batches .record(output_batches.len() as f64); - self.metrics.flush_output_items.record( - output_batches - .iter() - .map(OtapPayloadHelpers::num_items) - .sum::() as f64, - ); if let Some(bytes) = known_total_bytes(&output_batches) { self.metrics.flush_output_bytes.record(bytes as f64); } @@ -1093,27 +1048,19 @@ where let mut input_context = inputs.take_context(); for records in output_batches { - let items = records.num_items(); + // Apportion ack/nack subscribers in the active sizer's unit. + let weight = self.fmtcfg.sizer.batch_size(&records)?; let mut pdata = OtapPdata::new(Context::default(), records.into()); - // Increment produced_items for the appropriate signal match self.signal { - SignalType::Logs => { - self.metrics.produced_items_logs.add(items as u64); - self.metrics.produced_batches_logs.add(1); - } - SignalType::Metrics => { - self.metrics.produced_items_metrics.add(items as u64); - self.metrics.produced_batches_metrics.add(1); - } - SignalType::Traces => { - self.metrics.produced_items_traces.add(items as u64); - self.metrics.produced_batches_traces.add(1); - } + SignalType::Logs => self.metrics.produced_batches_logs.add(1), + SignalType::Metrics => self.metrics.produced_batches_metrics.add(1), + SignalType::Traces => self.metrics.produced_batches_traces.add(1), } - // If any items require notification, get an outbound slot and subscribe. - if let Some(ctxs) = self.buffer.drain_context(items, &mut input_context) { + // If any inputs in this batch require notification, get an + // outbound slot and subscribe. + if let Some(ctxs) = self.buffer.drain_context(weight, &mut input_context) { match self.buffer.outbound.allocate_with_data(ctxs) { Err(ctxs) => { for bp in ctxs { @@ -1349,7 +1296,7 @@ impl Default for Inputs { Self { pending: Vec::new(), context: Vec::new(), - items: 0, + weight: 0, } } } @@ -1378,8 +1325,8 @@ where } impl BatchPortion { - const fn new(inkey: Option, items: usize) -> Self { - Self { inkey, items } + const fn new(inkey: Option, weight: usize) -> Self { + Self { inkey, weight } } } @@ -1394,12 +1341,12 @@ impl Inputs { Self { pending: self.pending.drain(..).collect(), context: self.context.drain(..).collect(), - items: std::mem::take(&mut self.items), + weight: std::mem::take(&mut self.weight), } } const fn is_empty(&self) -> bool { - self.items == 0 + self.weight == 0 } const fn requests(&self) -> usize { @@ -1413,15 +1360,14 @@ impl Inputs { fn size_by(&self, sizer: Sizer) -> Result { match sizer { Sizer::Requests => Ok(self.requests()), - Sizer::Items => Ok(self.items), - Sizer::Bytes => self.known_bytes().ok_or_else(|| PDataError::Format { - error: "bytes encoding not known".into(), - }), + // For Sizer::Items / Sizer::Bytes, `weight` was accumulated in + // the active sizer's unit at accept() time, so this is exact. + Sizer::Items | Sizer::Bytes => Ok(self.weight), } } fn accept(&mut self, batch: T, part: BatchPortion) { - self.items += part.items; + self.weight += part.weight; self.pending.push(batch); self.context.push(part); } @@ -1459,20 +1405,25 @@ where /// Takes the residual batch, used in case the final output is less than /// the lower bound. This removes the last output btach, the corresponding /// context, and places it back in the pending buffer as the first in line. - fn take_remaining(&mut self, from_inputs: &mut Inputs, output_batches: &mut Vec) { + fn take_remaining( + &mut self, + sizer: Sizer, + from_inputs: &mut Inputs, + output_batches: &mut Vec, + ) { // SAFETY: protected by output_batches.len() > 1. let remaining = output_batches.pop().expect("has last"); let last_input = from_inputs.context.last().expect("has last"); - let last_items = remaining.num_items(); - let new_part = BatchPortion::new(last_input.inkey, last_items); + let last_weight = sizer.batch_size(&remaining).expect("known size"); + let new_part = BatchPortion::new(last_input.inkey, last_weight); - from_inputs.items -= last_items; + from_inputs.weight -= last_weight; self.inputs.accept(remaining, new_part); } /// Using a multi-context corresponding with the input pending - /// data, and considering an output item count for a single output + /// data, and considering an output weight for a single output /// batch, this determines the set of (maybe partial) pending /// batches that correspond. When merging only (not splitting), /// this will return the entire set of pending contexts; when @@ -1480,19 +1431,19 @@ where /// retained as first-in-line. fn drain_context( &mut self, - mut items: usize, + mut weight: usize, contexts: &mut MultiContext, ) -> Option> { let mut out = Vec::new(); - while items > 0 && contexts.pos < contexts.inputs.len() { + while weight > 0 && contexts.pos < contexts.inputs.len() { let bp = contexts.inputs.get_mut(contexts.pos).expect("valid"); - let take = bp.items.min(items); - bp.items -= take; - items -= take; + let take = bp.weight.min(weight); + bp.weight -= take; + weight -= take; - if bp.items == 0 { + if bp.weight == 0 { contexts.pos += 1; } @@ -1670,14 +1621,15 @@ mod tests { (telemetry_registry, metrics_reporter, phase) } - /// Helper to verify consumed and produced item metrics - fn verify_item_metrics( + /// Helper to verify that batch counters were incremented to the expected + /// values. `expected_counts` is `(consumed_batches, produced_batches)` — + /// i.e. the number of input requests consumed by the processor and the + /// number of output batches it produced. + fn verify_batch_metrics( telemetry_registry: &TelemetryRegistryHandle, signal: SignalType, - expected_items: usize, + expected_counts: (usize, usize), ) { - let mut consumed_items = 0u64; - let mut produced_items = 0u64; let mut consumed_batches = 0u64; let mut produced_batches = 0u64; @@ -1685,18 +1637,6 @@ mod tests { if desc.name == "otap.processor.batch" { for (field, value) in iter { match (signal, field.name) { - (SignalType::Logs, "consumed.items.logs") => { - consumed_items = value.to_u64_lossy() - } - (SignalType::Logs, "produced.items.logs") => { - produced_items = value.to_u64_lossy() - } - (SignalType::Traces, "consumed.items.traces") => { - consumed_items = value.to_u64_lossy() - } - (SignalType::Traces, "produced.items.traces") => { - produced_items = value.to_u64_lossy() - } (SignalType::Logs, "consumed.batches.logs") => { consumed_batches = value.to_u64_lossy() } @@ -1715,16 +1655,15 @@ mod tests { } }); + let (expected_consumed, expected_produced) = expected_counts; assert_eq!( - consumed_items as usize, expected_items, - "consumed_items metric must match" + consumed_batches, expected_consumed as u64, + "consumed_batches" ); assert_eq!( - produced_items as usize, expected_items, - "produced_items metric must match" + produced_batches, expected_produced as u64, + "produced_batches" ); - assert!(produced_batches != 0, "produced_batches != 0"); - assert!(consumed_batches != 0, "consumed_batches != 0"); } fn mmsc_metric_count( @@ -1945,6 +1884,8 @@ mod tests { let input_item_count: usize = inputs_otlp.iter().map(|m| m.num_items()).sum(); let num_inputs = inputs_otlp.len(); let total_events = events.len(); + let produced_total = Arc::new(std::sync::atomic::AtomicUsize::new(0)); + let produced_total_inner = produced_total.clone(); phase .run_test(move |mut ctx| async move { @@ -2116,12 +2057,16 @@ mod tests { // Test-specific validation. (verify_outputs)(&event_outputs); + + // Publish the produced batch count for the validate closure. + produced_total_inner.store(total_outputs, std::sync::atomic::Ordering::SeqCst); }) .validate(move |_| async move { // TODO: Not clear why, but this sleep is necessary (probably flaky) // for the NodeControlMsg::CollectTelemetry sent above to take effect. tokio::time::sleep(Duration::from_millis(50)).await; - verify_item_metrics(&telemetry_registry, signal, input_item_count); + let produced = produced_total.load(std::sync::atomic::Ordering::SeqCst); + verify_batch_metrics(&telemetry_registry, signal, (num_inputs, produced)); }); } @@ -2362,7 +2307,9 @@ mod tests { }) .validate(move |_| async move { tokio::time::sleep(Duration::from_millis(50)).await; - verify_item_metrics(&telemetry_registry, SignalType::Logs, 9); + // 3 inputs consumed; 2 batches produced (one size flush after + // the second input, one timer flush after the third). + verify_batch_metrics(&telemetry_registry, SignalType::Logs, (3, 2)); }); } @@ -2422,7 +2369,10 @@ mod tests { }) .validate(move |_| async move { tokio::time::sleep(Duration::from_millis(50)).await; - verify_item_metrics(&telemetry_registry, SignalType::Logs, 18); + // 4 inputs consumed; 3 size-flushed batches produced (the + // first input only arms the timer; each subsequent input + // size-flushes one batch). + verify_batch_metrics(&telemetry_registry, SignalType::Logs, (4, 3)); assert_eq!( mmsc_metric_count( &telemetry_registry, @@ -2496,7 +2446,8 @@ mod tests { }) .validate(move |_| async move { tokio::time::sleep(Duration::from_millis(50)).await; - verify_item_metrics(&telemetry_registry, SignalType::Logs, 3); + // 1 input consumed; 1 batch produced by the real wakeup. + verify_batch_metrics(&telemetry_registry, SignalType::Logs, (1, 1)); }); } @@ -2651,7 +2602,8 @@ mod tests { }) .validate(move |_| async move { tokio::time::sleep(Duration::from_millis(50)).await; - verify_item_metrics(&telemetry_registry, SignalType::Logs, 3); + // 1 input consumed; 1 batch produced by the shutdown flush. + verify_batch_metrics(&telemetry_registry, SignalType::Logs, (1, 1)); }); } @@ -3162,7 +3114,83 @@ mod tests { }) .validate(move |_| async move { tokio::time::sleep(Duration::from_millis(50)).await; - verify_item_metrics(&telemetry_registry, SignalType::Logs, 3); + // 1 input consumed; 1 batch produced by the byte-size flush. + verify_batch_metrics(&telemetry_registry, SignalType::Logs, (1, 1)); + }); + } + + /// A zero-byte OTLP request is acked immediately and never reaches the + /// batch buffer. + #[test] + fn test_otlp_zero_byte_request_acked_immediately() { + let (telemetry_registry, metrics_reporter, phase) = setup_test_runtime(json!({ + "format": "otlp", + "otlp": { + "min_size": 100, + "sizer": "bytes", + }, + "max_batch_duration": "1s" + })); + + phase + .run_test(move |mut ctx| async move { + let empty = OtlpProtoBytes::ExportLogsRequest(Bytes::new()); + ctx.process(Message::PData(OtapPdata::new_default(empty.into()))) + .await + .expect("process empty otlp"); + + assert!(ctx.drain_pdata().await.is_empty(), "no batch should flush"); + + ctx.process(Message::Control(NodeControlMsg::CollectTelemetry { + metrics_reporter, + })) + .await + .expect("collect telemetry"); + }) + .validate(move |_| async move { + tokio::time::sleep(Duration::from_millis(50)).await; + verify_batch_metrics(&telemetry_registry, SignalType::Logs, (1, 0)); + }); + } + + /// An OTLP request that has non-zero bytes but zero log records is not + /// treated as empty under the bytes sizer; it flows through the batcher + /// like any other input. + #[test] + fn test_otlp_empty_container_passes_through_batcher() { + let (telemetry_registry, metrics_reporter, phase) = setup_test_runtime(json!({ + "format": "otlp", + "otlp": { + "min_size": 1, + "sizer": "bytes", + }, + "max_batch_duration": "1s" + })); + + phase + .run_test(move |mut ctx| async move { + let logs = LogsData { + resource_logs: vec![ResourceLogs::default()], + }; + let input_bytes = otlp_message_to_bytes(&OtlpProtoMessage::Logs(logs)); + assert!(input_bytes.num_bytes() > 0); + + ctx.process(Message::PData(OtapPdata::new_default(input_bytes.into()))) + .await + .expect("process empty-container otlp"); + + let outputs = ctx.drain_pdata().await; + assert_eq!(outputs.len(), 1, "empty container should size-flush"); + + ctx.process(Message::Control(NodeControlMsg::CollectTelemetry { + metrics_reporter, + })) + .await + .expect("collect telemetry"); + }) + .validate(move |_| async move { + tokio::time::sleep(Duration::from_millis(50)).await; + verify_batch_metrics(&telemetry_registry, SignalType::Logs, (1, 1)); }); } @@ -3249,7 +3277,7 @@ mod tests { assert_equivalent(&[logs1, logs2], &outputs); - // Collect telemetry for verify_item_metrics. + // Collect telemetry for verify_batch_metrics. ctx.process(Message::Control(NodeControlMsg::CollectTelemetry { metrics_reporter, })) @@ -3258,7 +3286,9 @@ mod tests { }) .validate(move |_| async move { tokio::time::sleep(Duration::from_millis(50)).await; - verify_item_metrics(&telemetry_registry, SignalType::Logs, 6); + // 2 inputs consumed (one OTLP, one OTAP); 2 batches produced + // (one per format, both flushed by their respective wakeups). + verify_batch_metrics(&telemetry_registry, SignalType::Logs, (2, 2)); }); } diff --git a/rust/otap-dataflow/crates/otap/tests/core_node_liveness_tests.rs b/rust/otap-dataflow/crates/otap/tests/core_node_liveness_tests.rs index bf157a0f30..db85975247 100644 --- a/rust/otap-dataflow/crates/otap/tests/core_node_liveness_tests.rs +++ b/rust/otap-dataflow/crates/otap/tests/core_node_liveness_tests.rs @@ -569,9 +569,7 @@ fn test_batch_pipeline_uses_timer_wakeup_metrics_with_otlp_bytes_config() { 5, "the local wakeup pipeline should export every generated item" ); - metrics.assert_eq("consumed.items.logs", 5); metrics.assert_eq("consumed.batches.logs", 5); - metrics.assert_eq("produced.items.logs", 5); metrics.assert_eq("produced.batches.logs", 5); metrics.assert_eq("flushes.size", 0); metrics.assert_eq("flushes.timer", 5);