diff --git a/Cargo.toml b/Cargo.toml index 718b4315c..ce9fbf302 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,7 +50,7 @@ x25519-dalek = "2.0.1" x509-parser = "0.17.0" yasna = "0.5.0" zeroize = "1.8.1" -yamux = "0.13.5" +yamux = "0.13.7" # Websocket related dependencies. tokio-tungstenite = { version = "0.27.0", features = ["rustls-tls-native-roots", "url"], optional = true } diff --git a/src/yamux/control.rs b/src/yamux/control.rs index 1114f7458..2eda5ca11 100644 --- a/src/yamux/control.rs +++ b/src/yamux/control.rs @@ -19,6 +19,8 @@ use std::{ task::{Context, Poll}, }; +const LOG_TARGET: &str = "litep2p::yamux::control"; + /// A Yamux [`Connection`] controller. /// /// This presents an alternative API for using a yamux [`Connection`]. @@ -81,7 +83,25 @@ where State::Idle(mut connection) => { match connection.poll_next_inbound(cx) { Poll::Ready(maybe_stream) => { - self.state = State::Idle(connection); + // Transport layers will close the connection on the first + // substream error. The `connection.poll_next_inbound` should + // not be called again after returning an error. Instead, we + // must close the connection gracefully. + match maybe_stream.as_ref() { + Some(Err(error)) => { + tracing::debug!(target: LOG_TARGET, ?error, "Inbound stream error, closing connection"); + + self.state = State::Closing { + reply: None, + inner: Closing::DrainingControlCommands { connection }, + }; + } + other => { + tracing::debug!(target: LOG_TARGET, ?other, "Inbound stream reset state to idle"); + self.state = State::Idle(connection) + } + } + return Poll::Ready(maybe_stream); } Poll::Pending => {} @@ -191,7 +211,7 @@ where return Poll::Pending; } }, - State::Poisoned => unreachable!(), + State::Poisoned => return Poll::Pending, } } }