diff --git a/apollo-router/src/axum_factory/axum_http_server_factory.rs b/apollo-router/src/axum_factory/axum_http_server_factory.rs index df6d9743d4..763e5e59f0 100644 --- a/apollo-router/src/axum_factory/axum_http_server_factory.rs +++ b/apollo-router/src/axum_factory/axum_http_server_factory.rs @@ -33,6 +33,8 @@ use opentelemetry_api::metrics::MeterProvider as _; use opentelemetry_api::metrics::ObservableGauge; use serde::Serialize; use serde_json::json; +use sha1::Digest as _; +use sha2::Sha256; #[cfg(unix)] use tokio::net::UnixListener; use tokio::sync::mpsc; @@ -239,6 +241,7 @@ impl HttpServerFactory for AxumHttpServerFactory { &self, service_factory: RF, configuration: Arc, + schema_id: String, mut main_listener: Option, previous_listeners: Vec<(ListenAddr, Listener)>, extra_endpoints: MultiMap, @@ -326,10 +329,19 @@ impl HttpServerFactory for AxumHttpServerFactory { http_config.max_buf_size(max_buf_size.as_u64() as usize); } + let config_hash = { + let mut hasher = Sha256::new(); + serde_json::to_writer(&mut hasher, &configuration) + .expect("write to hasher can't fail"); + format!("{:x}", hasher.finalize()) + }; + let (main_server, main_shutdown_sender) = serve_router_on_listen_addr( main_listener, actual_main_listen_address.clone(), all_routers.main.1, + config_hash.clone(), + schema_id.clone(), true, http_config.clone(), all_connections_stopped_sender.clone(), @@ -370,6 +382,8 @@ impl HttpServerFactory for AxumHttpServerFactory { listener, listen_addr.clone(), router, + config_hash.clone(), + schema_id.clone(), false, http_config.clone(), all_connections_stopped_sender.clone(), diff --git a/apollo-router/src/axum_factory/listeners.rs b/apollo-router/src/axum_factory/listeners.rs index 17881d47bb..c5a105b926 100644 --- a/apollo-router/src/axum_factory/listeners.rs +++ b/apollo-router/src/axum_factory/listeners.rs @@ -15,6 +15,7 @@ use futures::prelude::*; use hyper::server::conn::Http; use multimap::MultiMap; use opentelemetry::metrics::MeterProvider; +use opentelemetry::metrics::ObservableGauge; use opentelemetry::KeyValue; #[cfg(unix)] use tokio::net::UnixListener; @@ -33,21 +34,61 @@ use crate::router::ApolloRouterError; use crate::router_factory::Endpoint; use crate::ListenAddr; -static TOTAL_SESSION_COUNT: AtomicU64 = AtomicU64::new(0); static MAX_FILE_HANDLES_WARN: AtomicBool = AtomicBool::new(false); -struct TotalSessionCountGuard; +/// Implements the `apollo_router_session_count_total` metric. +/// +/// Creating an instance with [TotalSessionCountInstrument::register] registers the metric with otel. +/// Instances can then be cheaply cloned to extend the lifetime of the instrument. +#[derive(Clone)] +struct TotalSessionCountInstrument { + value: Arc, + _instrument: ObservableGauge, +} + +impl TotalSessionCountInstrument { + /// Create a new instrument with the given attributes. + fn register(listener_address: String, config_hash: String, schema_id: String) -> Self { + let value = Arc::new(AtomicU64::new(0)); + let value_for_observation = Arc::clone(&value); + + let instrument = meter_provider() + .meter("apollo/router") + .u64_observable_gauge("apollo_router_session_count_total") + .with_description("Number of currently connected clients") + .with_callback(move |gauge| { + gauge.observe( + value_for_observation.load(Ordering::Relaxed), + &[ + KeyValue::new("listener", listener_address.clone()), + KeyValue::new("config_hash", config_hash.clone()), + KeyValue::new("schema_id", schema_id.clone()), + ], + ); + }) + .init(); -impl TotalSessionCountGuard { - fn start() -> Self { - TOTAL_SESSION_COUNT.fetch_add(1, Ordering::Acquire); - Self + Self { + value, + _instrument: instrument, + } + } + + /// Return a guard that increments the session count, and extends the instrument's lifetime + /// to ensure it continues to report on lingering sessions even after the server listener is detached. + /// + /// Care must be taken to keep the guard alive for the duration of the session, by moving it + /// into tokio tasks etc used to process requests. + fn start(&self) -> TotalSessionCountGuard { + self.value.fetch_add(1, Ordering::Acquire); + TotalSessionCountGuard(self.clone()) } } +struct TotalSessionCountGuard(TotalSessionCountInstrument); impl Drop for TotalSessionCountGuard { fn drop(&mut self) { - TOTAL_SESSION_COUNT.fetch_sub(1, Ordering::Acquire); + self.0.value.fetch_sub(1, Ordering::Acquire); } } @@ -215,10 +256,14 @@ pub(super) async fn get_extra_listeners( Ok(listeners_and_routers) } +// clippy's not wrong, but it's not clear that it would be improved by using a structure +#[allow(clippy::too_many_arguments)] pub(super) fn serve_router_on_listen_addr( mut listener: Listener, address: ListenAddr, router: axum::Router, + config_hash: String, + schema_id: String, main_graphql_port: bool, http_config: Http, all_connections_stopped_sender: mpsc::Sender<()>, @@ -232,17 +277,8 @@ pub(super) fn serve_router_on_listen_addr( let server = async move { tokio::pin!(shutdown_receiver); - let _total_session_count_instrument = meter_provider() - .meter("apollo/router") - .u64_observable_gauge("apollo_router_session_count_total") - .with_description("Number of currently connected clients") - .with_callback(move |gauge| { - gauge.observe( - TOTAL_SESSION_COUNT.load(Ordering::Relaxed), - &[KeyValue::new("listener", address.to_string())], - ); - }) - .init(); + let total_session_count_instrument = + TotalSessionCountInstrument::register(address.to_string(), config_hash, schema_id); let connection_shutdown = Arc::new(Notify::new()); @@ -264,12 +300,13 @@ pub(super) fn serve_router_on_listen_addr( } // We only want to recognise sessions if we are the main graphql port. - let _guard = main_graphql_port.then(TotalSessionCountGuard::start); + let session_count_guard = main_graphql_port.then(|| total_session_count_instrument.start()); let mut http_config = http_config.clone(); tokio::task::spawn(async move { // this sender must be moved into the session to track that it is still running let _connection_stop_signal = connection_stop_signal; + let _session_count_guard = session_count_guard; match res { NetworkStream::Tcp(stream) => { diff --git a/apollo-router/src/axum_factory/tests.rs b/apollo-router/src/axum_factory/tests.rs index 0aced33790..be22af8b7e 100644 --- a/apollo-router/src/axum_factory/tests.rs +++ b/apollo-router/src/axum_factory/tests.rs @@ -214,6 +214,7 @@ async fn init( .build() .unwrap(), ), + "test_schema_id".into(), None, vec![], MultiMap::new(), @@ -272,6 +273,7 @@ pub(super) async fn init_with_config( inner: service.into_inner(), }, conf, + "test_schema_id".into(), None, vec![], web_endpoints, @@ -339,6 +341,7 @@ async fn init_unix( .build() .unwrap(), ), + "test_schema_id".into(), None, vec![], MultiMap::new(), @@ -1770,6 +1773,7 @@ async fn it_supports_server_restart() { .create( supergraph_service_factory.clone(), configuration, + "test_schema_id".into(), None, vec![], MultiMap::new(), @@ -1801,6 +1805,7 @@ async fn it_supports_server_restart() { &server_factory, supergraph_service_factory, new_configuration, + "test_schema_id".into(), MultiMap::new(), LicenseState::default(), ) diff --git a/apollo-router/src/configuration/mod.rs b/apollo-router/src/configuration/mod.rs index 3b83d9416a..81822e53a2 100644 --- a/apollo-router/src/configuration/mod.rs +++ b/apollo-router/src/configuration/mod.rs @@ -118,7 +118,7 @@ pub enum ConfigurationError { #[derivative(Debug)] // We can't put a global #[serde(default)] here because of the Default implementation using `from_str` which use deserialize pub struct Configuration { - /// The raw configuration string. + /// The configuration object as a dynamic JSON value. #[serde(skip)] pub(crate) validated_yaml: Option, diff --git a/apollo-router/src/http_server_factory.rs b/apollo-router/src/http_server_factory.rs index c66c9da0f0..4509e31420 100644 --- a/apollo-router/src/http_server_factory.rs +++ b/apollo-router/src/http_server_factory.rs @@ -28,6 +28,7 @@ pub(crate) trait HttpServerFactory { &self, service_factory: RF, configuration: Arc, + schema_id: String, main_listener: Option, previous_listeners: ExtraListeners, extra_endpoints: MultiMap, @@ -122,6 +123,7 @@ impl HttpServerHandle { factory: &SF, router: RF, configuration: Arc, + schema_id: String, web_endpoints: MultiMap, license: LicenseState, ) -> Result @@ -144,6 +146,7 @@ impl HttpServerHandle { .create( router, configuration, + schema_id, Some(main_listener), extra_listeners, web_endpoints, diff --git a/apollo-router/src/state_machine.rs b/apollo-router/src/state_machine.rs index 669a70d9ef..b4edb123a2 100644 --- a/apollo-router/src/state_machine.rs +++ b/apollo-router/src/state_machine.rs @@ -358,6 +358,10 @@ impl State { license }; + // We want to give the schema ID to the HTTP server factory so it can be reported in + // session count metrics. + let schema_id = schema.schema_id.clone(); + let router_service_factory = state_machine .router_configurator .create( @@ -383,6 +387,7 @@ impl State { .create( router_service_factory.clone(), configuration.clone(), + schema_id.to_string(), Default::default(), Default::default(), web_endpoints, @@ -397,6 +402,7 @@ impl State { &state_machine.http_server_factory, router_service_factory.clone(), configuration.clone(), + schema_id.to_string(), web_endpoints, effective_license, ) @@ -1194,6 +1200,7 @@ mod tests { &self, _service_factory: RF, configuration: Arc, + _schema_id: String, main_listener: Option, _extra_listeners: Vec<(ListenAddr, Listener)>, _web_endpoints: MultiMap,