Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
dccc21b
Implement PartialEq for exporter config
Sep 19, 2025
33d9170
Move activation into lifecycle.rs
Sep 19, 2025
bbcb396
Remove Option when setting a meter provider on Aggregate meter provider.
Sep 23, 2025
4923eb0
Initialization rework
Sep 24, 2025
1eacd98
Renames and move stuff around
Sep 24, 2025
6faaab8
Clippy
Sep 24, 2025
96e5fc9
Move creation of logging layer into builder and apply during activation
Sep 24, 2025
323cf7e
Make propagation reload uniform
Sep 24, 2025
f4c5269
Move stuff around
Sep 24, 2025
316cd3b
Fix change detection for apollo trace exporting
Sep 24, 2025
cc970f0
Move TracingConfigurator
Sep 24, 2025
520c4e8
Move prometheus logging
Sep 24, 2025
300abcf
Fix shutdown test
Sep 24, 2025
21ed7ca
Fix apollo metrics config
Sep 24, 2025
116a5f7
Fix some tests, but there are still some failures
Sep 25, 2025
9a51f05
Prevent schema reload from choosing a different port for prom
Sep 25, 2025
6039eb5
Wait a little longer for metrics
Sep 25, 2025
93ab64e
Changelog
Sep 25, 2025
f65de82
Move view logic to builder
Sep 25, 2025
78f3706
Add test for metrics reloading to show that reloads don't always happen
Sep 25, 2025
3ab26e9
Add some unit testing for the reload functionality
Sep 25, 2025
b882be9
Improve logging
Sep 26, 2025
2e990e4
Improve logic around spawning a blocking safe task.
Sep 26, 2025
dd037c4
Use block_in_place during configuration just to be safe.
Sep 26, 2025
984c233
Merge branch 'dev' into bryn/otel-reload-simplification
Sep 30, 2025
6ddfce9
Add tracing reload test
Sep 30, 2025
f5d684d
Update apollo-router/src/metrics/aggregation.rs
BrynCooke Oct 1, 2025
1feb276
(refactor) create_registered_instrument logic moved to inner
Oct 1, 2025
6f72675
(docs) Clarify activation fields
Oct 1, 2025
b25b296
(refactor) Remove unused fns and convert others to test
Oct 1, 2025
c77255a
(refactor) Rename fields in activation
Oct 1, 2025
a7d51c6
(refactor) Rename boolean fns with is_ prefix
Oct 1, 2025
1cb9e12
(docs) deadlock explaination
Oct 1, 2025
31d9956
(refactor) Renames based on sync review
Oct 2, 2025
9f26ac4
(docs) Code comments and docs
Oct 2, 2025
3faef4f
Lints
Oct 2, 2025
469a12a
Use spawn_blocking instead of block in place to prevent blocking relo…
Oct 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changesets/config_bryn_otel_reload_simplification.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
### Only reload telemetry when needed ([PR #8328](https://github.com/apollographql/router/pull/8328))

Previously when schema or config reload took place telemetry would always be reloaded. This would drop existing exporters
and create new ones.

Telemetry exporters will now only be recreated if relevant configuration has changed.

