Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changesets/feat_rohan_b99_otlp_push_exporter_retries.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
### Add retry layer for push metrics exporters ([PR #9036](https://github.com/apollographql/router/pull/9036))

Add `RetryMetricExporter`, which will retry up to 3 times with jittered exponential backoff to the `apollo metrics` and `otlp` named exporters.

By [@rohan-b99](https://github.com/rohan-b99) in https://github.com/apollographql/router/pull/9036
11 changes: 7 additions & 4 deletions apollo-router/src/plugins/telemetry/metrics/apollo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use crate::plugins::telemetry::config::ApolloMetricsReferenceMode;
use crate::plugins::telemetry::config::Conf;
use crate::plugins::telemetry::metrics::NamedMetricExporter;
use crate::plugins::telemetry::metrics::OverflowMetricExporter;
use crate::plugins::telemetry::metrics::RetryMetricExporter;
use crate::plugins::telemetry::otlp::Protocol;
use crate::plugins::telemetry::otlp::TelemetryDataKind;
use crate::plugins::telemetry::otlp::process_endpoint;
Expand Down Expand Up @@ -185,11 +186,13 @@ impl Config {
builder.build()?
}
};
// Wrap with overflow detection, then error prefixing
let named_exporter =
NamedMetricExporter::new(OverflowMetricExporter::new_push(exporter), "apollo");
// Wrap with retry, then overflow detection, then error prefixing
let named_exporter = NamedMetricExporter::new(
OverflowMetricExporter::new_push(RetryMetricExporter::new(exporter)),
"apollo",
);
let named_realtime_exporter = NamedMetricExporter::new(
OverflowMetricExporter::new_push(realtime_exporter),
OverflowMetricExporter::new_push(RetryMetricExporter::new(realtime_exporter)),
"apollo",
);

Expand Down
2 changes: 2 additions & 0 deletions apollo-router/src/plugins/telemetry/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ mod named;
pub(crate) mod otlp;
mod overflow;
pub(crate) mod prometheus;
mod retry;

pub(crate) use named::NamedMetricExporter;
pub(crate) use overflow::OverflowMetricExporter;
pub(crate) use retry::RetryMetricExporter;
9 changes: 6 additions & 3 deletions apollo-router/src/plugins/telemetry/metrics/otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::metrics::aggregation::MeterProviderType;
use crate::plugins::telemetry::config::Conf;
use crate::plugins::telemetry::metrics::NamedMetricExporter;
use crate::plugins::telemetry::metrics::OverflowMetricExporter;
use crate::plugins::telemetry::metrics::RetryMetricExporter;
use crate::plugins::telemetry::otlp::Protocol;
use crate::plugins::telemetry::otlp::TelemetryDataKind;
use crate::plugins::telemetry::otlp::process_endpoint;
Expand All @@ -29,9 +30,11 @@ impl MetricsConfigurator for super::super::otlp::Config {

let exporter = config.build_metric_exporter()?;

// Wrap with overflow detection, then error prefixing
let named_exporter =
NamedMetricExporter::new(OverflowMetricExporter::new_push(exporter), "otlp");
// // Wrap with retry, then overflow detection, then error prefixing
let named_exporter = NamedMetricExporter::new(
OverflowMetricExporter::new_push(RetryMetricExporter::new(exporter)),
"otlp",
);
builder.with_reader(
MeterProviderType::Public,
PeriodicReader::builder(named_exporter, runtime::Tokio)
Expand Down
203 changes: 203 additions & 0 deletions apollo-router/src/plugins/telemetry/metrics/retry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
//! Retry wrapper for push metric exporters.
//!
//! Wraps a `PushMetricExporter` and retries failed exports a number
//! of times with jittered exponential backoff. Only surfaces the error
//! after all attempts are exhausted, keeping transient failures out of the logs.
//! We use this approach as recommended by the OpenTelemetry Spec:
//! <https://opentelemetry.io/docs/specs/otel/protocol/exporter/#retry>

use std::fmt::Debug;
use std::time::Duration;

use opentelemetry_sdk::error::OTelSdkResult;
use opentelemetry_sdk::metrics::Temporality;
use opentelemetry_sdk::metrics::data::ResourceMetrics;
use opentelemetry_sdk::metrics::exporter::PushMetricExporter;
use rand::Rng;

const DEFAULT_MAX_RETRIES: usize = 3;
const BASE_BACKOFF: Duration = Duration::from_millis(100);

pub(crate) struct RetryMetricExporter<T> {
inner: T,
max_retries: usize,
}

impl<T> RetryMetricExporter<T> {
pub(crate) fn new(inner: T) -> Self {
Self {
inner,
max_retries: DEFAULT_MAX_RETRIES,
Comment thread
conwuegb marked this conversation as resolved.
}
}
}

impl<T: Debug> Debug for RetryMetricExporter<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RetryMetricExporter")
.field("max_retries", &self.max_retries)
.field("inner", &self.inner)
.finish()
}
}

