Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

51 changes: 48 additions & 3 deletions crates/networking/p2p/peer_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::{
},
message::Message as RLPxMessage,
p2p::{Capability, SUPPORTED_ETH_CAPABILITIES},
snap::{Snap2BlockAccessLists, Snap2GetBlockAccessLists},
},
};
use ethrex_common::{
Expand All @@ -33,9 +34,9 @@ use tracing::{debug, error, trace, warn};

// Re-export constants from snap::constants for backward compatibility
pub use crate::snap::constants::{
HASH_MAX, MAX_BLOCK_BODIES_TO_REQUEST, MAX_HEADER_CHUNK, MAX_RESPONSE_BYTES,
PEER_REPLY_TIMEOUT, PEER_SELECT_RETRY_ATTEMPTS, RANGE_FILE_CHUNK_SIZE, REQUEST_RETRY_ATTEMPTS,
SNAP_LIMIT,
BAL_RESPONSE_SOFT_CAP_BYTES, HASH_MAX, MAX_BLOCK_BODIES_TO_REQUEST, MAX_HEADER_CHUNK,
MAX_RESPONSE_BYTES, PEER_REPLY_TIMEOUT, PEER_SELECT_RETRY_ATTEMPTS, RANGE_FILE_CHUNK_SIZE,
REQUEST_RETRY_ATTEMPTS, SNAP_LIMIT,
};

// Re-export snap client types for backward compatibility
Expand Down Expand Up @@ -607,6 +608,50 @@ impl PeerHandler {
}
}

/// Request block access lists via snap/2 (`GetBlockAccessLists`/`BlockAccessLists`).
///
/// Returns `None` when no snap/2 peer is available. On success returns
/// `(bals, peer_id)` where `peer_id` identifies the responding peer so
/// callers can call `record_failure` on the right peer.
///
/// B2: uses `Message::Snap2GetBlockAccessLists` to send and
/// `Message::Snap2BlockAccessLists` to receive — NOT the eth/71 variants.
pub async fn request_snap2_bals(
&mut self,
block_hashes: &[H256],
) -> Result<Option<(Vec<Option<BlockAccessList>>, H256)>, PeerHandlerError> {
let request_id: u64 = rand::random();
let request = RLPxMessage::Snap2GetBlockAccessLists(Snap2GetBlockAccessLists {
id: request_id,
block_hashes: block_hashes.to_vec(),
response_bytes: BAL_RESPONSE_SOFT_CAP_BYTES,
});
let Some((peer_id, mut connection, permit)) = self
.peer_table
.get_best_peer(vec![Capability::snap(2)])
.await?
else {
return Ok(None);
};
let response = connection
.outgoing_request(request, PEER_REPLY_TIMEOUT)
.await;
drop(permit);
match response {
Ok(RLPxMessage::Snap2BlockAccessLists(Snap2BlockAccessLists { id, bals }))
if id == request_id =>
{
self.peer_table.record_success(peer_id)?;
Ok(Some((bals, peer_id)))
}
_ => {
warn!("[SYNCING] didn't receive snap/2 BALs from peer {peer_id}");
self.peer_table.record_failure(peer_id)?;
Ok(None)
}
}
}

/// Returns diagnostic snapshots for all connected peers (scores, requests, eligibility).
pub async fn read_peer_diagnostics(&self) -> Vec<PeerDiagnostics> {
self.peer_table
Expand Down
21 changes: 14 additions & 7 deletions crates/networking/p2p/rlpx/connection/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::sync::{Arc, RwLock};

use crate::rlpx::{
error::PeerConnectionError,
message::{self as rlpx, EthCapVersion},
message::{self as rlpx, EthCapVersion, SnapCapVersion},
utils::ecdh_xchng,
};

Expand Down Expand Up @@ -34,6 +34,7 @@ pub struct RLPxCodec {
pub(crate) ingress_aes: Aes256Ctr64BE,
pub(crate) egress_aes: Aes256Ctr64BE,
pub(crate) eth_version: Arc<RwLock<EthCapVersion>>,
pub(crate) snap_version: Arc<RwLock<Option<SnapCapVersion>>>,
}

impl RLPxCodec {
Expand All @@ -42,6 +43,7 @@ impl RLPxCodec {
remote_state: &RemoteState,
hashed_nonces: [u8; 32],
eth_version: Arc<RwLock<EthCapVersion>>,
snap_version: Arc<RwLock<Option<SnapCapVersion>>>,
) -> Result<Self, PeerConnectionError> {
let ephemeral_key_secret =
ecdh_xchng(&local_state.ephemeral_key, &remote_state.ephemeral_key).map_err(
Expand Down Expand Up @@ -78,6 +80,7 @@ impl RLPxCodec {
ingress_aes,
egress_aes,
eth_version,
snap_version,
})
}
}
Expand All @@ -92,6 +95,7 @@ impl std::fmt::Debug for RLPxCodec {
.field("ingress_aes", &"Aes256Ctr64BE")
.field("egress_aes", &"Aes256Ctr64BE")
.field("eth_version", &self.eth_version)
.field("snap_version", &self.snap_version)
.finish()
}
}
Expand Down Expand Up @@ -229,13 +233,16 @@ impl Decoder for RLPxCodec {
})?;

