Skip to content

Commit

Permalink
balance: Log and fail stuck discovery streams. (#2484)
Browse files Browse the repository at this point in the history
In 6d2abbc, we changed how outbound proxies process discovery updates.
The prior implementation used a watchdog timeout to bound the amount of
time an update stream could be full. With that change, when an update
channel fills, the backpressure can extend to the destination
controller's gRPC response stream.

To detect and avoid this harmful (and useless) backpressure, this change
modifies the balancer's discovery processing stream to exit when the
balancer has 1000 unprocessed discovery updates. A sufficiently scary
warning is logged.
  • Loading branch information
olix0r authored and hawkw committed Oct 25, 2023
1 parent 0e843c9 commit 19b2951
Showing 1 changed file with 51 additions and 18 deletions.
69 changes: 51 additions & 18 deletions linkerd/proxy/balance/src/discover/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use futures_util::future::poll_fn;
use linkerd_error::Error;
use tokio::sync::mpsc;
use tower::discover;
use tracing::{debug, instrument::Instrument, trace};
use tracing::{debug, debug_span, instrument::Instrument, trace, warn};

pub type Result<K, S> = std::result::Result<discover::Change<K, S>, Error>;
pub type Buffer<K, S> = tokio_stream::wrappers::ReceiverStream<Result<K, S>>;
Expand All @@ -16,48 +16,81 @@ where
{
let (tx, rx) = mpsc::channel(capacity);

// Attempts to send an update to the balancer, returning `true` if sending
// was successful and `false` otherwise.
let send = |tx: &mpsc::Sender<_>, up| {
match tx.try_send(up) {
Ok(()) => true,

// The balancer has been dropped (and will never be used again).
Err(mpsc::error::TrySendError::Closed(_)) => {
debug!("Discovery receiver dropped");
false
}

// The balancer is stalled and we can't continue to buffer
// updates for it.
Err(mpsc::error::TrySendError::Full(_)) => {
warn!(
"The balancer is not processing discovery updates; aborting discovery stream"
);
false
}
}
};

debug!(%capacity, "Spawning discovery buffer");
tokio::spawn(
async move {
tokio::pin!(inner);

loop {
let res = tokio::select! {
_ = tx.closed() => break,
biased;

_ = tx.closed() => {
debug!("Discovery receiver dropped");
return;
}

res = poll_fn(|cx| inner.as_mut().poll_discover(cx)) => res,
};

let change = match res {
match res {
Some(Ok(change)) => {
trace!("Changed");
change
if !send(&tx, Ok(change)) {
// XXX(ver) We don't actually have a way to "blow
// up" the balancer in this situation. My
// understanding is that this will cause the
// balancer to get cut off from further updates,
// should it ever become available again. That needs
// to be fixed.
//
// One option would be to drop the discovery stream
// and rebuild it if the balancer ever becomes
// unblocked.
//
// Ultimately we need to track down how we're
// getting into this blocked/idle state
return;
}
}
Some(Err(e)) => {
let error = e.into();
debug!(%error);
let _ = tx.send(Err(error)).await;
send(&tx, Err(error));
return;
}
None => {
debug!("Discovery stream closed");
return;
}
};

tokio::select! {
_ = tx.closed() => break,
res = tx.send(Ok(change)) => {
if res.is_err() {
break;
}
trace!("Change sent");
}
}
}

debug!("Discovery receiver dropped");
}
.in_current_span(),
.in_current_span()
.instrument(debug_span!("discover")),
);

Buffer::new(rx)
Expand Down

0 comments on commit 19b2951

Please sign in to comment.