By [@BrynCooke](https://github.com/BrynCooke) in https://github.com/apollographql/router/pull/8328
2 changes: 1 addition & 1 deletion apollo-router/src/executable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::configuration::schema::Mode;
use crate::configuration::validate_yaml_configuration;
use crate::metrics::meter_provider_internal;
use crate::plugin::plugins;
use crate::plugins::telemetry::reload::init_telemetry;
use crate::plugins::telemetry::reload::otel::init_telemetry;
use crate::registry::OciConfig;
use crate::router::ConfigurationSource;
use crate::router::RouterHttpServer;
Expand Down
159 changes: 100 additions & 59 deletions apollo-router/src/metrics/aggregation.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::any::Any;
use std::borrow::Cow;
use std::collections::HashMap;
use std::ops::DerefMut;
use std::mem;
use std::sync::Arc;

use derive_more::From;
Expand All @@ -25,7 +25,13 @@ use opentelemetry::metrics::SyncGauge;
use opentelemetry::metrics::SyncHistogram;
use opentelemetry::metrics::SyncUpDownCounter;
use opentelemetry::metrics::UpDownCounter;
use opentelemetry::metrics::noop::NoopMeterProvider;
use opentelemetry_sdk::metrics::SdkMeterProvider;
use parking_lot::Mutex;
use strum::EnumCount;
use strum_macros::Display;
use strum_macros::EnumCount;
use strum_macros::EnumIter;

use crate::metrics::filter::FilterMeterProvider;

Expand All @@ -36,9 +42,11 @@ use crate::metrics::filter::FilterMeterProvider;
// This is within the spec: https://opentelemetry.io/docs/specs/otel/metrics/api/#get-a-meter
// `Meters are identified by name, version, and schema_url fields. When more than one Meter of the same name, version, and schema_url is created, it is unspecified whether or under which conditions the same or different Meter instances are returned. It is a user error to create Meters with different attributes but the same identity.`

#[derive(Hash, Ord, PartialOrd, Eq, PartialEq, Copy, Clone, Debug)]
#[derive(
Hash, Ord, PartialOrd, Eq, PartialEq, Copy, Clone, Debug, EnumCount, EnumIter, Display,
)]
#[repr(u8)]
pub(crate) enum MeterProviderType {
PublicPrometheus,
Apollo,
ApolloRealtime,
Public,
Expand All @@ -47,35 +55,48 @@ pub(crate) enum MeterProviderType {

#[derive(Clone)]
pub(crate) struct AggregateMeterProvider {
inner: Arc<Mutex<Inner>>,
inner: Arc<Mutex<Option<Inner>>>,
}

impl Default for AggregateMeterProvider {
fn default() -> Self {
let meter_provider = AggregateMeterProvider {
inner: Arc::new(Mutex::new(Inner::default())),
inner: Arc::new(Mutex::new(Some(Inner::default()))),
};

// If the regular global meter provider has been set then the aggregate meter provider will use it. Otherwise it'll default to a no-op.
// For this to work the global meter provider must be set before the aggregate meter provider is created.
// This functionality is not guaranteed to stay like this, so use at your own risk.
meter_provider.set(
MeterProviderType::OtelDefault,
Some(FilterMeterProvider::public(
opentelemetry::global::meter_provider(),
)),
FilterMeterProvider::public(opentelemetry::global::meter_provider()),
);

meter_provider
}
}

#[derive(Default)]
pub(crate) struct Inner {
providers: HashMap<MeterProviderType, (FilterMeterProvider, HashMap<MeterId, Meter>)>,
providers: Vec<(FilterMeterProvider, HashMap<MeterId, Meter>)>,
Comment on lines -75 to +80
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I fully understand the change to a vec? easier to iterate over?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have converted the code to always expect something for every meter provider type. Hashmap would allow the slot to be empty which adds a layer of complexity. Now we just have a fixed set of meter providers always.

registered_instruments: Vec<InstrumentWrapper>,
}

impl Default for Inner {
fn default() -> Self {
Inner {
providers: (0..MeterProviderType::COUNT)
.map(|_| {
(
FilterMeterProvider::public(SdkMeterProvider::default()),
HashMap::new(),
)
})
.collect(),
registered_instruments: Vec::new(),
}
}
}

/// Fields are never used directly but strong references here
/// keep weak references elsewhere upgradable.
#[derive(From)]
Expand Down Expand Up @@ -103,7 +124,7 @@ pub(crate) enum InstrumentWrapper {
},
}

#[derive(Eq, PartialEq, Hash)]
#[derive(Eq, PartialEq, Hash, Clone)]
struct MeterId {
name: Cow<'static, str>,
version: Option<Cow<'static, str>>,
Expand All @@ -119,53 +140,55 @@ impl AggregateMeterProvider {
pub(crate) fn set(
&self,
meter_provider_type: MeterProviderType,
meter_provider: Option<FilterMeterProvider>,
) -> Option<FilterMeterProvider> {
let mut inner = self.inner.lock();
meter_provider: FilterMeterProvider,
) -> FilterMeterProvider {
let mut guard = self.inner.lock();
let inner = guard
.as_mut()
.expect("cannot use meter provider after shutdown");
// As we are changing a meter provider we need to invalidate any registered instruments.
// Clearing these allows any weak references at callsites to be invalidated.
// This must be done BEFORE the old provider is dropped to ensure that metrics are not lost.
// Once invalidated all metrics callsites will try to obtain new instruments, but will be blocked on the mutex.
inner.registered_instruments.clear();
inner.invalidate();

//Now update the meter provider
let old = if let Some(meter_provider) = meter_provider {
inner
.providers
.insert(
meter_provider_type,
(meter_provider.clone(), HashMap::new()),
)
.map(|(old_provider, _)| old_provider)
} else {
None
};
let mut swap = (meter_provider, HashMap::new());
mem::swap(
&mut inner.providers[meter_provider_type as usize],
&mut swap,
);

// Important! The mutex MUST be dropped before the old meter provider is dropped to avoid deadlocks in the case that the export function has metrics.
// This implicitly happens by returning the old meter provider.
// However, to avoid a potential footgun where someone removes the return value of this function I will explicitly drop the mutex guard.
drop(inner);
drop(guard);

// Important! Now it is safe to drop the old meter provider, we return it, so we should be OK. If someone removes the return value of this function then
// this must instead be converted to a drop call.
old
swap.0
}

/// Invalidate all the cached instruments
#[cfg(test)]
pub(crate) fn invalidate(&self) {
if let Some(inner) = self.inner.lock().as_mut() {
inner.invalidate();
}
}

/// Shutdown MUST be called from a blocking thread.
pub(crate) fn shutdown(&self) {
// Make sure that we don't deadlock by dropping the mutex guard before actual shutdown happens
// This means that if we have any misbehaving code that tries to access the meter provider during shutdown, e.g. for export metrics
// then we don't get stuck on the mutex.
let mut inner = self.inner.lock();
let mut swap = Inner::default();
std::mem::swap(&mut *inner, &mut swap);
drop(inner);

// Now that we have dropped the mutex guard we can safely shutdown the meter providers
for (meter_provider_type, (meter_provider, _)) in &swap.providers {
if let Err(e) = meter_provider.shutdown() {
::tracing::error!(error = %e, meter_provider_type = ?meter_provider_type, "failed to shutdown meter provider")
}
}
// For instance the apollo exporters have in the past had metrics for exporting, as
// they shut down they try to increment a metric which causes a new meter to be created.
// However, if we have not released the guard then we deadlock.
let mut guard = self.inner.lock();
let old = guard.take();
drop(guard);
drop(old);
}

/// Create a registered instrument. This enables caching at callsites and invalidation at the meter provider via weak reference.
Expand All @@ -177,18 +200,27 @@ impl AggregateMeterProvider {
Arc<T>: Into<InstrumentWrapper>,
{
let mut guard = self.inner.lock();
let instrument = Arc::new((create_fn)(guard.deref_mut()));
guard.registered_instruments.push(instrument.clone().into());
instrument
let inner = guard
.as_mut()
.expect("cannot use meter provider after shutdown");
inner.create_registered_instrument(create_fn)
}

#[cfg(test)]
pub(crate) fn registered_instruments(&self) -> usize {
self.inner.lock().registered_instruments.len()
self.inner
.lock()
.as_ref()
.expect("cannot use meter provider after shutdown")
.registered_instruments
.len()
}
}

impl Inner {
pub(crate) fn invalidate(&mut self) {
self.registered_instruments.clear()
}
pub(crate) fn meter(&mut self, name: impl Into<Cow<'static, str>>) -> Meter {
self.versioned_meter(
name,
Expand All @@ -209,7 +241,7 @@ impl Inner {
let schema_url = schema_url.map(|v| v.into());
let mut meters = Vec::with_capacity(self.providers.len());

for (provider, existing_meters) in self.providers.values_mut() {
for (provider, existing_meters) in &mut self.providers {
meters.push(
existing_meters
.entry(MeterId {
Expand All @@ -231,6 +263,18 @@ impl Inner {

Meter::new(Arc::new(AggregateInstrumentProvider { meters }))
}

pub(crate) fn create_registered_instrument<T>(
&mut self,
create_fn: impl Fn(&mut Inner) -> T,
) -> Arc<T>
where
Arc<T>: Into<InstrumentWrapper>,
{
let instrument = Arc::new((create_fn)(self));
self.registered_instruments.push(instrument.clone().into());
instrument
}
}

impl MeterProvider for AggregateMeterProvider {
Expand All @@ -242,7 +286,12 @@ impl MeterProvider for AggregateMeterProvider {
attributes: Option<Vec<KeyValue>>,
) -> Meter {
let mut inner = self.inner.lock();
inner.versioned_meter(name, version, schema_url, attributes)
if let Some(inner) = inner.as_mut() {
inner.versioned_meter(name, version, schema_url, attributes)
} else {
// The meter was used after shutdown. Default to Noop since the instrument cannot actually be used
NoopMeterProvider::default().versioned_meter(name, version, schema_url, attributes)
}
}
}

Expand Down Expand Up @@ -585,7 +634,7 @@ mod test {
let meter_provider = AggregateMeterProvider::default();
meter_provider.set(
MeterProviderType::Public,
Some(FilterMeterProvider::public(delegate)),
FilterMeterProvider::public(delegate),
);
let meter = meter_provider.meter("test");

Expand Down Expand Up @@ -636,7 +685,7 @@ mod test {
let meter_provider = AggregateMeterProvider::default();
meter_provider.set(
MeterProviderType::Public,
Some(FilterMeterProvider::public(delegate)),
FilterMeterProvider::public(delegate),
);
let meter = meter_provider.meter("test");

Expand Down Expand Up @@ -734,9 +783,7 @@ mod test {
let meter_provider = AggregateMeterProvider::default();
meter_provider.set(
MeterProviderType::OtelDefault,
Some(FilterMeterProvider::public(GlobalMeterProvider::new(
delegate,
))),
FilterMeterProvider::public(GlobalMeterProvider::new(delegate)),
);

let counter = meter_provider
Expand Down Expand Up @@ -814,9 +861,7 @@ mod test {

meter_provider.set(
MeterProviderType::OtelDefault,
Some(FilterMeterProvider::public(GlobalMeterProvider::new(
delegate,
))),
FilterMeterProvider::public(GlobalMeterProvider::new(delegate)),
);

tokio::time::sleep(Duration::from_millis(20)).await;
Expand All @@ -840,9 +885,7 @@ mod test {

meter_provider.set(
MeterProviderType::OtelDefault,
Some(FilterMeterProvider::public(GlobalMeterProvider::new(
delegate,
))),
FilterMeterProvider::public(GlobalMeterProvider::new(delegate)),
);

tokio::time::sleep(Duration::from_millis(20)).await;
Expand All @@ -856,9 +899,7 @@ mod test {
// Setting the meter provider should not deadlock.
meter_provider.set(
MeterProviderType::OtelDefault,
Some(FilterMeterProvider::public(GlobalMeterProvider::new(
delegate,
))),
FilterMeterProvider::public(GlobalMeterProvider::new(delegate)),
);

tokio::time::sleep(Duration::from_millis(20)).await;
Expand Down
Loading