let (msg_id, msg_data): (u8, _) = RLPDecode::decode_unfinished(frame_data)?;
let eth_ver = *self
.eth_version
.read()
.map_err(|err| PeerConnectionError::InternalError(err.to_string()))?;
let snap_ver = *self
.snap_version
.read()
.map_err(|err| PeerConnectionError::InternalError(err.to_string()))?;
Ok(Some(rlpx::Message::decode(
msg_id,
msg_data,
*self
.eth_version
.read()
.map_err(|err| PeerConnectionError::InternalError(err.to_string()))?,
msg_id, msg_data, eth_ver, snap_ver,
)?))
}

Expand Down
19 changes: 16 additions & 3 deletions crates/networking/p2p/rlpx/connection/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
rlpx::{
connection::server::{ConnectionState, Established},
error::PeerConnectionError,
message::EthCapVersion,
message::{EthCapVersion, SnapCapVersion},
utils::{compress_pubkey, decompress_pubkey, ecdh_xchng, kdf, sha256, sha256_hmac},
},
types::Node,
Expand Down Expand Up @@ -61,6 +61,7 @@ pub(crate) struct LocalState {
pub(crate) async fn perform(
state: ConnectionState,
eth_version: Arc<RwLock<EthCapVersion>>,
snap_version: Arc<RwLock<Option<SnapCapVersion>>>,
) -> Result<(Established, SplitStream<Framed<TcpStream, RLPxCodec>>), PeerConnectionError> {
let (context, node, framed) = match state {
ConnectionState::Initiator(Initiator { context, node }) => {
Expand All @@ -79,7 +80,13 @@ pub(crate) async fn perform(
// keccak256(nonce || initiator-nonce)
let hashed_nonces: [u8; 32] =
keccak_hash([remote_state.nonce.0, local_state.nonce.0].concat());
let codec = RLPxCodec::new(&local_state, &remote_state, hashed_nonces, eth_version)?;
let codec = RLPxCodec::new(
&local_state,
&remote_state,
hashed_nonces,
eth_version,
snap_version,
)?;
trace!(peer=%node, "Completed handshake as initiator");
(context, node, Framed::new(stream, codec))
}
Expand All @@ -99,7 +106,13 @@ pub(crate) async fn perform(
// keccak256(nonce || initiator-nonce)
let hashed_nonces: [u8; 32] =
keccak_hash([local_state.nonce.0, remote_state.nonce.0].concat());
let codec = RLPxCodec::new(&local_state, &remote_state, hashed_nonces, eth_version)?;
let codec = RLPxCodec::new(
&local_state,
&remote_state,
hashed_nonces,
eth_version,
snap_version,
)?;
let node = Node::new(
peer_addr.ip(),
peer_addr.port(),
Expand Down
125 changes: 117 additions & 8 deletions crates/networking/p2p/rlpx/connection/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@ use crate::{
transactions::{GetPooledTransactions, NewPooledTransactionHashes},
update::BlockRangeUpdate,
},
message::EthCapVersion,
message::{EthCapVersion, SnapCapVersion},
p2p::{
self, Capability, DisconnectMessage, DisconnectReason, PingMessage, PongMessage,
SUPPORTED_ETH_CAPABILITIES, SUPPORTED_SNAP_CAPABILITIES,
},
snap::TrieNodes,
snap::{Snap2BlockAccessLists, Snap2GetBlockAccessLists, TrieNodes},
},
snap::constants::{BAL_MAX_REQUEST_HASHES, BAL_RESPONSE_SOFT_CAP_BYTES},
snap::{
process_account_range_request, process_byte_codes_request, process_storage_ranges_request,
process_trie_nodes_request,
Expand Down Expand Up @@ -318,16 +319,24 @@ pub struct PeerConnectionServer {
impl PeerConnectionServer {
#[started]
async fn started(&mut self, ctx: &Context<Self>) {
// Set a default eth version that we can update after we negotiate peer capabilities
// Set a default eth version that we can update after we negotiate peer capabilities.
// This eth version will only be used to encode & decode the initial `Hello` messages.
let eth_version = Arc::new(RwLock::new(EthCapVersion::default()));
// snap_version starts as None; set after hello-exchange to the negotiated snap version.
let snap_version: Arc<RwLock<Option<SnapCapVersion>>> = Arc::new(RwLock::new(None));
// Take ownership of the state, replacing with HandshakeFailed as placeholder
let state = std::mem::replace(&mut self.state, ConnectionState::HandshakeFailed);
match handshake::perform(state, eth_version.clone()).await {
match handshake::perform(state, eth_version.clone(), snap_version.clone()).await {
Ok((mut established_state, stream)) => {
trace!(peer=%established_state.node, "Starting RLPx connection");
if let Err(reason) =
initialize_connection(ctx, &mut established_state, stream, eth_version).await
if let Err(reason) = initialize_connection(
ctx,
&mut established_state,
stream,
eth_version,
snap_version,
)
.await
{
match &reason {
PeerConnectionError::NoMatchingCapabilities
Expand Down Expand Up @@ -710,6 +719,7 @@ async fn initialize_connection<S>(
state: &mut Established,
mut stream: S,
eth_version: Arc<RwLock<EthCapVersion>>,
snap_version: Arc<RwLock<Option<SnapCapVersion>>>,
) -> Result<(), PeerConnectionError>
where
S: Unpin + Send + Stream<Item = Result<Message, PeerConnectionError>> + 'static,
Expand All @@ -720,7 +730,7 @@ where
}
exchange_hello_messages(state, &mut stream).await?;

// Update eth capability version to the negotiated version for further message decoding
// Update eth capability version to the negotiated version for further message decoding.
let version = match &state.negotiated_eth_capability {
Some(cap) if cap == &Capability::eth(68) => EthCapVersion::V68,
Some(cap) if cap == &Capability::eth(69) => EthCapVersion::V69,
Expand All @@ -732,6 +742,16 @@ where
.write()
.map_err(|err| PeerConnectionError::InternalError(err.to_string()))? = version;

// Update snap capability version to the negotiated version.
let snap_ver = match &state.negotiated_snap_capability {
Some(cap) if cap == &Capability::snap(1) => Some(SnapCapVersion::V1),
Some(cap) if cap == &Capability::snap(2) => Some(SnapCapVersion::V2),
_ => None,
};
*snap_version
.write()
.map_err(|err| PeerConnectionError::InternalError(err.to_string()))? = snap_ver;

init_capabilities(state, &mut stream).await?;

let mut connection = PeerConnection {
Expand Down Expand Up @@ -1167,6 +1187,7 @@ async fn handle_incoming_message(
| Message::GetStorageRanges(_)
| Message::GetByteCodes(_)
| Message::GetTrieNodes(_)
| Message::Snap2GetBlockAccessLists(_)
);
if is_data_request && !check_serve_request_rate(state) {
warn!(
Expand Down Expand Up @@ -1541,6 +1562,27 @@ async fn handle_incoming_message(
Err(_) => send(state, Message::TrieNodes(TrieNodes { id, nodes: vec![] })).await?,
}
}
Message::Snap2GetBlockAccessLists(req) => {
// Defense-in-depth: only serve if the peer negotiated snap/2.
if state.negotiated_snap_capability != Some(Capability::snap(2)) {
warn!(
peer = %state.node,
"Received Snap2GetBlockAccessLists from peer that did not negotiate snap/2; disconnecting"
);
send_disconnect_message(state, Some(DisconnectReason::ProtocolError)).await;
return Err(PeerConnectionError::DisconnectSent(
DisconnectReason::ProtocolError,
));
}
// Offload synchronous storage/RLP work off the connection task
// so other peers/messages keep flowing on this tokio worker.
let storage = state.storage.clone();
let response =
tokio::task::spawn_blocking(move || build_snap2_bal_response(req, &storage))
.await
.map_err(|e| PeerConnectionError::InternalError(e.to_string()))??;
send(state, Message::Snap2BlockAccessLists(response)).await?
}
#[cfg(feature = "l2")]
Message::L2(req) if peer_supports_l2 => {
handle_based_capability_message(state, req).await?;
Expand All @@ -1555,7 +1597,8 @@ async fn handle_incoming_message(
| message @ Message::Receipts68(_)
| message @ Message::Receipts69(_)
| message @ Message::Receipts70(_)
| message @ Message::BlockAccessLists(_) => {
| message @ Message::BlockAccessLists(_)
| message @ Message::Snap2BlockAccessLists(_) => {
if let Some((_, tx)) = message
.request_id()
.and_then(|id| state.current_requests.remove(&id))
Expand All @@ -1572,6 +1615,72 @@ async fn handle_incoming_message(
Ok(())
}

/// Build a `Snap2BlockAccessLists` response for a `Snap2GetBlockAccessLists` request.
///
/// Per EIP-8189:
/// - §50: always respond; push `None` for unknown/pruned/pre-Amsterdam blocks.
/// - §51: truncate from tail (preserve order) once the byte budget is exceeded.
/// - §52: orphaned blocks are served the same way as canonical blocks (keyed by hash).
/// - §60: enforce `min(response_bytes, 2 MiB)`; `0` means 2 MiB.
/// - §100: push `None` for blocks whose header has `block_access_list_hash == None`.
pub fn build_snap2_bal_response(
req: Snap2GetBlockAccessLists,
storage: &ethrex_storage::Store,
) -> Result<Snap2BlockAccessLists, PeerConnectionError> {
let cap = if req.response_bytes == 0 {
BAL_RESPONSE_SOFT_CAP_BYTES
} else {
req.response_bytes.min(BAL_RESPONSE_SOFT_CAP_BYTES)
};

// Defend against tiny-BAL flood DoS: truncate hash list before doing any
// storage work. Matches go-ethereum's `maxAccessListLookups`.
let hashes: &[H256] = if req.block_hashes.len() > BAL_MAX_REQUEST_HASHES {
&req.block_hashes[..BAL_MAX_REQUEST_HASHES]
} else {
&req.block_hashes
};

// Batched BAL fetch (`Store::iter_block_access_lists_by_hashes`). Header
// lookup remains per-hash because §100 requires inspecting each header's
// `block_access_list_hash` to decide whether to serve the BAL.
let raw_bals = storage
.iter_block_access_lists_by_hashes(hashes)
.map_err(|e| PeerConnectionError::InternalError(e.to_string()))?;

let mut bals: Vec<Option<ethrex_common::types::block_access_list::BlockAccessList>> =
Vec::with_capacity(hashes.len());
let mut bytes_used: u64 = 0;

for (hash, raw_bal) in hashes.iter().zip(raw_bals.into_iter()) {
// Keep at least one entry then stop once cap is exceeded.
if !bals.is_empty() && bytes_used >= cap {
break;
}

let header = storage
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per-hash header lookup runs even when we already have a BAL. When raw_bals[i] is Some(bal), we already know the block is post-Amsterdam (we wouldn't have stored a BAL for a pre-Amsterdam block). The only branch that needs header.block_access_list_hash is the §100 case — i.e. when raw_bal is None and we need to distinguish "pre-Amsterdam" from "unknown/pruned".

Cheap rewrite:

let slot = match raw_bal {
    Some(bal) => Some(bal),
    None => match storage.get_block_header_by_hash(*hash)? {
        Some(h) if h.block_access_list_hash.is_none() => None, // §100
        _ => None, // unknown/pruned
    },
};

Saves N storage reads on the happy path where the peer has all the BALs. On a 1024-hash request that's ~1024 fewer disk hits per response, which is the kind of work this defense-against-DoS limit was designed to bound in the first place.

Non-blocking perf.

.get_block_header_by_hash(*hash)
.map_err(|e| PeerConnectionError::InternalError(e.to_string()))?;

let slot = match header {
// §100: pre-Amsterdam blocks have no block_access_list_hash — return None.
Some(h) if h.block_access_list_hash.is_none() => None,
// Known post-Amsterdam header — serve whatever the store holds (which may itself be None).
Some(_) => raw_bal,
// Unknown block hash.
None => None,
};

bytes_used += match &slot {
Some(bal) => bal.length() as u64,
None => 1, // RLP empty string (0x80) = 1 byte
};
bals.push(slot);
}

Ok(Snap2BlockAccessLists { id: req.id, bals })
}

async fn handle_outgoing_message(
state: &mut Established,
message: Message,
Expand Down
Loading
Loading