Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changesets/maint_metrics_trace_error_messages.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
### Trace and metrics exporter wrappers to append details to errors ([PR #8363](https://github.com/apollographql/router/pull/8363))

Error messages raised during tracing and metric exports now indicate if the error occurred when exporting to Apollo Studio or to the user's configured OTLP or Zipkin endpoint. For example, errors that occur when exporting Apollo Studio traces will look like:
`OpenTelemetry trace error occurred: [apollo traces] <etc>`
While errors that occur when exporting traces to a user's configured OTLP endpoint will look like:
`OpenTelemetry trace error occurred: [otlp traces] <etc>`

By [@bonnici](https://github.com/bonnici) in https://github.com/apollographql/router/pull/8363
248 changes: 248 additions & 0 deletions apollo-router/src/plugins/telemetry/error_handler.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,22 @@
use std::fmt::Debug;
use std::time::Duration;
use std::time::Instant;

use async_trait::async_trait;
use dashmap::DashMap;
use futures::future::BoxFuture;
use once_cell::sync::OnceCell;
use opentelemetry::metrics::MetricsError;
use opentelemetry_sdk::export::trace::ExportResult;
use opentelemetry_sdk::export::trace::SpanData;
use opentelemetry_sdk::export::trace::SpanExporter;
use opentelemetry_sdk::metrics::Aggregation;
use opentelemetry_sdk::metrics::InstrumentKind;
use opentelemetry_sdk::metrics::data::ResourceMetrics;
use opentelemetry_sdk::metrics::data::Temporality;
use opentelemetry_sdk::metrics::exporter::PushMetricsExporter;
use opentelemetry_sdk::metrics::reader::AggregationSelector;
use opentelemetry_sdk::metrics::reader::TemporalitySelector;

#[derive(Eq, PartialEq, Hash)]
enum ErrorType {
Expand Down Expand Up @@ -101,6 +114,117 @@ fn handle_error_with_map<T: Into<opentelemetry::global::Error>>(
}
}

/// Wrapper that modifies trace export errors to include exporter name
pub(crate) struct NamedSpanExporter<E> {
name: &'static str,
inner: E,
}

impl<E> NamedSpanExporter<E> {
pub(crate) fn new(inner: E, name: &'static str) -> Self {
Self { name, inner }
}
}

impl<E: SpanExporter> Debug for NamedSpanExporter<E> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("NamedSpanExporter")
.field("name", &self.name)
.finish()
}
}

impl<E: SpanExporter> SpanExporter for NamedSpanExporter<E> {
fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, ExportResult> {
let name = self.name;
let fut = self.inner.export(batch);
Box::pin(async move {
fut.await.map_err(|err| {
let modified = format!("[{} traces] {}", name, err);
opentelemetry::trace::TraceError::from(modified)
})
})
}

fn shutdown(&mut self) {
self.inner.shutdown()
}

fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
self.inner.set_resource(resource)
}
}

/// Wrapper that modifies metrics export errors to include exporter name
pub(crate) struct NamedMetricsExporter<E> {
name: &'static str,
inner: E,
}

impl<E> NamedMetricsExporter<E> {
pub(crate) fn new(inner: E, name: &'static str) -> Self {
Self { name, inner }
}
}

impl<E: PushMetricsExporter> Debug for NamedMetricsExporter<E> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("NamedMetricsExporter")
.field("name", &self.name)
.finish()
}
}

impl<E: AggregationSelector> AggregationSelector for NamedMetricsExporter<E> {
fn aggregation(&self, kind: InstrumentKind) -> Aggregation {
self.inner.aggregation(kind)
}
}

impl<E: TemporalitySelector> TemporalitySelector for NamedMetricsExporter<E> {
fn temporality(&self, kind: InstrumentKind) -> Temporality {
self.inner.temporality(kind)
}
}

fn prefix_metrics_error(name: &'static str, err: MetricsError) -> MetricsError {
match err {
MetricsError::Other(msg) => MetricsError::Other(format!("[{} metrics] {}", name, msg)),
MetricsError::Config(msg) => MetricsError::Config(format!("[{} metrics] {}", name, msg)),
MetricsError::ExportErr(inner) => {
MetricsError::Other(format!("[{} metrics] {}", name, inner))
}
// Don't modify instrument configuration errors - not related to export
MetricsError::InvalidInstrumentConfiguration(msg) => {
MetricsError::InvalidInstrumentConfiguration(msg)
}
_ => MetricsError::Other(format!("[{} metrics] {}", name, err)),
}
}

#[async_trait]
impl<E: PushMetricsExporter> PushMetricsExporter for NamedMetricsExporter<E> {
async fn export(&self, metrics: &mut ResourceMetrics) -> opentelemetry::metrics::Result<()> {
self.inner
.export(metrics)
.await
.map_err(|err| prefix_metrics_error(self.name, err))
}

async fn force_flush(&self) -> opentelemetry::metrics::Result<()> {
self.inner
.force_flush()
.await
.map_err(|err| prefix_metrics_error(self.name, err))
}

fn shutdown(&self) -> opentelemetry::metrics::Result<()> {
self.inner
.shutdown()
.map_err(|err| prefix_metrics_error(self.name, err))
}
}

