diff --git a/.changesets/maint_metrics_trace_error_messages.md b/.changesets/maint_metrics_trace_error_messages.md new file mode 100644 index 0000000000..6c079602e8 --- /dev/null +++ b/.changesets/maint_metrics_trace_error_messages.md @@ -0,0 +1,8 @@ +### Trace and metrics exporter wrappers to append details to errors ([PR #8363](https://github.com/apollographql/router/pull/8363)) + +Error messages raised during tracing and metric exports now indicate if the error occurred when exporting to Apollo Studio or to the user's configured OTLP or Zipkin endpoint. For example, errors that occur when exporting Apollo Studio traces will look like: +`OpenTelemetry trace error occurred: [apollo traces] ` +While errors that occur when exporting traces to a user's configured OTLP endpoint will look like: +`OpenTelemetry trace error occurred: [otlp traces] ` + +By [@bonnici](https://github.com/bonnici) in https://github.com/apollographql/router/pull/8363 diff --git a/apollo-router/src/plugins/telemetry/error_handler.rs b/apollo-router/src/plugins/telemetry/error_handler.rs index 91ac5124a4..279d82b543 100644 --- a/apollo-router/src/plugins/telemetry/error_handler.rs +++ b/apollo-router/src/plugins/telemetry/error_handler.rs @@ -1,9 +1,22 @@ +use std::fmt::Debug; use std::time::Duration; use std::time::Instant; +use async_trait::async_trait; use dashmap::DashMap; +use futures::future::BoxFuture; use once_cell::sync::OnceCell; use opentelemetry::metrics::MetricsError; +use opentelemetry_sdk::export::trace::ExportResult; +use opentelemetry_sdk::export::trace::SpanData; +use opentelemetry_sdk::export::trace::SpanExporter; +use opentelemetry_sdk::metrics::Aggregation; +use opentelemetry_sdk::metrics::InstrumentKind; +use opentelemetry_sdk::metrics::data::ResourceMetrics; +use opentelemetry_sdk::metrics::data::Temporality; +use opentelemetry_sdk::metrics::exporter::PushMetricsExporter; +use opentelemetry_sdk::metrics::reader::AggregationSelector; +use opentelemetry_sdk::metrics::reader::TemporalitySelector; #[derive(Eq, PartialEq, Hash)] enum ErrorType { @@ -101,6 +114,117 @@ fn handle_error_with_map>( } } +/// Wrapper that modifies trace export errors to include exporter name +pub(crate) struct NamedSpanExporter { + name: &'static str, + inner: E, +} + +impl NamedSpanExporter { + pub(crate) fn new(inner: E, name: &'static str) -> Self { + Self { name, inner } + } +} + +impl Debug for NamedSpanExporter { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("NamedSpanExporter") + .field("name", &self.name) + .finish() + } +} + +impl SpanExporter for NamedSpanExporter { + fn export(&mut self, batch: Vec) -> BoxFuture<'static, ExportResult> { + let name = self.name; + let fut = self.inner.export(batch); + Box::pin(async move { + fut.await.map_err(|err| { + let modified = format!("[{} traces] {}", name, err); + opentelemetry::trace::TraceError::from(modified) + }) + }) + } + + fn shutdown(&mut self) { + self.inner.shutdown() + } + + fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) { + self.inner.set_resource(resource) + } +} + +/// Wrapper that modifies metrics export errors to include exporter name +pub(crate) struct NamedMetricsExporter { + name: &'static str, + inner: E, +} + +impl NamedMetricsExporter { + pub(crate) fn new(inner: E, name: &'static str) -> Self { + Self { name, inner } + } +} + +impl Debug for NamedMetricsExporter { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("NamedMetricsExporter") + .field("name", &self.name) + .finish() + } +} + +impl AggregationSelector for NamedMetricsExporter { + fn aggregation(&self, kind: InstrumentKind) -> Aggregation { + self.inner.aggregation(kind) + } +} + +impl TemporalitySelector for NamedMetricsExporter { + fn temporality(&self, kind: InstrumentKind) -> Temporality { + self.inner.temporality(kind) + } +} + +fn prefix_metrics_error(name: &'static str, err: MetricsError) -> MetricsError { + match err { + MetricsError::Other(msg) => MetricsError::Other(format!("[{} metrics] {}", name, msg)), + MetricsError::Config(msg) => MetricsError::Config(format!("[{} metrics] {}", name, msg)), + MetricsError::ExportErr(inner) => { + MetricsError::Other(format!("[{} metrics] {}", name, inner)) + } + // Don't modify instrument configuration errors - not related to export + MetricsError::InvalidInstrumentConfiguration(msg) => { + MetricsError::InvalidInstrumentConfiguration(msg) + } + _ => MetricsError::Other(format!("[{} metrics] {}", name, err)), + } +} + +#[async_trait] +impl PushMetricsExporter for NamedMetricsExporter { + async fn export(&self, metrics: &mut ResourceMetrics) -> opentelemetry::metrics::Result<()> { + self.inner + .export(metrics) + .await + .map_err(|err| prefix_metrics_error(self.name, err)) + } + + async fn force_flush(&self) -> opentelemetry::metrics::Result<()> { + self.inner + .force_flush() + .await + .map_err(|err| prefix_metrics_error(self.name, err)) + } + + fn shutdown(&self) -> opentelemetry::metrics::Result<()> { + self.inner + .shutdown() + .map_err(|err| prefix_metrics_error(self.name, err)) + } +} + #[cfg(test)] mod tests { use std::fmt::Debug; @@ -109,6 +233,12 @@ mod tests { use std::time::Duration; use dashmap::DashMap; + use futures::future::BoxFuture; + use opentelemetry::metrics::MetricsError; + use opentelemetry_sdk::export::trace::SpanData; + use opentelemetry_sdk::export::trace::SpanExporter; + use opentelemetry_sdk::metrics::data::ResourceMetrics; + use opentelemetry_sdk::metrics::exporter::PushMetricsExporter; use parking_lot::Mutex; use tracing_core::Event; use tracing_core::Field; @@ -213,4 +343,122 @@ mod tests { .with_metrics() .await; } + + // Mock span exporter to test failures + #[derive(Debug)] + struct FailingSpanExporter; + + impl SpanExporter for FailingSpanExporter { + fn export( + &mut self, + _batch: Vec, + ) -> BoxFuture<'static, opentelemetry_sdk::export::trace::ExportResult> { + Box::pin(async { Err(opentelemetry::trace::TraceError::from("connection failed")) }) + } + + fn shutdown(&mut self) {} + + fn set_resource(&mut self, _resource: &opentelemetry_sdk::Resource) {} + } + + #[tokio::test] + async fn test_named_span_exporter_adds_prefix() { + let inner = FailingSpanExporter; + let mut named = super::NamedSpanExporter::new(inner, "test-exporter"); + + let result = named.export(vec![]).await; + + assert!(result.is_err()); + let err = result.unwrap_err(); + let err_msg = err.to_string(); + assert!(err_msg.contains("[test-exporter traces]")); + assert!(err_msg.contains("connection failed")); + } + + // Mock metrics exporter to test failures + #[derive(Debug)] + struct FailingMetricsExporter { + error_type: &'static str, + } + + #[async_trait::async_trait] + impl PushMetricsExporter for FailingMetricsExporter { + async fn export( + &self, + _metrics: &mut ResourceMetrics, + ) -> opentelemetry::metrics::Result<()> { + match self.error_type { + "other" => Err(MetricsError::Other("export failed".to_string())), + "config" => Err(MetricsError::Config("invalid config".to_string())), + _ => Ok(()), + } + } + + async fn force_flush(&self) -> opentelemetry::metrics::Result<()> { + Ok(()) + } + + fn shutdown(&self) -> opentelemetry::metrics::Result<()> { + Ok(()) + } + } + + impl opentelemetry_sdk::metrics::reader::AggregationSelector for FailingMetricsExporter { + fn aggregation( + &self, + _kind: opentelemetry_sdk::metrics::InstrumentKind, + ) -> opentelemetry_sdk::metrics::Aggregation { + opentelemetry_sdk::metrics::Aggregation::Default + } + } + + impl opentelemetry_sdk::metrics::reader::TemporalitySelector for FailingMetricsExporter { + fn temporality( + &self, + _kind: opentelemetry_sdk::metrics::InstrumentKind, + ) -> opentelemetry_sdk::metrics::data::Temporality { + opentelemetry_sdk::metrics::data::Temporality::Cumulative + } + } + + fn empty_resource_metrics() -> ResourceMetrics { + use opentelemetry_sdk::Resource; + ResourceMetrics { + resource: Resource::empty(), + scope_metrics: vec![], + } + } + + #[tokio::test] + async fn test_named_metrics_exporter_adds_prefix() { + let inner = FailingMetricsExporter { + error_type: "other", + }; + let named = super::NamedMetricsExporter::new(inner, "test-exporter"); + + let result = named.export(&mut empty_resource_metrics()).await; + + assert!(result.is_err()); + let err = result.unwrap_err(); + match err { + MetricsError::Other(msg) => { + assert!(msg.contains("[test-exporter metrics]")); + assert!(msg.contains("export failed")); + } + _ => panic!("Expected MetricsError::Other, got: {:?}", err), + } + } + + #[test] + fn test_prefix_metrics_error() { + let err = MetricsError::Config("bad config".to_string()); + let prefixed = super::prefix_metrics_error("test-exporter", err); + + match prefixed { + MetricsError::Config(msg) => { + assert_eq!(msg, "[test-exporter metrics] bad config"); + } + _ => panic!("Expected Config variant"), + } + } } diff --git a/apollo-router/src/plugins/telemetry/metrics/apollo/mod.rs b/apollo-router/src/plugins/telemetry/metrics/apollo/mod.rs index ec19e479be..260114f1b3 100644 --- a/apollo-router/src/plugins/telemetry/metrics/apollo/mod.rs +++ b/apollo-router/src/plugins/telemetry/metrics/apollo/mod.rs @@ -25,6 +25,7 @@ use crate::plugins::telemetry::apollo_exporter::ApolloExporter; use crate::plugins::telemetry::apollo_exporter::get_uname; use crate::plugins::telemetry::config::ApolloMetricsReferenceMode; use crate::plugins::telemetry::config::Conf; +use crate::plugins::telemetry::error_handler::NamedMetricsExporter; use crate::plugins::telemetry::metrics::CustomAggregationSelector; use crate::plugins::telemetry::otlp::CustomTemporalitySelector; use crate::plugins::telemetry::otlp::Protocol; @@ -194,12 +195,15 @@ impl Config { .build(), ), )?; - let default_reader = PeriodicReader::builder(exporter, runtime::Tokio) + let named_exporter = NamedMetricsExporter::new(exporter, "apollo"); + let named_realtime_exporter = NamedMetricsExporter::new(realtime_exporter, "apollo"); + + let default_reader = PeriodicReader::builder(named_exporter, runtime::Tokio) .with_interval(Duration::from_secs(60)) .with_timeout(batch_config.max_export_timeout) .build(); - let realtime_reader = PeriodicReader::builder(realtime_exporter, runtime::Tokio) + let realtime_reader = PeriodicReader::builder(named_realtime_exporter, runtime::Tokio) .with_interval(batch_config.scheduled_delay) .with_timeout(batch_config.max_export_timeout) .build(); diff --git a/apollo-router/src/plugins/telemetry/metrics/otlp.rs b/apollo-router/src/plugins/telemetry/metrics/otlp.rs index 5fc34882da..443c135b3c 100644 --- a/apollo-router/src/plugins/telemetry/metrics/otlp.rs +++ b/apollo-router/src/plugins/telemetry/metrics/otlp.rs @@ -5,6 +5,7 @@ use tower::BoxError; use crate::metrics::aggregation::MeterProviderType; use crate::plugins::telemetry::config::Conf; +use crate::plugins::telemetry::error_handler::NamedMetricsExporter; use crate::plugins::telemetry::metrics::CustomAggregationSelector; use crate::plugins::telemetry::otlp::TelemetryDataKind; use crate::plugins::telemetry::reload::metrics::MetricsBuilder; @@ -30,9 +31,10 @@ impl MetricsConfigurator for super::super::otlp::Config { ), )?; + let named_exporter = NamedMetricsExporter::new(exporter, "otlp"); builder.with_reader( MeterProviderType::Public, - PeriodicReader::builder(exporter, runtime::Tokio) + PeriodicReader::builder(named_exporter, runtime::Tokio) .with_interval(self.batch_processor.scheduled_delay) .with_timeout(self.batch_processor.max_export_timeout) .build(), diff --git a/apollo-router/src/plugins/telemetry/tracing/apollo.rs b/apollo-router/src/plugins/telemetry/tracing/apollo.rs index 796f95b490..1e7b67ea62 100644 --- a/apollo-router/src/plugins/telemetry/tracing/apollo.rs +++ b/apollo-router/src/plugins/telemetry/tracing/apollo.rs @@ -7,6 +7,7 @@ use crate::plugins::telemetry::apollo::Config; use crate::plugins::telemetry::apollo::router_id; use crate::plugins::telemetry::apollo_exporter::proto::reports::Trace; use crate::plugins::telemetry::config::Conf; +use crate::plugins::telemetry::error_handler::NamedSpanExporter; use crate::plugins::telemetry::otel::named_runtime_channel::NamedTokioRuntime; use crate::plugins::telemetry::reload::tracing::TracingBuilder; use crate::plugins::telemetry::reload::tracing::TracingConfigurator; @@ -48,8 +49,9 @@ impl TracingConfigurator for Config { .use_legacy_request_span(matches!(builder.spans().mode, SpanMode::Deprecated)) .metrics_reference_mode(self.metrics_reference_mode) .build()?; + let named_exporter = NamedSpanExporter::new(exporter, "apollo"); builder.with_span_processor( - BatchSpanProcessor::builder(exporter, NamedTokioRuntime::new("apollo-tracing")) + BatchSpanProcessor::builder(named_exporter, NamedTokioRuntime::new("apollo-tracing")) .with_batch_config(self.tracing.batch_processor.clone().into()) .build(), ); diff --git a/apollo-router/src/plugins/telemetry/tracing/datadog/mod.rs b/apollo-router/src/plugins/telemetry/tracing/datadog/mod.rs index 6eb09721d4..8633e5bf8f 100644 --- a/apollo-router/src/plugins/telemetry/tracing/datadog/mod.rs +++ b/apollo-router/src/plugins/telemetry/tracing/datadog/mod.rs @@ -40,6 +40,7 @@ use crate::plugins::telemetry::consts::SUBGRAPH_REQUEST_SPAN_NAME; use crate::plugins::telemetry::consts::SUBGRAPH_SPAN_NAME; use crate::plugins::telemetry::consts::SUPERGRAPH_SPAN_NAME; use crate::plugins::telemetry::endpoint::UriEndpoint; +use crate::plugins::telemetry::error_handler::NamedSpanExporter; use crate::plugins::telemetry::otel::named_runtime_channel::NamedTokioRuntime; use crate::plugins::telemetry::reload::tracing::TracingBuilder; use crate::plugins::telemetry::reload::tracing::TracingConfigurator; @@ -218,11 +219,14 @@ impl TracingConfigurator for Config { let mut span_metrics = default_span_metrics(); span_metrics.extend(self.span_metrics.clone()); + let wrapper = ExporterWrapper { + delegate: exporter, + span_metrics, + }; + let named_exporter = NamedSpanExporter::new(wrapper, "datadog"); + let batch_processor = opentelemetry_sdk::trace::BatchSpanProcessor::builder( - ExporterWrapper { - delegate: exporter, - span_metrics, - }, + named_exporter, NamedTokioRuntime::new("datadog-tracing"), ) .with_batch_config(self.batch_processor.clone().into()) diff --git a/apollo-router/src/plugins/telemetry/tracing/otlp.rs b/apollo-router/src/plugins/telemetry/tracing/otlp.rs index 64c6db1169..ea90c0f69d 100644 --- a/apollo-router/src/plugins/telemetry/tracing/otlp.rs +++ b/apollo-router/src/plugins/telemetry/tracing/otlp.rs @@ -6,6 +6,7 @@ use opentelemetry_sdk::trace::BatchSpanProcessor; use tower::BoxError; use crate::plugins::telemetry::config::Conf; +use crate::plugins::telemetry::error_handler::NamedSpanExporter; use crate::plugins::telemetry::otel::named_runtime_channel::NamedTokioRuntime; use crate::plugins::telemetry::otlp::TelemetryDataKind; use crate::plugins::telemetry::reload::tracing::TracingBuilder; @@ -23,13 +24,12 @@ impl TracingConfigurator for super::super::otlp::Config { fn configure(&self, builder: &mut TracingBuilder) -> Result<(), BoxError> { let exporter: SpanExporterBuilder = self.exporter(TelemetryDataKind::Traces)?; - let batch_span_processor = BatchSpanProcessor::builder( - exporter.build_span_exporter()?, - NamedTokioRuntime::new("otlp-tracing"), - ) - .with_batch_config(self.batch_processor.clone().into()) - .build() - .filtered(); + let named_exporter = NamedSpanExporter::new(exporter.build_span_exporter()?, "otlp"); + let batch_span_processor = + BatchSpanProcessor::builder(named_exporter, NamedTokioRuntime::new("otlp-tracing")) + .with_batch_config(self.batch_processor.clone().into()) + .build() + .filtered(); if builder .tracing_common() diff --git a/apollo-router/src/plugins/telemetry/tracing/zipkin.rs b/apollo-router/src/plugins/telemetry/tracing/zipkin.rs index f7c97dd2c4..a25da681fc 100644 --- a/apollo-router/src/plugins/telemetry/tracing/zipkin.rs +++ b/apollo-router/src/plugins/telemetry/tracing/zipkin.rs @@ -11,6 +11,7 @@ use tower::BoxError; use crate::plugins::telemetry::config::Conf; use crate::plugins::telemetry::config::GenericWith; use crate::plugins::telemetry::endpoint::UriEndpoint; +use crate::plugins::telemetry::error_handler::NamedSpanExporter; use crate::plugins::telemetry::otel::named_runtime_channel::NamedTokioRuntime; use crate::plugins::telemetry::reload::tracing::TracingBuilder; use crate::plugins::telemetry::reload::tracing::TracingConfigurator; @@ -62,8 +63,9 @@ impl TracingConfigurator for Config { .with_trace_config(common) .init_exporter()?; + let named_exporter = NamedSpanExporter::new(exporter, "zipkin"); builder.with_span_processor( - BatchSpanProcessor::builder(exporter, NamedTokioRuntime::new("zipkin-tracing")) + BatchSpanProcessor::builder(named_exporter, NamedTokioRuntime::new("zipkin-tracing")) .with_batch_config(self.batch_processor.clone().into()) .build() .filtered(),