Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions crates/net/eth-wire/src/ethstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
use crate::{
errors::{EthHandshakeError, EthStreamError},
handshake::EthereumEthHandshake,
message::{EthBroadcastMessage, ProtocolBroadcastMessage, MAX_MESSAGE_SIZE},
message::{EthBroadcastMessage, EthMessageID, ProtocolBroadcastMessage, MAX_MESSAGE_SIZE},
p2pstream::HANDSHAKE_TIMEOUT,
CanDisconnect, DisconnectReason, EthMessage, EthNetworkPrimitives, EthVersion, ProtocolMessage,
UnifiedStatus,
Expand Down Expand Up @@ -108,6 +108,9 @@ pub struct EthStreamInner<N> {
version: EthVersion,
/// Maximum allowed ETH message size.
max_message_size: usize,
/// When true, `NewBlock` (0x07) and `NewBlockHashes` (0x01) messages are rejected before RLP
/// decoding to avoid any memory impact for non-PoW networks.
reject_block_announcements: bool,
_pd: std::marker::PhantomData<N>,
}

Expand All @@ -122,7 +125,12 @@ where

/// Creates a new [`EthStreamInner`] with the given eth version and message size limit.
pub const fn with_max_message_size(version: EthVersion, max_message_size: usize) -> Self {
Self { version, max_message_size, _pd: std::marker::PhantomData }
Self {
version,
max_message_size,
reject_block_announcements: false,
_pd: std::marker::PhantomData,
}
}

/// Returns the eth version
Expand All @@ -131,12 +139,25 @@ where
self.version
}

/// Sets whether to reject block announcement messages (`NewBlock`, `NewBlockHashes`) before
/// RLP decoding.
pub const fn set_reject_block_announcements(&mut self, reject: bool) {
self.reject_block_announcements = reject;
}

/// Decodes incoming bytes into an [`EthMessage`].
pub fn decode_message(&self, bytes: BytesMut) -> Result<EthMessage<N>, EthStreamError> {
if bytes.len() > self.max_message_size {
return Err(EthStreamError::MessageTooBig(bytes.len()));
}

if self.reject_block_announcements &&
let Some(&id) = bytes.first() &&
(id == EthMessageID::NewBlock.to_u8() || id == EthMessageID::NewBlockHashes.to_u8())
{
return Err(EthStreamError::UnsupportedMessage { message_id: id });
}

let msg = match ProtocolMessage::decode_message(self.version, &mut bytes.as_ref()) {
Ok(m) => m,
Err(err) => {
Expand Down Expand Up @@ -208,6 +229,12 @@ impl<S, N: NetworkPrimitives> EthStream<S, N> {
self.eth.version()
}

/// Sets whether to reject block announcement messages (`NewBlock`, `NewBlockHashes`) before
/// RLP decoding.
pub const fn set_reject_block_announcements(&mut self, reject: bool) {
self.eth.set_reject_block_announcements(reject);
}

/// Returns the underlying stream.
#[inline]
pub const fn inner(&self) -> &S {
Expand Down
1 change: 1 addition & 0 deletions crates/net/network/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
extra_protocols,
handshake,
eth_max_message_size,
network_mode.is_stake(),
);

let state = NetworkState::new(
Expand Down
9 changes: 9 additions & 0 deletions crates/net/network/src/session/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,15 @@ impl<N: NetworkPrimitives> EthRlpxConnection<N> {
Self::Satellite(conn) => conn.primary_mut().start_send_raw(msg),
}
}

/// Sets whether to reject block announcement messages (`NewBlock`, `NewBlockHashes`) before
/// RLP decoding to avoid memory amplification from deserializing blocks that will be discarded.
pub fn set_reject_block_announcements(&mut self, reject: bool) {
match self {
Self::EthOnly(conn) => conn.set_reject_block_announcements(reject),
Self::Satellite(conn) => conn.primary_mut().set_reject_block_announcements(reject),
}
}
}

impl<N: NetworkPrimitives> From<EthPeerConnection<N>> for EthRlpxConnection<N> {
Expand Down
11 changes: 10 additions & 1 deletion crates/net/network/src/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ pub struct SessionManager<N: NetworkPrimitives> {
/// Shared local range information that gets propagated to active sessions.
/// This represents the range of blocks that this node can serve to other peers.
local_range_info: BlockRangeInfo,
/// When true, block announcement messages (`NewBlock`, `NewBlockHashes`) are rejected before
/// RLP decoding on new sessions to avoid memory amplification.
reject_block_announcements: bool,
}

// === impl SessionManager ===
Expand All @@ -140,6 +143,7 @@ impl<N: NetworkPrimitives> SessionManager<N> {
extra_protocols: RlpxSubProtocols,
handshake: Arc<dyn EthRlpxHandshake>,
eth_max_message_size: usize,
reject_block_announcements: bool,
) -> Self {
let (pending_sessions_tx, pending_sessions_rx) = mpsc::channel(config.session_event_buffer);
let (active_session_tx, active_session_rx) = mpsc::channel(config.session_event_buffer);
Expand Down Expand Up @@ -176,6 +180,7 @@ impl<N: NetworkPrimitives> SessionManager<N> {
handshake,
eth_max_message_size,
local_range_info,
reject_block_announcements,
}
}

Expand Down Expand Up @@ -496,7 +501,7 @@ impl<N: NetworkPrimitives> SessionManager<N> {
local_addr,
peer_id,
capabilities,
conn,
mut conn,
status,
direction,
client_id,
Expand Down Expand Up @@ -563,6 +568,10 @@ impl<N: NetworkPrimitives> SessionManager<N> {
BlockRangeInfo::new(update.earliest, update.latest, update.latest_hash)
});

if self.reject_block_announcements {
conn.set_reject_block_announcements(true);
}

let session = ActiveSession {
next_id: 0,
remote_peer_id: peer_id,
Expand Down
Loading