From 19b2951e7fe8a10a033bfbbd4140654edd354de2 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Tue, 17 Oct 2023 11:01:19 -0700 Subject: [PATCH] balance: Log and fail stuck discovery streams. (#2484) In 6d2abbc, we changed how outbound proxies process discovery updates. The prior implementation used a watchdog timeout to bound the amount of time an update stream could be full. With that change, when an update channel fills, the backpressure can extend to the destination controller's gRPC response stream. To detect and avoid this harmful (and useless) backpressure, this change modifies the balancer's discovery processing stream to exit when the balancer has 1000 unprocessed discovery updates. A sufficiently scary warning is logged. --- linkerd/proxy/balance/src/discover/buffer.rs | 69 +++++++++++++++----- 1 file changed, 51 insertions(+), 18 deletions(-) diff --git a/linkerd/proxy/balance/src/discover/buffer.rs b/linkerd/proxy/balance/src/discover/buffer.rs index c17b477a4d..d46787b4a5 100644 --- a/linkerd/proxy/balance/src/discover/buffer.rs +++ b/linkerd/proxy/balance/src/discover/buffer.rs @@ -2,7 +2,7 @@ use futures_util::future::poll_fn; use linkerd_error::Error; use tokio::sync::mpsc; use tower::discover; -use tracing::{debug, instrument::Instrument, trace}; +use tracing::{debug, debug_span, instrument::Instrument, trace, warn}; pub type Result = std::result::Result, Error>; pub type Buffer = tokio_stream::wrappers::ReceiverStream>; @@ -16,6 +16,29 @@ where { let (tx, rx) = mpsc::channel(capacity); + // Attempts to send an update to the balancer, returning `true` if sending + // was successful and `false` otherwise. + let send = |tx: &mpsc::Sender<_>, up| { + match tx.try_send(up) { + Ok(()) => true, + + // The balancer has been dropped (and will never be used again). + Err(mpsc::error::TrySendError::Closed(_)) => { + debug!("Discovery receiver dropped"); + false + } + + // The balancer is stalled and we can't continue to buffer + // updates for it. + Err(mpsc::error::TrySendError::Full(_)) => { + warn!( + "The balancer is not processing discovery updates; aborting discovery stream" + ); + false + } + } + }; + debug!(%capacity, "Spawning discovery buffer"); tokio::spawn( async move { @@ -23,41 +46,51 @@ where loop { let res = tokio::select! { - _ = tx.closed() => break, + biased; + + _ = tx.closed() => { + debug!("Discovery receiver dropped"); + return; + } + res = poll_fn(|cx| inner.as_mut().poll_discover(cx)) => res, }; - let change = match res { + match res { Some(Ok(change)) => { trace!("Changed"); - change + if !send(&tx, Ok(change)) { + // XXX(ver) We don't actually have a way to "blow + // up" the balancer in this situation. My + // understanding is that this will cause the + // balancer to get cut off from further updates, + // should it ever become available again. That needs + // to be fixed. + // + // One option would be to drop the discovery stream + // and rebuild it if the balancer ever becomes + // unblocked. + // + // Ultimately we need to track down how we're + // getting into this blocked/idle state + return; + } } Some(Err(e)) => { let error = e.into(); debug!(%error); - let _ = tx.send(Err(error)).await; + send(&tx, Err(error)); return; } None => { debug!("Discovery stream closed"); return; } - }; - - tokio::select! { - _ = tx.closed() => break, - res = tx.send(Ok(change)) => { - if res.is_err() { - break; - } - trace!("Change sent"); - } } } - - debug!("Discovery receiver dropped"); } - .in_current_span(), + .in_current_span() + .instrument(debug_span!("discover")), ); Buffer::new(rx)