diff --git a/.changesets/fix_bryn_connection_cancellation_race.md b/.changesets/fix_bryn_connection_cancellation_race.md new file mode 100644 index 0000000000..7caf17e699 --- /dev/null +++ b/.changesets/fix_bryn_connection_cancellation_race.md @@ -0,0 +1,9 @@ +### Connection shutdown sometimes fails over hot reload ([PR #8169](https://github.com/apollographql/router/pull/8169)) + +A race in the way that connections were shutdown when a hot-reload is triggered meant that occasionally some connections +were left in active state and never entered terminating state. This could cause OOMs over time as multiple pipelines are +left active. + +This is now fixed and connections that are opening at the same time as shutdown will immediately terminate. + +By [@BrynCooke](https://github.com/BrynCooke) in https://github.com/apollographql/router/pull/8169 diff --git a/apollo-router/src/axum_factory/listeners.rs b/apollo-router/src/axum_factory/listeners.rs index 5cbe7891da..7c6cab16ad 100644 --- a/apollo-router/src/axum_factory/listeners.rs +++ b/apollo-router/src/axum_factory/listeners.rs @@ -19,8 +19,8 @@ use hyper_util::server::conn::auto::Builder; use multimap::MultiMap; #[cfg(unix)] use tokio::net::UnixListener; -use tokio::sync::Notify; use tokio::sync::mpsc; +use tokio_util::sync::CancellationToken; use tokio_util::time::FutureExt; use tower_service::Service; @@ -219,7 +219,7 @@ macro_rules! handle_connection { // the shutdown receiver was triggered first, // so we tell the connection to do a graceful shutdown // on the next request, then we wait for it to finish - _ = connection_shutdown.notified() => { + _ = connection_shutdown.cancelled() => { connection_handle.shutdown(); connection.as_mut().graceful_shutdown(); // Only wait for the connection to close gracfully if we recieved a request. @@ -332,7 +332,7 @@ pub(super) fn serve_router_on_listen_addr( let server = async move { tokio::pin!(shutdown_receiver); - let connection_shutdown = Arc::new(Notify::new()); + let connection_shutdown = CancellationToken::new(); loop { tokio::select! { @@ -465,7 +465,7 @@ pub(super) fn serve_router_on_listen_addr( // the shutdown receiver was triggered so we break out of // the server loop, tell the currently active connections to stop // then return the TCP listen socket - connection_shutdown.notify_waiters(); + connection_shutdown.cancel(); listener }; (server, shutdown_sender) @@ -512,6 +512,7 @@ where mod tests { use std::net::SocketAddr; use std::str::FromStr; + use std::sync::Arc; use axum::BoxError; use tower::ServiceExt;