#[cfg(test)]
mod tests {
use std::fmt::Debug;
Expand All @@ -109,6 +233,12 @@ mod tests {
use std::time::Duration;

use dashmap::DashMap;
use futures::future::BoxFuture;
use opentelemetry::metrics::MetricsError;
use opentelemetry_sdk::export::trace::SpanData;
use opentelemetry_sdk::export::trace::SpanExporter;
use opentelemetry_sdk::metrics::data::ResourceMetrics;
use opentelemetry_sdk::metrics::exporter::PushMetricsExporter;
use parking_lot::Mutex;
use tracing_core::Event;
use tracing_core::Field;
Expand Down Expand Up @@ -213,4 +343,122 @@ mod tests {
.with_metrics()
.await;
}

// Mock span exporter to test failures
#[derive(Debug)]
struct FailingSpanExporter;

impl SpanExporter for FailingSpanExporter {
fn export(
&mut self,
_batch: Vec<SpanData>,
) -> BoxFuture<'static, opentelemetry_sdk::export::trace::ExportResult> {
Box::pin(async { Err(opentelemetry::trace::TraceError::from("connection failed")) })
}

fn shutdown(&mut self) {}

fn set_resource(&mut self, _resource: &opentelemetry_sdk::Resource) {}
}

#[tokio::test]
async fn test_named_span_exporter_adds_prefix() {
let inner = FailingSpanExporter;
let mut named = super::NamedSpanExporter::new(inner, "test-exporter");

let result = named.export(vec![]).await;

assert!(result.is_err());
let err = result.unwrap_err();
let err_msg = err.to_string();
assert!(err_msg.contains("[test-exporter traces]"));
assert!(err_msg.contains("connection failed"));
}

// Mock metrics exporter to test failures
#[derive(Debug)]
struct FailingMetricsExporter {
error_type: &'static str,
}

#[async_trait::async_trait]
impl PushMetricsExporter for FailingMetricsExporter {
async fn export(
&self,
_metrics: &mut ResourceMetrics,
) -> opentelemetry::metrics::Result<()> {
match self.error_type {
"other" => Err(MetricsError::Other("export failed".to_string())),
"config" => Err(MetricsError::Config("invalid config".to_string())),
_ => Ok(()),
}
}

async fn force_flush(&self) -> opentelemetry::metrics::Result<()> {
Ok(())
}

fn shutdown(&self) -> opentelemetry::metrics::Result<()> {
Ok(())
}
}

impl opentelemetry_sdk::metrics::reader::AggregationSelector for FailingMetricsExporter {
fn aggregation(
&self,
_kind: opentelemetry_sdk::metrics::InstrumentKind,
) -> opentelemetry_sdk::metrics::Aggregation {
opentelemetry_sdk::metrics::Aggregation::Default
}
}

impl opentelemetry_sdk::metrics::reader::TemporalitySelector for FailingMetricsExporter {
fn temporality(
&self,
_kind: opentelemetry_sdk::metrics::InstrumentKind,
) -> opentelemetry_sdk::metrics::data::Temporality {
opentelemetry_sdk::metrics::data::Temporality::Cumulative
}
}

fn empty_resource_metrics() -> ResourceMetrics {
use opentelemetry_sdk::Resource;
ResourceMetrics {
resource: Resource::empty(),
scope_metrics: vec![],
}
}

#[tokio::test]
async fn test_named_metrics_exporter_adds_prefix() {
let inner = FailingMetricsExporter {
error_type: "other",
};
let named = super::NamedMetricsExporter::new(inner, "test-exporter");

let result = named.export(&mut empty_resource_metrics()).await;

assert!(result.is_err());
let err = result.unwrap_err();
match err {
MetricsError::Other(msg) => {
assert!(msg.contains("[test-exporter metrics]"));
assert!(msg.contains("export failed"));
}
_ => panic!("Expected MetricsError::Other, got: {:?}", err),
}
}

#[test]
fn test_prefix_metrics_error() {
let err = MetricsError::Config("bad config".to_string());
let prefixed = super::prefix_metrics_error("test-exporter", err);

match prefixed {
MetricsError::Config(msg) => {
assert_eq!(msg, "[test-exporter metrics] bad config");
}
_ => panic!("Expected Config variant"),
}
}
}
8 changes: 6 additions & 2 deletions apollo-router/src/plugins/telemetry/metrics/apollo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::plugins::telemetry::apollo_exporter::ApolloExporter;
use crate::plugins::telemetry::apollo_exporter::get_uname;
use crate::plugins::telemetry::config::ApolloMetricsReferenceMode;
use crate::plugins::telemetry::config::Conf;
use crate::plugins::telemetry::error_handler::NamedMetricsExporter;
use crate::plugins::telemetry::metrics::CustomAggregationSelector;
use crate::plugins::telemetry::otlp::CustomTemporalitySelector;
use crate::plugins::telemetry::otlp::Protocol;
Expand Down Expand Up @@ -194,12 +195,15 @@ impl Config {
.build(),
),
)?;
let default_reader = PeriodicReader::builder(exporter, runtime::Tokio)
let named_exporter = NamedMetricsExporter::new(exporter, "apollo");
let named_realtime_exporter = NamedMetricsExporter::new(realtime_exporter, "apollo");

