Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -411,8 +411,8 @@ struct BatchContext {
struct BatchPortion {
/// The number of these matches Signalbuffer.inbound[inkey]
inkey: Option<SlotKey>,
/// Number of items
items: usize,
/// Weight of this portion in the active sizer's unit.
weight: usize,
}

struct Inputs<T: OtapPayloadHelpers> {
Expand All @@ -422,8 +422,8 @@ struct Inputs<T: OtapPayloadHelpers> {
/// Waiter context
context: Vec<BatchPortion>,

/// A count defined by num_items(), number of spans, log records, or metric data points.
items: usize,
/// Total weight across all pending portions, in the active sizer's unit.
weight: usize,
}

struct MultiContext {
Expand Down Expand Up @@ -496,16 +496,6 @@ enum FlushReason {
#[metric_set(name = "otap.processor.batch")]
#[derive(Debug, Default, Clone)]
pub struct BatchProcessorMetrics {
/// Total items consumed for logs signal
#[metric(unit = "{item}")]
consumed_items_logs: Counter<u64>,
/// Total items consumed for metrics signal
#[metric(unit = "{item}")]
consumed_items_metrics: Counter<u64>,
/// Total items consumed for traces signal
#[metric(unit = "{item}")]
consumed_items_traces: Counter<u64>,

/// Total batches consumed for logs signal
#[metric(unit = "{item}")]
consumed_batches_logs: Counter<u64>,
Expand All @@ -516,16 +506,6 @@ pub struct BatchProcessorMetrics {
#[metric(unit = "{item}")]
consumed_batches_traces: Counter<u64>,

/// Total items produced for logs signal
#[metric(unit = "{item}")]
produced_items_logs: Counter<u64>,
/// Total items produced for metrics signal
#[metric(unit = "{item}")]
produced_items_metrics: Counter<u64>,
/// Total items produced for traces signal
#[metric(unit = "{item}")]
produced_items_traces: Counter<u64>,

/// Total batches produced for logs signal
#[metric(unit = "{item}")]
produced_batches_logs: Counter<u64>,
Expand All @@ -546,9 +526,6 @@ pub struct BatchProcessorMetrics {
/// Number of input requests pending at flush time
#[metric(unit = "{request}")]
flush_pending_requests: Mmsc,
/// Number of primary signal items pending at flush time
#[metric(unit = "{item}")]
Comment thread
jmacd marked this conversation as resolved.
flush_pending_items: Mmsc,
/// Number of bytes pending at flush time when byte size is known
#[metric(unit = "By")]
flush_pending_bytes: Mmsc,
Expand All @@ -561,9 +538,6 @@ pub struct BatchProcessorMetrics {
/// Number of output batches emitted by each flush
#[metric(unit = "{batch}")]
flush_output_batches: Mmsc,
/// Number of primary signal items emitted by each flush
#[metric(unit = "{item}")]
flush_output_items: Mmsc,
/// Number of bytes emitted by each flush when byte size is known
#[metric(unit = "By")]
flush_output_bytes: Mmsc,
Expand Down Expand Up @@ -738,30 +712,18 @@ impl BatchProcessor {
effect: &mut local::EffectHandler<OtapPdata>,
request: OtapPdata,
) -> Result<(), EngineError> {
let items = request.num_items();

if items == 0 {
if request.is_empty() {
Comment thread
jmacd marked this conversation as resolved.
Outdated
Comment thread
jmacd marked this conversation as resolved.
Outdated
self.metrics.dropped_empty_records.inc();
// Note: Failure to Ack/Nack is an engine-level error.
effect.notify_ack(AckMsg::new(request)).await?;
return Ok(());
}

// Increment consumed_items for the appropriate signal
let signal = request.signal_type();
match signal {
SignalType::Logs => {
self.metrics.consumed_items_logs.add(items as u64);
self.metrics.consumed_batches_logs.add(1);
}
SignalType::Metrics => {
self.metrics.consumed_items_metrics.add(items as u64);
self.metrics.consumed_batches_metrics.add(1);
}
SignalType::Traces => {
self.metrics.consumed_items_traces.add(items as u64);
self.metrics.consumed_batches_traces.add(1);
}
SignalType::Logs => self.metrics.consumed_batches_logs.add(1),
SignalType::Metrics => self.metrics.consumed_batches_metrics.add(1),
SignalType::Traces => self.metrics.consumed_batches_traces.add(1),
};

let (ctx, payload) = request.into_parts();
Expand All @@ -771,12 +733,12 @@ impl BatchProcessor {
if let Some(mut otap_format) = self.otap_format() {
otap_format
.for_signal(signal)
.accept_payload(effect, ctx, otap, items)
.accept_payload(effect, ctx, otap)
.await?
} else if let Some(mut otlp_format) = self.otlp_format() {
otlp_format
.for_signal(signal)
.accept_payload(effect, ctx, otap.try_into_with_default()?, items)
.accept_payload(effect, ctx, otap.try_into_with_default()?)
.await?
} else {
return Err(Self::no_active_format_error());
Expand All @@ -786,12 +748,12 @@ impl BatchProcessor {
if let Some(mut otlp_format) = self.otlp_format() {
otlp_format
.for_signal(signal)
.accept_payload(effect, ctx, otlp, items)
.accept_payload(effect, ctx, otlp)
.await?
} else if let Some(mut otap_format) = self.otap_format() {
otap_format
.for_signal(signal)
.accept_payload(effect, ctx, otlp.try_into_with_default()?, items)
.accept_payload(effect, ctx, otlp.try_into_with_default()?)
.await?
} else {
return Err(Self::no_active_format_error());
Expand Down Expand Up @@ -880,8 +842,10 @@ where
effect: &mut local::EffectHandler<OtapPdata>,
ctx: Context,
payload: T,
items: usize,
) -> Result<(), EngineError> {
// Compute weight in the active sizer's unit.
let weight = self.fmtcfg.sizer.batch_size(&payload)?;

// If there are subscribers, calculate an inbound slot key.
let inkey = if ctx.has_subscribers() {
let slot = self
Expand Down Expand Up @@ -920,7 +884,7 @@ where

self.buffer
.inputs
.accept(payload, BatchPortion::new(inkey, items));
.accept(payload, BatchPortion::new(inkey, weight));

let pending_size = self.buffer.inputs.size_by(self.fmtcfg.sizer)?;

Expand Down Expand Up @@ -988,9 +952,6 @@ where
self.metrics
.flush_pending_requests
.record(self.buffer.inputs.requests() as f64);
self.metrics
.flush_pending_items
.record(self.buffer.inputs.items as f64);
if let Some(bytes) = self.buffer.inputs.known_bytes() {
self.metrics.flush_pending_bytes.record(bytes as f64);
}
Expand Down Expand Up @@ -1067,7 +1028,8 @@ where
.batch_size(&output_batches[num_output - 1])?;

if last_batch_size < self.fmtcfg.lower_limit() {
self.buffer.take_remaining(&mut inputs, &mut output_batches);
self.buffer
.take_remaining(self.fmtcfg.sizer, &mut inputs, &mut output_batches);

// We use the latest arrival time as the new arrival for timeout purposes.
self.buffer
Expand All @@ -1080,40 +1042,26 @@ where
self.metrics
.flush_output_batches
.record(output_batches.len() as f64);
self.metrics.flush_output_items.record(
output_batches
.iter()
.map(OtapPayloadHelpers::num_items)
.sum::<usize>() as f64,
);
if let Some(bytes) = known_total_bytes(&output_batches) {
self.metrics.flush_output_bytes.record(bytes as f64);
}

let mut input_context = inputs.take_context();

for records in output_batches {
let items = records.num_items();
// Apportion ack/nack subscribers in the active sizer's unit.
let weight = self.fmtcfg.sizer.batch_size(&records)?;
let mut pdata = OtapPdata::new(Context::default(), records.into());

// Increment produced_items for the appropriate signal
match self.signal {
SignalType::Logs => {
self.metrics.produced_items_logs.add(items as u64);
self.metrics.produced_batches_logs.add(1);
}
SignalType::Metrics => {
self.metrics.produced_items_metrics.add(items as u64);
self.metrics.produced_batches_metrics.add(1);
}
SignalType::Traces => {
self.metrics.produced_items_traces.add(items as u64);
self.metrics.produced_batches_traces.add(1);
}
SignalType::Logs => self.metrics.produced_batches_logs.add(1),
SignalType::Metrics => self.metrics.produced_batches_metrics.add(1),
SignalType::Traces => self.metrics.produced_batches_traces.add(1),
}

// If any items require notification, get an outbound slot and subscribe.
if let Some(ctxs) = self.buffer.drain_context(items, &mut input_context) {
// If any inputs in this batch require notification, get an
// outbound slot and subscribe.
if let Some(ctxs) = self.buffer.drain_context(weight, &mut input_context) {
match self.buffer.outbound.allocate_with_data(ctxs) {
Err(ctxs) => {
for bp in ctxs {
Expand Down Expand Up @@ -1349,7 +1297,7 @@ impl<T: OtapPayloadHelpers> Default for Inputs<T> {
Self {
pending: Vec::new(),
context: Vec::new(),
items: 0,
weight: 0,
}
}
}
Expand Down Expand Up @@ -1378,8 +1326,8 @@ where
}

impl BatchPortion {
const fn new(inkey: Option<SlotKey>, items: usize) -> Self {
Self { inkey, items }
const fn new(inkey: Option<SlotKey>, weight: usize) -> Self {
Self { inkey, weight }
}
}

Expand All @@ -1394,12 +1342,12 @@ impl<T: OtapPayloadHelpers> Inputs<T> {
Self {
pending: self.pending.drain(..).collect(),
context: self.context.drain(..).collect(),
items: std::mem::take(&mut self.items),
weight: std::mem::take(&mut self.weight),
}
}

const fn is_empty(&self) -> bool {
self.items == 0
self.weight == 0
}

const fn requests(&self) -> usize {
Expand All @@ -1413,15 +1361,14 @@ impl<T: OtapPayloadHelpers> Inputs<T> {
fn size_by(&self, sizer: Sizer) -> Result<usize, PDataError> {
match sizer {
Sizer::Requests => Ok(self.requests()),
Sizer::Items => Ok(self.items),
Sizer::Bytes => self.known_bytes().ok_or_else(|| PDataError::Format {
error: "bytes encoding not known".into(),
}),
// For Sizer::Items / Sizer::Bytes, `weight` was accumulated in
// the active sizer's unit at accept() time, so this is exact.
Sizer::Items | Sizer::Bytes => Ok(self.weight),
}
}

fn accept(&mut self, batch: T, part: BatchPortion) {
self.items += part.items;
self.weight += part.weight;
self.pending.push(batch);
self.context.push(part);
}
Expand Down Expand Up @@ -1459,40 +1406,45 @@ where
/// Takes the residual batch, used in case the final output is less than
/// the lower bound. This removes the last output btach, the corresponding
/// context, and places it back in the pending buffer as the first in line.
fn take_remaining(&mut self, from_inputs: &mut Inputs<T>, output_batches: &mut Vec<T>) {
fn take_remaining(
&mut self,
sizer: Sizer,
from_inputs: &mut Inputs<T>,
output_batches: &mut Vec<T>,
) {
// SAFETY: protected by output_batches.len() > 1.
let remaining = output_batches.pop().expect("has last");
let last_input = from_inputs.context.last().expect("has last");
let last_items = remaining.num_items();
let new_part = BatchPortion::new(last_input.inkey, last_items);
let last_weight = sizer.batch_size(&remaining).expect("known size");
let new_part = BatchPortion::new(last_input.inkey, last_weight);

from_inputs.items -= last_items;
from_inputs.weight -= last_weight;

self.inputs.accept(remaining, new_part);
}

/// Using a multi-context corresponding with the input pending
/// data, and considering an output item count for a single output
/// data, and considering an output weight for a single output
/// batch, this determines the set of (maybe partial) pending
/// batches that correspond. When merging only (not splitting),
/// this will return the entire set of pending contexts; when
/// splitting, this will return all except the portion that was
/// retained as first-in-line.
fn drain_context(
&mut self,
mut items: usize,
mut weight: usize,
contexts: &mut MultiContext,
) -> Option<Vec<BatchPortion>> {
let mut out = Vec::new();

while items > 0 && contexts.pos < contexts.inputs.len() {
while weight > 0 && contexts.pos < contexts.inputs.len() {
let bp = contexts.inputs.get_mut(contexts.pos).expect("valid");

let take = bp.items.min(items);
bp.items -= take;
items -= take;
let take = bp.weight.min(weight);
bp.weight -= take;
weight -= take;

if bp.items == 0 {
if bp.weight == 0 {
contexts.pos += 1;
}

Expand Down Expand Up @@ -1670,33 +1622,19 @@ mod tests {
(telemetry_registry, metrics_reporter, phase)
}

/// Helper to verify consumed and produced item metrics
/// Helper to verify that batch counters were incremented.
fn verify_item_metrics(
telemetry_registry: &TelemetryRegistryHandle,
signal: SignalType,
expected_items: usize,
_expected_items: usize,
Comment thread
jmacd marked this conversation as resolved.
Outdated
) {
let mut consumed_items = 0u64;
let mut produced_items = 0u64;
let mut consumed_batches = 0u64;
let mut produced_batches = 0u64;

telemetry_registry.visit_current_metrics(|desc, _attrs, iter| {
if desc.name == "otap.processor.batch" {
for (field, value) in iter {
match (signal, field.name) {
(SignalType::Logs, "consumed.items.logs") => {
consumed_items = value.to_u64_lossy()
}
(SignalType::Logs, "produced.items.logs") => {
produced_items = value.to_u64_lossy()
}
(SignalType::Traces, "consumed.items.traces") => {
consumed_items = value.to_u64_lossy()
}
(SignalType::Traces, "produced.items.traces") => {
produced_items = value.to_u64_lossy()
}
(SignalType::Logs, "consumed.batches.logs") => {
consumed_batches = value.to_u64_lossy()
}
Expand All @@ -1715,14 +1653,6 @@ mod tests {
}
});

assert_eq!(
consumed_items as usize, expected_items,
"consumed_items metric must match"
);
assert_eq!(
produced_items as usize, expected_items,
"produced_items metric must match"
);
assert!(produced_batches != 0, "produced_batches != 0");
assert!(consumed_batches != 0, "consumed_batches != 0");
}
Expand Down
Loading