From 93364f10a58c09bf2aff8261627272e3dbd493df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9e=20Kooi?= Date: Fri, 20 Dec 2024 15:19:33 +0100 Subject: [PATCH] Migrate session_count_total metric Somehow missed this in #6476 --- .../axum_factory/axum_http_server_factory.rs | 8 +-- apollo-router/src/axum_factory/listeners.rs | 53 ++++++++++++------- 2 files changed, 37 insertions(+), 24 deletions(-) diff --git a/apollo-router/src/axum_factory/axum_http_server_factory.rs b/apollo-router/src/axum_factory/axum_http_server_factory.rs index bf53143815..df6d9743d4 100644 --- a/apollo-router/src/axum_factory/axum_http_server_factory.rs +++ b/apollo-router/src/axum_factory/axum_http_server_factory.rs @@ -88,16 +88,16 @@ fn session_count_instrument() -> ObservableGauge { .init() } -struct SessionCountGuard; +struct ActiveSessionCountGuard; -impl SessionCountGuard { +impl ActiveSessionCountGuard { fn start() -> Self { ACTIVE_SESSION_COUNT.fetch_add(1, Ordering::Acquire); Self } } -impl Drop for SessionCountGuard { +impl Drop for ActiveSessionCountGuard { fn drop(&mut self) { ACTIVE_SESSION_COUNT.fetch_sub(1, Ordering::Acquire); } @@ -655,7 +655,7 @@ async fn handle_graphql( experimental_log_on_broken_pipe: bool, http_request: Request>, ) -> impl IntoResponse { - let _guard = SessionCountGuard::start(); + let _guard = ActiveSessionCountGuard::start(); let (parts, body) = http_request.into_parts(); diff --git a/apollo-router/src/axum_factory/listeners.rs b/apollo-router/src/axum_factory/listeners.rs index 52ea352979..17881d47bb 100644 --- a/apollo-router/src/axum_factory/listeners.rs +++ b/apollo-router/src/axum_factory/listeners.rs @@ -14,6 +14,8 @@ use futures::channel::oneshot; use futures::prelude::*; use hyper::server::conn::Http; use multimap::MultiMap; +use opentelemetry::metrics::MeterProvider; +use opentelemetry::KeyValue; #[cfg(unix)] use tokio::net::UnixListener; use tokio::sync::mpsc; @@ -26,13 +28,29 @@ use crate::axum_factory::ENDPOINT_CALLBACK; use crate::configuration::Configuration; use crate::http_server_factory::Listener; use crate::http_server_factory::NetworkStream; +use crate::metrics::meter_provider; use crate::router::ApolloRouterError; use crate::router_factory::Endpoint; use crate::ListenAddr; -static SESSION_COUNT: AtomicU64 = AtomicU64::new(0); +static TOTAL_SESSION_COUNT: AtomicU64 = AtomicU64::new(0); static MAX_FILE_HANDLES_WARN: AtomicBool = AtomicBool::new(false); +struct TotalSessionCountGuard; + +impl TotalSessionCountGuard { + fn start() -> Self { + TOTAL_SESSION_COUNT.fetch_add(1, Ordering::Acquire); + Self + } +} + +impl Drop for TotalSessionCountGuard { + fn drop(&mut self) { + TOTAL_SESSION_COUNT.fetch_sub(1, Ordering::Acquire); + } +} + #[derive(Clone, Debug)] pub(crate) struct ListenAddrAndRouter(pub(crate) ListenAddr, pub(crate) Router); @@ -214,9 +232,19 @@ pub(super) fn serve_router_on_listen_addr( let server = async move { tokio::pin!(shutdown_receiver); - let connection_shutdown = Arc::new(Notify::new()); + let _total_session_count_instrument = meter_provider() + .meter("apollo/router") + .u64_observable_gauge("apollo_router_session_count_total") + .with_description("Number of currently connected clients") + .with_callback(move |gauge| { + gauge.observe( + TOTAL_SESSION_COUNT.load(Ordering::Relaxed), + &[KeyValue::new("listener", address.to_string())], + ); + }) + .init(); - let address = address.to_string(); + let connection_shutdown = Arc::new(Notify::new()); loop { tokio::select! { @@ -234,16 +262,10 @@ pub(super) fn serve_router_on_listen_addr( tracing::info!("can accept connections again"); MAX_FILE_HANDLES_WARN.store(false, Ordering::SeqCst); } + // We only want to recognise sessions if we are the main graphql port. - if main_graphql_port { - let session_count = SESSION_COUNT.fetch_add(1, Ordering::Acquire)+1; - tracing::info!( - value.apollo_router_session_count_total = session_count, - listener = &address - ); - } + let _guard = main_graphql_port.then(TotalSessionCountGuard::start); - let address = address.clone(); let mut http_config = http_config.clone(); tokio::task::spawn(async move { // this sender must be moved into the session to track that it is still running @@ -352,15 +374,6 @@ pub(super) fn serve_router_on_listen_addr( } } } - - // We only want to recognise sessions if we are the main graphql port. - if main_graphql_port { - let session_count = SESSION_COUNT.fetch_sub(1, Ordering::Acquire)-1; - tracing::info!( - value.apollo_router_session_count_total = session_count, - listener = &address - ); - } }); }