let default_reader = PeriodicReader::builder(named_exporter, runtime::Tokio)
.with_interval(Duration::from_secs(60))
.with_timeout(batch_config.max_export_timeout)
.build();

let realtime_reader = PeriodicReader::builder(realtime_exporter, runtime::Tokio)
let realtime_reader = PeriodicReader::builder(named_realtime_exporter, runtime::Tokio)
.with_interval(batch_config.scheduled_delay)
.with_timeout(batch_config.max_export_timeout)
.build();
Expand Down
4 changes: 3 additions & 1 deletion apollo-router/src/plugins/telemetry/metrics/otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use tower::BoxError;

use crate::metrics::aggregation::MeterProviderType;
use crate::plugins::telemetry::config::Conf;
use crate::plugins::telemetry::error_handler::NamedMetricsExporter;
use crate::plugins::telemetry::metrics::CustomAggregationSelector;
use crate::plugins::telemetry::otlp::TelemetryDataKind;
use crate::plugins::telemetry::reload::metrics::MetricsBuilder;
Expand All @@ -30,9 +31,10 @@ impl MetricsConfigurator for super::super::otlp::Config {
),
)?;

let named_exporter = NamedMetricsExporter::new(exporter, "otlp");
builder.with_reader(
MeterProviderType::Public,
PeriodicReader::builder(exporter, runtime::Tokio)
PeriodicReader::builder(named_exporter, runtime::Tokio)
.with_interval(self.batch_processor.scheduled_delay)
.with_timeout(self.batch_processor.max_export_timeout)
.build(),
Expand Down
4 changes: 3 additions & 1 deletion apollo-router/src/plugins/telemetry/tracing/apollo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::plugins::telemetry::apollo::Config;
use crate::plugins::telemetry::apollo::router_id;
use crate::plugins::telemetry::apollo_exporter::proto::reports::Trace;
use crate::plugins::telemetry::config::Conf;
use crate::plugins::telemetry::error_handler::NamedSpanExporter;
use crate::plugins::telemetry::otel::named_runtime_channel::NamedTokioRuntime;
use crate::plugins::telemetry::reload::tracing::TracingBuilder;
use crate::plugins::telemetry::reload::tracing::TracingConfigurator;
Expand Down Expand Up @@ -48,8 +49,9 @@ impl TracingConfigurator for Config {
.use_legacy_request_span(matches!(builder.spans().mode, SpanMode::Deprecated))
.metrics_reference_mode(self.metrics_reference_mode)
.build()?;
let named_exporter = NamedSpanExporter::new(exporter, "apollo");
builder.with_span_processor(
BatchSpanProcessor::builder(exporter, NamedTokioRuntime::new("apollo-tracing"))
BatchSpanProcessor::builder(named_exporter, NamedTokioRuntime::new("apollo-tracing"))
.with_batch_config(self.tracing.batch_processor.clone().into())
.build(),
);
Expand Down
12 changes: 8 additions & 4 deletions apollo-router/src/plugins/telemetry/tracing/datadog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use crate::plugins::telemetry::consts::SUBGRAPH_REQUEST_SPAN_NAME;
use crate::plugins::telemetry::consts::SUBGRAPH_SPAN_NAME;
use crate::plugins::telemetry::consts::SUPERGRAPH_SPAN_NAME;
use crate::plugins::telemetry::endpoint::UriEndpoint;
use crate::plugins::telemetry::error_handler::NamedSpanExporter;
use crate::plugins::telemetry::otel::named_runtime_channel::NamedTokioRuntime;
use crate::plugins::telemetry::reload::tracing::TracingBuilder;
use crate::plugins::telemetry::reload::tracing::TracingConfigurator;
Expand Down Expand Up @@ -218,11 +219,14 @@ impl TracingConfigurator for Config {
let mut span_metrics = default_span_metrics();
span_metrics.extend(self.span_metrics.clone());

let wrapper = ExporterWrapper {
delegate: exporter,
span_metrics,
};
let named_exporter = NamedSpanExporter::new(wrapper, "datadog");

let batch_processor = opentelemetry_sdk::trace::BatchSpanProcessor::builder(
ExporterWrapper {
delegate: exporter,
span_metrics,
},
named_exporter,
NamedTokioRuntime::new("datadog-tracing"),
)
.with_batch_config(self.batch_processor.clone().into())
Expand Down
Loading