From 8c674c3809e68ea6b3017c4d6285272d68e12dd1 Mon Sep 17 00:00:00 2001 From: Hubert Bugaj Date: Tue, 28 Apr 2026 15:15:23 +0200 Subject: [PATCH 1/4] fix: harden `Hello` protocol handling --- CHANGELOG.md | 2 ++ src/libp2p/chain_exchange/mod.rs | 12 +++++++-- src/libp2p/hello/codec.rs | 5 +++- src/libp2p/rpc/mod.rs | 44 +++++++++++++++++--------------- 4 files changed, 40 insertions(+), 23 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 424d67bf23c0..a264b68b708f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -37,6 +37,8 @@ - [#6972](https://github.com/ChainSafe/forest/pull/6972) `ChainExchange` hardening to limit the node memory usage. +- [#6976](https://github.com/ChainSafe/forest/pull/6976) `Hello` and `ChainExchange` response timeouts to prevent hanging connections. + ## Forest v0.33.1 "Paradyzja" Non-mandatory release for all node operators. It includes support for the NV28 _FireHorse_ network upgrade for devnets (not calibnet or mainnet yet), a number of significant performance improvements and bug fixes. diff --git a/src/libp2p/chain_exchange/mod.rs b/src/libp2p/chain_exchange/mod.rs index e9c16d2c32f3..ba8047e0410f 100644 --- a/src/libp2p/chain_exchange/mod.rs +++ b/src/libp2p/chain_exchange/mod.rs @@ -13,5 +13,13 @@ use super::rpc::CborRequestResponse; pub const CHAIN_EXCHANGE_PROTOCOL_NAME: &str = "/fil/chain/xchg/0.0.1"; /// `ChainExchange` protocol codec to be used within the RPC service. -pub type ChainExchangeCodec = - CborRequestResponse<&'static str, ChainExchangeRequest, ChainExchangeResponse>; +/// +/// Cap matches Lotus's [`maxExchangeMessageSize`] (15 blocks × 8 MiB messages). +/// +/// [`maxExchangeMessageSize`]: https://github.com/filecoin-project/lotus/blob/v1.35.1/chain/exchange/client.go#L30 +pub type ChainExchangeCodec = CborRequestResponse< + &'static str, + ChainExchangeRequest, + ChainExchangeResponse, + { 120 * 1024 * 1024 }, +>; diff --git a/src/libp2p/hello/codec.rs b/src/libp2p/hello/codec.rs index b54d2f85b6e2..dcc13325e2ae 100644 --- a/src/libp2p/hello/codec.rs +++ b/src/libp2p/hello/codec.rs @@ -5,4 +5,7 @@ use super::*; use crate::libp2p::rpc::CborRequestResponse; /// Hello protocol codec to be used within the RPC service. -pub type HelloCodec = CborRequestResponse<&'static str, HelloRequest, HelloResponse>; +/// +/// `HelloResponse` is `[u64, u64]` — at most **19 bytes** CBOR-encoded +/// (1-byte array header + two 9-byte `u64`s for `u64::MAX`). +pub type HelloCodec = CborRequestResponse<&'static str, HelloRequest, HelloResponse, 32>; diff --git a/src/libp2p/rpc/mod.rs b/src/libp2p/rpc/mod.rs index c79bc71bf42b..7852dc3d6c8e 100644 --- a/src/libp2p/rpc/mod.rs +++ b/src/libp2p/rpc/mod.rs @@ -13,14 +13,19 @@ use serde::{Serialize, de::DeserializeOwned}; /// Generic `Cbor` `RequestResponse` type. This is just needed to satisfy /// [`request_response::Codec`] for Hello and `ChainExchange` protocols without /// duplication. +/// +/// Pick a tight `MAX_RESPONSE_BYTES` for fixed-shape protocols (Hello) and a +/// generous one for bulk transfers (ChainExchange). #[derive(Clone)] -pub struct CborRequestResponse { +pub struct CborRequestResponse { protocol: PhantomData

, request: PhantomData, response: PhantomData, } -impl Default for CborRequestResponse { +impl Default + for CborRequestResponse +{ fn default() -> Self { Self { protocol: PhantomData::

, @@ -74,7 +79,8 @@ impl From for RequestResponseError { } #[async_trait] -impl request_response::Codec for CborRequestResponse +impl request_response::Codec + for CborRequestResponse where P: AsRef + Send + Clone, RQ: Serialize + DeserializeOwned + Send + Sync, @@ -99,14 +105,7 @@ where where T: AsyncRead + Unpin + Send, { - // Cap buffered bytes per response to bound memory exposure. Over-cap - // streams get cut off; the decode then fails and chain-sync retries - // another peer. Matches Lotus's `maxExchangeMessageSize`: - // https://github.com/filecoin-project/lotus/blob/v1.35.1/chain/exchange/client.go#L30 - const MAX_RESPONSE_BYTES: u64 = 120 * 1024 * 1024; - let mut bytes = Vec::with_capacity(64 * 1024); - io.take(MAX_RESPONSE_BYTES).read_to_end(&mut bytes).await?; - serde_ipld_dagcbor::de::from_reader(bytes.as_slice()).map_err(io::Error::other) + timed_decode(io, MAX_RESPONSE_BYTES).await } async fn write_request( @@ -159,18 +158,23 @@ where IO: AsyncRead + Unpin, T: serde::de::DeserializeOwned, { - const MAX_BYTES_ALLOWED: usize = 2 * 1024 * 1024; // messages over 2MB are likely malicious - const TIMEOUT: Duration = Duration::from_secs(30); + // Requests over 2 MiB are likely malicious for both protocols we support. + const MAX_BYTES_ALLOWED: usize = 2 * 1024 * 1024; + timed_decode(io, MAX_BYTES_ALLOWED).await +} - // Currently the protocol does not send length encoded message, - // and we use `decode-success-with-no-trailing-data` to detect end of frame - // just like what `FramedRead` does, so it's possible to cause deadlock at - // `io.poll_ready` Adding timeout here to mitigate the issue - match tokio::time::timeout(TIMEOUT, DagCborDecodingReader::new(io, MAX_BYTES_ALLOWED)).await { +/// Decodes a response from the given `io` with a timeout. This is used to prevent hanging when a peer fails to respond in a timely manner. +async fn timed_decode(io: &mut IO, max_bytes: usize) -> io::Result +where + IO: AsyncRead + Unpin, + T: serde::de::DeserializeOwned, +{ + const DECODE_TIMEOUT: Duration = Duration::from_secs(30); + match tokio::time::timeout(DECODE_TIMEOUT, DagCborDecodingReader::new(io, max_bytes)).await { Ok(r) => r, Err(_) => { - let err = io::Error::other("read_and_decode timeout"); - tracing::warn!("{err}"); + let err = io::Error::from(io::ErrorKind::TimedOut); + tracing::debug!("{err}"); Err(err) } } From 55a0401433d8094419d0873ff22af9473f3bc905 Mon Sep 17 00:00:00 2001 From: Hubert Bugaj Date: Tue, 28 Apr 2026 16:32:56 +0200 Subject: [PATCH 2/4] spellcheck --- src/libp2p/rpc/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/libp2p/rpc/mod.rs b/src/libp2p/rpc/mod.rs index 7852dc3d6c8e..3dbd546cc94e 100644 --- a/src/libp2p/rpc/mod.rs +++ b/src/libp2p/rpc/mod.rs @@ -15,7 +15,7 @@ use serde::{Serialize, de::DeserializeOwned}; /// duplication. /// /// Pick a tight `MAX_RESPONSE_BYTES` for fixed-shape protocols (Hello) and a -/// generous one for bulk transfers (ChainExchange). +/// generous one for bulk transfers (`ChainExchange`). #[derive(Clone)] pub struct CborRequestResponse { protocol: PhantomData

, From dc87e8ee323ad874ce0a8b183e8b8af7120a7fb9 Mon Sep 17 00:00:00 2001 From: Hubert Bugaj Date: Wed, 29 Apr 2026 10:15:29 +0200 Subject: [PATCH 3/4] req bounds, refactor --- CHANGELOG.md | 2 +- src/libp2p/chain_exchange/mod.rs | 22 +++++++++-- src/libp2p/hello/codec.rs | 24 +++++++++--- src/libp2p/rpc/mod.rs | 63 +++++++++++++++++--------------- 4 files changed, 72 insertions(+), 39 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a264b68b708f..ebb11b127c5d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -37,7 +37,7 @@ - [#6972](https://github.com/ChainSafe/forest/pull/6972) `ChainExchange` hardening to limit the node memory usage. -- [#6976](https://github.com/ChainSafe/forest/pull/6976) `Hello` and `ChainExchange` response timeouts to prevent hanging connections. +- [#6976](https://github.com/ChainSafe/forest/pull/6976) `Hello` and `ChainExchange` response timeouts and additional bounds to prevent hanging connections. ## Forest v0.33.1 "Paradyzja" diff --git a/src/libp2p/chain_exchange/mod.rs b/src/libp2p/chain_exchange/mod.rs index ba8047e0410f..1a0c2756271e 100644 --- a/src/libp2p/chain_exchange/mod.rs +++ b/src/libp2p/chain_exchange/mod.rs @@ -4,22 +4,36 @@ mod behaviour; mod message; mod provider; +use std::time::Duration; + pub use behaviour::*; pub use self::{message::*, provider::*}; -use super::rpc::CborRequestResponse; +use super::rpc::{CborRequestResponse, CodecConfig}; /// Libp2p protocol name for `ChainExchange`. pub const CHAIN_EXCHANGE_PROTOCOL_NAME: &str = "/fil/chain/xchg/0.0.1"; -/// `ChainExchange` protocol codec to be used within the RPC service. +/// Codec limits for the `ChainExchange` protocol. /// -/// Cap matches Lotus's [`maxExchangeMessageSize`] (15 blocks × 8 MiB messages). +/// - Request: tipset CIDs + length + options bitfield — well under 1 KiB. 4 KiB cap. +/// - Response: cap matches Lotus's [`maxExchangeMessageSize`] (15 blocks × 8 MiB messages). +/// - Decode timeout: 60s — accommodates ~32 MiB realistic responses at +/// ~5 Mbps per stream (we run up to 3 outbound chain-exchange streams in +/// parallel, so per-stream bandwidth is a fraction of the peer's link). /// /// [`maxExchangeMessageSize`]: https://github.com/filecoin-project/lotus/blob/v1.35.1/chain/exchange/client.go#L30 +pub struct ChainExchangeCodecConfig; + +impl CodecConfig for ChainExchangeCodecConfig { + const MAX_REQUEST_BYTES: usize = 4096; + const MAX_RESPONSE_BYTES: usize = 120 * 1024 * 1024; + const DECODE_TIMEOUT: Duration = Duration::from_secs(60); +} + pub type ChainExchangeCodec = CborRequestResponse< &'static str, ChainExchangeRequest, ChainExchangeResponse, - { 120 * 1024 * 1024 }, + ChainExchangeCodecConfig, >; diff --git a/src/libp2p/hello/codec.rs b/src/libp2p/hello/codec.rs index dcc13325e2ae..75748732263b 100644 --- a/src/libp2p/hello/codec.rs +++ b/src/libp2p/hello/codec.rs @@ -1,11 +1,25 @@ // Copyright 2019-2026 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT +use std::time::Duration; + use super::*; -use crate::libp2p::rpc::CborRequestResponse; +use crate::libp2p::rpc::{CborRequestResponse, CodecConfig}; -/// Hello protocol codec to be used within the RPC service. +/// Codec limits for the Hello protocol. /// -/// `HelloResponse` is `[u64, u64]` — at most **19 bytes** CBOR-encoded -/// (1-byte array header + two 9-byte `u64`s for `u64::MAX`). -pub type HelloCodec = CborRequestResponse<&'static str, HelloRequest, HelloResponse, 32>; +/// - Request: tipset CIDs + height + weight + genesis CID — comfortably under +/// 1 KiB even at the 15-blocks-per-tipset ceiling. 4 KiB cap. +/// - Response: `[u64, u64]`, at most **19 bytes** CBOR-encoded. 32 byte cap. +/// - Decode timeout: 10s — the response is tiny, anything stalling longer is +/// misbehaving. +pub struct HelloCodecConfig; + +impl CodecConfig for HelloCodecConfig { + const MAX_REQUEST_BYTES: usize = 4096; + const MAX_RESPONSE_BYTES: usize = 32; + const DECODE_TIMEOUT: Duration = Duration::from_secs(10); +} + +pub type HelloCodec = + CborRequestResponse<&'static str, HelloRequest, HelloResponse, HelloCodecConfig>; diff --git a/src/libp2p/rpc/mod.rs b/src/libp2p/rpc/mod.rs index 3dbd546cc94e..9595fcccac60 100644 --- a/src/libp2p/rpc/mod.rs +++ b/src/libp2p/rpc/mod.rs @@ -10,27 +10,41 @@ use futures::prelude::*; use libp2p::request_response::{self, OutboundFailure}; use serde::{Serialize, de::DeserializeOwned}; +/// Per-protocol codec limits. Implementors set tight values for fixed-shape +/// protocols (Hello) and generous ones for bulk transfers (`ChainExchange`). +pub trait CodecConfig { + const MAX_REQUEST_BYTES: usize; + const MAX_RESPONSE_BYTES: usize; + /// Aborts the read if the peer hasn't finished writing within this window. + const DECODE_TIMEOUT: Duration; +} + /// Generic `Cbor` `RequestResponse` type. This is just needed to satisfy /// [`request_response::Codec`] for Hello and `ChainExchange` protocols without /// duplication. -/// -/// Pick a tight `MAX_RESPONSE_BYTES` for fixed-shape protocols (Hello) and a -/// generous one for bulk transfers (`ChainExchange`). -#[derive(Clone)] -pub struct CborRequestResponse { +pub struct CborRequestResponse { protocol: PhantomData

, request: PhantomData, response: PhantomData, + config: PhantomData, } -impl Default - for CborRequestResponse -{ +// Manual impls so we don't pin `C: Copy + Clone` (auto-derive would). +// All fields are `PhantomData`, so the type is unconditionally `Copy`. +impl Copy for CborRequestResponse {} +impl Clone for CborRequestResponse { + fn clone(&self) -> Self { + *self + } +} + +impl Default for CborRequestResponse { fn default() -> Self { Self { - protocol: PhantomData::

, - request: PhantomData::, - response: PhantomData::, + protocol: PhantomData, + request: PhantomData, + response: PhantomData, + config: PhantomData, } } } @@ -79,12 +93,12 @@ impl From for RequestResponseError { } #[async_trait] -impl request_response::Codec - for CborRequestResponse +impl request_response::Codec for CborRequestResponse where P: AsRef + Send + Clone, RQ: Serialize + DeserializeOwned + Send + Sync, RS: Serialize + DeserializeOwned + Send + Sync, + C: CodecConfig + Send + Sync, { type Protocol = P; type Request = RQ; @@ -94,7 +108,7 @@ where where T: AsyncRead + Unpin + Send, { - read_request_and_decode(io).await + timed_decode(io, C::MAX_REQUEST_BYTES, C::DECODE_TIMEOUT).await } async fn read_response( @@ -105,7 +119,7 @@ where where T: AsyncRead + Unpin + Send, { - timed_decode(io, MAX_RESPONSE_BYTES).await + timed_decode(io, C::MAX_RESPONSE_BYTES, C::DECODE_TIMEOUT).await } async fn write_request( @@ -153,24 +167,15 @@ where // // `io` is essentially [yamux::Stream](https://docs.rs/yamux/0.11.0/yamux/struct.Stream.html) // -async fn read_request_and_decode(io: &mut IO) -> io::Result -where - IO: AsyncRead + Unpin, - T: serde::de::DeserializeOwned, -{ - // Requests over 2 MiB are likely malicious for both protocols we support. - const MAX_BYTES_ALLOWED: usize = 2 * 1024 * 1024; - timed_decode(io, MAX_BYTES_ALLOWED).await -} - -/// Decodes a response from the given `io` with a timeout. This is used to prevent hanging when a peer fails to respond in a timely manner. -async fn timed_decode(io: &mut IO, max_bytes: usize) -> io::Result +/// Decodes a CBOR value from `io` with a timeout. Used by both `read_request` +/// and `read_response` to prevent hanging on a peer that fails to send `FIN` +/// in a timely manner. +async fn timed_decode(io: &mut IO, max_bytes: usize, timeout: Duration) -> io::Result where IO: AsyncRead + Unpin, T: serde::de::DeserializeOwned, { - const DECODE_TIMEOUT: Duration = Duration::from_secs(30); - match tokio::time::timeout(DECODE_TIMEOUT, DagCborDecodingReader::new(io, max_bytes)).await { + match tokio::time::timeout(timeout, DagCborDecodingReader::new(io, max_bytes)).await { Ok(r) => r, Err(_) => { let err = io::Error::from(io::ErrorKind::TimedOut); From 9ef2dc1892c9c22b969ef19d555214fdbeac254f Mon Sep 17 00:00:00 2001 From: Hubert Bugaj Date: Wed, 29 Apr 2026 11:53:03 +0200 Subject: [PATCH 4/4] fix spellcheck --- src/libp2p/chain_exchange/mod.rs | 2 +- src/libp2p/hello/codec.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/libp2p/chain_exchange/mod.rs b/src/libp2p/chain_exchange/mod.rs index 1a0c2756271e..0154207e28dd 100644 --- a/src/libp2p/chain_exchange/mod.rs +++ b/src/libp2p/chain_exchange/mod.rs @@ -18,7 +18,7 @@ pub const CHAIN_EXCHANGE_PROTOCOL_NAME: &str = "/fil/chain/xchg/0.0.1"; /// /// - Request: tipset CIDs + length + options bitfield — well under 1 KiB. 4 KiB cap. /// - Response: cap matches Lotus's [`maxExchangeMessageSize`] (15 blocks × 8 MiB messages). -/// - Decode timeout: 60s — accommodates ~32 MiB realistic responses at +/// - Decode timeout: 60 seconds — accommodates ~32 MiB realistic responses at /// ~5 Mbps per stream (we run up to 3 outbound chain-exchange streams in /// parallel, so per-stream bandwidth is a fraction of the peer's link). /// diff --git a/src/libp2p/hello/codec.rs b/src/libp2p/hello/codec.rs index 75748732263b..faf72f054e75 100644 --- a/src/libp2p/hello/codec.rs +++ b/src/libp2p/hello/codec.rs @@ -11,7 +11,7 @@ use crate::libp2p::rpc::{CborRequestResponse, CodecConfig}; /// - Request: tipset CIDs + height + weight + genesis CID — comfortably under /// 1 KiB even at the 15-blocks-per-tipset ceiling. 4 KiB cap. /// - Response: `[u64, u64]`, at most **19 bytes** CBOR-encoded. 32 byte cap. -/// - Decode timeout: 10s — the response is tiny, anything stalling longer is +/// - Decode timeout: 10 seconds — the response is tiny, anything stalling longer is /// misbehaving. pub struct HelloCodecConfig;