Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .changesets/fix_garypen_fix_sessions_and_handle_reporting.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
### Fix session counting and the reporting of file handle shortage ([PR #5834](https://github.com/apollographql/router/pull/5834))

Session counting incorrectly included connections to the health check or other non-graphql connections. This is now corrected so that only connections to the main graphql port are counted.

Warnings about file handle shortages are now handled correctly as a global resource.

The listening logic had its own custom rate limiting notifications. This has been removed and log notification is now controlled by the [standard router log rate limiting configuration](https://www.apollographql.com/docs/router/configuration/telemetry/exporters/logging/stdout/#rate_limit)

By [@garypen](https://github.com/garypen) in https://github.com/apollographql/router/pull/5834
2 changes: 2 additions & 0 deletions apollo-router/src/axum_factory/axum_http_server_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ impl HttpServerFactory for AxumHttpServerFactory {
main_listener,
actual_main_listen_address.clone(),
all_routers.main.1,
true,
all_connections_stopped_sender.clone(),
);

Expand Down Expand Up @@ -341,6 +342,7 @@ impl HttpServerFactory for AxumHttpServerFactory {
listener,
listen_addr.clone(),
router,
false,
all_connections_stopped_sender.clone(),
);
(
Expand Down
52 changes: 25 additions & 27 deletions apollo-router/src/axum_factory/listeners.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;

use axum::response::*;
use axum::Router;
Expand All @@ -31,7 +30,8 @@ use crate::router::ApolloRouterError;
use crate::router_factory::Endpoint;
use crate::ListenAddr;

pub(crate) static SESSION_COUNT: AtomicU64 = AtomicU64::new(0);
static SESSION_COUNT: AtomicU64 = AtomicU64::new(0);
static MAX_FILE_HANDLES_WARN: AtomicBool = AtomicBool::new(false);

#[derive(Clone, Debug)]
pub(crate) struct ListenAddrAndRouter(pub(crate) ListenAddr, pub(crate) Router);
Expand Down Expand Up @@ -201,6 +201,7 @@ pub(super) fn serve_router_on_listen_addr(
mut listener: Listener,
address: ListenAddr,
router: axum::Router,
main_graphql_port: bool,
all_connections_stopped_sender: mpsc::Sender<()>,
) -> (impl Future<Output = Listener>, oneshot::Sender<()>) {
let (shutdown_sender, shutdown_receiver) = oneshot::channel::<()>();
Expand All @@ -213,7 +214,6 @@ pub(super) fn serve_router_on_listen_addr(
tokio::pin!(shutdown_receiver);

let connection_shutdown = Arc::new(Notify::new());
let mut max_open_file_warning = None;

let address = address.to_string();

Expand All @@ -229,16 +229,18 @@ pub(super) fn serve_router_on_listen_addr(

match res {
Ok(res) => {
if max_open_file_warning.is_some(){
if MAX_FILE_HANDLES_WARN.load(Ordering::SeqCst) {
tracing::info!("can accept connections again");
max_open_file_warning = None;
MAX_FILE_HANDLES_WARN.store(false, Ordering::SeqCst);
}
// We only want to recognise sessions if we are the main graphql port.
if main_graphql_port {
let session_count = SESSION_COUNT.fetch_add(1, Ordering::Acquire)+1;
tracing::info!(
value.apollo_router_session_count_total = session_count,
listener = &address
);
}

let session_count = SESSION_COUNT.fetch_add(1, Ordering::Acquire)+1;
tracing::info!(
value.apollo_router_session_count_total = session_count,
listener = &address
);

let address = address.clone();
tokio::task::spawn(async move {
Expand Down Expand Up @@ -356,12 +358,14 @@ pub(super) fn serve_router_on_listen_addr(
}
}

let session_count = SESSION_COUNT.fetch_sub(1, Ordering::Acquire)-1;
tracing::info!(
value.apollo_router_session_count_total = session_count,
listener = &address
);

// We only want to recognise sessions if we are the main graphql port.
if main_graphql_port {
let session_count = SESSION_COUNT.fetch_sub(1, Ordering::Acquire)-1;
tracing::info!(
value.apollo_router_session_count_total = session_count,
listener = &address
);
}
});
}

Expand Down Expand Up @@ -419,16 +423,10 @@ pub(super) fn serve_router_on_listen_addr(
_ => {
match e.raw_os_error() {
Some(libc::EMFILE) | Some(libc::ENFILE) => {
match max_open_file_warning {
None => {
tracing::error!("reached the max open file limit, cannot accept any new connection");
max_open_file_warning = Some(Instant::now());
}
Some(last) => if Instant::now() - last > Duration::from_secs(60) {
tracing::error!("still at the max open file limit, cannot accept any new connection");
max_open_file_warning = Some(Instant::now());
}
}
tracing::error!(
"reached the max open file limit, cannot accept any new connection"
);
MAX_FILE_HANDLES_WARN.store(true, Ordering::SeqCst);
tokio::time::sleep(Duration::from_millis(1)).await;
}
_ => {}
Expand Down