impl<T: PushMetricExporter> PushMetricExporter for RetryMetricExporter<T> {
async fn export(&self, metrics: &ResourceMetrics) -> OTelSdkResult {
let mut last_err = None;
for attempt in 0..self.max_retries {
match self.inner.export(metrics).await {
Ok(()) => return Ok(()),
Err(err) => {
tracing::debug!(
attempt = attempt + 1,
Comment thread
conwuegb marked this conversation as resolved.
max_retries = self.max_retries,
error = %err,
"metric export attempt failed, will retry"
);
last_err = Some(err);
if attempt + 1 < self.max_retries {
tokio::time::sleep(jittered_backoff(BASE_BACKOFF, attempt as u32)).await;
}
}
}
}
Err(last_err.expect("max_retries must be >= 1"))
}

fn force_flush(&self) -> OTelSdkResult {
self.inner.force_flush()
}

fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
self.inner.shutdown_with_timeout(timeout)
}

fn temporality(&self) -> Temporality {
self.inner.temporality()
}
}

/// Full jitter: uniform random duration in `[0, base_backoff * 2^attempt]`.
fn jittered_backoff(base: Duration, attempt: u32) -> Duration {
let max = base * 2u32.pow(attempt);
let max_millis = max.as_millis() as u64;
if max_millis == 0 {
return Duration::ZERO;
}
let jittered = rand::rng().random_range(0..=max_millis);
Duration::from_millis(jittered)
}

#[cfg(test)]
mod tests {
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::time::Duration;

use opentelemetry_sdk::error::OTelSdkError;
use opentelemetry_sdk::error::OTelSdkResult;
use opentelemetry_sdk::metrics::Temporality;
use opentelemetry_sdk::metrics::data::ResourceMetrics;
use opentelemetry_sdk::metrics::exporter::PushMetricExporter;

use super::*;

#[derive(Debug)]
struct CountingExporter {
call_count: AtomicUsize,
fail_until: usize,
}

impl CountingExporter {
fn new(fail_until: usize) -> Self {
Self {
call_count: AtomicUsize::new(0),
fail_until,
}
}

fn calls(&self) -> usize {
self.call_count.load(Ordering::SeqCst)
}
}

impl PushMetricExporter for CountingExporter {
async fn export(&self, _metrics: &ResourceMetrics) -> OTelSdkResult {
let n = self.call_count.fetch_add(1, Ordering::SeqCst) + 1;
if n <= self.fail_until {
Err(OTelSdkError::InternalFailure("transient".into()))
} else {
Ok(())
}
}

fn force_flush(&self) -> OTelSdkResult {
Ok(())
}

fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
Ok(())
}

fn temporality(&self) -> Temporality {
Temporality::Delta
}
}

#[tokio::test]
async fn succeeds_on_first_try() {
let inner = CountingExporter::new(0);
let exporter = RetryMetricExporter::new(inner);
let result = exporter.export(&ResourceMetrics::default()).await;
assert!(result.is_ok());
assert_eq!(exporter.inner.calls(), 1);
}

#[tokio::test]
async fn succeeds_after_transient_failures() {
let inner = CountingExporter::new(2);
let exporter = RetryMetricExporter::new(inner);
let result = exporter.export(&ResourceMetrics::default()).await;
assert!(result.is_ok());
assert_eq!(exporter.inner.calls(), 3);
}

#[tokio::test]
async fn fails_after_all_retries_exhausted() {
let inner = CountingExporter::new(usize::MAX);
let exporter = RetryMetricExporter::new(inner);
let result = exporter.export(&ResourceMetrics::default()).await;
assert!(result.is_err());
assert_eq!(exporter.inner.calls(), DEFAULT_MAX_RETRIES);
}

#[test]
fn jittered_backoff_within_bounds() {
let base = Duration::from_millis(100);
for attempt in 0..4 {
let max = base * 2u32.pow(attempt);
for _ in 0..200 {
let d = jittered_backoff(base, attempt);
assert!(d <= max, "attempt {attempt}: {d:?} exceeded max {max:?}");
}
}
}

#[test]
fn jittered_backoff_zero_base() {
assert_eq!(jittered_backoff(Duration::ZERO, 0), Duration::ZERO);
assert_eq!(jittered_backoff(Duration::ZERO, 5), Duration::ZERO);
}

#[test]
fn jittered_backoff_has_spread() {
let base = Duration::from_millis(100);
let samples: Vec<Duration> = (0..100).map(|_| jittered_backoff(base, 2)).collect();
let min = *samples.iter().min().unwrap();
let max = *samples.iter().max().unwrap();
assert!(
max - min > Duration::from_millis(50),
"expected spread across samples, got min={min:?} max={max:?}"
);
}
}
Loading