diff --git a/.changesets/config_bryn_otel_reload_simplification.md b/.changesets/config_bryn_otel_reload_simplification.md new file mode 100644 index 0000000000..6f8785e5f5 --- /dev/null +++ b/.changesets/config_bryn_otel_reload_simplification.md @@ -0,0 +1,8 @@ +### Only reload telemetry when needed ([PR #8328](https://github.com/apollographql/router/pull/8328)) + +Previously when schema or config reload took place telemetry would always be reloaded. This would drop existing exporters +and create new ones. + +Telemetry exporters will now only be recreated if relevant configuration has changed. + +By [@BrynCooke](https://github.com/BrynCooke) in https://github.com/apollographql/router/pull/8328 diff --git a/apollo-router/src/executable.rs b/apollo-router/src/executable.rs index bae8df7d19..d64de0e0d6 100644 --- a/apollo-router/src/executable.rs +++ b/apollo-router/src/executable.rs @@ -31,7 +31,7 @@ use crate::configuration::schema::Mode; use crate::configuration::validate_yaml_configuration; use crate::metrics::meter_provider_internal; use crate::plugin::plugins; -use crate::plugins::telemetry::reload::init_telemetry; +use crate::plugins::telemetry::reload::otel::init_telemetry; use crate::registry::OciConfig; use crate::router::ConfigurationSource; use crate::router::RouterHttpServer; diff --git a/apollo-router/src/metrics/aggregation.rs b/apollo-router/src/metrics/aggregation.rs index 72706a7763..dd108e9fd5 100644 --- a/apollo-router/src/metrics/aggregation.rs +++ b/apollo-router/src/metrics/aggregation.rs @@ -1,7 +1,7 @@ use std::any::Any; use std::borrow::Cow; use std::collections::HashMap; -use std::ops::DerefMut; +use std::mem; use std::sync::Arc; use derive_more::From; @@ -25,7 +25,13 @@ use opentelemetry::metrics::SyncGauge; use opentelemetry::metrics::SyncHistogram; use opentelemetry::metrics::SyncUpDownCounter; use opentelemetry::metrics::UpDownCounter; +use opentelemetry::metrics::noop::NoopMeterProvider; +use opentelemetry_sdk::metrics::SdkMeterProvider; use parking_lot::Mutex; +use strum::EnumCount; +use strum_macros::Display; +use strum_macros::EnumCount; +use strum_macros::EnumIter; use crate::metrics::filter::FilterMeterProvider; @@ -36,9 +42,11 @@ use crate::metrics::filter::FilterMeterProvider; // This is within the spec: https://opentelemetry.io/docs/specs/otel/metrics/api/#get-a-meter // `Meters are identified by name, version, and schema_url fields. When more than one Meter of the same name, version, and schema_url is created, it is unspecified whether or under which conditions the same or different Meter instances are returned. It is a user error to create Meters with different attributes but the same identity.` -#[derive(Hash, Ord, PartialOrd, Eq, PartialEq, Copy, Clone, Debug)] +#[derive( + Hash, Ord, PartialOrd, Eq, PartialEq, Copy, Clone, Debug, EnumCount, EnumIter, Display, +)] +#[repr(u8)] pub(crate) enum MeterProviderType { - PublicPrometheus, Apollo, ApolloRealtime, Public, @@ -47,13 +55,13 @@ pub(crate) enum MeterProviderType { #[derive(Clone)] pub(crate) struct AggregateMeterProvider { - inner: Arc>, + inner: Arc>>, } impl Default for AggregateMeterProvider { fn default() -> Self { let meter_provider = AggregateMeterProvider { - inner: Arc::new(Mutex::new(Inner::default())), + inner: Arc::new(Mutex::new(Some(Inner::default()))), }; // If the regular global meter provider has been set then the aggregate meter provider will use it. Otherwise it'll default to a no-op. @@ -61,21 +69,34 @@ impl Default for AggregateMeterProvider { // This functionality is not guaranteed to stay like this, so use at your own risk. meter_provider.set( MeterProviderType::OtelDefault, - Some(FilterMeterProvider::public( - opentelemetry::global::meter_provider(), - )), + FilterMeterProvider::public(opentelemetry::global::meter_provider()), ); meter_provider } } -#[derive(Default)] pub(crate) struct Inner { - providers: HashMap)>, + providers: Vec<(FilterMeterProvider, HashMap)>, registered_instruments: Vec, } +impl Default for Inner { + fn default() -> Self { + Inner { + providers: (0..MeterProviderType::COUNT) + .map(|_| { + ( + FilterMeterProvider::public(SdkMeterProvider::default()), + HashMap::new(), + ) + }) + .collect(), + registered_instruments: Vec::new(), + } + } +} + /// Fields are never used directly but strong references here /// keep weak references elsewhere upgradable. #[derive(From)] @@ -103,7 +124,7 @@ pub(crate) enum InstrumentWrapper { }, } -#[derive(Eq, PartialEq, Hash)] +#[derive(Eq, PartialEq, Hash, Clone)] struct MeterId { name: Cow<'static, str>, version: Option>, @@ -119,35 +140,41 @@ impl AggregateMeterProvider { pub(crate) fn set( &self, meter_provider_type: MeterProviderType, - meter_provider: Option, - ) -> Option { - let mut inner = self.inner.lock(); + meter_provider: FilterMeterProvider, + ) -> FilterMeterProvider { + let mut guard = self.inner.lock(); + let inner = guard + .as_mut() + .expect("cannot use meter provider after shutdown"); // As we are changing a meter provider we need to invalidate any registered instruments. // Clearing these allows any weak references at callsites to be invalidated. // This must be done BEFORE the old provider is dropped to ensure that metrics are not lost. // Once invalidated all metrics callsites will try to obtain new instruments, but will be blocked on the mutex. - inner.registered_instruments.clear(); + inner.invalidate(); //Now update the meter provider - let old = if let Some(meter_provider) = meter_provider { - inner - .providers - .insert( - meter_provider_type, - (meter_provider.clone(), HashMap::new()), - ) - .map(|(old_provider, _)| old_provider) - } else { - None - }; + let mut swap = (meter_provider, HashMap::new()); + mem::swap( + &mut inner.providers[meter_provider_type as usize], + &mut swap, + ); + // Important! The mutex MUST be dropped before the old meter provider is dropped to avoid deadlocks in the case that the export function has metrics. // This implicitly happens by returning the old meter provider. // However, to avoid a potential footgun where someone removes the return value of this function I will explicitly drop the mutex guard. - drop(inner); + drop(guard); // Important! Now it is safe to drop the old meter provider, we return it, so we should be OK. If someone removes the return value of this function then // this must instead be converted to a drop call. - old + swap.0 + } + + /// Invalidate all the cached instruments + #[cfg(test)] + pub(crate) fn invalidate(&self) { + if let Some(inner) = self.inner.lock().as_mut() { + inner.invalidate(); + } } /// Shutdown MUST be called from a blocking thread. @@ -155,17 +182,13 @@ impl AggregateMeterProvider { // Make sure that we don't deadlock by dropping the mutex guard before actual shutdown happens // This means that if we have any misbehaving code that tries to access the meter provider during shutdown, e.g. for export metrics // then we don't get stuck on the mutex. - let mut inner = self.inner.lock(); - let mut swap = Inner::default(); - std::mem::swap(&mut *inner, &mut swap); - drop(inner); - - // Now that we have dropped the mutex guard we can safely shutdown the meter providers - for (meter_provider_type, (meter_provider, _)) in &swap.providers { - if let Err(e) = meter_provider.shutdown() { - ::tracing::error!(error = %e, meter_provider_type = ?meter_provider_type, "failed to shutdown meter provider") - } - } + // For instance the apollo exporters have in the past had metrics for exporting, as + // they shut down they try to increment a metric which causes a new meter to be created. + // However, if we have not released the guard then we deadlock. + let mut guard = self.inner.lock(); + let old = guard.take(); + drop(guard); + drop(old); } /// Create a registered instrument. This enables caching at callsites and invalidation at the meter provider via weak reference. @@ -177,18 +200,27 @@ impl AggregateMeterProvider { Arc: Into, { let mut guard = self.inner.lock(); - let instrument = Arc::new((create_fn)(guard.deref_mut())); - guard.registered_instruments.push(instrument.clone().into()); - instrument + let inner = guard + .as_mut() + .expect("cannot use meter provider after shutdown"); + inner.create_registered_instrument(create_fn) } #[cfg(test)] pub(crate) fn registered_instruments(&self) -> usize { - self.inner.lock().registered_instruments.len() + self.inner + .lock() + .as_ref() + .expect("cannot use meter provider after shutdown") + .registered_instruments + .len() } } impl Inner { + pub(crate) fn invalidate(&mut self) { + self.registered_instruments.clear() + } pub(crate) fn meter(&mut self, name: impl Into>) -> Meter { self.versioned_meter( name, @@ -209,7 +241,7 @@ impl Inner { let schema_url = schema_url.map(|v| v.into()); let mut meters = Vec::with_capacity(self.providers.len()); - for (provider, existing_meters) in self.providers.values_mut() { + for (provider, existing_meters) in &mut self.providers { meters.push( existing_meters .entry(MeterId { @@ -231,6 +263,18 @@ impl Inner { Meter::new(Arc::new(AggregateInstrumentProvider { meters })) } + + pub(crate) fn create_registered_instrument( + &mut self, + create_fn: impl Fn(&mut Inner) -> T, + ) -> Arc + where + Arc: Into, + { + let instrument = Arc::new((create_fn)(self)); + self.registered_instruments.push(instrument.clone().into()); + instrument + } } impl MeterProvider for AggregateMeterProvider { @@ -242,7 +286,12 @@ impl MeterProvider for AggregateMeterProvider { attributes: Option>, ) -> Meter { let mut inner = self.inner.lock(); - inner.versioned_meter(name, version, schema_url, attributes) + if let Some(inner) = inner.as_mut() { + inner.versioned_meter(name, version, schema_url, attributes) + } else { + // The meter was used after shutdown. Default to Noop since the instrument cannot actually be used + NoopMeterProvider::default().versioned_meter(name, version, schema_url, attributes) + } } } @@ -585,7 +634,7 @@ mod test { let meter_provider = AggregateMeterProvider::default(); meter_provider.set( MeterProviderType::Public, - Some(FilterMeterProvider::public(delegate)), + FilterMeterProvider::public(delegate), ); let meter = meter_provider.meter("test"); @@ -636,7 +685,7 @@ mod test { let meter_provider = AggregateMeterProvider::default(); meter_provider.set( MeterProviderType::Public, - Some(FilterMeterProvider::public(delegate)), + FilterMeterProvider::public(delegate), ); let meter = meter_provider.meter("test"); @@ -734,9 +783,7 @@ mod test { let meter_provider = AggregateMeterProvider::default(); meter_provider.set( MeterProviderType::OtelDefault, - Some(FilterMeterProvider::public(GlobalMeterProvider::new( - delegate, - ))), + FilterMeterProvider::public(GlobalMeterProvider::new(delegate)), ); let counter = meter_provider @@ -814,9 +861,7 @@ mod test { meter_provider.set( MeterProviderType::OtelDefault, - Some(FilterMeterProvider::public(GlobalMeterProvider::new( - delegate, - ))), + FilterMeterProvider::public(GlobalMeterProvider::new(delegate)), ); tokio::time::sleep(Duration::from_millis(20)).await; @@ -840,9 +885,7 @@ mod test { meter_provider.set( MeterProviderType::OtelDefault, - Some(FilterMeterProvider::public(GlobalMeterProvider::new( - delegate, - ))), + FilterMeterProvider::public(GlobalMeterProvider::new(delegate)), ); tokio::time::sleep(Duration::from_millis(20)).await; @@ -856,9 +899,7 @@ mod test { // Setting the meter provider should not deadlock. meter_provider.set( MeterProviderType::OtelDefault, - Some(FilterMeterProvider::public(GlobalMeterProvider::new( - delegate, - ))), + FilterMeterProvider::public(GlobalMeterProvider::new(delegate)), ); tokio::time::sleep(Duration::from_millis(20)).await; diff --git a/apollo-router/src/metrics/filter.rs b/apollo-router/src/metrics/filter.rs index dd9115648e..c2a1031081 100644 --- a/apollo-router/src/metrics/filter.rs +++ b/apollo-router/src/metrics/filter.rs @@ -43,13 +43,8 @@ impl MeterProvider { } } } - fn shutdown(&self) -> opentelemetry::metrics::Result<()> { - match self { - MeterProvider::Regular(provider) => provider.shutdown(), - MeterProvider::Global(_provider) => Ok(()), - } - } + #[cfg(test)] fn force_flush(&self) -> opentelemetry::metrics::Result<()> { match self { MeterProvider::Regular(provider) => provider.force_flush(), @@ -93,14 +88,14 @@ impl FilterMeterProvider { .expect("regex should have been valid") } - pub(crate) fn private_realtime>(delegate: T) -> Self { + pub(crate) fn apollo_realtime>(delegate: T) -> Self { FilterMeterProvider::builder() .delegate(delegate) .allow(Self::get_private_realtime_regex().clone()) .build() } - pub(crate) fn private>(delegate: T) -> Self { + pub(crate) fn apollo>(delegate: T) -> Self { FilterMeterProvider::builder() .delegate(delegate) .allow( @@ -128,11 +123,7 @@ impl FilterMeterProvider { FilterMeterProvider::builder().delegate(delegate).build() } - pub(crate) fn shutdown(&self) -> opentelemetry::metrics::Result<()> { - self.delegate.shutdown() - } - - #[allow(dead_code)] + #[cfg(test)] pub(crate) fn force_flush(&self) -> opentelemetry::metrics::Result<()> { self.delegate.force_flush() } @@ -267,7 +258,7 @@ mod test { #[tokio::test(flavor = "multi_thread")] async fn test_private_metrics() { let exporter = InMemoryMetricsExporter::default(); - let meter_provider = FilterMeterProvider::private( + let meter_provider = FilterMeterProvider::apollo( MeterProviderBuilder::default() .with_reader(PeriodicReader::builder(exporter.clone(), runtime::Tokio).build()) .build(), @@ -375,7 +366,7 @@ mod test { #[tokio::test(flavor = "multi_thread")] async fn test_description_and_unit() { let exporter = InMemoryMetricsExporter::default(); - let meter_provider = FilterMeterProvider::private( + let meter_provider = FilterMeterProvider::apollo( MeterProviderBuilder::default() .with_reader(PeriodicReader::builder(exporter.clone(), runtime::Tokio).build()) .build(), @@ -494,7 +485,7 @@ mod test { #[tokio::test(flavor = "multi_thread")] async fn test_private_realtime_metrics() { let exporter = InMemoryMetricsExporter::default(); - let meter_provider = FilterMeterProvider::private_realtime( + let meter_provider = FilterMeterProvider::apollo_realtime( MeterProviderBuilder::default() .with_reader(PeriodicReader::builder(exporter.clone(), runtime::Tokio).build()) .build(), diff --git a/apollo-router/src/metrics/mod.rs b/apollo-router/src/metrics/mod.rs index 58ac54c5ee..67bd757165 100644 --- a/apollo-router/src/metrics/mod.rs +++ b/apollo-router/src/metrics/mod.rs @@ -169,11 +169,11 @@ pub(crate) mod test_utils { meter_provider.set( MeterProviderType::Public, - Some(FilterMeterProvider::all( + FilterMeterProvider::all( MeterProviderBuilder::default() .with_reader(reader.clone()) .build(), - )), + ), ); (meter_provider, reader) @@ -1480,7 +1480,6 @@ mod test { use opentelemetry::metrics::MeterProvider; use crate::metrics::FutureMetricsExt; - use crate::metrics::aggregation::MeterProviderType; use crate::metrics::meter_provider; use crate::metrics::meter_provider_internal; @@ -1730,7 +1729,7 @@ mod test { assert_eq!(meter_provider_internal().registered_instruments(), 1); // Force invalidation of instruments - meter_provider_internal().set(MeterProviderType::PublicPrometheus, None); + meter_provider_internal().invalidate(); assert_eq!(meter_provider_internal().registered_instruments(), 0); // Slow path diff --git a/apollo-router/src/plugins/license_enforcement/mod.rs b/apollo-router/src/plugins/license_enforcement/mod.rs index 27af0d6493..7b1e3d17bc 100644 --- a/apollo-router/src/plugins/license_enforcement/mod.rs +++ b/apollo-router/src/plugins/license_enforcement/mod.rs @@ -170,7 +170,7 @@ mod test { ); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] async fn it_emits_metrics_when_tps_enforced() { async { // GIVEN diff --git a/apollo-router/src/plugins/telemetry/apollo.rs b/apollo-router/src/plugins/telemetry/apollo.rs index 503c52a27e..57890116fa 100644 --- a/apollo-router/src/plugins/telemetry/apollo.rs +++ b/apollo-router/src/plugins/telemetry/apollo.rs @@ -54,7 +54,7 @@ pub(crate) fn router_id() -> String { ROUTER_ID.get_or_init(Uuid::new_v4).to_string() } -#[derive(Clone, Deserialize, JsonSchema, Debug)] +#[derive(Clone, Deserialize, JsonSchema, Debug, PartialEq)] #[serde(deny_unknown_fields, default)] #[schemars(rename = "ApolloTelemetryConfig")] pub(crate) struct Config { @@ -133,14 +133,14 @@ pub(crate) struct Config { pub(crate) preview_subgraph_metrics: bool, } -#[derive(Debug, Clone, Deserialize, JsonSchema, Default)] +#[derive(Debug, Clone, Deserialize, JsonSchema, Default, PartialEq)] #[serde(deny_unknown_fields, default)] pub(crate) struct TracingConfiguration { /// Configuration for tracing batch processor. pub(crate) batch_processor: BatchProcessorConfig, } -#[derive(Debug, Clone, Deserialize, JsonSchema, Default)] +#[derive(Debug, Clone, Deserialize, JsonSchema, Default, PartialEq)] #[serde(deny_unknown_fields, default)] pub(crate) struct MetricsConfiguration { /// Configuration for exporting metrics via OTLP. @@ -149,14 +149,14 @@ pub(crate) struct MetricsConfiguration { pub(crate) usage_reports: UsageReportsMetricsConfiguration, } -#[derive(Debug, Clone, Deserialize, JsonSchema, Default)] +#[derive(Debug, Clone, Deserialize, JsonSchema, Default, PartialEq)] #[serde(deny_unknown_fields, default)] pub(crate) struct OtlpMetricsConfiguration { /// Batch processor config for OTLP metrics. pub(crate) batch_processor: OtlpMetricsBatchProcessorConfiguration, } -#[derive(Debug, Clone, Deserialize, JsonSchema, Default)] +#[derive(Debug, Clone, Deserialize, JsonSchema, Default, PartialEq)] #[serde(deny_unknown_fields, default)] pub(crate) struct UsageReportsMetricsConfiguration { /// Batch processor config for Apollo usage report metrics. @@ -164,7 +164,7 @@ pub(crate) struct UsageReportsMetricsConfiguration { } // This config copies the relevant values from BatchProcessorConfig. -#[derive(Debug, Clone, Deserialize, JsonSchema)] +#[derive(Debug, Clone, Deserialize, JsonSchema, PartialEq)] #[serde(default)] pub(crate) struct OtlpMetricsBatchProcessorConfiguration { #[serde(deserialize_with = "humantime_serde::deserialize")] @@ -200,7 +200,7 @@ impl Display for OtlpMetricsBatchProcessorConfiguration { } // This config copies the relevant values from BatchProcessorConfig. -#[derive(Debug, Clone, Deserialize, JsonSchema)] +#[derive(Debug, Clone, Deserialize, JsonSchema, PartialEq)] #[serde(default)] pub(crate) struct ApolloUsageReportsBatchProcessorConfiguration { /// The delay interval in milliseconds between two consecutive processing @@ -249,7 +249,7 @@ impl Display for ApolloUsageReportsBatchProcessorConfiguration { } } -#[derive(Debug, Clone, Deserialize, JsonSchema, Default)] +#[derive(Debug, Clone, Deserialize, JsonSchema, Default, PartialEq)] #[serde(deny_unknown_fields, default)] pub(crate) struct ErrorsConfiguration { /// Handling of errors coming from subgraph @@ -259,7 +259,7 @@ pub(crate) struct ErrorsConfiguration { pub(crate) preview_extended_error_metrics: ExtendedErrorMetricsMode, } -#[derive(Debug, Clone, Deserialize, JsonSchema, Default)] +#[derive(Debug, Clone, Deserialize, JsonSchema, Default, PartialEq)] #[serde(deny_unknown_fields, default)] pub(crate) struct SubgraphErrorConfig { /// Handling of errors coming from all subgraphs @@ -268,7 +268,7 @@ pub(crate) struct SubgraphErrorConfig { pub(crate) subgraphs: HashMap, } -#[derive(Debug, Clone, Deserialize, JsonSchema)] +#[derive(Debug, Clone, Deserialize, JsonSchema, PartialEq)] #[serde(deny_unknown_fields, default)] pub(crate) struct ErrorConfiguration { /// Send subgraph errors to Apollo Studio @@ -301,7 +301,7 @@ impl SubgraphErrorConfig { } /// Extended Open Telemetry error metrics mode -#[derive(Clone, Default, Debug, Deserialize, JsonSchema, Copy)] +#[derive(Clone, Default, Debug, Deserialize, JsonSchema, Copy, PartialEq)] #[serde(deny_unknown_fields, rename_all = "lowercase")] pub(crate) enum ExtendedErrorMetricsMode { /// Do not send extended OTLP error metrics @@ -313,7 +313,7 @@ pub(crate) enum ExtendedErrorMetricsMode { } /// Allow some error fields to be send to Apollo Studio even when `redact` is true. -#[derive(Clone, Default, Debug, Deserialize, JsonSchema, Copy)] +#[derive(Clone, Default, Debug, Deserialize, JsonSchema, Copy, PartialEq)] #[serde(deny_unknown_fields, rename_all = "lowercase")] pub(crate) enum ErrorRedactionPolicy { /// Applies redaction to all error details. @@ -399,7 +399,7 @@ schemar_fn!( ); /// Forward headers -#[derive(Debug, Clone, Deserialize, JsonSchema)] +#[derive(Debug, Clone, Deserialize, JsonSchema, PartialEq)] #[serde(deny_unknown_fields, rename_all = "snake_case")] pub(crate) enum ForwardHeaders { /// Don't send any headers @@ -438,7 +438,7 @@ schemar_fn!( ); /// Forward GraphQL variables -#[derive(Debug, Clone, Deserialize, JsonSchema)] +#[derive(Debug, Clone, Deserialize, JsonSchema, PartialEq)] #[serde(deny_unknown_fields, rename_all = "snake_case")] pub(crate) enum ForwardValues { /// Dont send any variables diff --git a/apollo-router/src/plugins/telemetry/config.rs b/apollo-router/src/plugins/telemetry/config.rs index 046689f77d..e692868756 100644 --- a/apollo-router/src/plugins/telemetry/config.rs +++ b/apollo-router/src/plugins/telemetry/config.rs @@ -64,7 +64,7 @@ pub(crate) struct Conf { } /// Exporter configuration -#[derive(Clone, Default, Debug, Deserialize, JsonSchema)] +#[derive(Clone, Default, Debug, Deserialize, JsonSchema, PartialEq)] #[serde(deny_unknown_fields, default)] pub(crate) struct Exporters { /// Logging configuration @@ -96,7 +96,7 @@ impl Instrumentation { } /// Metrics configuration -#[derive(Clone, Default, Debug, Deserialize, JsonSchema)] +#[derive(Clone, Default, Debug, Deserialize, JsonSchema, PartialEq)] #[serde(deny_unknown_fields, default)] pub(crate) struct Metrics { /// Common metrics configuration across all exporters @@ -107,7 +107,7 @@ pub(crate) struct Metrics { pub(crate) prometheus: metrics::prometheus::Config, } -#[derive(Clone, Debug, Deserialize, JsonSchema)] +#[derive(Clone, Debug, Deserialize, JsonSchema, PartialEq)] #[serde(deny_unknown_fields, default)] pub(crate) struct MetricsCommon { /// Set a service.name resource in your metrics @@ -196,7 +196,7 @@ pub(crate) enum MetricAggregation { } /// Tracing configuration -#[derive(Clone, Default, Debug, Deserialize, JsonSchema)] +#[derive(Clone, Default, Debug, Deserialize, JsonSchema, PartialEq)] #[serde(deny_unknown_fields, default)] pub(crate) struct Tracing { // TODO: when deleting the `experimental_` prefix, check the usage when enabling dev mode @@ -216,7 +216,7 @@ pub(crate) struct Tracing { pub(crate) datadog: tracing::datadog::Config, } -#[derive(Clone, Default, Debug, Deserialize, JsonSchema)] +#[derive(Clone, Default, Debug, Deserialize, JsonSchema, PartialEq)] #[serde(deny_unknown_fields, default)] pub(crate) struct ExposeTraceId { /// Expose the trace_id in response headers @@ -281,7 +281,7 @@ pub(crate) enum ApolloSignatureNormalizationAlgorithm { } /// Apollo usage report reference generation modes. -#[derive(Clone, Default, Debug, Deserialize, JsonSchema, Copy)] +#[derive(Clone, Default, Debug, Deserialize, JsonSchema, Copy, PartialEq)] #[serde(deny_unknown_fields, rename_all = "lowercase")] pub(crate) enum ApolloMetricsReferenceMode { /// Use the extended mode to report input object fields and enum value references as well as object fields. @@ -293,7 +293,7 @@ pub(crate) enum ApolloMetricsReferenceMode { /// Configure propagation of traces. In general you won't have to do this as these are automatically configured /// along with any exporter you configure. -#[derive(Clone, Default, Debug, Deserialize, JsonSchema)] +#[derive(Clone, Default, Debug, Deserialize, JsonSchema, PartialEq)] #[serde(deny_unknown_fields, default)] pub(crate) struct Propagation { /// Select a custom request header to set your own trace_id (header value must be convertible from hexadecimal to set a correct trace_id) @@ -312,7 +312,7 @@ pub(crate) struct Propagation { pub(crate) aws_xray: bool, } -#[derive(Clone, Debug, Deserialize, JsonSchema, Default)] +#[derive(Clone, Debug, Deserialize, JsonSchema, Default, PartialEq)] #[serde(deny_unknown_fields)] pub(crate) struct RequestPropagation { /// Choose the header name to expose trace_id (default: apollo-trace-id) @@ -325,7 +325,7 @@ pub(crate) struct RequestPropagation { pub(crate) format: TraceIdFormat, } -#[derive(Debug, Clone, Deserialize, JsonSchema)] +#[derive(Debug, Clone, Deserialize, JsonSchema, PartialEq)] #[serde(deny_unknown_fields, default)] #[non_exhaustive] pub(crate) struct TracingCommon { @@ -617,7 +617,7 @@ impl From for AttributeArray { } } -#[derive(Clone, Debug, Deserialize, JsonSchema)] +#[derive(Clone, Debug, Deserialize, JsonSchema, PartialEq)] #[serde(deny_unknown_fields, untagged)] pub(crate) enum SamplerOption { /// Sample a given fraction. Fractions >= 1 will always sample. @@ -625,7 +625,7 @@ pub(crate) enum SamplerOption { Always(Sampler), } -#[derive(Clone, Debug, Deserialize, JsonSchema)] +#[derive(Clone, Debug, Deserialize, JsonSchema, PartialEq)] #[serde(deny_unknown_fields, rename_all = "snake_case")] pub(crate) enum Sampler { /// Always sample diff --git a/apollo-router/src/plugins/telemetry/config_new/graphql/mod.rs b/apollo-router/src/plugins/telemetry/config_new/graphql/mod.rs index ae3758cc1b..b6868471ac 100644 --- a/apollo-router/src/plugins/telemetry/config_new/graphql/mod.rs +++ b/apollo-router/src/plugins/telemetry/config_new/graphql/mod.rs @@ -208,7 +208,7 @@ pub(crate) mod test { use crate::plugins::telemetry::Telemetry; use crate::plugins::test::PluginTestHarness; - #[test_log::test(tokio::test)] + #[tokio::test(flavor = "multi_thread")] async fn basic_metric_publishing() { async { let schema_str = include_str!( @@ -256,7 +256,7 @@ pub(crate) mod test { .await; } - #[test_log::test(tokio::test)] + #[tokio::test(flavor = "multi_thread")] async fn multiple_fields_metric_publishing() { async { let schema_str = include_str!( @@ -311,7 +311,7 @@ pub(crate) mod test { .await; } - #[test_log::test(tokio::test)] + #[tokio::test(flavor = "multi_thread")] async fn disabled_metric_publishing() { async { let schema_str = include_str!( @@ -353,7 +353,7 @@ pub(crate) mod test { .await; } - #[test_log::test(tokio::test)] + #[tokio::test(flavor = "multi_thread")] async fn filtered_metric_publishing() { async { let schema_str = include_str!( diff --git a/apollo-router/src/plugins/telemetry/config_new/logging.rs b/apollo-router/src/plugins/telemetry/config_new/logging.rs index 7f2c5ff8f2..3faf6601f1 100644 --- a/apollo-router/src/plugins/telemetry/config_new/logging.rs +++ b/apollo-router/src/plugins/telemetry/config_new/logging.rs @@ -16,7 +16,7 @@ use crate::plugins::telemetry::config::TraceIdFormat; use crate::plugins::telemetry::resource::ConfigResource; /// Logging configuration. -#[derive(Deserialize, JsonSchema, Clone, Default, Debug)] +#[derive(Deserialize, JsonSchema, Clone, Default, Debug, PartialEq)] #[serde(deny_unknown_fields, default)] pub(crate) struct Logging { /// Common configuration @@ -28,7 +28,7 @@ pub(crate) struct Logging { pub(crate) file: File, } -#[derive(Clone, Debug, Deserialize, JsonSchema, Default)] +#[derive(Clone, Debug, Deserialize, JsonSchema, Default, PartialEq)] #[serde(deny_unknown_fields, default)] pub(crate) struct LoggingCommon { /// Set a service.name resource in your metrics @@ -53,7 +53,7 @@ impl ConfigResource for LoggingCommon { } } -#[derive(Deserialize, JsonSchema, Clone, Debug)] +#[derive(Deserialize, JsonSchema, Clone, Debug, PartialEq)] #[serde(deny_unknown_fields, default)] pub(crate) struct StdOut { /// Set to true to log to stdout. @@ -77,7 +77,7 @@ impl Default for StdOut { } } -#[derive(Deserialize, JsonSchema, Clone, Debug)] +#[derive(Deserialize, JsonSchema, Clone, Debug, PartialEq)] #[serde(deny_unknown_fields, default)] pub(crate) struct RateLimit { /// Set to true to limit the rate of log messages @@ -102,7 +102,7 @@ impl Default for RateLimit { /// Log to a file #[allow(dead_code)] -#[derive(Deserialize, JsonSchema, Clone, Default, Debug)] +#[derive(Deserialize, JsonSchema, Clone, Default, Debug, PartialEq)] #[serde(deny_unknown_fields, default)] pub(crate) struct File { /// Set to true to log to a file. @@ -388,7 +388,7 @@ impl Default for TextFormat { /// The period to rollover the log file. #[allow(dead_code)] -#[derive(Deserialize, JsonSchema, Clone, Default, Debug)] +#[derive(Deserialize, JsonSchema, Clone, Default, Debug, PartialEq)] #[serde(deny_unknown_fields, rename_all = "snake_case")] pub(crate) enum Rollover { /// Roll over every hour. diff --git a/apollo-router/src/plugins/telemetry/dynamic_attribute.rs b/apollo-router/src/plugins/telemetry/dynamic_attribute.rs index 6fd7b616eb..04688ebc64 100644 --- a/apollo-router/src/plugins/telemetry/dynamic_attribute.rs +++ b/apollo-router/src/plugins/telemetry/dynamic_attribute.rs @@ -14,7 +14,7 @@ use super::formatters::APOLLO_PRIVATE_PREFIX; use super::otel::OtelData; use super::otel::layer::str_to_span_kind; use super::otel::layer::str_to_status; -use super::reload::IsSampled; +use crate::plugins::telemetry::reload::otel::IsSampled; #[derive(Debug, Default)] pub(crate) struct LogAttributes { diff --git a/apollo-router/src/plugins/telemetry/error_counter/tests.rs b/apollo-router/src/plugins/telemetry/error_counter/tests.rs index 023df21308..d752988e6e 100644 --- a/apollo-router/src/plugins/telemetry/error_counter/tests.rs +++ b/apollo-router/src/plugins/telemetry/error_counter/tests.rs @@ -682,7 +682,7 @@ async fn test_count_operation_errors_with_duplicate_errors_and_extended_config_e .await; } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn test_subgraph_error_counting() { async { let operation_name = "operationName"; @@ -798,7 +798,7 @@ async fn test_subgraph_error_counting() { .await; } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn test_execution_error_counting() { async { let operation_name = "operationName"; @@ -909,7 +909,7 @@ async fn test_execution_error_counting() { .await; } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn test_supergraph_error_counting() { async { let query = "query operationName { __typename }"; @@ -1031,7 +1031,7 @@ async fn test_supergraph_error_counting() { .await; } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn test_router_error_counting() { async { let operation_name = "operationName"; @@ -1141,7 +1141,7 @@ async fn test_router_error_counting() { .await; } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn test_operation_errors_emitted_when_config_is_enabled() { async { let query = "query operationName { __typename }"; diff --git a/apollo-router/src/plugins/telemetry/fmt_layer.rs b/apollo-router/src/plugins/telemetry/fmt_layer.rs index 012e0f55f9..dcdc09821a 100644 --- a/apollo-router/src/plugins/telemetry/fmt_layer.rs +++ b/apollo-router/src/plugins/telemetry/fmt_layer.rs @@ -19,7 +19,6 @@ use super::config_new::ToOtelValue; use super::dynamic_attribute::LogAttributes; use super::formatters::EXCLUDED_ATTRIBUTES; use super::formatters::EventFormatter; -use super::reload::IsSampled; use crate::plugins::telemetry::config; use crate::plugins::telemetry::config_new::logging::Format; use crate::plugins::telemetry::config_new::logging::StdOut; @@ -27,7 +26,8 @@ use crate::plugins::telemetry::consts::EVENT_ATTRIBUTE_OMIT_LOG; use crate::plugins::telemetry::formatters::RateLimitFormatter; use crate::plugins::telemetry::formatters::json::Json; use crate::plugins::telemetry::formatters::text::Text; -use crate::plugins::telemetry::reload::LayeredTracer; +use crate::plugins::telemetry::reload::otel::IsSampled; +use crate::plugins::telemetry::reload::otel::LayeredTracer; use crate::plugins::telemetry::resource::ConfigResource; pub(crate) fn create_fmt_layer( diff --git a/apollo-router/src/plugins/telemetry/formatters/mod.rs b/apollo-router/src/plugins/telemetry/formatters/mod.rs index 48d0eb0610..7d4dd9671a 100644 --- a/apollo-router/src/plugins/telemetry/formatters/mod.rs +++ b/apollo-router/src/plugins/telemetry/formatters/mod.rs @@ -24,8 +24,8 @@ use tracing_subscriber::registry::SpanRef; use super::config_new::logging::RateLimit; use super::dynamic_attribute::LogAttributes; -use super::reload::SampledSpan; use crate::plugins::telemetry::otel::OtelData; +use crate::plugins::telemetry::reload::otel::SampledSpan; pub(crate) const APOLLO_PRIVATE_PREFIX: &str = "apollo_private."; // FIXME: this is a temporary solution to avoid exposing hardcoded attributes in connector spans instead of using the custom telemetry features. diff --git a/apollo-router/src/plugins/telemetry/metrics/apollo/mod.rs b/apollo-router/src/plugins/telemetry/metrics/apollo/mod.rs index d6669554f6..ec19e479be 100644 --- a/apollo-router/src/plugins/telemetry/metrics/apollo/mod.rs +++ b/apollo-router/src/plugins/telemetry/metrics/apollo/mod.rs @@ -16,6 +16,7 @@ use tonic::transport::ClientTlsConfig; use tower::BoxError; use url::Url; +use crate::metrics::aggregation::MeterProviderType; use crate::plugins::telemetry::apollo::ApolloUsageReportsBatchProcessorConfiguration; use crate::plugins::telemetry::apollo::Config; use crate::plugins::telemetry::apollo::OtlpMetricsBatchProcessorConfiguration; @@ -23,14 +24,14 @@ use crate::plugins::telemetry::apollo::router_id; use crate::plugins::telemetry::apollo_exporter::ApolloExporter; use crate::plugins::telemetry::apollo_exporter::get_uname; use crate::plugins::telemetry::config::ApolloMetricsReferenceMode; -use crate::plugins::telemetry::config::MetricsCommon; +use crate::plugins::telemetry::config::Conf; use crate::plugins::telemetry::metrics::CustomAggregationSelector; -use crate::plugins::telemetry::metrics::MetricsBuilder; -use crate::plugins::telemetry::metrics::MetricsConfigurator; use crate::plugins::telemetry::otlp::CustomTemporalitySelector; use crate::plugins::telemetry::otlp::Protocol; use crate::plugins::telemetry::otlp::TelemetryDataKind; use crate::plugins::telemetry::otlp::process_endpoint; +use crate::plugins::telemetry::reload::metrics::MetricsBuilder; +use crate::plugins::telemetry::reload::metrics::MetricsConfigurator; pub(crate) mod histogram; pub(crate) mod studio; @@ -42,79 +43,74 @@ fn default_buckets() -> Vec { } impl MetricsConfigurator for Config { - fn enabled(&self) -> bool { + fn config(conf: &Conf) -> &Self { + &conf.apollo + } + + fn is_enabled(&self) -> bool { self.apollo_key.is_some() && self.apollo_graph_ref.is_some() } - fn apply( - &self, - mut builder: MetricsBuilder, - _metrics_config: &MetricsCommon, - ) -> Result { + fn configure(&self, builder: &mut MetricsBuilder) -> Result<(), BoxError> { tracing::debug!("configuring Apollo metrics"); static ENABLED: AtomicBool = AtomicBool::new(false); - Ok(match self { - Config { + if let Config { + endpoint, + experimental_otlp_endpoint: otlp_endpoint, + experimental_otlp_metrics_protocol: otlp_metrics_protocol, + apollo_key: Some(key), + apollo_graph_ref: Some(reference), + schema_id, + metrics, + metrics_reference_mode, + .. + } = self + { + if !ENABLED.swap(true, Ordering::Relaxed) { + tracing::info!( + "Apollo Studio usage reporting is enabled. See https://go.apollo.dev/o/data for details" + ); + } + + Self::configure_apollo_metrics( + builder, endpoint, - experimental_otlp_endpoint: otlp_endpoint, - experimental_otlp_metrics_protocol: otlp_metrics_protocol, - apollo_key: Some(key), - apollo_graph_ref: Some(reference), + key, + reference, schema_id, - metrics, - metrics_reference_mode, - .. - } => { - if !ENABLED.swap(true, Ordering::Relaxed) { - tracing::info!( - "Apollo Studio usage reporting is enabled. See https://go.apollo.dev/o/data for details" - ); - } - - builder = Self::configure_apollo_metrics( + &metrics.usage_reports.batch_processor, + *metrics_reference_mode, + )?; + // env variable EXPERIMENTAL_APOLLO_OTLP_METRICS_ENABLED will disappear without warning in future + if std::env::var("EXPERIMENTAL_APOLLO_OTLP_METRICS_ENABLED") + .unwrap_or_else(|_| "true".to_string()) + == "true" + { + Self::configure_apollo_otlp_metrics( builder, - endpoint, + otlp_endpoint, + otlp_metrics_protocol, key, reference, schema_id, - &metrics.usage_reports.batch_processor, - *metrics_reference_mode, + &metrics.otlp.batch_processor, )?; - // env variable EXPERIMENTAL_APOLLO_OTLP_METRICS_ENABLED will disappear without warning in future - if std::env::var("EXPERIMENTAL_APOLLO_OTLP_METRICS_ENABLED") - .unwrap_or_else(|_| "true".to_string()) - == "true" - { - builder = Self::configure_apollo_otlp_metrics( - builder, - otlp_endpoint, - otlp_metrics_protocol, - key, - reference, - schema_id, - &metrics.otlp.batch_processor, - )?; - } - builder } - _ => { - ENABLED.swap(false, Ordering::Relaxed); - builder - } - }) + } + Ok(()) } } impl Config { fn configure_apollo_otlp_metrics( - mut builder: MetricsBuilder, + builder: &mut MetricsBuilder, endpoint: &Url, otlp_protocol: &Protocol, key: &str, reference: &str, schema_id: &str, batch_config: &OtlpMetricsBatchProcessorConfiguration, - ) -> Result { + ) -> Result<(), BoxError> { tracing::info!("configuring Apollo OTLP metrics: {}", batch_config); let mut metadata = MetadataMap::new(); metadata.insert("apollo.api.key", key.parse()?); @@ -224,27 +220,25 @@ impl Config { KeyValue::new("apollo.client.uname", get_uname()?), ]); - builder.apollo_meter_provider_builder = builder - .apollo_meter_provider_builder - .with_reader(default_reader) - .with_resource(resource.clone()); + builder + .with_reader(MeterProviderType::Apollo, default_reader) + .with_resource(MeterProviderType::Apollo, resource.clone()); - builder.apollo_realtime_meter_provider_builder = builder - .apollo_realtime_meter_provider_builder - .with_reader(realtime_reader) - .with_resource(resource.clone()); - Ok(builder) + builder + .with_reader(MeterProviderType::ApolloRealtime, realtime_reader) + .with_resource(MeterProviderType::ApolloRealtime, resource.clone()); + Ok(()) } fn configure_apollo_metrics( - mut builder: MetricsBuilder, + builder: &mut MetricsBuilder, endpoint: &Url, key: &str, reference: &str, schema_id: &str, batch_config: &ApolloUsageReportsBatchProcessorConfiguration, metrics_reference_mode: ApolloMetricsReferenceMode, - ) -> Result { + ) -> Result<(), BoxError> { tracing::info!("configuring Apollo usage report metrics: {}", batch_config); let exporter = ApolloExporter::new( endpoint, @@ -256,8 +250,8 @@ impl Config { metrics_reference_mode, )?; - builder.apollo_metrics_sender = exporter.start(); - Ok(builder) + builder.with_apollo_metrics_sender(exporter.start()); + Ok(()) } } @@ -288,7 +282,7 @@ mod test { use crate::query_planner::OperationKind; use crate::services::SupergraphRequest; - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] async fn apollo_metrics_disabled() -> Result<(), BoxError> { let config = r#" telemetry: diff --git a/apollo-router/src/plugins/telemetry/metrics/mod.rs b/apollo-router/src/plugins/telemetry/metrics/mod.rs index 5ec8e54d87..b8d0fac56d 100644 --- a/apollo-router/src/plugins/telemetry/metrics/mod.rs +++ b/apollo-router/src/plugins/telemetry/metrics/mod.rs @@ -1,61 +1,11 @@ -use multimap::MultiMap; -use opentelemetry_sdk::Resource; use opentelemetry_sdk::metrics::Aggregation; use opentelemetry_sdk::metrics::InstrumentKind; use opentelemetry_sdk::metrics::reader::AggregationSelector; -use tower::BoxError; - -use crate::ListenAddr; -use crate::plugins::telemetry::apollo_exporter::Sender; -use crate::plugins::telemetry::config::Conf; -use crate::plugins::telemetry::config::MetricsCommon; -use crate::plugins::telemetry::resource::ConfigResource; -use crate::router_factory::Endpoint; - pub(crate) mod apollo; pub(crate) mod local_type_stats; pub(crate) mod otlp; pub(crate) mod prometheus; -pub(crate) struct MetricsBuilder { - pub(crate) public_meter_provider_builder: opentelemetry_sdk::metrics::MeterProviderBuilder, - pub(crate) apollo_meter_provider_builder: opentelemetry_sdk::metrics::MeterProviderBuilder, - pub(crate) apollo_realtime_meter_provider_builder: - opentelemetry_sdk::metrics::MeterProviderBuilder, - pub(crate) prometheus_meter_provider: Option, - pub(crate) custom_endpoints: MultiMap, - pub(crate) apollo_metrics_sender: Sender, - pub(crate) resource: Resource, -} - -impl MetricsBuilder { - pub(crate) fn new(config: &Conf) -> Self { - let resource = config.exporters.metrics.common.to_resource(); - - Self { - resource: resource.clone(), - public_meter_provider_builder: opentelemetry_sdk::metrics::SdkMeterProvider::builder() - .with_resource(resource.clone()), - apollo_meter_provider_builder: opentelemetry_sdk::metrics::SdkMeterProvider::builder(), - apollo_realtime_meter_provider_builder: - opentelemetry_sdk::metrics::SdkMeterProvider::builder(), - prometheus_meter_provider: None, - custom_endpoints: MultiMap::new(), - apollo_metrics_sender: Sender::default(), - } - } -} - -pub(crate) trait MetricsConfigurator { - fn enabled(&self) -> bool; - - fn apply( - &self, - builder: MetricsBuilder, - metrics_config: &MetricsCommon, - ) -> Result; -} - #[derive(Clone, Default, Debug)] pub(crate) struct CustomAggregationSelector { boundaries: Vec, diff --git a/apollo-router/src/plugins/telemetry/metrics/otlp.rs b/apollo-router/src/plugins/telemetry/metrics/otlp.rs index 92e62768c6..5fc34882da 100644 --- a/apollo-router/src/plugins/telemetry/metrics/otlp.rs +++ b/apollo-router/src/plugins/telemetry/metrics/otlp.rs @@ -1,49 +1,43 @@ use opentelemetry_otlp::MetricsExporterBuilder; use opentelemetry_sdk::metrics::PeriodicReader; -use opentelemetry_sdk::metrics::View; use opentelemetry_sdk::runtime; use tower::BoxError; -use crate::plugins::telemetry::config::MetricsCommon; +use crate::metrics::aggregation::MeterProviderType; +use crate::plugins::telemetry::config::Conf; use crate::plugins::telemetry::metrics::CustomAggregationSelector; -use crate::plugins::telemetry::metrics::MetricsBuilder; -use crate::plugins::telemetry::metrics::MetricsConfigurator; use crate::plugins::telemetry::otlp::TelemetryDataKind; +use crate::plugins::telemetry::reload::metrics::MetricsBuilder; +use crate::plugins::telemetry::reload::metrics::MetricsConfigurator; impl MetricsConfigurator for super::super::otlp::Config { - fn enabled(&self) -> bool { + fn config(conf: &Conf) -> &Self { + &conf.exporters.metrics.otlp + } + + fn is_enabled(&self) -> bool { self.enabled } - fn apply( - &self, - mut builder: MetricsBuilder, - metrics_config: &MetricsCommon, - ) -> Result { - if !self.enabled { - return Ok(builder); - } + fn configure(&self, builder: &mut MetricsBuilder) -> Result<(), BoxError> { let exporter_builder: MetricsExporterBuilder = self.exporter(TelemetryDataKind::Metrics)?; let exporter = exporter_builder.build_metrics_exporter( (&self.temporality).into(), Box::new( CustomAggregationSelector::builder() - .boundaries(metrics_config.buckets.clone()) + .boundaries(builder.metrics_common().buckets.clone()) .build(), ), )?; - builder.public_meter_provider_builder = builder.public_meter_provider_builder.with_reader( + builder.with_reader( + MeterProviderType::Public, PeriodicReader::builder(exporter, runtime::Tokio) .with_interval(self.batch_processor.scheduled_delay) .with_timeout(self.batch_processor.max_export_timeout) .build(), ); - for metric_view in metrics_config.views.clone() { - let view: Box = metric_view.try_into()?; - builder.public_meter_provider_builder = - builder.public_meter_provider_builder.with_view(view); - } - Ok(builder) + + Ok(()) } } diff --git a/apollo-router/src/plugins/telemetry/metrics/prometheus.rs b/apollo-router/src/plugins/telemetry/metrics/prometheus.rs index 04988095a0..125ff7c1e9 100644 --- a/apollo-router/src/plugins/telemetry/metrics/prometheus.rs +++ b/apollo-router/src/plugins/telemetry/metrics/prometheus.rs @@ -3,32 +3,25 @@ use std::task::Poll; use futures::future::BoxFuture; use http::StatusCode; -use once_cell::sync::Lazy; use opentelemetry_prometheus::ResourceSelector; -use opentelemetry_sdk::Resource; -use opentelemetry_sdk::metrics::SdkMeterProvider; -use opentelemetry_sdk::metrics::View; -use parking_lot::Mutex; use prometheus::Encoder; use prometheus::Registry; use prometheus::TextEncoder; use schemars::JsonSchema; use serde::Deserialize; use tower::BoxError; -use tower::ServiceExt; use tower_service::Service; use crate::ListenAddr; -use crate::plugins::telemetry::config::MetricView; -use crate::plugins::telemetry::config::MetricsCommon; +use crate::metrics::aggregation::MeterProviderType; +use crate::plugins::telemetry::config::Conf; use crate::plugins::telemetry::metrics::CustomAggregationSelector; -use crate::plugins::telemetry::metrics::MetricsBuilder; -use crate::plugins::telemetry::metrics::MetricsConfigurator; -use crate::router_factory::Endpoint; +use crate::plugins::telemetry::reload::metrics::MetricsBuilder; +use crate::plugins::telemetry::reload::metrics::MetricsConfigurator; use crate::services::router; /// Prometheus configuration -#[derive(Debug, Clone, Deserialize, JsonSchema)] +#[derive(Debug, Clone, Deserialize, JsonSchema, PartialEq)] #[serde(deny_unknown_fields, default)] #[schemars(rename = "PrometheusMetricsConfig")] pub(crate) struct Config { @@ -42,7 +35,7 @@ pub(crate) struct Config { pub(crate) path: String, } -#[derive(Debug, Clone, Copy, Deserialize, JsonSchema, Default)] +#[derive(Debug, Clone, Copy, Deserialize, JsonSchema, Default, PartialEq)] #[serde(rename_all = "snake_case")] pub(crate) enum ResourceSelectorConfig { /// Export all resource attributes with every metrics. @@ -72,82 +65,22 @@ impl Default for Config { } } -// Prometheus metrics are special. We want them to persist between restarts if possible. -// This means reusing the existing registry and meter provider if we can. -// These statics will keep track of new registry for commit when the telemetry plugin is activated. -static EXISTING_PROMETHEUS: Lazy>> = - Lazy::new(Default::default); -static NEW_PROMETHEUS: Lazy>> = - Lazy::new(Default::default); - -#[derive(PartialEq, Clone)] -struct PrometheusConfig { - resource: Resource, - buckets: Vec, - views: Vec, -} - -pub(crate) fn commit_prometheus() { - if let Some(prometheus) = NEW_PROMETHEUS.lock().take() { - tracing::debug!("committing prometheus registry"); - EXISTING_PROMETHEUS.lock().replace(prometheus); +impl MetricsConfigurator for Config { + fn config(conf: &Conf) -> &Self { + &conf.exporters.metrics.prometheus } -} -impl MetricsConfigurator for Config { - fn enabled(&self) -> bool { + fn is_enabled(&self) -> bool { self.enabled } - fn apply( - &self, - mut builder: MetricsBuilder, - metrics_config: &MetricsCommon, - ) -> Result { - // Prometheus metrics are special, they must persist between reloads. This means that we only want to create something new if the resources have changed. - // The prometheus exporter, and the associated registry are linked, so replacing one means replacing the other. - - let prometheus_config = PrometheusConfig { - resource: builder.resource.clone(), - buckets: metrics_config.buckets.clone(), - views: metrics_config.views.clone(), - }; - - // Check the last registry to see if the resources are the same, if they are we can use it as is. - // Otherwise go with the new controller and store it so that it can be committed during telemetry activation. - // Note that during tests the prom registry cannot be reused as we have a different meter provider for each test. - // Prom reloading IS tested in an integration test. - #[cfg(not(test))] - if let Some((last_config, last_registry)) = EXISTING_PROMETHEUS.lock().clone() { - if prometheus_config == last_config { - tracing::debug!("prometheus registry can be reused"); - builder.custom_endpoints.insert( - self.listen.clone(), - Endpoint::from_router_service( - self.path.clone(), - PrometheusService { - registry: last_registry.clone(), - } - .boxed(), - ), - ); - tracing::info!( - "Prometheus endpoint exposed at {}{}", - self.listen, - self.path - ); - return Ok(builder); - } else { - tracing::debug!("prometheus registry cannot be reused"); - } - } - - let registry = prometheus::Registry::new(); + fn configure(&self, builder: &mut MetricsBuilder) -> Result<(), BoxError> { + let registry = Registry::new(); let exporter = opentelemetry_prometheus::exporter() .with_aggregation_selector( CustomAggregationSelector::builder() - .boundaries(metrics_config.buckets.clone()) + .boundaries(builder.metrics_common().buckets.clone()) .record_min_max(true) .build(), ) @@ -155,41 +88,14 @@ impl MetricsConfigurator for Config { .with_registry(registry.clone()) .build()?; - let mut meter_provider_builder = SdkMeterProvider::builder() - .with_reader(exporter) - .with_resource(builder.resource.clone()); - for metric_view in metrics_config.views.clone() { - let view: Box = metric_view.try_into()?; - meter_provider_builder = meter_provider_builder.with_view(view); - } - let meter_provider = meter_provider_builder.build(); - builder.custom_endpoints.insert( - self.listen.clone(), - Endpoint::from_router_service( - self.path.clone(), - PrometheusService { - registry: registry.clone(), - } - .boxed(), - ), - ); - builder.prometheus_meter_provider = Some(meter_provider.clone()); - - NEW_PROMETHEUS.lock().replace((prometheus_config, registry)); - - tracing::info!( - "Prometheus endpoint exposed at {}{}", - self.listen, - self.path - ); - - Ok(builder) + builder.with_reader(MeterProviderType::Public, exporter); + builder.with_prometheus_registry(registry); + Ok(()) } } -#[derive(Clone)] pub(crate) struct PrometheusService { - registry: Registry, + pub(crate) registry: Registry, } impl Service for PrometheusService { diff --git a/apollo-router/src/plugins/telemetry/mod.rs b/apollo-router/src/plugins/telemetry/mod.rs index f738a0f700..55412a1747 100644 --- a/apollo-router/src/plugins/telemetry/mod.rs +++ b/apollo-router/src/plugins/telemetry/mod.rs @@ -30,12 +30,10 @@ use metrics::local_type_stats::LocalTypeStatRecorder; use multimap::MultiMap; use opentelemetry::Key; use opentelemetry::KeyValue; -use opentelemetry::global::GlobalTracerProvider; use opentelemetry::metrics::MeterProvider; use opentelemetry::metrics::ObservableGauge; use opentelemetry::propagation::Extractor; use opentelemetry::propagation::Injector; -use opentelemetry::propagation::TextMapCompositePropagator; use opentelemetry::propagation::TextMapPropagator; use opentelemetry::propagation::text_map_propagator::FieldIter; use opentelemetry::trace::SpanContext; @@ -44,17 +42,16 @@ use opentelemetry::trace::TraceContextExt; use opentelemetry::trace::TraceFlags; use opentelemetry::trace::TraceId; use opentelemetry::trace::TraceState; -use opentelemetry::trace::TracerProvider; -use opentelemetry_sdk::trace::Builder; use opentelemetry_semantic_conventions::trace::HTTP_REQUEST_METHOD; use parking_lot::Mutex; use parking_lot::RwLock; use rand::Rng; +use reload::activation::Activation; +use reload::tracing::TracingConfigurator; use serde_json_bytes::ByteString; use serde_json_bytes::Map; use serde_json_bytes::Value; use serde_json_bytes::json; -use tokio::runtime::Handle; use tower::BoxError; use tower::ServiceBuilder; use tower::ServiceExt; @@ -71,12 +68,10 @@ use self::config::TraceIdFormat; use self::config_new::instruments::Instrumented; use self::config_new::router::events::RouterEvents; use self::config_new::router::instruments::RouterInstruments; -use self::config_new::spans::Spans; use self::config_new::subgraph::events::SubgraphEvents; use self::config_new::subgraph::instruments::SubgraphInstruments; use self::config_new::supergraph::events::SupergraphEvents; use self::metrics::apollo::studio::SingleTypeStat; -use self::reload::reload_fmt; pub(crate) use self::span_factory::SpanMode; use self::tracing::apollo_telemetry::APOLLO_PRIVATE_DURATION_NS; use self::tracing::apollo_telemetry::CLIENT_NAME_KEY; @@ -91,18 +86,13 @@ use crate::context::OPERATION_NAME; use crate::graphql::ResponseVisitor; use crate::layers::ServiceBuilderExt; use crate::layers::instrument::InstrumentLayer; -use crate::metrics::aggregation::MeterProviderType; -use crate::metrics::filter::FilterMeterProvider; use crate::metrics::meter_provider; -use crate::metrics::meter_provider_internal; use crate::plugin::PluginInit; use crate::plugin::PluginPrivate; use crate::plugins::telemetry::apollo::ForwardHeaders; use crate::plugins::telemetry::apollo_exporter::proto::reports::StatsContext; use crate::plugins::telemetry::apollo_exporter::proto::reports::trace::node::Id::ResponseName; use crate::plugins::telemetry::config::AttributeValue; -use crate::plugins::telemetry::config::MetricsCommon; -use crate::plugins::telemetry::config::TracingCommon; use crate::plugins::telemetry::config_new::DatadogId; use crate::plugins::telemetry::config_new::apollo::instruments::ApolloConnectorInstruments; use crate::plugins::telemetry::config_new::apollo::instruments::ApolloSubgraphInstruments; @@ -123,9 +113,6 @@ use crate::plugins::telemetry::error_counter::count_execution_errors; use crate::plugins::telemetry::error_counter::count_router_errors; use crate::plugins::telemetry::error_counter::count_subgraph_errors; use crate::plugins::telemetry::error_counter::count_supergraph_errors; -use crate::plugins::telemetry::fmt_layer::create_fmt_layer; -use crate::plugins::telemetry::metrics::MetricsBuilder; -use crate::plugins::telemetry::metrics::MetricsConfigurator; use crate::plugins::telemetry::metrics::apollo::histogram::ListLengthHistogram; use crate::plugins::telemetry::metrics::apollo::studio::LocalTypeStat; use crate::plugins::telemetry::metrics::apollo::studio::SingleContextualizedStats; @@ -133,10 +120,8 @@ use crate::plugins::telemetry::metrics::apollo::studio::SinglePathErrorStats; use crate::plugins::telemetry::metrics::apollo::studio::SingleQueryLatencyStats; use crate::plugins::telemetry::metrics::apollo::studio::SingleStats; use crate::plugins::telemetry::metrics::apollo::studio::SingleStatsReport; -use crate::plugins::telemetry::metrics::prometheus::commit_prometheus; use crate::plugins::telemetry::otel::OpenTelemetrySpanExt; -use crate::plugins::telemetry::reload::OPENTELEMETRY_TRACER_HANDLE; -use crate::plugins::telemetry::tracing::TracingConfigurator; +use crate::plugins::telemetry::reload::metrics::MetricsConfigurator; use crate::plugins::telemetry::tracing::apollo_telemetry::APOLLO_PRIVATE_OPERATION_SIGNATURE; use crate::plugins::telemetry::tracing::apollo_telemetry::decode_ftv1_trace; use crate::query_planner::OperationKind; @@ -222,62 +207,10 @@ pub(crate) struct Telemetry { apollo_metrics_sender: apollo_exporter::Sender, field_level_instrumentation_ratio: f64, builtin_instruments: RwLock, - activation: Mutex, + activation: Mutex>, enabled_features: EnabledFeatures, } -struct TelemetryActivation { - tracer_provider: Option, - // We have to have separate meter providers for prometheus metrics so that they don't get zapped on router reload. - public_meter_provider: Option, - public_prometheus_meter_provider: Option, - private_meter_provider: Option, - private_realtime_meter_provider: Option, - is_active: bool, -} - -fn setup_tracing( - mut builder: Builder, - configurator: &T, - tracing_config: &TracingCommon, - spans_config: &Spans, -) -> Result { - if configurator.enabled() { - builder = configurator.apply(builder, tracing_config, spans_config)?; - } - Ok(builder) -} - -fn setup_metrics_exporter( - mut builder: MetricsBuilder, - configurator: &T, - metrics_common: &MetricsCommon, -) -> Result { - if configurator.enabled() { - builder = configurator.apply(builder, metrics_common)?; - } - Ok(builder) -} - -impl Drop for Telemetry { - fn drop(&mut self) { - let mut activation = self.activation.lock(); - let metrics_providers: [Option; 4] = [ - activation.private_realtime_meter_provider.take(), - activation.private_meter_provider.take(), - activation.public_meter_provider.take(), - activation.public_prometheus_meter_provider.take(), - ]; - let tracer_provider = activation.tracer_provider.take(); - drop(activation); - TelemetryActivation::checked_meter_shutdown(metrics_providers); - - if let Some(tracer_provider) = tracer_provider { - Self::checked_tracer_shutdown(tracer_provider); - } - } -} - /// When observed, it reports the most recently stored value (give or take atomicity looseness). /// /// This *could* be generalised to any kind of gauge, but we should ideally have gauges that can just @@ -396,9 +329,11 @@ impl PluginPrivate for Telemetry { let field_level_instrumentation_ratio = config.calculate_field_level_instrumentation_ratio()?; - let metrics_builder = Self::create_metrics_builder(&config)?; - let tracer_provider = Self::create_tracer_provider(&config)?; + let (activation, custom_endpoints, apollo_metrics_sender) = + reload::prepare(&init.previous_config, &config)?; + + ::tracing::info!("custom endpoints {:?}", custom_endpoints); if config.instrumentation.spans.mode == SpanMode::Deprecated { ::tracing::warn!( "telemetry.instrumentation.spans.mode is currently set to 'deprecated', either explicitly or via defaulting. Set telemetry.instrumentation.spans.mode explicitly in your router.yaml to 'spec_compliant' for log and span attributes that follow OpenTelemetry semantic conventions. This option will be defaulted to 'spec_compliant' in a future release and eventually removed altogether" @@ -414,28 +349,11 @@ impl PluginPrivate for Telemetry { ::tracing::debug!("Enabled scale features: {:?}", enabled_features); Ok(Telemetry { - custom_endpoints: metrics_builder.custom_endpoints, - apollo_metrics_sender: metrics_builder.apollo_metrics_sender, + custom_endpoints, + apollo_metrics_sender, supergraph_schema_id: init.supergraph_schema_id, field_level_instrumentation_ratio, - activation: Mutex::new(TelemetryActivation { - tracer_provider: Some(tracer_provider), - public_meter_provider: Some(FilterMeterProvider::public( - metrics_builder.public_meter_provider_builder.build(), - )), - private_meter_provider: Some(FilterMeterProvider::private( - metrics_builder.apollo_meter_provider_builder.build(), - )), - private_realtime_meter_provider: Some(FilterMeterProvider::private_realtime( - metrics_builder - .apollo_realtime_meter_provider_builder - .build(), - )), - public_prometheus_meter_provider: metrics_builder - .prometheus_meter_provider - .map(FilterMeterProvider::public), - is_active: false, - }), + activation: Mutex::new(Some(activation)), builtin_instruments: RwLock::new(create_builtin_instruments( &config.instrumentation.instruments, )), @@ -1182,118 +1100,20 @@ impl PluginPrivate for Telemetry { } fn activate(&self) { - let mut activation = self.activation.lock(); - if activation.is_active { - return; + // activation called multiple times during startup due to telemetry needed to be initialized before + // plugins are initialized + if let Some(activation) = self.activation.lock().take() { + activation.commit(); + // The reason this exist here is that these instruments use the global meter provider when created. + // In future, we should directly use the meter provider from activation rather than the global + // meter provider, this will eliminate the brittle sequencing of instrument creation. + *self.builtin_instruments.write() = + create_builtin_instruments(&self.config.instrumentation.instruments); } - - // Only apply things if we were executing in the context of a vanilla the Apollo executable. - // Users that are rolling their own routers will need to set up telemetry themselves. - if let Some(hot_tracer) = OPENTELEMETRY_TRACER_HANDLE.get() { - // The reason that this has to happen here is that we are interacting with global state. - // If we do this logic during plugin init then if a subsequent plugin fails to init then we - // will already have set the new tracer provider and we will be in an inconsistent state. - // activate is infallible, so if we get here we know the new pipeline is ready to go. - let tracer_provider = activation - .tracer_provider - .take() - .expect("must have new tracer_provider"); - - let tracer = tracer_provider - .tracer_builder(GLOBAL_TRACER_NAME) - .with_version(env!("CARGO_PKG_VERSION")) - .build(); - hot_tracer.reload(tracer); - - let last_provider = opentelemetry::global::set_tracer_provider(tracer_provider); - - Self::checked_global_tracer_shutdown(last_provider); - - let propagator = Self::create_propagator(&self.config); - opentelemetry::global::set_text_map_propagator(propagator); - } - - activation.reload_metrics(); - - *self.builtin_instruments.write() = - create_builtin_instruments(&self.config.instrumentation.instruments); - reload_fmt(create_fmt_layer(&self.config)); - activation.is_active = true; } } impl Telemetry { - fn create_propagator(config: &config::Conf) -> TextMapCompositePropagator { - let propagation = &config.exporters.tracing.propagation; - - let tracing = &config.exporters.tracing; - - let mut propagators: Vec> = Vec::new(); - // TLDR the jaeger propagator MUST BE the first one because the version of opentelemetry_jaeger is buggy. - // It overrides the current span context with an empty one if it doesn't find the corresponding headers. - // Waiting for the >=0.16.1 release - if propagation.jaeger { - propagators.push(Box::::default()); - } - if propagation.baggage { - propagators.push(Box::::default()); - } - if propagation.trace_context || tracing.otlp.enabled { - propagators - .push(Box::::default()); - } - if propagation.zipkin || tracing.zipkin.enabled { - propagators.push(Box::::default()); - } - if propagation.datadog || tracing.datadog.enabled() { - propagators.push(Box::::default()); - } - if propagation.aws_xray { - propagators.push(Box::::default()); - } - - // This propagator MUST come last because the user is trying to override the default behavior of the - // other propagators. - if let Some(from_request_header) = &propagation.request.header_name { - propagators.push(Box::new(CustomTraceIdPropagator::new( - from_request_header.to_string(), - propagation.request.format.clone(), - ))); - } - - TextMapCompositePropagator::new(propagators) - } - - fn create_tracer_provider( - config: &config::Conf, - ) -> Result { - let tracing_config = &config.exporters.tracing; - let spans_config = &config.instrumentation.spans; - let common = &tracing_config.common; - - let mut builder = - opentelemetry_sdk::trace::TracerProvider::builder().with_config((common).into()); - - builder = setup_tracing(builder, &tracing_config.zipkin, common, spans_config)?; - builder = setup_tracing(builder, &tracing_config.datadog, common, spans_config)?; - builder = setup_tracing(builder, &tracing_config.otlp, common, spans_config)?; - builder = setup_tracing(builder, &config.apollo, common, spans_config)?; - - let tracer_provider = builder.build(); - Ok(tracer_provider) - } - - fn create_metrics_builder(config: &config::Conf) -> Result { - let metrics_config = &config.exporters.metrics; - let metrics_common_config = &metrics_config.common; - let mut builder = MetricsBuilder::new(config); - builder = setup_metrics_exporter(builder, &config.apollo, metrics_common_config)?; - builder = - setup_metrics_exporter(builder, &metrics_config.prometheus, metrics_common_config)?; - builder = setup_metrics_exporter(builder, &metrics_config.otlp, metrics_common_config)?; - Ok(builder) - } - fn filter_variables_values( variables: &Map, forward_rules: &ForwardValues, @@ -1825,19 +1645,19 @@ impl Telemetry { fn plugin_metrics(config: &Arc) { let mut attributes = Vec::new(); - if MetricsConfigurator::enabled(&config.exporters.metrics.otlp) { + if MetricsConfigurator::is_enabled(&config.exporters.metrics.otlp) { attributes.push(KeyValue::new("telemetry.metrics.otlp", true)); } if config.exporters.metrics.prometheus.enabled { attributes.push(KeyValue::new("telemetry.metrics.prometheus", true)); } - if TracingConfigurator::enabled(&config.exporters.tracing.otlp) { + if TracingConfigurator::is_enabled(&config.exporters.tracing.otlp) { attributes.push(KeyValue::new("telemetry.tracing.otlp", true)); } - if config.exporters.tracing.datadog.enabled() { + if config.exporters.tracing.datadog.is_enabled() { attributes.push(KeyValue::new("telemetry.tracing.datadog", true)); } - if config.exporters.tracing.zipkin.enabled() { + if config.exporters.tracing.zipkin.is_enabled() { attributes.push(KeyValue::new("telemetry.tracing.zipkin", true)); } @@ -1851,41 +1671,6 @@ impl Telemetry { } } - fn checked_tracer_shutdown(tracer_provider: opentelemetry_sdk::trace::TracerProvider) { - Self::checked_spawn_task(Box::new(move || { - drop(tracer_provider); - })); - } - - fn checked_global_tracer_shutdown(global_tracer_provider: GlobalTracerProvider) { - Self::checked_spawn_task(Box::new(move || { - drop(global_tracer_provider); - })); - } - - fn checked_spawn_task(task: Box) { - // If we are in an tokio async context, use `spawn_blocking()`, if not just execute the - // task. - // Note: - // - If we use spawn_blocking, then tokio looks after waiting for the task to - // terminate - // - We could spawn a thread to execute the task, but if the process terminated that would - // cause the thread to terminate which isn't ideal. Let's just run it in the current - // thread. This won't affect router performance since that will always be within the - // context of tokio. - match Handle::try_current() { - Ok(hdl) => { - hdl.spawn_blocking(move || { - task(); - }); - // We don't join here since we can't await or block_on() - } - Err(_err) => { - task(); - } - } - } - fn extract_enabled_features(full_config: &serde_json::Value) -> EnabledFeatures { EnabledFeatures { // The APQ cache enabled config defaults to true. @@ -1906,44 +1691,6 @@ impl Telemetry { } } -impl TelemetryActivation { - fn reload_metrics(&mut self) { - let meter_provider = meter_provider_internal(); - commit_prometheus(); - let mut old_meter_providers: [Option; 4] = Default::default(); - - old_meter_providers[0] = meter_provider.set( - MeterProviderType::PublicPrometheus, - self.public_prometheus_meter_provider.take(), - ); - - old_meter_providers[1] = meter_provider.set( - MeterProviderType::Apollo, - self.private_meter_provider.take(), - ); - - old_meter_providers[2] = meter_provider.set( - MeterProviderType::ApolloRealtime, - self.private_realtime_meter_provider.take(), - ); - - old_meter_providers[3] = - meter_provider.set(MeterProviderType::Public, self.public_meter_provider.take()); - - Self::checked_meter_shutdown(old_meter_providers); - } - - fn checked_meter_shutdown(meters: [Option; 4]) { - for meter_provider in meters.into_iter().flatten() { - Telemetry::checked_spawn_task(Box::new(move || { - if let Err(e) = meter_provider.shutdown() { - ::tracing::error!(error = %e, "failed to shutdown meter provider") - } - })); - } - } -} - fn filter_headers(headers: &HeaderMap, forward_rules: &ForwardHeaders) -> String { if let ForwardHeaders::None = forward_rules { return String::from("{}"); @@ -2229,21 +1976,12 @@ mod tests { .with_deserialized_config() .expect("unable to deserialize telemetry config"); - let plugin = crate::plugin::plugins() + crate::plugin::plugins() .find(|factory| factory.name == "apollo.telemetry") .expect("Plugin not found") .create_instance(init) .await - .expect("unable to create telemetry plugin"); - - let downcast = plugin - .as_any() - .downcast_ref::() - .expect("Telemetry plugin expected"); - if downcast.config.exporters.metrics.prometheus.enabled { - downcast.activation.lock().reload_metrics(); - } - plugin + .expect("unable to create telemetry plugin") } async fn get_prometheus_metrics(plugin: &dyn DynPlugin) -> String { @@ -2325,12 +2063,12 @@ mod tests { .unwrap(); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] async fn config_serialization() { create_plugin_with_config(include_str!("testdata/config.router.yaml")).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] async fn test_enabled_features() { // Explicitly enabled let plugin = create_plugin_with_config(include_str!( @@ -2408,7 +2146,7 @@ mod tests { .enabled_features } - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] async fn test_supergraph_metrics_ok() { async { let plugin = @@ -2430,7 +2168,7 @@ mod tests { .await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] async fn test_supergraph_metrics_bad_request() { async { let plugin = @@ -2480,7 +2218,7 @@ mod tests { .await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] async fn test_custom_router_instruments() { async { let plugin = @@ -2556,7 +2294,7 @@ mod tests { .await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] async fn test_custom_router_instruments_with_requirement_level() { async { let plugin = create_plugin_with_config(include_str!( @@ -2646,7 +2384,7 @@ mod tests { .await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] async fn test_custom_supergraph_instruments() { async { let plugin = @@ -2749,7 +2487,7 @@ mod tests { .await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] async fn test_custom_subgraph_instruments_level() { async { let plugin = create_plugin_with_config(include_str!( @@ -2851,7 +2589,7 @@ mod tests { .await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] async fn test_custom_subgraph_instruments() { async { let plugin = Box::new( @@ -2952,7 +2690,7 @@ mod tests { .await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] async fn test_field_instrumentation_sampler_with_preview_datadog_agent_sampling() { let plugin = create_plugin_with_config(include_str!( "testdata/config.field_instrumentation_sampler.router.yaml" @@ -3009,7 +2747,7 @@ mod tests { assert_eq!(ftv1_counter.load(Ordering::Relaxed), 10); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] async fn test_subgraph_metrics_ok() { async { let plugin = @@ -3083,7 +2821,7 @@ mod tests { .await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] async fn test_subgraph_metrics_http_error() { async { let plugin = @@ -3142,7 +2880,7 @@ mod tests { .await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] async fn it_test_prometheus_wrong_endpoint() { async { let plugin = @@ -3182,6 +2920,7 @@ mod tests { async { let plugin = create_plugin_with_config(include_str!("testdata/prometheus.router.yaml")).await; + plugin.activate(); u64_histogram!("apollo.test.histo", "it's a test", 1u64); make_supergraph_request(plugin.as_ref()).await; @@ -3198,6 +2937,7 @@ mod tests { "testdata/prometheus_custom_buckets.router.yaml" )) .await; + plugin.activate(); u64_histogram!("apollo.test.histo", "it's a test", 1u64); make_supergraph_request(plugin.as_ref()).await; @@ -3214,6 +2954,7 @@ mod tests { "testdata/prometheus_custom_buckets_specific_metrics.router.yaml" )) .await; + plugin.activate(); make_supergraph_request(plugin.as_ref()).await; u64_histogram!("apollo.test.histo", "it's a test", 1u64); assert_prometheus_metrics!(plugin); @@ -3241,9 +2982,9 @@ mod tests { async { let plugin = create_plugin_with_config(include_str!("testdata/prometheus.router.yaml")).await; + plugin.activate(); u64_histogram_with_unit!("apollo.test.histo1", "no unit", "{request}", 1u64); f64_histogram_with_unit!("apollo.test.histo2", "unit", "s", 1f64); - make_supergraph_request(plugin.as_ref()).await; assert_prometheus_metrics!(plugin); } @@ -3421,7 +3162,7 @@ mod tests { .unwrap(); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] async fn test_demand_control_delta_filter() { async { let plugin = create_plugin_with_config(include_str!( @@ -3445,7 +3186,7 @@ mod tests { .await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] async fn test_demand_control_result_filter() { async { let plugin = create_plugin_with_config(include_str!( @@ -3469,7 +3210,7 @@ mod tests { .await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] async fn test_demand_control_result_attributes() { async { let plugin = create_plugin_with_config(include_str!( diff --git a/apollo-router/src/plugins/telemetry/otel/layer.rs b/apollo-router/src/plugins/telemetry/otel/layer.rs index 369d542f61..9e8f623656 100644 --- a/apollo-router/src/plugins/telemetry/otel/layer.rs +++ b/apollo-router/src/plugins/telemetry/otel/layer.rs @@ -38,8 +38,8 @@ use crate::plugins::telemetry::consts::OTEL_STATUS_CODE; use crate::plugins::telemetry::consts::OTEL_STATUS_MESSAGE; use crate::plugins::telemetry::consts::REQUEST_SPAN_NAME; use crate::plugins::telemetry::consts::ROUTER_SPAN_NAME; -use crate::plugins::telemetry::reload::IsSampled; -use crate::plugins::telemetry::reload::SampledSpan; +use crate::plugins::telemetry::reload::otel::IsSampled; +use crate::plugins::telemetry::reload::otel::SampledSpan; use crate::query_planner::subscription::SUBSCRIPTION_EVENT_SPAN_NAME; use crate::router_factory::STARTING_SPAN_NAME; diff --git a/apollo-router/src/plugins/telemetry/otlp.rs b/apollo-router/src/plugins/telemetry/otlp.rs index 1cc11eb441..ce80e06333 100644 --- a/apollo-router/src/plugins/telemetry/otlp.rs +++ b/apollo-router/src/plugins/telemetry/otlp.rs @@ -20,7 +20,7 @@ use url::Url; use crate::plugins::telemetry::tracing::BatchProcessorConfig; -#[derive(Debug, Clone, Deserialize, JsonSchema, Default)] +#[derive(Debug, Clone, Deserialize, JsonSchema, Default, PartialEq)] #[serde(deny_unknown_fields)] #[schemars(rename = "OTLPConfig")] pub(crate) struct Config { @@ -203,14 +203,14 @@ impl Config { } } -#[derive(Debug, Clone, Deserialize, Serialize, Default, JsonSchema)] +#[derive(Debug, Clone, Deserialize, Serialize, Default, JsonSchema, PartialEq)] #[serde(deny_unknown_fields, default)] pub(crate) struct HttpExporter { /// Headers to send on report requests pub(crate) headers: HashMap, } -#[derive(Debug, Clone, Deserialize, Serialize, Default, JsonSchema)] +#[derive(Debug, Clone, Deserialize, Serialize, Default, JsonSchema, PartialEq)] #[serde(deny_unknown_fields, default)] pub(crate) struct GrpcExporter { /// The optional domain name for tls config. @@ -272,7 +272,7 @@ impl GrpcExporter { } } -#[derive(Debug, Default, Clone, Deserialize, Serialize, JsonSchema)] +#[derive(Debug, Default, Clone, Deserialize, Serialize, JsonSchema, PartialEq)] #[serde(deny_unknown_fields, rename_all = "snake_case")] pub(crate) enum Protocol { #[default] @@ -280,7 +280,7 @@ pub(crate) enum Protocol { Http, } -#[derive(Debug, Default, Clone, Deserialize, Serialize, JsonSchema)] +#[derive(Debug, Default, Clone, Deserialize, Serialize, JsonSchema, PartialEq)] #[serde(deny_unknown_fields, rename_all = "snake_case")] pub(crate) enum Temporality { /// Export cumulative metrics. diff --git a/apollo-router/src/plugins/telemetry/reload/activation.rs b/apollo-router/src/plugins/telemetry/reload/activation.rs new file mode 100644 index 0000000000..82e993ba60 --- /dev/null +++ b/apollo-router/src/plugins/telemetry/reload/activation.rs @@ -0,0 +1,259 @@ +//! Telemetry activation state container +//! +//! This module provides the [`Activation`] type which acts as a container for all telemetry +//! components that will be activated when the router is ready to commit configuration changes. +//! +//! ## Purpose +//! +//! The [`Activation`] struct collects new telemetry components during the preparation phase: +//! - Meter providers (for metrics) +//! - Tracer provider (for distributed tracing) +//! - Trace propagation configuration +//! - Prometheus registry (if enabled) +//! - Logging format layer +//! +//! ## Safe Resource Management +//! +//! OpenTelemetry providers perform blocking I/O during shutdown, which can deadlock if executed +//! on async runtime threads. This module ensures safety by: +//! +//! 1. **During commit**: Old providers being replaced are moved to blocking tasks for safe shutdown +//! 2. **During drop**: Any uncommitted providers are moved to blocking tasks for cleanup +//! +//! This prevents blocking the async runtime while ensuring all resources are properly cleaned up. + +use std::collections::HashMap; +use std::sync::LazyLock; + +use opentelemetry::propagation::TextMapCompositePropagator; +use opentelemetry::trace::TracerProvider; +use parking_lot::Mutex; +use prometheus::Registry; +use tokio::task::spawn_blocking; +use tracing_subscriber::Layer; + +use crate::metrics::aggregation::MeterProviderType; +use crate::metrics::filter::FilterMeterProvider; +use crate::metrics::meter_provider_internal; +use crate::plugins::telemetry::GLOBAL_TRACER_NAME; +use crate::plugins::telemetry::reload::otel::LayeredTracer; +use crate::plugins::telemetry::reload::otel::OPENTELEMETRY_TRACER_HANDLE; +use crate::plugins::telemetry::reload::otel::reload_fmt; + +/// State container for telemetry components to be activated. +/// +/// Collects new telemetry providers and configuration during the preparation phase, +/// then atomically applies them during the activation phase via [`Activation::commit()`]. +pub(crate) struct Activation { + /// The new tracer provider. None means leave the existing one + new_trace_provider: Option, + + /// The new tracer propagator. None means leave the existing one + new_trace_propagator: Option, + + /// The new metrics providers. Absent entry for a particular meter provider type + /// means leave the existing one as is + new_meter_providers: HashMap, + + /// The registry that backs prometheus + /// Unlike the other fields in this struct there is no noop implementation + /// Therefore if this is None then Prometheus is not active + /// This will be defaulted to the last applied registry via static unfortunately + /// We can remove this static if eventually we have a facility for plugins to maintain state across reloads. + prometheus_registry: Option, + + /// The new format layer + new_logging_fmt_layer: Option + Send + Sync>>, + + /// Test instrumentation to track what components were set + #[cfg(test)] + test_instrumentation: TestInstrumentation, +} + +#[cfg(test)] +#[derive(Default, Debug, Clone)] +pub(crate) struct TestInstrumentation { + pub(crate) tracer_provider_set: bool, + pub(crate) tracer_propagator_set: bool, + pub(crate) meter_providers_added: std::collections::HashSet, + pub(crate) prometheus_registry_set: bool, + pub(crate) logging_layer_set: bool, +} + +/// Allows us to keep track of the last registry that was used. Not ideal. Plugins would be better to have state +/// that can be maintained across reloads. +static REGISTRY: LazyLock>> = LazyLock::new(Default::default); + +impl Activation { + pub(crate) fn new() -> Self { + Self { + new_trace_provider: None, + new_trace_propagator: None, + new_meter_providers: HashMap::default(), + // We can remove this is we allow state to be maintained across plugin reloads + prometheus_registry: REGISTRY.lock().clone(), + new_logging_fmt_layer: None, + #[cfg(test)] + test_instrumentation: TestInstrumentation::default(), + } + } + + pub(crate) fn with_logging( + &mut self, + logging_layer: Box + Send + Sync>, + ) { + self.new_logging_fmt_layer = Some(logging_layer); + #[cfg(test)] + { + self.test_instrumentation.logging_layer_set = true; + } + } + + pub(crate) fn with_tracer_propagator(&mut self, tracer_propagator: TextMapCompositePropagator) { + self.new_trace_propagator = Some(tracer_propagator); + #[cfg(test)] + { + self.test_instrumentation.tracer_propagator_set = true; + } + } + + pub(crate) fn add_meter_providers( + &mut self, + meter_providers: impl IntoIterator, + ) { + for (meter_provider_type, meter_provider) in meter_providers { + self.new_meter_providers + .insert(meter_provider_type, meter_provider); + #[cfg(test)] + { + self.test_instrumentation + .meter_providers_added + .insert(meter_provider_type); + } + } + } + + pub(crate) fn with_tracer_provider( + &mut self, + tracer_provider: opentelemetry_sdk::trace::TracerProvider, + ) { + self.new_trace_provider = Some(tracer_provider); + #[cfg(test)] + { + self.test_instrumentation.tracer_provider_set = true; + } + } + + pub(crate) fn with_prometheus_registry(&mut self, prometheus_registry: Option) { + self.prometheus_registry = prometheus_registry; + #[cfg(test)] + { + self.test_instrumentation.prometheus_registry_set = true; + } + } + + pub(crate) fn prometheus_registry(&self) -> Option { + self.prometheus_registry.clone() + } + + #[cfg(test)] + pub(crate) fn test_instrumentation(&self) -> &TestInstrumentation { + &self.test_instrumentation + } +} + +impl Activation { + /// Commits the prepared telemetry state to global OpenTelemetry providers (Phase 2 of reload lifecycle). + /// + /// This method atomically updates all global telemetry state: + /// 1. Swaps in new tracer provider and updates the hot-reload handle + /// 2. Updates trace context propagation configuration + /// 3. Swaps in new meter providers for metrics collection + /// 4. Updates logging format layer + /// 5. Stores Prometheus registry for future endpoint creation + /// + /// Old providers are safely shut down in blocking tasks to avoid deadlocking the async runtime. + /// + /// This method cannot not fail - by the time we reach activation, all plugins have been + /// successfully initialized and we are committed to applying the new configuration. + pub(crate) fn commit(mut self) { + self.reload_tracing(); + self.reload_trace_propagation(); + self.reload_metrics(); + self.reload_logging(); + *REGISTRY.lock() = self.prometheus_registry.clone(); + } + + fn reload_tracing(&mut self) { + // Only apply things if we were executing in the context of a vanilla the Apollo executable. + // Users that are rolling their own routers will need to set up telemetry themselves. + if let Some(hot_tracer) = OPENTELEMETRY_TRACER_HANDLE.get() + && let Some(tracer_provider) = self.new_trace_provider.take() + { + // Build a new tracer from the provider and hot-swap it into the tracing subscriber + let tracer = tracer_provider + .tracer_builder(GLOBAL_TRACER_NAME) + .with_version(env!("CARGO_PKG_VERSION")) + .build(); + hot_tracer.reload(tracer); + + // Install the new provider globally and safely drop the old one in a blocking task + let last_provider = opentelemetry::global::set_tracer_provider(tracer_provider); + spawn_blocking(move || { + drop(last_provider); + }); + } + } + + /// Reloads metrics providers, installing new ones and storing the old ones for safe shutdown on drop. + /// + /// This performs an atomic swap: new providers are installed and old providers are stored back + /// in `self.new_meter_providers`. The old providers will be safely dropped when this `Activation` + /// is dropped (using blocking tasks to avoid runtime deadlocks). + pub(crate) fn reload_metrics(&mut self) { + let global_meter_provider = meter_provider_internal(); + // Swap new meter providers with old ones. Old providers stored here will be + // safely dropped in the Drop implementation using blocking tasks. + for (meter_provider_type, meter_provider) in std::mem::take(&mut self.new_meter_providers) { + self.new_meter_providers.insert( + meter_provider_type, + global_meter_provider.set(meter_provider_type, meter_provider), + ); + } + } + + fn reload_logging(&mut self) { + if let Some(fmt_layer) = self.new_logging_fmt_layer.take() { + reload_fmt(fmt_layer); + } + } + + fn reload_trace_propagation(&mut self) { + if let Some(propagator) = self.new_trace_propagator.take() { + opentelemetry::global::set_text_map_propagator(propagator); + } + } +} + +/// Safely drops OpenTelemetry providers using blocking tasks (Phase 3 of reload lifecycle). +/// +/// OpenTelemetry providers perform blocking I/O during shutdown (flushing buffers, closing connections). +/// If dropped on an async runtime thread, this can deadlock the runtime. This Drop implementation ensures +/// all providers are moved to blocking tasks for safe cleanup. +/// +/// This runs in two scenarios: +/// 1. **After commit**: Drops the old providers that were replaced +/// 2. **If preparation fails**: Drops the new providers that were never activated +impl Drop for Activation { + fn drop(&mut self) { + // Drop all meter providers in blocking tasks to avoid runtime deadlocks + for meter_provider in std::mem::take(&mut self.new_meter_providers).into_values() { + spawn_blocking(move || drop(meter_provider)); + } + + // Drop tracer provider in blocking task if present + if let Some(tracer_provider) = self.new_trace_provider.take() { + spawn_blocking(move || drop(tracer_provider)); + } + } +} diff --git a/apollo-router/src/plugins/telemetry/reload/builder.rs b/apollo-router/src/plugins/telemetry/reload/builder.rs new file mode 100644 index 0000000000..0b7395fd03 --- /dev/null +++ b/apollo-router/src/plugins/telemetry/reload/builder.rs @@ -0,0 +1,581 @@ +//! Telemetry configuration change detection and provider construction +//! +//! This module provides the [`Builder`] which orchestrates the preparation phase of telemetry reloading. +//! +//! ## Purpose +//! +//! The builder is responsible for: +//! 1. **Change detection** - Comparing previous and current configurations to determine what needs reloading +//! 2. **Provider construction** - Building new OpenTelemetry providers only when necessary +//! 3. **State collection** - Gathering all prepared components into an [`Activation`] for the activation phase +//! +//! ## Change Detection +//! +//! The builder uses trait-based configuration access ([`MetricsConfigurator`], [`TracingConfigurator`]) +//! to detect changes in specific exporter configurations. This allows it to: +//! - Reload only the components that have changed +//! - Preserve existing providers when configuration is unchanged +//! - Handle both common settings (service name, resource attributes) and exporter-specific settings +//! +//! ## Construction Safety +//! +//! External exporters may perform blocking I/O during construction, so the entire build process +//! runs in [`block_in_place`] to avoid blocking the async runtime. + +use multimap::MultiMap; +use tokio::task::block_in_place; +use tower::BoxError; +use tower::ServiceExt; + +use crate::Endpoint; +use crate::ListenAddr; +use crate::metrics::aggregation::MeterProviderType; +use crate::plugins::telemetry::apollo; +use crate::plugins::telemetry::apollo_exporter::Sender; +use crate::plugins::telemetry::config::Conf; +use crate::plugins::telemetry::fmt_layer::create_fmt_layer; +use crate::plugins::telemetry::metrics; +use crate::plugins::telemetry::metrics::prometheus::PrometheusService; +use crate::plugins::telemetry::otlp; +use crate::plugins::telemetry::reload::activation::Activation; +use crate::plugins::telemetry::reload::metrics::MetricsBuilder; +use crate::plugins::telemetry::reload::metrics::MetricsConfigurator; +use crate::plugins::telemetry::reload::tracing::TracingBuilder; +use crate::plugins::telemetry::reload::tracing::TracingConfigurator; +use crate::plugins::telemetry::reload::tracing::create_propagator; +use crate::plugins::telemetry::tracing::datadog; +use crate::plugins::telemetry::tracing::zipkin; + +/// Orchestrates telemetry reload preparation by detecting configuration changes +/// and constructing new providers as needed. +pub(super) struct Builder<'a> { + previous_config: &'a Option, + config: &'a Conf, + activation: Activation, + endpoints: MultiMap, + apollo_sender: Sender, +} + +impl<'a> Builder<'a> { + pub(super) fn new(previous_config: &'a Option, config: &'a Conf) -> Self { + Self { + previous_config, + config, + activation: Activation::new(), + endpoints: Default::default(), + apollo_sender: Sender::Noop, + } + } + + pub(super) fn build( + mut self, + ) -> Result<(Activation, MultiMap, Sender), BoxError> { + // We can't guarantee that exporters from external libraries will not perform blocking io during or after construction. + // Use block_in_place to avoid any chance of blocking the main rt threads + block_in_place(|| { + self.setup_logging(); + self.setup_public_tracing()?; + self.setup_public_metrics()?; + self.setup_apollo_metrics()?; + self.setup_propagation(); + Ok((self.activation, self.endpoints, self.apollo_sender)) + }) + } + + fn setup_public_metrics(&mut self) -> Result<(), BoxError> { + if self.is_metrics_config_changed::() + || self.is_metrics_config_changed::() + { + ::tracing::debug!("configuring metrics"); + let mut builder = MetricsBuilder::new(self.config); + builder.configure(&self.config.exporters.metrics.prometheus)?; + builder.configure(&self.config.exporters.metrics.otlp)?; + builder.configure_views(MeterProviderType::Public)?; + + let (prometheus_registry, meter_providers, _) = builder.build(); + self.activation + .with_prometheus_registry(prometheus_registry); + self.activation.add_meter_providers(meter_providers); + } + // Always create Prometheus endpoint if we have a registry (either new or existing). + // The activation holds either the newly created registry or the previous one if unchanged. + // This will be None only if Prometheus has never been configured. + if let Some(prometheus_registry) = self.activation.prometheus_registry() { + let listen = self.config.exporters.metrics.prometheus.listen.clone(); + let path = self.config.exporters.metrics.prometheus.path.clone(); + self.endpoints.insert( + listen, + Endpoint::from_router_service( + path, + PrometheusService { + registry: prometheus_registry.clone(), + } + .boxed(), + ), + ); + } + Ok(()) + } + + fn setup_public_tracing(&mut self) -> Result<(), BoxError> { + if self.is_tracing_config_changed::() + || self.is_tracing_config_changed::() + || self.is_tracing_config_changed::() + || self.is_tracing_config_changed::() + { + ::tracing::debug!("configuring tracing"); + let mut builder = TracingBuilder::new(self.config); + builder.configure(&self.config.exporters.tracing.otlp)?; + builder.configure(&self.config.exporters.tracing.zipkin)?; + builder.configure(&self.config.exporters.tracing.datadog)?; + builder.configure(&self.config.apollo)?; + + self.activation.with_tracer_provider(builder.build()) + } + Ok(()) + } + + fn setup_apollo_metrics(&mut self) -> Result<(), BoxError> { + ::tracing::debug!("configuring Apollo metrics"); + // Apollo metrics are always rebuilt (no change detection) because the sender + // needs to be populated on every reload. The sender cannot be stored globally + // and must be returned from the prepare phase. + let mut builder = MetricsBuilder::new(self.config); + builder.configure(&self.config.apollo)?; + let (_, meter_providers, sender) = builder.build(); + self.activation.add_meter_providers(meter_providers); + self.apollo_sender = sender; + Ok(()) + } + + /// Detects if metrics config has changed for a specific exporter. + /// + /// Returns `true` if: + /// - This is the first run (no previous config) + /// - The exporter-specific config has changed + /// - Common metrics settings (service name, resource attributes, etc.) have changed + fn is_metrics_config_changed(&self) -> bool { + let Some(previous_config) = self.previous_config else { + return true; + }; + T::config(previous_config) != T::config(self.config) + || previous_config.exporters.metrics.common != self.config.exporters.metrics.common + } + + /// Detects if tracing config has changed for a specific exporter. + /// + /// Returns `true` if: + /// - This is the first run (no previous config) + /// - The exporter-specific config has changed + /// - Common tracing settings (service name, sampler, span limits, etc.) have changed + fn is_tracing_config_changed(&self) -> bool { + let Some(previous_config) = self.previous_config else { + return true; + }; + T::config(previous_config) != T::config(self.config) + || previous_config.exporters.tracing.common != self.config.exporters.tracing.common + } + + fn setup_propagation(&mut self) { + let propagators = create_propagator( + &self.config.exporters.tracing.propagation, + &self.config.exporters.tracing, + ); + self.activation.with_tracer_propagator(propagators); + } + + fn setup_logging(&mut self) { + ::tracing::debug!("configuring logging"); + self.activation.with_logging(create_fmt_layer(self.config)); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::plugins::telemetry::apollo; + use crate::plugins::telemetry::config::Exporters; + use crate::plugins::telemetry::config::Instrumentation; + use crate::plugins::telemetry::config::Metrics; + use crate::plugins::telemetry::config::Tracing; + + fn create_default_config() -> Conf { + Conf { + apollo: apollo::Config::default(), + exporters: Exporters { + metrics: Metrics { + common: Default::default(), + otlp: Default::default(), + prometheus: Default::default(), + }, + tracing: Tracing::default(), + logging: Default::default(), + }, + instrumentation: Instrumentation::default(), + } + } + + fn create_config_with_prometheus_enabled() -> Conf { + let mut config = create_default_config(); + config.exporters.metrics.prometheus.enabled = true; + config + } + + fn create_config_with_otlp_metrics_enabled() -> Conf { + let mut config = create_default_config(); + config.exporters.metrics.otlp.enabled = true; + config + } + + fn create_config_with_otlp_tracing_enabled() -> Conf { + let mut config = create_default_config(); + config.exporters.tracing.otlp.enabled = true; + config + } + + fn create_config_with_apollo_enabled() -> Conf { + let mut config = create_default_config(); + config.apollo.apollo_key = Some("test-key".to_string()); + config.apollo.apollo_graph_ref = Some("test@current".to_string()); + config + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_no_reload_when_configs_identical() { + let config = create_default_config(); + let previous_config = Some(config.clone()); + + let builder = Builder::new(&previous_config, &config); + let (activation, _endpoints, _sender) = builder.build().unwrap(); + + // When configs are identical, only certain things should be set + let instr = activation.test_instrumentation(); + assert!( + !instr.tracer_provider_set, + "Tracer provider should not reload when configs identical" + ); + assert!( + !instr.prometheus_registry_set, + "Prometheus registry should not reload when configs identical" + ); + // Apollo metrics should not be added when not configured (no apollo key/graph ref) + assert!( + instr.meter_providers_added.is_empty(), + "No meter providers should be added when configs identical and apollo not configured" + ); + // Logging and propagation always get set + assert!(instr.logging_layer_set, "Logging should always be set"); + assert!( + instr.tracer_propagator_set, + "Propagator should always be set" + ); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_metrics_reload_on_prometheus_change() { + let previous_config = Some(create_default_config()); + let config = create_config_with_prometheus_enabled(); + + let builder = Builder::new(&previous_config, &config); + let (activation, _endpoints, _sender) = builder.build().unwrap(); + + let instr = activation.test_instrumentation(); + // Prometheus config changed, so metrics should reload + assert!( + instr.prometheus_registry_set, + "Prometheus registry should be set when config changes" + ); + assert!( + instr + .meter_providers_added + .contains(&MeterProviderType::Public), + "Public meter provider should be added" + ); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_metrics_reload_on_otlp_change() { + let previous_config = Some(create_default_config()); + let config = create_config_with_otlp_metrics_enabled(); + + let builder = Builder::new(&previous_config, &config); + let (activation, _endpoints, _sender) = builder.build().unwrap(); + + let instr = activation.test_instrumentation(); + // OTLP metrics config changed, so metrics should reload + assert!( + instr + .meter_providers_added + .contains(&MeterProviderType::Public), + "Public meter provider should be added when OTLP metrics changes" + ); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_tracing_reload_on_otlp_change() { + let previous_config = Some(create_default_config()); + let config = create_config_with_otlp_tracing_enabled(); + + let builder = Builder::new(&previous_config, &config); + let (activation, _endpoints, _sender) = builder.build().unwrap(); + + let instr = activation.test_instrumentation(); + // OTLP tracing config changed, so tracing should reload + assert!( + instr.tracer_provider_set, + "Tracer provider should be set when OTLP tracing changes" + ); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_apollo_metrics_always_rebuild_when_enabled() { + let config = create_config_with_apollo_enabled(); + let previous_config = Some(config.clone()); + + let builder = Builder::new(&previous_config, &config); + let (activation, _endpoints, _sender) = builder.build().unwrap(); + + let instr = activation.test_instrumentation(); + // Apollo metrics should always rebuild when apollo is configured + assert!( + instr + .meter_providers_added + .contains(&MeterProviderType::Apollo) + || instr + .meter_providers_added + .contains(&MeterProviderType::ApolloRealtime), + "Apollo metrics should always rebuild when apollo is configured" + ); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_first_run_builds_everything() { + let config = create_default_config(); + let previous_config = None; // First run, no previous config + + let builder = Builder::new(&previous_config, &config); + let (activation, _endpoints, _sender) = builder.build().unwrap(); + + let instr = activation.test_instrumentation(); + // First run should build everything + assert!( + instr.tracer_provider_set, + "First run should build tracer provider" + ); + assert!( + instr.tracer_propagator_set, + "First run should set tracer propagator" + ); + assert!( + instr.logging_layer_set, + "First run should set logging layer" + ); + // But no meter providers get added if nothing is configured + assert!( + instr.meter_providers_added.is_empty(), + "No meter providers added on first run when nothing enabled" + ); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_first_run_with_apollo_enabled() { + let config = create_config_with_apollo_enabled(); + let previous_config = None; // First run, no previous config + + let builder = Builder::new(&previous_config, &config); + let (activation, _endpoints, _sender) = builder.build().unwrap(); + + let instr = activation.test_instrumentation(); + // First run with apollo enabled should build apollo meters + assert!( + instr.tracer_provider_set, + "First run should build tracer provider" + ); + assert!( + instr.tracer_propagator_set, + "First run should set tracer propagator" + ); + assert!( + instr.logging_layer_set, + "First run should set logging layer" + ); + // Apollo meter providers should be added on first run when apollo is enabled + assert!( + instr + .meter_providers_added + .contains(&MeterProviderType::Apollo) + || instr + .meter_providers_added + .contains(&MeterProviderType::ApolloRealtime), + "First run should add apollo meter providers when apollo enabled" + ); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_metrics_common_change_triggers_reload() { + let previous_config = create_config_with_prometheus_enabled(); + let mut config = create_config_with_prometheus_enabled(); + config.exporters.metrics.common.service_name = Some("new-service".to_string()); + + let previous_config_opt = Some(previous_config); + let builder = Builder::new(&previous_config_opt, &config); + let (activation, _endpoints, _sender) = builder.build().unwrap(); + + let instr = activation.test_instrumentation(); + // Common config changed, so metrics should reload even when only common settings change + assert!( + instr + .meter_providers_added + .contains(&MeterProviderType::Public), + "Public meter provider should reload when common config changes" + ); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_tracing_common_change_triggers_reload() { + let previous_config = create_config_with_otlp_tracing_enabled(); + let mut config = create_config_with_otlp_tracing_enabled(); + config.exporters.tracing.common.service_name = Some("new-service".to_string()); + + let previous_config_opt = Some(previous_config); + let builder = Builder::new(&previous_config_opt, &config); + let (activation, _endpoints, _sender) = builder.build().unwrap(); + + let instr = activation.test_instrumentation(); + // Common config changed, so tracing should reload even when only common settings change + assert!( + instr.tracer_provider_set, + "Tracer provider should reload when common config changes" + ); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_metrics_common_service_namespace_change() { + let previous_config = create_config_with_prometheus_enabled(); + let mut config = create_config_with_prometheus_enabled(); + config.exporters.metrics.common.service_namespace = Some("new-namespace".to_string()); + + let previous_config_opt = Some(previous_config); + let builder = Builder::new(&previous_config_opt, &config); + let (activation, _endpoints, _sender) = builder.build().unwrap(); + + let instr = activation.test_instrumentation(); + assert!( + instr + .meter_providers_added + .contains(&MeterProviderType::Public), + "Public meter provider should reload when service_namespace changes" + ); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_metrics_common_resource_change() { + let previous_config = create_config_with_prometheus_enabled(); + let mut config = create_config_with_prometheus_enabled(); + config.exporters.metrics.common.resource.insert( + "deployment.environment".to_string(), + crate::plugins::telemetry::config::AttributeValue::String("staging".to_string()), + ); + + let previous_config_opt = Some(previous_config); + let builder = Builder::new(&previous_config_opt, &config); + let (activation, _endpoints, _sender) = builder.build().unwrap(); + + let instr = activation.test_instrumentation(); + assert!( + instr + .meter_providers_added + .contains(&MeterProviderType::Public), + "Public meter provider should reload when resource attributes change" + ); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_metrics_common_buckets_change() { + let previous_config = create_config_with_prometheus_enabled(); + let mut config = create_config_with_prometheus_enabled(); + config.exporters.metrics.common.buckets = vec![0.1, 0.5, 1.0, 2.5, 5.0, 10.0]; + + let previous_config_opt = Some(previous_config); + let builder = Builder::new(&previous_config_opt, &config); + let (activation, _endpoints, _sender) = builder.build().unwrap(); + + let instr = activation.test_instrumentation(); + assert!( + instr + .meter_providers_added + .contains(&MeterProviderType::Public), + "Public meter provider should reload when histogram buckets change" + ); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_tracing_common_service_namespace_change() { + let previous_config = create_config_with_otlp_tracing_enabled(); + let mut config = create_config_with_otlp_tracing_enabled(); + config.exporters.tracing.common.service_namespace = Some("new-namespace".to_string()); + + let previous_config_opt = Some(previous_config); + let builder = Builder::new(&previous_config_opt, &config); + let (activation, _endpoints, _sender) = builder.build().unwrap(); + + let instr = activation.test_instrumentation(); + assert!( + instr.tracer_provider_set, + "Tracer provider should reload when service_namespace changes" + ); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_tracing_common_sampler_change() { + let previous_config = create_config_with_otlp_tracing_enabled(); + let mut config = create_config_with_otlp_tracing_enabled(); + config.exporters.tracing.common.sampler = + crate::plugins::telemetry::config::SamplerOption::TraceIdRatioBased(0.5); + + let previous_config_opt = Some(previous_config); + let builder = Builder::new(&previous_config_opt, &config); + let (activation, _endpoints, _sender) = builder.build().unwrap(); + + let instr = activation.test_instrumentation(); + assert!( + instr.tracer_provider_set, + "Tracer provider should reload when sampler changes" + ); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_tracing_common_parent_based_sampler_change() { + let previous_config = create_config_with_otlp_tracing_enabled(); + let mut config = create_config_with_otlp_tracing_enabled(); + config.exporters.tracing.common.parent_based_sampler = false; + + let previous_config_opt = Some(previous_config); + let builder = Builder::new(&previous_config_opt, &config); + let (activation, _endpoints, _sender) = builder.build().unwrap(); + + let instr = activation.test_instrumentation(); + assert!( + instr.tracer_provider_set, + "Tracer provider should reload when parent_based_sampler changes" + ); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_tracing_common_span_limits_change() { + let previous_config = create_config_with_otlp_tracing_enabled(); + let mut config = create_config_with_otlp_tracing_enabled(); + config.exporters.tracing.common.max_events_per_span = 256; + config.exporters.tracing.common.max_attributes_per_span = 64; + + let previous_config_opt = Some(previous_config); + let builder = Builder::new(&previous_config_opt, &config); + let (activation, _endpoints, _sender) = builder.build().unwrap(); + + let instr = activation.test_instrumentation(); + assert!( + instr.tracer_provider_set, + "Tracer provider should reload when span limits change" + ); + } +} diff --git a/apollo-router/src/plugins/telemetry/reload/metrics.rs b/apollo-router/src/plugins/telemetry/reload/metrics.rs new file mode 100644 index 0000000000..8526540574 --- /dev/null +++ b/apollo-router/src/plugins/telemetry/reload/metrics.rs @@ -0,0 +1,184 @@ +//! Metrics provider construction +//! +//! This module provides tools for building OpenTelemetry meter providers from router configuration. +//! +//! ## Purpose +//! +//! The [`MetricsBuilder`] constructs meter providers for different telemetry backends: +//! - **Public metrics** - Prometheus and OTLP exporters for general observability +//! - **Apollo metrics** - Special meter providers for Apollo Studio reporting +//! +//! ## Configurator Pattern +//! +//! The [`MetricsConfigurator`] trait allows different metric exporters to contribute to the +//! builder in a uniform way. Each exporter (Prometheus, OTLP, Apollo) implements this trait +//! to extract its configuration and add appropriate readers and views to the builder. +//! +//! ## Provider Types +//! +//! Multiple meter providers are created to serve different purposes: +//! - `Public` - For standard metrics exposed via Prometheus or sent to OTLP collectors +//! - `Apollo`/`ApolloRealtime` - For metrics sent to Apollo Studio with specific filtering + +use ahash::HashMap; +use opentelemetry_sdk::Resource; +use opentelemetry_sdk::metrics::MeterProviderBuilder; +use opentelemetry_sdk::metrics::SdkMeterProvider; +use opentelemetry_sdk::metrics::View; +use prometheus::Registry; +use tower::BoxError; + +use crate::_private::telemetry::ConfigResource; +use crate::metrics::aggregation::MeterProviderType; +use crate::metrics::filter::FilterMeterProvider; +use crate::plugins::telemetry::apollo_exporter::Sender; +use crate::plugins::telemetry::config::Conf; +use crate::plugins::telemetry::config::MetricsCommon; + +/// Trait for metric exporters to contribute to meter provider construction +pub(crate) trait MetricsConfigurator { + fn config(conf: &Conf) -> &Self; + fn is_enabled(&self) -> bool; + fn configure<'a>(&self, builder: &mut MetricsBuilder<'a>) -> Result<(), BoxError>; +} + +/// Builder for constructing OpenTelemetry meter providers. +/// +/// Accumulates configuration from multiple exporters and builds the final meter providers +/// with appropriate readers, views, and resource attributes. +pub(crate) struct MetricsBuilder<'a> { + meter_provider_builders: + HashMap, + apollo_metrics_sender: Sender, + prometheus_registry: Option, + metrics_common: &'a MetricsCommon, + resource: Resource, +} + +impl<'a> MetricsBuilder<'a> { + pub(crate) fn build( + self, + ) -> ( + Option, + HashMap, + Sender, + ) { + ( + self.prometheus_registry, + self.meter_provider_builders + .into_iter() + .map(|(k, v)| { + ( + k, + match k { + MeterProviderType::Public => FilterMeterProvider::public(v.build()), + MeterProviderType::OtelDefault => { + FilterMeterProvider::public(v.build()) + } + MeterProviderType::Apollo => FilterMeterProvider::apollo(v.build()), + MeterProviderType::ApolloRealtime => { + FilterMeterProvider::apollo_realtime(v.build()) + } + }, + ) + }) + .collect(), + self.apollo_metrics_sender, + ) + } + pub(crate) fn configure(&mut self, config: &T) -> Result<(), BoxError> { + if config.is_enabled() { + return config.configure(self); + } + Ok(()) + } + + pub(crate) fn new(config: &'a Conf) -> Self { + let resource = config.exporters.metrics.common.to_resource(); + + Self { + meter_provider_builders: HashMap::default(), + resource, + apollo_metrics_sender: Sender::default(), + prometheus_registry: None, + metrics_common: &config.exporters.metrics.common, + } + } + pub(crate) fn metrics_common(&self) -> &MetricsCommon { + self.metrics_common + } + pub(crate) fn with_prometheus_registry(&mut self, prometheus_registry: Registry) -> &mut Self { + self.prometheus_registry = Some(prometheus_registry); + self + } + pub(crate) fn with_apollo_metrics_sender( + &mut self, + apollo_metrics_sender: Sender, + ) -> &mut Self { + self.apollo_metrics_sender = apollo_metrics_sender; + self + } + pub(crate) fn with_reader( + &mut self, + meter_provider_type: MeterProviderType, + reader: T, + ) -> &mut Self { + let meter_provider = self.meter_provider(meter_provider_type); + *meter_provider = std::mem::take(meter_provider).with_reader(reader); + self + } + + pub(crate) fn with_view( + &mut self, + meter_provider_type: MeterProviderType, + view: Box, + ) -> &mut Self { + let meter_provider = self.meter_provider(meter_provider_type); + *meter_provider = std::mem::take(meter_provider).with_view(view); + self + } + + pub(crate) fn with_resource( + &mut self, + meter_provider_type: MeterProviderType, + resource: Resource, + ) -> &mut Self { + let meter_provider = self.meter_provider(meter_provider_type); + *meter_provider = std::mem::take(meter_provider).with_resource(resource); + self + } + + /// Gets or creates a meter provider builder for a specific type. + /// + /// Note: Only Public and OtelDefault providers include resource attributes. + /// Apollo providers omit resources as they're not sent to Apollo Studio. + fn meter_provider( + &mut self, + meter_provider_type: MeterProviderType, + ) -> &mut MeterProviderBuilder { + self.meter_provider_builders + .entry(meter_provider_type) + .or_insert_with(|| match meter_provider_type { + // Public and default providers include resource attributes (service name, etc.) + MeterProviderType::Public => { + SdkMeterProvider::builder().with_resource(self.resource.clone()) + } + MeterProviderType::OtelDefault => { + SdkMeterProvider::builder().with_resource(self.resource.clone()) + } + // Apollo providers omit resource attributes - not sent to Apollo Studio + MeterProviderType::Apollo => SdkMeterProvider::builder(), + MeterProviderType::ApolloRealtime => SdkMeterProvider::builder(), + }) + } + + pub(crate) fn configure_views( + &mut self, + meter_provider_type: MeterProviderType, + ) -> Result<(), BoxError> { + for metric_view in self.metrics_common().views.clone() { + self.with_view(meter_provider_type, metric_view.try_into()?); + } + Ok(()) + } +} diff --git a/apollo-router/src/plugins/telemetry/reload/mod.rs b/apollo-router/src/plugins/telemetry/reload/mod.rs new file mode 100644 index 0000000000..c46edcb4e6 --- /dev/null +++ b/apollo-router/src/plugins/telemetry/reload/mod.rs @@ -0,0 +1,330 @@ +//! Reload support for telemetry +//! +//! Telemetry reloading is complex because it modifies global OpenTelemetry state that must remain +//! consistent across the entire application. The challenge is that plugins may fail during initialization, +//! so we cannot safely modify global state at that time without risking leaving the system in an +//! inconsistent state if a later plugin fails. +//! +//! ## Lifecycle Overview +//! +//! The reload process follows a three-phase lifecycle: +//! +//! ### 1. Preparation Phase (`prepare`) +//! - Called during plugin initialization with the current and previous configurations +//! - The [`Builder`] detects what has changed by comparing configurations +//! - For each changed component (metrics, tracing, etc.), new providers are constructed +//! - An [`Activation`] object is created containing all the prepared state +//! - This phase is fallible - errors will prevent the reload +//! +//! ### 2. Activation Phase (`activate`) +//! - Called via `PluginPrivate::activate()` after all plugins are successfully initialized +//! - The [`Activation::commit()`] method atomically updates global state +//! - Old providers are safely shut down using blocking tasks +//! - This phase is infallible - by this point we're committed to the new configuration +//! +//! ### 3. Cleanup Phase (Drop) +//! - When activation is dropped (or if preparation fails), OpenTelemetry resources are cleaned up +//! - Meter and tracer providers perform blocking I/O in their Drop implementations +//! - These are moved to blocking tasks to avoid blocking async runtime threads +//! +//! ## Module Structure +//! +//! * [`otel`] - Global state management and initialization of the tracing subscriber +//! * [`activation`] - State container for the activation phase, handles safe provider replacement +//! * [`builder`] - Configuration change detection and construction of new providers +//! * [`metrics`] - Building meter providers from configuration +//! * [`tracing`] - Building trace providers from configuration +use multimap::MultiMap; +use tower::BoxError; + +use crate::Endpoint; +use crate::ListenAddr; +use crate::plugins::telemetry::apollo_exporter; +use crate::plugins::telemetry::config::Conf; +use crate::plugins::telemetry::reload::activation::Activation; +use crate::plugins::telemetry::reload::builder::Builder; + +pub(crate) mod activation; +pub(crate) mod builder; +pub(crate) mod metrics; +pub(crate) mod otel; +pub(crate) mod tracing; + +/// Prepares telemetry components for activation (Phase 1 of reload lifecycle). +/// +/// This is the entry point for the preparation phase. It: +/// 1. Detects configuration changes by comparing `previous_config` with `config` +/// 2. Constructs new providers only for components that have changed +/// 3. Returns an `Activation` object containing the prepared state +/// +/// # Returns +/// +/// A tuple containing: +/// - [`Activation`] - Prepared telemetry state to be activated later +/// - Prometheus endpoints (if Prometheus is enabled) +/// - Apollo metrics sender (if Apollo is configured) +/// +/// # Errors +/// +/// Returns an error if any provider construction fails. This prevents +/// the reload from proceeding to the activation phase. +pub(crate) fn prepare( + previous_config: &Option, + config: &Conf, +) -> Result< + ( + Activation, + MultiMap, + apollo_exporter::Sender, + ), + BoxError, +> { + Builder::new(previous_config, config).build() +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::metrics::aggregation::MeterProviderType; + use crate::plugins::telemetry::apollo; + use crate::plugins::telemetry::config::Exporters; + use crate::plugins::telemetry::config::Instrumentation; + use crate::plugins::telemetry::config::Metrics; + use crate::plugins::telemetry::config::Tracing; + + fn create_default_config() -> Conf { + Conf { + apollo: apollo::Config::default(), + exporters: Exporters { + metrics: Metrics { + common: Default::default(), + otlp: Default::default(), + prometheus: Default::default(), + }, + tracing: Tracing::default(), + logging: Default::default(), + }, + instrumentation: Instrumentation::default(), + } + } + + fn create_config_with_prometheus() -> Conf { + let mut config = create_default_config(); + config.exporters.metrics.prometheus.enabled = true; + config.exporters.metrics.prometheus.listen = + crate::ListenAddr::SocketAddr("127.0.0.1:9090".parse().unwrap()); + config.exporters.metrics.prometheus.path = "/metrics".to_string(); + config + } + + fn create_config_with_apollo() -> Conf { + let mut config = create_default_config(); + config.apollo.apollo_key = Some("test-key".to_string()); + config.apollo.apollo_graph_ref = Some("test@current".to_string()); + config + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_prepare_with_no_previous_config() { + let config = create_default_config(); + + let result = prepare(&None, &config); + assert!(result.is_ok(), "prepare should succeed with default config"); + + let (activation, endpoints, _sender) = result.unwrap(); + let instr = activation.test_instrumentation(); + + // First run should set up basic telemetry + assert!( + instr.tracer_provider_set, + "First run should set tracer provider" + ); + assert!( + instr.tracer_propagator_set, + "First run should set propagator" + ); + assert!(instr.logging_layer_set, "First run should set logging"); + + // No endpoints should be created with default config + assert!( + endpoints.is_empty(), + "No endpoints should be created with default config" + ); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_prepare_with_prometheus_creates_endpoint() { + let config = create_config_with_prometheus(); + + let result = prepare(&None, &config); + assert!( + result.is_ok(), + "prepare should succeed with prometheus config" + ); + + let (activation, endpoints, _sender) = result.unwrap(); + let instr = activation.test_instrumentation(); + + // Should set up prometheus + assert!( + instr.prometheus_registry_set, + "Should set prometheus registry" + ); + assert!( + instr + .meter_providers_added + .contains(&MeterProviderType::Public), + "Should add public meter provider" + ); + + // Should create prometheus endpoint + assert!(!endpoints.is_empty(), "Should create prometheus endpoint"); + let listen_addr = crate::ListenAddr::SocketAddr("127.0.0.1:9090".parse().unwrap()); + assert!( + endpoints.contains_key(&listen_addr), + "Should create endpoint on correct address" + ); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_prepare_with_apollo_creates_apollo_metrics() { + let config = create_config_with_apollo(); + + let result = prepare(&None, &config); + assert!(result.is_ok(), "prepare should succeed with apollo config"); + + let (activation, _endpoints, sender) = result.unwrap(); + let instr = activation.test_instrumentation(); + + // Should set up apollo metrics and tracing + assert!( + instr.tracer_provider_set, + "Should set tracer provider for apollo" + ); + assert!( + instr + .meter_providers_added + .contains(&MeterProviderType::Apollo) + || instr + .meter_providers_added + .contains(&MeterProviderType::ApolloRealtime), + "Should add apollo meter providers" + ); + + // Should have apollo sender (not noop) + if let apollo_exporter::Sender::Noop = sender { + panic!("Should not be noop sender when apollo configured") + } + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_prepare_detects_config_changes() { + let previous_config = create_default_config(); + let config = create_config_with_prometheus(); + + let result = prepare(&Some(previous_config), &config); + assert!(result.is_ok(), "prepare should succeed with config change"); + + let (activation, endpoints, _sender) = result.unwrap(); + let instr = activation.test_instrumentation(); + + // Should detect prometheus config change and reload metrics + assert!( + instr.prometheus_registry_set, + "Should reload prometheus when config changes" + ); + assert!( + instr + .meter_providers_added + .contains(&MeterProviderType::Public), + "Should reload public meter provider" + ); + + // Should create new endpoint + assert!( + !endpoints.is_empty(), + "Should create prometheus endpoint when enabled" + ); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_prepare_no_reload_when_configs_identical() { + let config = create_default_config(); + let previous_config = Some(config.clone()); + + let result = prepare(&previous_config, &config); + assert!( + result.is_ok(), + "prepare should succeed with identical configs" + ); + + let (activation, endpoints, _sender) = result.unwrap(); + let instr = activation.test_instrumentation(); + + // Should not reload anything when configs are identical + assert!( + !instr.tracer_provider_set, + "Should not reload tracer when configs identical" + ); + assert!( + !instr.prometheus_registry_set, + "Should not reload prometheus when configs identical" + ); + assert!( + instr.meter_providers_added.is_empty(), + "Should not add meter providers when configs identical" + ); + + // But always set logging and propagation + assert!(instr.logging_layer_set, "Should always set logging"); + assert!(instr.tracer_propagator_set, "Should always set propagation"); + + // No endpoints with default config + assert!(endpoints.is_empty(), "No endpoints with default config"); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_prepare_multiple_config_changes() { + let previous_config = create_default_config(); + let mut config = create_config_with_prometheus(); + config.apollo.apollo_key = Some("test-key".to_string()); + config.apollo.apollo_graph_ref = Some("test@current".to_string()); + + let result = prepare(&Some(previous_config), &config); + assert!( + result.is_ok(), + "prepare should succeed with multiple changes" + ); + + let (activation, endpoints, sender) = result.unwrap(); + let instr = activation.test_instrumentation(); + + // Should set up both prometheus and apollo + assert!(instr.tracer_provider_set, "Should set tracer provider"); + assert!( + instr.prometheus_registry_set, + "Should set prometheus registry" + ); + assert!( + instr + .meter_providers_added + .contains(&MeterProviderType::Public), + "Should add public meter provider" + ); + assert!( + instr + .meter_providers_added + .contains(&MeterProviderType::Apollo) + || instr + .meter_providers_added + .contains(&MeterProviderType::ApolloRealtime), + "Should add apollo meter providers" + ); + + // Should create prometheus endpoint and apollo sender + assert!(!endpoints.is_empty(), "Should create prometheus endpoint"); + if let apollo_exporter::Sender::Noop = sender { + panic!("Should not be noop sender when apollo configured") + } + } +} diff --git a/apollo-router/src/plugins/telemetry/reload.rs b/apollo-router/src/plugins/telemetry/reload/otel.rs similarity index 77% rename from apollo-router/src/plugins/telemetry/reload.rs rename to apollo-router/src/plugins/telemetry/reload/otel.rs index c3445b27ce..62f44f7c26 100644 --- a/apollo-router/src/plugins/telemetry/reload.rs +++ b/apollo-router/src/plugins/telemetry/reload/otel.rs @@ -1,6 +1,32 @@ +//! OpenTelemetry global state management +//! +//! This module manages the global OpenTelemetry state and tracing subscriber initialization. +//! It provides the foundation for hot-reloading telemetry configuration without restarting the router. +//! +//! ## Global State +//! +//! OpenTelemetry requires global state for tracer providers and propagators. This module maintains: +//! - **Tracer handle** ([`OPENTELEMETRY_TRACER_HANDLE`]) - Allows hot-swapping the active tracer +//! - **Format layer handle** ([`FMT_LAYER_HANDLE`]) - Allows hot-swapping the logging format +//! +//! These handles are set once during initialization and then used to reload components when +//! configuration changes. +//! +//! ## Initialization +//! +//! The [`init_telemetry`] function sets up the tracing subscriber stack with: +//! - Dynamic attribute layer for request-scoped attributes +//! - OpenTelemetry layer for distributed tracing +//! - Format layer for structured logging (JSON or text based on TTY) +//! - Environment filter for log level control +//! +//! ## Reloading +//! +//! The reload handles enable the activation phase to update telemetry without recreating the +//! entire subscriber stack, which would require restarting the application. + use std::io::IsTerminal; -use anyhow::Result; use anyhow::anyhow; use once_cell::sync::OnceCell; use opentelemetry::Context; @@ -13,17 +39,17 @@ use opentelemetry::trace::TracerProvider; use opentelemetry_sdk::trace::Tracer; use tower::BoxError; use tracing_subscriber::EnvFilter; +use tracing_subscriber::Layer; use tracing_subscriber::Registry; -use tracing_subscriber::layer::Layer; use tracing_subscriber::layer::Layered; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::registry::SpanRef; use tracing_subscriber::reload::Handle; use tracing_subscriber::util::SubscriberInitExt; -use super::dynamic_attribute::DynAttributeLayer; -use super::fmt_layer::FmtLayer; -use super::formatters::json::Json; +use crate::plugins::telemetry::dynamic_attribute::DynAttributeLayer; +use crate::plugins::telemetry::fmt_layer::FmtLayer; +use crate::plugins::telemetry::formatters::json::Json; use crate::plugins::telemetry::formatters::text::Text; use crate::plugins::telemetry::otel; use crate::plugins::telemetry::otel::OpenTelemetryLayer; @@ -32,21 +58,26 @@ use crate::plugins::telemetry::tracing::reload::ReloadTracer; use crate::tracer::TraceId; pub(crate) type LayeredRegistry = Layered; - -pub(super) type LayeredTracer = +pub(in crate::plugins::telemetry) type LayeredTracer = Layered>, LayeredRegistry>; -// These handles allow hot tracing of layers. They have complex type definitions because tracing has -// generic types in the layer definition. -pub(super) static OPENTELEMETRY_TRACER_HANDLE: OnceCell< +/// Global handle for hot-reloading the OpenTelemetry tracer +/// +/// This handle allows the activation phase to swap in a new tracer without rebuilding +/// the entire tracing subscriber stack. +pub(in crate::plugins::telemetry) static OPENTELEMETRY_TRACER_HANDLE: OnceCell< ReloadTracer, > = OnceCell::new(); +/// Global handle for hot-reloading the logging format layer +/// +/// This handle allows the activation phase to change logging format (e.g., JSON vs text) +/// without rebuilding the entire tracing subscriber stack. static FMT_LAYER_HANDLE: OnceCell< Handle + Send + Sync>, LayeredTracer>, > = OnceCell::new(); -pub(crate) fn init_telemetry(log_level: &str) -> Result<()> { +pub(crate) fn init_telemetry(log_level: &str) -> anyhow::Result<()> { let hot_tracer = ReloadTracer::new( opentelemetry_sdk::trace::TracerProvider::default() .tracer_builder("noop") @@ -89,7 +120,9 @@ pub(crate) fn init_telemetry(log_level: &str) -> Result<()> { Ok(()) } -pub(super) fn reload_fmt(layer: Box + Send + Sync>) { +pub(in crate::plugins::telemetry) fn reload_fmt( + layer: Box + Send + Sync>, +) { if let Some(handle) = FMT_LAYER_HANDLE.get() { handle.reload(layer).expect("fmt layer reload must succeed"); } @@ -147,11 +180,9 @@ where fn is_sampled(&self) -> bool { // if this extension is set, that means the parent span was accepted, and so the // entire trace is accepted - let extensions = self.extensions(); - extensions + self.extensions() .get::() - .map(|s| matches!(s, SampledSpan::Sampled(_, _))) - .unwrap_or_default() + .is_some_and(|s| matches!(s, SampledSpan::Sampled(_, _))) } fn get_trace_id(&self) -> Option { @@ -169,6 +200,7 @@ const LEGACY_METRIC_PREFIX_COUNTER: &str = "counter."; const LEGACY_METRIC_PREFIX_HISTOGRAM: &str = "histogram."; const LEGACY_METRIC_PREFIX_VALUE: &str = "value."; +/// REMOVE in 3.0 /// Detects use of the 1.x `tracing`-based metrics events, which are no longer supported in 2.x. struct WarnLegacyMetricsLayer; diff --git a/apollo-router/src/plugins/telemetry/reload/tracing.rs b/apollo-router/src/plugins/telemetry/reload/tracing.rs new file mode 100644 index 0000000000..4eb0426eff --- /dev/null +++ b/apollo-router/src/plugins/telemetry/reload/tracing.rs @@ -0,0 +1,120 @@ +//! Trace provider construction +//! +//! This module provides tools for building OpenTelemetry tracer providers from router configuration. +//! +//! ## Purpose +//! +//! The [`TracingBuilder`] constructs a tracer provider that handles distributed tracing across +//! multiple backends (OTLP, Datadog, Zipkin, Apollo). It also configures trace propagation to +//! ensure trace context is properly propagated across service boundaries. +//! +//! ## Configurator Pattern +//! +//! The [`TracingConfigurator`] trait allows different trace exporters to contribute span processors +//! to the builder. Each exporter (OTLP, Datadog, Zipkin, Apollo) implements this trait to add its +//! specific span processing logic. +//! +//! ## Propagation +//! +//! The [`create_propagator`] function builds a composite propagator supporting multiple trace +//! context formats (W3C Trace Context, Jaeger, Zipkin, Datadog, AWS X-Ray). This allows the router +//! to interoperate with services using different tracing systems. + +use opentelemetry::propagation::TextMapCompositePropagator; +use opentelemetry::propagation::TextMapPropagator; +use opentelemetry_sdk::trace::SpanProcessor; +use opentelemetry_sdk::trace::TracerProvider; +use tower::BoxError; + +use crate::plugins::telemetry::CustomTraceIdPropagator; +use crate::plugins::telemetry::config::Conf; +use crate::plugins::telemetry::config::Propagation; +use crate::plugins::telemetry::config::Tracing; +use crate::plugins::telemetry::config::TracingCommon; +use crate::plugins::telemetry::config_new::spans::Spans; + +/// Builder for constructing OpenTelemetry tracer providers with multiple exporters +pub(crate) struct TracingBuilder<'a> { + common: &'a TracingCommon, + spans: &'a Spans, + builder: opentelemetry_sdk::trace::Builder, +} + +impl<'a> TracingBuilder<'a> { + pub(crate) fn new(config: &'a Conf) -> Self { + Self { + common: &config.exporters.tracing.common, + spans: &config.instrumentation.spans, + builder: opentelemetry_sdk::trace::TracerProvider::builder() + .with_config((&config.exporters.tracing.common).into()), + } + } + + pub(crate) fn configure(&mut self, config: &T) -> Result<(), BoxError> { + if config.is_enabled() { + return config.configure(self); + } + Ok(()) + } + + pub(crate) fn tracing_common(&self) -> &TracingCommon { + self.common + } + + pub(crate) fn spans(&self) -> &Spans { + self.spans + } + + pub(crate) fn with_span_processor(&mut self, span_processor: T) { + let builder = std::mem::take(&mut self.builder); + self.builder = builder.with_span_processor(span_processor); + } + + pub(crate) fn build(self) -> TracerProvider { + self.builder.build() + } +} + +pub(crate) fn create_propagator( + propagation: &Propagation, + tracing: &Tracing, +) -> TextMapCompositePropagator { + let mut propagators: Vec> = Vec::new(); + if propagation.jaeger { + propagators.push(Box::::default()); + } + if propagation.baggage { + propagators.push(Box::::default()); + } + if propagation.trace_context || tracing.otlp.enabled { + propagators.push(Box::::default()); + } + if propagation.zipkin || tracing.zipkin.enabled { + propagators.push(Box::::default()); + } + if propagation.datadog || tracing.datadog.enabled { + propagators.push(Box::< + crate::plugins::telemetry::tracing::datadog_exporter::DatadogPropagator, + >::default()); + } + if propagation.aws_xray { + propagators.push(Box::::default()); + } + + // This propagator MUST come last because the user is trying to override the default behavior of the + // other propagators. + if let Some(from_request_header) = &propagation.request.header_name { + propagators.push(Box::new(CustomTraceIdPropagator::new( + from_request_header.to_string(), + propagation.request.format.clone(), + ))); + } + TextMapCompositePropagator::new(propagators) +} + +/// Trait for trace exporters to contribute to tracer provider construction +pub(crate) trait TracingConfigurator { + fn config(conf: &Conf) -> &Self; + fn is_enabled(&self) -> bool; + fn configure(&self, builder: &mut TracingBuilder) -> Result<(), BoxError>; +} diff --git a/apollo-router/src/plugins/telemetry/tracing/apollo.rs b/apollo-router/src/plugins/telemetry/tracing/apollo.rs index cfbbe50a37..796f95b490 100644 --- a/apollo-router/src/plugins/telemetry/tracing/apollo.rs +++ b/apollo-router/src/plugins/telemetry/tracing/apollo.rs @@ -1,30 +1,28 @@ //! Tracing configuration for apollo telemetry. use opentelemetry_sdk::trace::BatchSpanProcessor; -use opentelemetry_sdk::trace::Builder; use serde::Serialize; use tower::BoxError; use crate::plugins::telemetry::apollo::Config; use crate::plugins::telemetry::apollo::router_id; use crate::plugins::telemetry::apollo_exporter::proto::reports::Trace; -use crate::plugins::telemetry::config; -use crate::plugins::telemetry::config_new::spans::Spans; +use crate::plugins::telemetry::config::Conf; use crate::plugins::telemetry::otel::named_runtime_channel::NamedTokioRuntime; +use crate::plugins::telemetry::reload::tracing::TracingBuilder; +use crate::plugins::telemetry::reload::tracing::TracingConfigurator; use crate::plugins::telemetry::span_factory::SpanMode; -use crate::plugins::telemetry::tracing::TracingConfigurator; use crate::plugins::telemetry::tracing::apollo_telemetry; impl TracingConfigurator for Config { - fn enabled(&self) -> bool { + fn config(conf: &Conf) -> &Self { + &conf.apollo + } + + fn is_enabled(&self) -> bool { self.apollo_key.is_some() && self.apollo_graph_ref.is_some() } - fn apply( - &self, - builder: Builder, - _common: &config::TracingCommon, - spans_config: &Spans, - ) -> Result { + fn configure(&self, builder: &mut TracingBuilder) -> Result<(), BoxError> { tracing::debug!("configuring Apollo tracing"); let exporter = apollo_telemetry::Exporter::builder() .endpoint(&self.endpoint) @@ -47,14 +45,15 @@ impl TracingConfigurator for Config { .field_execution_sampler(&self.field_level_instrumentation_sampler) .batch_processor_config(&self.tracing.batch_processor) .errors_configuration(&self.errors) - .use_legacy_request_span(matches!(spans_config.mode, SpanMode::Deprecated)) + .use_legacy_request_span(matches!(builder.spans().mode, SpanMode::Deprecated)) .metrics_reference_mode(self.metrics_reference_mode) .build()?; - Ok(builder.with_span_processor( + builder.with_span_processor( BatchSpanProcessor::builder(exporter, NamedTokioRuntime::new("apollo-tracing")) .with_batch_config(self.tracing.batch_processor.clone().into()) .build(), - )) + ); + Ok(()) } } diff --git a/apollo-router/src/plugins/telemetry/tracing/datadog/mod.rs b/apollo-router/src/plugins/telemetry/tracing/datadog/mod.rs index e3532cc610..6eb09721d4 100644 --- a/apollo-router/src/plugins/telemetry/tracing/datadog/mod.rs +++ b/apollo-router/src/plugins/telemetry/tracing/datadog/mod.rs @@ -21,7 +21,6 @@ use opentelemetry_sdk::Resource; use opentelemetry_sdk::export::trace::ExportResult; use opentelemetry_sdk::export::trace::SpanData; use opentelemetry_sdk::export::trace::SpanExporter; -use opentelemetry_sdk::trace::Builder; use opentelemetry_semantic_conventions::resource::SERVICE_NAME; use opentelemetry_semantic_conventions::resource::SERVICE_VERSION; use schemars::JsonSchema; @@ -29,9 +28,8 @@ use serde::Deserialize; pub(crate) use span_processor::DatadogSpanProcessor; use tower::BoxError; +use crate::plugins::telemetry::config::Conf; use crate::plugins::telemetry::config::GenericWith; -use crate::plugins::telemetry::config::TracingCommon; -use crate::plugins::telemetry::config_new::spans::Spans; use crate::plugins::telemetry::consts::BUILT_IN_SPAN_NAMES; use crate::plugins::telemetry::consts::HTTP_REQUEST_SPAN_NAME; use crate::plugins::telemetry::consts::OTEL_ORIGINAL_NAME; @@ -43,9 +41,10 @@ use crate::plugins::telemetry::consts::SUBGRAPH_SPAN_NAME; use crate::plugins::telemetry::consts::SUPERGRAPH_SPAN_NAME; use crate::plugins::telemetry::endpoint::UriEndpoint; use crate::plugins::telemetry::otel::named_runtime_channel::NamedTokioRuntime; +use crate::plugins::telemetry::reload::tracing::TracingBuilder; +use crate::plugins::telemetry::reload::tracing::TracingConfigurator; use crate::plugins::telemetry::tracing::BatchProcessorConfig; use crate::plugins::telemetry::tracing::SpanProcessorExt; -use crate::plugins::telemetry::tracing::TracingConfigurator; use crate::plugins::telemetry::tracing::datadog_exporter; use crate::plugins::telemetry::tracing::datadog_exporter::DatadogTraceState; @@ -66,12 +65,12 @@ fn default_resource_mappings() -> HashMap { const ENV_KEY: Key = Key::from_static_str("env"); const DEFAULT_ENDPOINT: &str = "http://127.0.0.1:8126"; -#[derive(Debug, Clone, Deserialize, JsonSchema, serde_derive_default::Default)] +#[derive(Debug, Clone, Deserialize, JsonSchema, serde_derive_default::Default, PartialEq)] #[serde(deny_unknown_fields)] #[schemars(rename = "DatadogConfig")] pub(crate) struct Config { /// Enable datadog - enabled: bool, + pub(crate) enabled: bool, /// The endpoint to send to #[serde(default)] @@ -118,18 +117,17 @@ fn default_true() -> bool { } impl TracingConfigurator for Config { - fn enabled(&self) -> bool { + fn config(conf: &Conf) -> &Self { + &conf.exporters.tracing.datadog + } + + fn is_enabled(&self) -> bool { self.enabled } - fn apply( - &self, - builder: Builder, - trace: &TracingCommon, - _spans_config: &Spans, - ) -> Result { + fn configure(&self, builder: &mut TracingBuilder) -> Result<(), BoxError> { tracing::info!("Configuring Datadog tracing: {}", self.batch_processor); - let common: opentelemetry_sdk::trace::Config = trace.into(); + let common: opentelemetry_sdk::trace::Config = builder.tracing_common().into(); // Precompute representation otel Keys for the mappings so that we don't do heap allocation for each span let resource_mappings = self.enable_span_mapping.then(|| { @@ -231,13 +229,16 @@ impl TracingConfigurator for Config { .build() .filtered(); - Ok( - if trace.preview_datadog_agent_sampling.unwrap_or_default() { - builder.with_span_processor(batch_processor.always_sampled()) - } else { - builder.with_span_processor(batch_processor) - }, - ) + if builder + .tracing_common() + .preview_datadog_agent_sampling + .unwrap_or_default() + { + builder.with_span_processor(batch_processor.always_sampled()) + } else { + builder.with_span_processor(batch_processor) + } + Ok(()) } } diff --git a/apollo-router/src/plugins/telemetry/tracing/mod.rs b/apollo-router/src/plugins/telemetry/tracing/mod.rs index 9a4f44e2ae..c689bf8d47 100644 --- a/apollo-router/src/plugins/telemetry/tracing/mod.rs +++ b/apollo-router/src/plugins/telemetry/tracing/mod.rs @@ -8,17 +8,13 @@ use opentelemetry_sdk::Resource; use opentelemetry_sdk::export::trace::SpanData; use opentelemetry_sdk::trace::BatchConfig; use opentelemetry_sdk::trace::BatchConfigBuilder; -use opentelemetry_sdk::trace::Builder; use opentelemetry_sdk::trace::Span; use opentelemetry_sdk::trace::SpanProcessor; use schemars::JsonSchema; use serde::Deserialize; -use tower::BoxError; -use super::config_new::spans::Spans; use super::formatters::APOLLO_CONNECTOR_PREFIX; use super::formatters::APOLLO_PRIVATE_PREFIX; -use crate::plugins::telemetry::config::TracingCommon; use crate::plugins::telemetry::tracing::datadog::DatadogSpanProcessor; pub(crate) mod apollo; @@ -30,16 +26,6 @@ pub(crate) mod otlp; pub(crate) mod reload; pub(crate) mod zipkin; -pub(crate) trait TracingConfigurator { - fn enabled(&self) -> bool; - fn apply( - &self, - builder: Builder, - common: &TracingCommon, - spans: &Spans, - ) -> Result; -} - #[derive(Debug)] struct ApolloFilterSpanProcessor { delegate: T, @@ -110,7 +96,7 @@ where } /// Batch processor configuration -#[derive(Debug, Clone, Deserialize, JsonSchema)] +#[derive(Debug, Clone, Deserialize, JsonSchema, PartialEq)] #[serde(default)] pub(crate) struct BatchProcessorConfig { #[serde(deserialize_with = "humantime_serde::deserialize")] diff --git a/apollo-router/src/plugins/telemetry/tracing/otlp.rs b/apollo-router/src/plugins/telemetry/tracing/otlp.rs index 51b785fded..64c6db1169 100644 --- a/apollo-router/src/plugins/telemetry/tracing/otlp.rs +++ b/apollo-router/src/plugins/telemetry/tracing/otlp.rs @@ -3,27 +3,25 @@ use std::result::Result; use opentelemetry_otlp::SpanExporterBuilder; use opentelemetry_sdk::trace::BatchSpanProcessor; -use opentelemetry_sdk::trace::Builder; use tower::BoxError; -use crate::plugins::telemetry::config::TracingCommon; -use crate::plugins::telemetry::config_new::spans::Spans; +use crate::plugins::telemetry::config::Conf; use crate::plugins::telemetry::otel::named_runtime_channel::NamedTokioRuntime; use crate::plugins::telemetry::otlp::TelemetryDataKind; +use crate::plugins::telemetry::reload::tracing::TracingBuilder; +use crate::plugins::telemetry::reload::tracing::TracingConfigurator; use crate::plugins::telemetry::tracing::SpanProcessorExt; -use crate::plugins::telemetry::tracing::TracingConfigurator; impl TracingConfigurator for super::super::otlp::Config { - fn enabled(&self) -> bool { + fn config(conf: &Conf) -> &Self { + &conf.exporters.tracing.otlp + } + + fn is_enabled(&self) -> bool { self.enabled } - fn apply( - &self, - builder: Builder, - common: &TracingCommon, - _spans_config: &Spans, - ) -> Result { + fn configure(&self, builder: &mut TracingBuilder) -> Result<(), BoxError> { let exporter: SpanExporterBuilder = self.exporter(TelemetryDataKind::Traces)?; let batch_span_processor = BatchSpanProcessor::builder( exporter.build_span_exporter()?, @@ -32,12 +30,17 @@ impl TracingConfigurator for super::super::otlp::Config { .with_batch_config(self.batch_processor.clone().into()) .build() .filtered(); - Ok( - if common.preview_datadog_agent_sampling.unwrap_or_default() { - builder.with_span_processor(batch_span_processor.always_sampled()) - } else { - builder.with_span_processor(batch_span_processor) - }, - ) + + if builder + .tracing_common() + .preview_datadog_agent_sampling + .unwrap_or_default() + { + builder.with_span_processor(batch_span_processor.always_sampled()) + } else { + builder.with_span_processor(batch_span_processor) + } + + Ok(()) } } diff --git a/apollo-router/src/plugins/telemetry/tracing/zipkin.rs b/apollo-router/src/plugins/telemetry/tracing/zipkin.rs index 802115cb1e..f7c97dd2c4 100644 --- a/apollo-router/src/plugins/telemetry/tracing/zipkin.rs +++ b/apollo-router/src/plugins/telemetry/tracing/zipkin.rs @@ -3,25 +3,24 @@ use std::sync::LazyLock; use http::Uri; use opentelemetry_sdk::trace::BatchSpanProcessor; -use opentelemetry_sdk::trace::Builder; use opentelemetry_semantic_conventions::resource::SERVICE_NAME; use schemars::JsonSchema; use serde::Deserialize; use tower::BoxError; +use crate::plugins::telemetry::config::Conf; use crate::plugins::telemetry::config::GenericWith; -use crate::plugins::telemetry::config::TracingCommon; -use crate::plugins::telemetry::config_new::spans::Spans; use crate::plugins::telemetry::endpoint::UriEndpoint; use crate::plugins::telemetry::otel::named_runtime_channel::NamedTokioRuntime; +use crate::plugins::telemetry::reload::tracing::TracingBuilder; +use crate::plugins::telemetry::reload::tracing::TracingConfigurator; use crate::plugins::telemetry::tracing::BatchProcessorConfig; use crate::plugins::telemetry::tracing::SpanProcessorExt; -use crate::plugins::telemetry::tracing::TracingConfigurator; static DEFAULT_ENDPOINT: LazyLock = LazyLock::new(|| Uri::from_static("http://127.0.0.1:9411/api/v2/spans")); -#[derive(Debug, Clone, Deserialize, JsonSchema, Default)] +#[derive(Debug, Clone, Deserialize, JsonSchema, Default, PartialEq)] #[serde(deny_unknown_fields)] #[schemars(rename = "ZipkinConfig")] pub(crate) struct Config { @@ -38,18 +37,17 @@ pub(crate) struct Config { } impl TracingConfigurator for Config { - fn enabled(&self) -> bool { + fn config(conf: &Conf) -> &Self { + &conf.exporters.tracing.zipkin + } + + fn is_enabled(&self) -> bool { self.enabled } - fn apply( - &self, - builder: Builder, - trace: &TracingCommon, - _spans_config: &Spans, - ) -> Result { + fn configure(&self, builder: &mut TracingBuilder) -> Result<(), BoxError> { tracing::info!("configuring Zipkin tracing: {}", self.batch_processor); - let common: opentelemetry_sdk::trace::Config = trace.into(); + let common: opentelemetry_sdk::trace::Config = builder.tracing_common().into(); let endpoint = &self.endpoint.to_full_uri(&DEFAULT_ENDPOINT); let exporter = opentelemetry_zipkin::new_pipeline() .with_collector_endpoint(endpoint.to_string()) @@ -64,11 +62,12 @@ impl TracingConfigurator for Config { .with_trace_config(common) .init_exporter()?; - Ok(builder.with_span_processor( + builder.with_span_processor( BatchSpanProcessor::builder(exporter, NamedTokioRuntime::new("zipkin-tracing")) .with_batch_config(self.batch_processor.clone().into()) .build() .filtered(), - )) + ); + Ok(()) } } diff --git a/apollo-router/src/router_factory.rs b/apollo-router/src/router_factory.rs index 4f22485c19..753e826206 100644 --- a/apollo-router/src/router_factory.rs +++ b/apollo-router/src/router_factory.rs @@ -30,7 +30,7 @@ use crate::plugin::PluginFactory; use crate::plugin::PluginInit; use crate::plugins::subscription::APOLLO_SUBSCRIPTION_PLUGIN; use crate::plugins::subscription::Subscription; -use crate::plugins::telemetry::reload::apollo_opentelemetry_initialized; +use crate::plugins::telemetry::reload::otel::apollo_opentelemetry_initialized; use crate::plugins::traffic_shaping::APOLLO_TRAFFIC_SHAPING; use crate::plugins::traffic_shaping::TrafficShaping; use crate::query_planner::QueryPlannerService; diff --git a/apollo-router/src/services/external.rs b/apollo-router/src/services/external.rs index 2ff4fcc874..55e795d506 100644 --- a/apollo-router/src/services/external.rs +++ b/apollo-router/src/services/external.rs @@ -26,7 +26,7 @@ use super::subgraph::SubgraphRequestId; use crate::Context; use crate::plugins::telemetry::consts::HTTP_REQUEST_SPAN_NAME; use crate::plugins::telemetry::otel::OpenTelemetrySpanExt; -use crate::plugins::telemetry::reload::prepare_context; +use crate::plugins::telemetry::reload::otel::prepare_context; use crate::query_planner::QueryPlan; use crate::services::router; use crate::services::router::body::RouterBody; diff --git a/apollo-router/src/services/http/service.rs b/apollo-router/src/services/http/service.rs index 1e07c2d69b..7d1866dede 100644 --- a/apollo-router/src/services/http/service.rs +++ b/apollo-router/src/services/http/service.rs @@ -38,7 +38,7 @@ use crate::error::FetchError; use crate::plugins::authentication::subgraph::SigningParamsConfig; use crate::plugins::telemetry::consts::HTTP_REQUEST_SPAN_NAME; use crate::plugins::telemetry::otel::OpenTelemetrySpanExt; -use crate::plugins::telemetry::reload::prepare_context; +use crate::plugins::telemetry::reload::otel::prepare_context; use crate::plugins::traffic_shaping::Http2Config; use crate::services::hickory_dns_connector::AsyncHyperResolver; use crate::services::hickory_dns_connector::new_async_http_connector; diff --git a/apollo-router/src/state_machine.rs b/apollo-router/src/state_machine.rs index 29d7d65ef2..c09ca3a485 100644 --- a/apollo-router/src/state_machine.rs +++ b/apollo-router/src/state_machine.rs @@ -34,7 +34,7 @@ use crate::configuration::Configuration; use crate::configuration::Discussed; use crate::configuration::ListenAddr; use crate::configuration::metrics::Metrics; -use crate::plugins::telemetry::reload::apollo_opentelemetry_initialized; +use crate::plugins::telemetry::reload::otel::apollo_opentelemetry_initialized; use crate::router::Event::UpdateLicense; use crate::router_factory::RouterFactory; use crate::router_factory::RouterSuperServiceFactory; diff --git a/apollo-router/src/test_harness.rs b/apollo-router/src/test_harness.rs index 2ac5f1d9b8..769b614977 100644 --- a/apollo-router/src/test_harness.rs +++ b/apollo-router/src/test_harness.rs @@ -27,7 +27,7 @@ use crate::plugin::PluginPrivate; use crate::plugin::PluginUnstable; use crate::plugin::test::MockSubgraph; use crate::plugin::test::canned; -use crate::plugins::telemetry::reload::init_telemetry; +use crate::plugins::telemetry::reload::otel::init_telemetry; use crate::router_factory::YamlRouterFactory; use crate::services::HasSchema; use crate::services::SupergraphCreator; diff --git a/apollo-router/src/tracer.rs b/apollo-router/src/tracer.rs index 55348d04b4..13da01fd0d 100644 --- a/apollo-router/src/tracer.rs +++ b/apollo-router/src/tracer.rs @@ -11,7 +11,7 @@ use tracing_subscriber::Registry; use tracing_subscriber::registry::LookupSpan; use crate::plugins::telemetry::otel::OpenTelemetrySpanExt; -use crate::plugins::telemetry::reload::IsSampled; +use crate::plugins::telemetry::reload::otel::IsSampled; /// Trace ID #[cfg_attr(test, derive(Default))] diff --git a/apollo-router/tests/common.rs b/apollo-router/tests/common.rs index db8b31cede..52f95e7f37 100644 --- a/apollo-router/tests/common.rs +++ b/apollo-router/tests/common.rs @@ -869,7 +869,7 @@ impl IntegrationTest { yaml, &self._subgraph_overrides, &self._apollo_otlp_server.uri().to_string(), - Some(self.bind_address()), + None, &self.redis_namespace, Some(&self.port_replacements), ), diff --git a/apollo-router/tests/integration/lifecycle.rs b/apollo-router/tests/integration/lifecycle.rs index 0d5a953647..6d708feaa3 100644 --- a/apollo-router/tests/integration/lifecycle.rs +++ b/apollo-router/tests/integration/lifecycle.rs @@ -149,7 +149,6 @@ async fn test_force_config_reload_via_chaos() -> Result<(), BoxError> { .await; router.start().await; router.assert_started().await; - tokio::time::sleep(Duration::from_secs(2)).await; router.assert_reloaded().await; router.graceful_shutdown().await; Ok(()) @@ -166,7 +165,6 @@ async fn test_force_schema_reload_via_chaos() -> Result<(), BoxError> { .await; router.start().await; router.assert_started().await; - tokio::time::sleep(Duration::from_secs(2)).await; router.assert_reloaded().await; router.graceful_shutdown().await; Ok(()) diff --git a/apollo-router/tests/integration/telemetry/apollo_otel_metrics.rs b/apollo-router/tests/integration/telemetry/apollo_otel_metrics.rs index 0f3965aad7..bec159b926 100644 --- a/apollo-router/tests/integration/telemetry/apollo_otel_metrics.rs +++ b/apollo-router/tests/integration/telemetry/apollo_otel_metrics.rs @@ -201,7 +201,7 @@ async fn test_subgraph_layer_error_emits_metric() { .await; let metrics = router - .wait_for_emitted_otel_metrics(Duration::from_millis(20)) + .wait_for_emitted_otel_metrics(Duration::from_millis(200)) .await; assert!(!metrics.is_empty()); diff --git a/apollo-router/tests/integration/telemetry/fixtures/otlp_tracing.router.yaml b/apollo-router/tests/integration/telemetry/fixtures/otlp_tracing.router.yaml new file mode 100644 index 0000000000..f65ce62f1e --- /dev/null +++ b/apollo-router/tests/integration/telemetry/fixtures/otlp_tracing.router.yaml @@ -0,0 +1,24 @@ +telemetry: + exporters: + tracing: + experimental_response_trace_id: + enabled: true + header_name: apollo-custom-trace-id + common: + service_name: router + resource: + env: local1 + otlp: + enabled: true + protocol: http + endpoint: + batch_processor: + scheduled_delay: 10ms + max_export_batch_size: 512 + + instrumentation: + spans: + mode: spec_compliant + supergraph: + attributes: + graphql.operation.name: true \ No newline at end of file diff --git a/apollo-router/tests/integration/telemetry/fixtures/otlp_tracing_reload.router.yaml b/apollo-router/tests/integration/telemetry/fixtures/otlp_tracing_reload.router.yaml new file mode 100644 index 0000000000..f6a4e93bcd --- /dev/null +++ b/apollo-router/tests/integration/telemetry/fixtures/otlp_tracing_reload.router.yaml @@ -0,0 +1,24 @@ +telemetry: + exporters: + tracing: + experimental_response_trace_id: + enabled: true + header_name: apollo-custom-trace-id + common: + service_name: router + resource: + env: local2 + otlp: + enabled: true + protocol: http + endpoint: + batch_processor: + scheduled_delay: 10ms + max_export_batch_size: 1024 + + instrumentation: + spans: + mode: spec_compliant + supergraph: + attributes: + graphql.operation.name: true \ No newline at end of file diff --git a/apollo-router/tests/integration/telemetry/fixtures/prometheus_no_reload.router.yaml b/apollo-router/tests/integration/telemetry/fixtures/prometheus_no_reload.router.yaml new file mode 100644 index 0000000000..d8a48722fc --- /dev/null +++ b/apollo-router/tests/integration/telemetry/fixtures/prometheus_no_reload.router.yaml @@ -0,0 +1,52 @@ +limits: + http_max_request_bytes: 200 +telemetry: + instrumentation: + instruments: + default_requirement_level: required + router: + http.server.request.duration: + attributes: + server.port: false + server.address: false + http.response.status_code: + alias: status + error: + error: reason + exporters: + metrics: + prometheus: + listen: 127.0.0.1:4000 + enabled: true + path: /metrics + common: + views: + - name: apollo_router_http_request_duration_seconds + aggregation: + histogram: + buckets: + - 0.1 + - 0.5 + - 1 + - 2 + - 3 + - 4 + - 5 + - 100 +headers: + all: + request: + - insert: + name: "x-custom-header" + value: "test_custom" +override_subgraph_url: + products: http://localhost:4005 +include_subgraph_errors: + all: true +supergraph: + introspection: true +apq: + router: + cache: + in_memory: + limit: 10000 \ No newline at end of file diff --git a/apollo-router/tests/integration/telemetry/fixtures/prometheus_reload.router.yaml b/apollo-router/tests/integration/telemetry/fixtures/prometheus_reload.router.yaml new file mode 100644 index 0000000000..eb6c71cf46 --- /dev/null +++ b/apollo-router/tests/integration/telemetry/fixtures/prometheus_reload.router.yaml @@ -0,0 +1,52 @@ +limits: + http_max_request_bytes: 200 +telemetry: + instrumentation: + instruments: + default_requirement_level: required + router: + http.server.request.duration: + attributes: + server.port: false + server.address: false + http.response.status_code: + alias: status + error: + error: reason + exporters: + metrics: + prometheus: + listen: 127.0.0.1:4000 + enabled: true + path: /metrics + common: + views: + - name: apollo_router_http_request_duration_seconds + aggregation: + histogram: + buckets: + - 0.1 + - 0.5 + - 1 + - 2 + - 3 + - 4 + - 5 + - 1000 +headers: + all: + request: + - insert: + name: "x-custom-header" + value: "test_custom" +override_subgraph_url: + products: http://localhost:4005 +include_subgraph_errors: + all: true +supergraph: + introspection: true +apq: + router: + cache: + in_memory: + limit: 1000 \ No newline at end of file diff --git a/apollo-router/tests/integration/telemetry/metrics.rs b/apollo-router/tests/integration/telemetry/metrics.rs index a107b471d4..722a3342ca 100644 --- a/apollo-router/tests/integration/telemetry/metrics.rs +++ b/apollo-router/tests/integration/telemetry/metrics.rs @@ -352,3 +352,50 @@ async fn test_gauges_on_reload() { ) .await; } + +#[tokio::test(flavor = "multi_thread")] +async fn test_prom_reset_on_reload() { + let mut router = IntegrationTest::builder() + .config(include_str!("fixtures/prometheus.router.yaml")) + .build() + .await; + + router.start().await; + router.assert_started().await; + + router.execute_default_query().await; + router.execute_default_query().await; + + router + .assert_metrics_contains( + r#"http_server_request_duration_seconds_count{http_request_method="POST",status="200",otel_scope_name="apollo/router"} 2"#, + None, + ) + .await; + + // This config will NOT reload prometheus as the config did not change + router + .update_config(include_str!("fixtures/prometheus.router.yaml")) + .await; + router.assert_reloaded().await; + router + .assert_metrics_contains( + r#"http_server_request_duration_seconds_count{http_request_method="POST",status="200",otel_scope_name="apollo/router"} 2"#, + None, + ) + .await; + + // This config will force a reload as it changes the prometheus buckets + router + .update_config(include_str!("fixtures/prometheus_reload.router.yaml")) + .await; + router.assert_reloaded().await; + router.execute_default_query().await; + router + .assert_metrics_contains( + r#"http_server_request_duration_seconds_count{http_request_method="POST",status="200",otel_scope_name="apollo/router"} 1"#, + None, + ) + .await; + router.graceful_shutdown().await; +} diff --git a/apollo-router/tests/integration/telemetry/otlp.rs b/apollo-router/tests/integration/telemetry/otlp.rs index 66e2c5494d..a2d965e29e 100644 --- a/apollo-router/tests/integration/telemetry/otlp.rs +++ b/apollo-router/tests/integration/telemetry/otlp.rs @@ -140,6 +140,65 @@ async fn test_resources() -> Result<(), BoxError> { Ok(()) } +#[tokio::test(flavor = "multi_thread")] +async fn test_otlp_tracing_reload() -> Result<(), BoxError> { + if !graph_os_enabled() { + panic!("Error: test skipped because GraphOS is not enabled"); + } + let mock_server = mock_otlp_server(0..).await; + let config_initial = include_str!("fixtures/otlp_tracing.router.yaml") + .replace("", &mock_server.uri()); + + let mut router = IntegrationTest::builder() + .telemetry(Telemetry::Otlp { + endpoint: Some(format!("{}/v1/traces", mock_server.uri())), + }) + .config(&config_initial) + .build() + .await; + + router.start().await; + router.assert_started().await; + + // Verify initial resource env=local1 + TraceSpec::builder() + .services(["client", "router", "subgraph"].into()) + .resource("env", "local1") + .build() + .validate_otlp_trace(&mut router, &mock_server, Query::default()) + .await?; + + // This config will NOT reload tracing as the config did not change + router.update_config(&config_initial).await; + router.assert_reloaded().await; + router.assert_log_not_contained("OpenTelemetry trace error occurred"); + + // Execute query and verify resource is still local1 + TraceSpec::builder() + .services(["client", "router", "subgraph"].into()) + .resource("env", "local1") + .build() + .validate_otlp_trace(&mut router, &mock_server, Query::default()) + .await?; + + // This config will force a reload as it changes the resource env value + let config_reload = include_str!("fixtures/otlp_tracing_reload.router.yaml") + .replace("", &mock_server.uri()); + router.update_config(&config_reload).await; + router.assert_reloaded().await; + + // Execute query and verify resource changed to local2 + TraceSpec::builder() + .services(["client", "router", "subgraph"].into()) + .resource("env", "local2") + .build() + .validate_otlp_trace(&mut router, &mock_server, Query::default()) + .await?; + + router.graceful_shutdown().await; + Ok(()) +} + #[tokio::test(flavor = "multi_thread")] async fn test_otlp_request_with_datadog_propagator() -> Result<(), BoxError> { if !graph_os_enabled() {