diff --git a/crates/net/eth-wire/src/ethstream.rs b/crates/net/eth-wire/src/ethstream.rs index ee9f9f568cd..805d27d9760 100644 --- a/crates/net/eth-wire/src/ethstream.rs +++ b/crates/net/eth-wire/src/ethstream.rs @@ -7,7 +7,7 @@ use crate::{ errors::{EthHandshakeError, EthStreamError}, handshake::EthereumEthHandshake, - message::{EthBroadcastMessage, ProtocolBroadcastMessage, MAX_MESSAGE_SIZE}, + message::{EthBroadcastMessage, EthMessageID, ProtocolBroadcastMessage, MAX_MESSAGE_SIZE}, p2pstream::HANDSHAKE_TIMEOUT, CanDisconnect, DisconnectReason, EthMessage, EthNetworkPrimitives, EthVersion, ProtocolMessage, UnifiedStatus, @@ -108,6 +108,9 @@ pub struct EthStreamInner { version: EthVersion, /// Maximum allowed ETH message size. max_message_size: usize, + /// When true, `NewBlock` (0x07) and `NewBlockHashes` (0x01) messages are rejected before RLP + /// decoding to avoid any memory impact for non-PoW networks. + reject_block_announcements: bool, _pd: std::marker::PhantomData, } @@ -122,7 +125,12 @@ where /// Creates a new [`EthStreamInner`] with the given eth version and message size limit. pub const fn with_max_message_size(version: EthVersion, max_message_size: usize) -> Self { - Self { version, max_message_size, _pd: std::marker::PhantomData } + Self { + version, + max_message_size, + reject_block_announcements: false, + _pd: std::marker::PhantomData, + } } /// Returns the eth version @@ -131,12 +139,25 @@ where self.version } + /// Sets whether to reject block announcement messages (`NewBlock`, `NewBlockHashes`) before + /// RLP decoding. + pub const fn set_reject_block_announcements(&mut self, reject: bool) { + self.reject_block_announcements = reject; + } + /// Decodes incoming bytes into an [`EthMessage`]. pub fn decode_message(&self, bytes: BytesMut) -> Result, EthStreamError> { if bytes.len() > self.max_message_size { return Err(EthStreamError::MessageTooBig(bytes.len())); } + if self.reject_block_announcements && + let Some(&id) = bytes.first() && + (id == EthMessageID::NewBlock.to_u8() || id == EthMessageID::NewBlockHashes.to_u8()) + { + return Err(EthStreamError::UnsupportedMessage { message_id: id }); + } + let msg = match ProtocolMessage::decode_message(self.version, &mut bytes.as_ref()) { Ok(m) => m, Err(err) => { @@ -208,6 +229,12 @@ impl EthStream { self.eth.version() } + /// Sets whether to reject block announcement messages (`NewBlock`, `NewBlockHashes`) before + /// RLP decoding. + pub const fn set_reject_block_announcements(&mut self, reject: bool) { + self.eth.set_reject_block_announcements(reject); + } + /// Returns the underlying stream. #[inline] pub const fn inner(&self) -> &S { diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index 7225c947882..b9428653ac6 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -318,6 +318,7 @@ impl NetworkManager { extra_protocols, handshake, eth_max_message_size, + network_mode.is_stake(), ); let state = NetworkState::new( diff --git a/crates/net/network/src/session/conn.rs b/crates/net/network/src/session/conn.rs index ea13cef4f01..85b3f17dbc3 100644 --- a/crates/net/network/src/session/conn.rs +++ b/crates/net/network/src/session/conn.rs @@ -93,6 +93,15 @@ impl EthRlpxConnection { Self::Satellite(conn) => conn.primary_mut().start_send_raw(msg), } } + + /// Sets whether to reject block announcement messages (`NewBlock`, `NewBlockHashes`) before + /// RLP decoding to avoid memory amplification from deserializing blocks that will be discarded. + pub fn set_reject_block_announcements(&mut self, reject: bool) { + match self { + Self::EthOnly(conn) => conn.set_reject_block_announcements(reject), + Self::Satellite(conn) => conn.primary_mut().set_reject_block_announcements(reject), + } + } } impl From> for EthRlpxConnection { diff --git a/crates/net/network/src/session/mod.rs b/crates/net/network/src/session/mod.rs index 943e7ad1a0b..2322d07567d 100644 --- a/crates/net/network/src/session/mod.rs +++ b/crates/net/network/src/session/mod.rs @@ -123,6 +123,9 @@ pub struct SessionManager { /// Shared local range information that gets propagated to active sessions. /// This represents the range of blocks that this node can serve to other peers. local_range_info: BlockRangeInfo, + /// When true, block announcement messages (`NewBlock`, `NewBlockHashes`) are rejected before + /// RLP decoding on new sessions to avoid memory amplification. + reject_block_announcements: bool, } // === impl SessionManager === @@ -140,6 +143,7 @@ impl SessionManager { extra_protocols: RlpxSubProtocols, handshake: Arc, eth_max_message_size: usize, + reject_block_announcements: bool, ) -> Self { let (pending_sessions_tx, pending_sessions_rx) = mpsc::channel(config.session_event_buffer); let (active_session_tx, active_session_rx) = mpsc::channel(config.session_event_buffer); @@ -176,6 +180,7 @@ impl SessionManager { handshake, eth_max_message_size, local_range_info, + reject_block_announcements, } } @@ -496,7 +501,7 @@ impl SessionManager { local_addr, peer_id, capabilities, - conn, + mut conn, status, direction, client_id, @@ -563,6 +568,10 @@ impl SessionManager { BlockRangeInfo::new(update.earliest, update.latest, update.latest_hash) }); + if self.reject_block_announcements { + conn.set_reject_block_announcements(true); + } + let session = ActiveSession { next_id: 0, remote_peer_id: peer_id,