Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions apollo-router/src/axum_factory/axum_http_server_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ use opentelemetry_api::metrics::MeterProvider as _;
use opentelemetry_api::metrics::ObservableGauge;
use serde::Serialize;
use serde_json::json;
use sha1::Digest as _;
use sha2::Sha256;
#[cfg(unix)]
use tokio::net::UnixListener;
use tokio::sync::mpsc;
Expand Down Expand Up @@ -239,6 +241,7 @@ impl HttpServerFactory for AxumHttpServerFactory {
&self,
service_factory: RF,
configuration: Arc<Configuration>,
schema_id: String,
mut main_listener: Option<Listener>,
previous_listeners: Vec<(ListenAddr, Listener)>,
extra_endpoints: MultiMap<ListenAddr, Endpoint>,
Expand Down Expand Up @@ -326,10 +329,19 @@ impl HttpServerFactory for AxumHttpServerFactory {
http_config.max_buf_size(max_buf_size.as_u64() as usize);
}

let config_hash = {
let mut hasher = Sha256::new();
serde_json::to_writer(&mut hasher, &configuration)
.expect("write to hasher can't fail");
format!("{:x}", hasher.finalize())
};

let (main_server, main_shutdown_sender) = serve_router_on_listen_addr(
main_listener,
actual_main_listen_address.clone(),
all_routers.main.1,
config_hash.clone(),
schema_id.clone(),
true,
http_config.clone(),
all_connections_stopped_sender.clone(),
Expand Down Expand Up @@ -370,6 +382,8 @@ impl HttpServerFactory for AxumHttpServerFactory {
listener,
listen_addr.clone(),
router,
config_hash.clone(),
schema_id.clone(),
false,
http_config.clone(),
all_connections_stopped_sender.clone(),
Expand Down
75 changes: 56 additions & 19 deletions apollo-router/src/axum_factory/listeners.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use futures::prelude::*;
use hyper::server::conn::Http;
use multimap::MultiMap;
use opentelemetry::metrics::MeterProvider;
use opentelemetry::metrics::ObservableGauge;
use opentelemetry::KeyValue;
#[cfg(unix)]
use tokio::net::UnixListener;
Expand All @@ -33,21 +34,61 @@ use crate::router::ApolloRouterError;
use crate::router_factory::Endpoint;
use crate::ListenAddr;

static TOTAL_SESSION_COUNT: AtomicU64 = AtomicU64::new(0);
static MAX_FILE_HANDLES_WARN: AtomicBool = AtomicBool::new(false);

struct TotalSessionCountGuard;
/// Implements the `apollo_router_session_count_total` metric.
///
/// Creating an instance with [TotalSessionCountInstrument::register] registers the metric with otel.
/// Instances can then be cheaply cloned to extend the lifetime of the instrument.
#[derive(Clone)]
struct TotalSessionCountInstrument {
value: Arc<AtomicU64>,
_instrument: ObservableGauge<u64>,
}

impl TotalSessionCountInstrument {
/// Create a new instrument with the given attributes.
fn register(listener_address: String, config_hash: String, schema_id: String) -> Self {
let value = Arc::new(AtomicU64::new(0));
let value_for_observation = Arc::clone(&value);

let instrument = meter_provider()
.meter("apollo/router")
.u64_observable_gauge("apollo_router_session_count_total")
.with_description("Number of currently connected clients")
.with_callback(move |gauge| {
gauge.observe(
value_for_observation.load(Ordering::Relaxed),
&[
KeyValue::new("listener", listener_address.clone()),
KeyValue::new("config_hash", config_hash.clone()),
KeyValue::new("schema_id", schema_id.clone()),
],
);
})
.init();

impl TotalSessionCountGuard {
fn start() -> Self {
TOTAL_SESSION_COUNT.fetch_add(1, Ordering::Acquire);
Self
Self {
value,
_instrument: instrument,
}
}

/// Return a guard that increments the session count, and extends the instrument's lifetime
/// to ensure it continues to report on lingering sessions even after the server listener is detached.
///
/// Care must be taken to keep the guard alive for the duration of the session, by moving it
/// into tokio tasks etc used to process requests.
fn start(&self) -> TotalSessionCountGuard {
self.value.fetch_add(1, Ordering::Acquire);
TotalSessionCountGuard(self.clone())
}
}

struct TotalSessionCountGuard(TotalSessionCountInstrument);
impl Drop for TotalSessionCountGuard {
fn drop(&mut self) {
TOTAL_SESSION_COUNT.fetch_sub(1, Ordering::Acquire);
self.0.value.fetch_sub(1, Ordering::Acquire);
}
}

Expand Down Expand Up @@ -215,10 +256,14 @@ pub(super) async fn get_extra_listeners(
Ok(listeners_and_routers)
}

// clippy's not wrong, but it's not clear that it would be improved by using a structure
#[allow(clippy::too_many_arguments)]
pub(super) fn serve_router_on_listen_addr(
mut listener: Listener,
address: ListenAddr,
router: axum::Router,
config_hash: String,
schema_id: String,
main_graphql_port: bool,
http_config: Http,
all_connections_stopped_sender: mpsc::Sender<()>,
Expand All @@ -232,17 +277,8 @@ pub(super) fn serve_router_on_listen_addr(
let server = async move {
tokio::pin!(shutdown_receiver);

let _total_session_count_instrument = meter_provider()
.meter("apollo/router")
.u64_observable_gauge("apollo_router_session_count_total")
.with_description("Number of currently connected clients")
.with_callback(move |gauge| {
gauge.observe(
TOTAL_SESSION_COUNT.load(Ordering::Relaxed),
&[KeyValue::new("listener", address.to_string())],
);
})
.init();
let total_session_count_instrument =
TotalSessionCountInstrument::register(address.to_string(), config_hash, schema_id);

let connection_shutdown = Arc::new(Notify::new());

Expand All @@ -264,12 +300,13 @@ pub(super) fn serve_router_on_listen_addr(
}

// We only want to recognise sessions if we are the main graphql port.
let _guard = main_graphql_port.then(TotalSessionCountGuard::start);
let session_count_guard = main_graphql_port.then(|| total_session_count_instrument.start());

let mut http_config = http_config.clone();
tokio::task::spawn(async move {
// this sender must be moved into the session to track that it is still running
let _connection_stop_signal = connection_stop_signal;
let _session_count_guard = session_count_guard;

match res {
NetworkStream::Tcp(stream) => {
Expand Down
5 changes: 5 additions & 0 deletions apollo-router/src/axum_factory/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ async fn init(
.build()
.unwrap(),
),
"test_schema_id".into(),
None,
vec![],
MultiMap::new(),
Expand Down Expand Up @@ -272,6 +273,7 @@ pub(super) async fn init_with_config(
inner: service.into_inner(),
},
conf,
"test_schema_id".into(),
None,
vec![],
web_endpoints,
Expand Down Expand Up @@ -339,6 +341,7 @@ async fn init_unix(
.build()
.unwrap(),
),
"test_schema_id".into(),
None,
vec![],
MultiMap::new(),
Expand Down Expand Up @@ -1770,6 +1773,7 @@ async fn it_supports_server_restart() {
.create(
supergraph_service_factory.clone(),
configuration,
"test_schema_id".into(),
None,
vec![],
MultiMap::new(),
Expand Down Expand Up @@ -1801,6 +1805,7 @@ async fn it_supports_server_restart() {
&server_factory,
supergraph_service_factory,
new_configuration,
"test_schema_id".into(),
MultiMap::new(),
LicenseState::default(),
)
Expand Down
2 changes: 1 addition & 1 deletion apollo-router/src/configuration/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ pub enum ConfigurationError {
#[derivative(Debug)]
// We can't put a global #[serde(default)] here because of the Default implementation using `from_str` which use deserialize
pub struct Configuration {
/// The raw configuration string.
/// The configuration object as a dynamic JSON value.
#[serde(skip)]
pub(crate) validated_yaml: Option<Value>,

Expand Down
3 changes: 3 additions & 0 deletions apollo-router/src/http_server_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub(crate) trait HttpServerFactory {
&self,
service_factory: RF,
configuration: Arc<Configuration>,
schema_id: String,
main_listener: Option<Listener>,
previous_listeners: ExtraListeners,
extra_endpoints: MultiMap<ListenAddr, Endpoint>,
Expand Down Expand Up @@ -122,6 +123,7 @@ impl HttpServerHandle {
factory: &SF,
router: RF,
configuration: Arc<Configuration>,
schema_id: String,
web_endpoints: MultiMap<ListenAddr, Endpoint>,
license: LicenseState,
) -> Result<Self, ApolloRouterError>
Expand All @@ -144,6 +146,7 @@ impl HttpServerHandle {
.create(
router,
configuration,
schema_id,
Some(main_listener),
extra_listeners,
web_endpoints,
Expand Down
7 changes: 7 additions & 0 deletions apollo-router/src/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,10 @@ impl<FA: RouterSuperServiceFactory> State<FA> {
license
};

// We want to give the schema ID to the HTTP server factory so it can be reported in
// session count metrics.
let schema_id = schema.schema_id.clone();

let router_service_factory = state_machine
.router_configurator
.create(
Expand All @@ -383,6 +387,7 @@ impl<FA: RouterSuperServiceFactory> State<FA> {
.create(
router_service_factory.clone(),
configuration.clone(),
schema_id.to_string(),
Default::default(),
Default::default(),
web_endpoints,
Expand All @@ -397,6 +402,7 @@ impl<FA: RouterSuperServiceFactory> State<FA> {
&state_machine.http_server_factory,
router_service_factory.clone(),
configuration.clone(),
schema_id.to_string(),
web_endpoints,
effective_license,
)
Expand Down Expand Up @@ -1194,6 +1200,7 @@ mod tests {
&self,
_service_factory: RF,
configuration: Arc<Configuration>,
_schema_id: String,
main_listener: Option<Listener>,
_extra_listeners: Vec<(ListenAddr, Listener)>,
_web_endpoints: MultiMap<ListenAddr, Endpoint>,
Expand Down