Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@

- [#6972](https://github.com/ChainSafe/forest/pull/6972) `ChainExchange` hardening to limit the node memory usage.

- [#6976](https://github.com/ChainSafe/forest/pull/6976) `Hello` and `ChainExchange` response timeouts and additional bounds to prevent hanging connections.

## Forest v0.33.1 "Paradyzja"

Non-mandatory release for all node operators. It includes support for the NV28 _FireHorse_ network upgrade for devnets (not calibnet or mainnet yet), a number of significant performance improvements and bug fixes.
Expand Down
30 changes: 26 additions & 4 deletions src/libp2p/chain_exchange/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,36 @@
mod behaviour;
mod message;
mod provider;
use std::time::Duration;

pub use behaviour::*;

pub use self::{message::*, provider::*};
use super::rpc::CborRequestResponse;
use super::rpc::{CborRequestResponse, CodecConfig};

/// Libp2p protocol name for `ChainExchange`.
pub const CHAIN_EXCHANGE_PROTOCOL_NAME: &str = "/fil/chain/xchg/0.0.1";

/// `ChainExchange` protocol codec to be used within the RPC service.
pub type ChainExchangeCodec =
CborRequestResponse<&'static str, ChainExchangeRequest, ChainExchangeResponse>;
/// Codec limits for the `ChainExchange` protocol.
///
/// - Request: tipset CIDs + length + options bitfield — well under 1 KiB. 4 KiB cap.
/// - Response: cap matches Lotus's [`maxExchangeMessageSize`] (15 blocks × 8 MiB messages).
/// - Decode timeout: 60 seconds — accommodates ~32 MiB realistic responses at
/// ~5 Mbps per stream (we run up to 3 outbound chain-exchange streams in
/// parallel, so per-stream bandwidth is a fraction of the peer's link).
///
/// [`maxExchangeMessageSize`]: https://github.com/filecoin-project/lotus/blob/v1.35.1/chain/exchange/client.go#L30
pub struct ChainExchangeCodecConfig;

impl CodecConfig for ChainExchangeCodecConfig {
const MAX_REQUEST_BYTES: usize = 4096;
const MAX_RESPONSE_BYTES: usize = 120 * 1024 * 1024;
const DECODE_TIMEOUT: Duration = Duration::from_secs(60);
}

pub type ChainExchangeCodec = CborRequestResponse<
&'static str,
ChainExchangeRequest,
ChainExchangeResponse,
ChainExchangeCodecConfig,
>;
23 changes: 20 additions & 3 deletions src/libp2p/hello/codec.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,25 @@
// Copyright 2019-2026 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use std::time::Duration;

use super::*;
use crate::libp2p::rpc::CborRequestResponse;
use crate::libp2p::rpc::{CborRequestResponse, CodecConfig};

/// Codec limits for the Hello protocol.
///
/// - Request: tipset CIDs + height + weight + genesis CID — comfortably under
/// 1 KiB even at the 15-blocks-per-tipset ceiling. 4 KiB cap.
/// - Response: `[u64, u64]`, at most **19 bytes** CBOR-encoded. 32 byte cap.
/// - Decode timeout: 10 seconds — the response is tiny, anything stalling longer is
/// misbehaving.
pub struct HelloCodecConfig;

impl CodecConfig for HelloCodecConfig {
const MAX_REQUEST_BYTES: usize = 4096;
const MAX_RESPONSE_BYTES: usize = 32;
const DECODE_TIMEOUT: Duration = Duration::from_secs(10);
}

/// Hello protocol codec to be used within the RPC service.
pub type HelloCodec = CborRequestResponse<&'static str, HelloRequest, HelloResponse>;
pub type HelloCodec =
CborRequestResponse<&'static str, HelloRequest, HelloResponse, HelloCodecConfig>;
63 changes: 36 additions & 27 deletions src/libp2p/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,41 @@ use futures::prelude::*;
use libp2p::request_response::{self, OutboundFailure};
use serde::{Serialize, de::DeserializeOwned};

/// Per-protocol codec limits. Implementors set tight values for fixed-shape
/// protocols (Hello) and generous ones for bulk transfers (`ChainExchange`).
pub trait CodecConfig {
const MAX_REQUEST_BYTES: usize;
const MAX_RESPONSE_BYTES: usize;
/// Aborts the read if the peer hasn't finished writing within this window.
const DECODE_TIMEOUT: Duration;
}

/// Generic `Cbor` `RequestResponse` type. This is just needed to satisfy
/// [`request_response::Codec`] for Hello and `ChainExchange` protocols without
/// duplication.
#[derive(Clone)]
pub struct CborRequestResponse<P, RQ, RS> {
pub struct CborRequestResponse<P, RQ, RS, C> {
protocol: PhantomData<P>,
request: PhantomData<RQ>,
response: PhantomData<RS>,
config: PhantomData<C>,
}

// Manual impls so we don't pin `C: Copy + Clone` (auto-derive would).
// All fields are `PhantomData`, so the type is unconditionally `Copy`.
impl<P, RQ, RS, C> Copy for CborRequestResponse<P, RQ, RS, C> {}
impl<P, RQ, RS, C> Clone for CborRequestResponse<P, RQ, RS, C> {
fn clone(&self) -> Self {
*self
}
}

impl<P, RQ, RS> Default for CborRequestResponse<P, RQ, RS> {
impl<P, RQ, RS, C> Default for CborRequestResponse<P, RQ, RS, C> {
fn default() -> Self {
Self {
protocol: PhantomData::<P>,
request: PhantomData::<RQ>,
response: PhantomData::<RS>,
protocol: PhantomData,
request: PhantomData,
response: PhantomData,
config: PhantomData,
}
}
}
Expand Down Expand Up @@ -74,11 +93,12 @@ impl From<OutboundFailure> for RequestResponseError {
}

#[async_trait]
impl<P, RQ, RS> request_response::Codec for CborRequestResponse<P, RQ, RS>
impl<P, RQ, RS, C> request_response::Codec for CborRequestResponse<P, RQ, RS, C>
where
P: AsRef<str> + Send + Clone,
RQ: Serialize + DeserializeOwned + Send + Sync,
RS: Serialize + DeserializeOwned + Send + Sync,
C: CodecConfig + Send + Sync,
{
type Protocol = P;
type Request = RQ;
Expand All @@ -88,7 +108,7 @@ where
where
T: AsyncRead + Unpin + Send,
{
read_request_and_decode(io).await
timed_decode(io, C::MAX_REQUEST_BYTES, C::DECODE_TIMEOUT).await
}

async fn read_response<T>(
Expand All @@ -99,14 +119,7 @@ where
where
T: AsyncRead + Unpin + Send,
{
// Cap buffered bytes per response to bound memory exposure. Over-cap
// streams get cut off; the decode then fails and chain-sync retries
// another peer. Matches Lotus's `maxExchangeMessageSize`:
// https://github.com/filecoin-project/lotus/blob/v1.35.1/chain/exchange/client.go#L30
const MAX_RESPONSE_BYTES: u64 = 120 * 1024 * 1024;
let mut bytes = Vec::with_capacity(64 * 1024);
io.take(MAX_RESPONSE_BYTES).read_to_end(&mut bytes).await?;
serde_ipld_dagcbor::de::from_reader(bytes.as_slice()).map_err(io::Error::other)
timed_decode(io, C::MAX_RESPONSE_BYTES, C::DECODE_TIMEOUT).await
}

async fn write_request<T>(
Expand Down Expand Up @@ -154,23 +167,19 @@ where
//
// `io` is essentially [yamux::Stream](https://docs.rs/yamux/0.11.0/yamux/struct.Stream.html)
//
async fn read_request_and_decode<IO, T>(io: &mut IO) -> io::Result<T>
/// Decodes a CBOR value from `io` with a timeout. Used by both `read_request`
/// and `read_response` to prevent hanging on a peer that fails to send `FIN`
/// in a timely manner.
async fn timed_decode<IO, T>(io: &mut IO, max_bytes: usize, timeout: Duration) -> io::Result<T>
where
IO: AsyncRead + Unpin,
T: serde::de::DeserializeOwned,
{
const MAX_BYTES_ALLOWED: usize = 2 * 1024 * 1024; // messages over 2MB are likely malicious
const TIMEOUT: Duration = Duration::from_secs(30);

// Currently the protocol does not send length encoded message,
// and we use `decode-success-with-no-trailing-data` to detect end of frame
// just like what `FramedRead` does, so it's possible to cause deadlock at
// `io.poll_ready` Adding timeout here to mitigate the issue
match tokio::time::timeout(TIMEOUT, DagCborDecodingReader::new(io, MAX_BYTES_ALLOWED)).await {
match tokio::time::timeout(timeout, DagCborDecodingReader::new(io, max_bytes)).await {
Ok(r) => r,
Err(_) => {
let err = io::Error::other("read_and_decode timeout");
tracing::warn!("{err}");
let err = io::Error::from(io::ErrorKind::TimedOut);
tracing::debug!("{err}");
Err(err)
}
}
Expand Down
Loading