Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .changesets/fix_bryn_connection_cancellation_race.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
### Connection shutdown sometimes fails over hot reload ([PR #8169](https://github.com/apollographql/router/pull/8169))

A race in the way that connections were shutdown when a hot-reload is triggered meant that occasionally some connections
were left in active state and never entered terminating state. This could cause OOMs over time as multiple pipelines are
left active.

This is now fixed and connections that are opening at the same time as shutdown will immediately terminate.

By [@BrynCooke](https://github.com/BrynCooke) in https://github.com/apollographql/router/pull/8169
9 changes: 5 additions & 4 deletions apollo-router/src/axum_factory/listeners.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use hyper_util::server::conn::auto::Builder;
use multimap::MultiMap;
#[cfg(unix)]
use tokio::net::UnixListener;
use tokio::sync::Notify;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use tokio_util::time::FutureExt;
use tower_service::Service;

Expand Down Expand Up @@ -219,7 +219,7 @@ macro_rules! handle_connection {
// the shutdown receiver was triggered first,
// so we tell the connection to do a graceful shutdown
// on the next request, then we wait for it to finish
_ = connection_shutdown.notified() => {
_ = connection_shutdown.cancelled() => {
connection_handle.shutdown();
connection.as_mut().graceful_shutdown();
// Only wait for the connection to close gracfully if we recieved a request.
Expand Down Expand Up @@ -332,7 +332,7 @@ pub(super) fn serve_router_on_listen_addr(
let server = async move {
tokio::pin!(shutdown_receiver);

let connection_shutdown = Arc::new(Notify::new());
let connection_shutdown = CancellationToken::new();

loop {
tokio::select! {
Expand Down Expand Up @@ -465,7 +465,7 @@ pub(super) fn serve_router_on_listen_addr(
// the shutdown receiver was triggered so we break out of
// the server loop, tell the currently active connections to stop
// then return the TCP listen socket
connection_shutdown.notify_waiters();
connection_shutdown.cancel();
listener
};
(server, shutdown_sender)
Expand Down Expand Up @@ -512,6 +512,7 @@ where
mod tests {
use std::net::SocketAddr;
use std::str::FromStr;
use std::sync::Arc;

use axum::BoxError;
use tower::ServiceExt;
Expand Down