From 3d6d2723abaa5347d72a3370d5c610e5f78f2cfb Mon Sep 17 00:00:00 2001 From: Karl Date: Wed, 22 Apr 2026 17:58:11 +0800 Subject: [PATCH 01/11] feat(net): add BAL requirement to block access list requests --- crates/net/network/src/fetch/client.rs | 22 ++- crates/net/network/src/fetch/mod.rs | 134 +++++++++++++++--- .../net/p2p/src/block_access_lists/client.rs | 44 +++++- crates/net/p2p/src/error.rs | 4 +- crates/net/p2p/src/full_block.rs | 3 +- crates/net/p2p/src/lib.rs | 2 +- 6 files changed, 186 insertions(+), 23 deletions(-) diff --git a/crates/net/network/src/fetch/client.rs b/crates/net/network/src/fetch/client.rs index cacef745e8b..bd00dd9a49b 100644 --- a/crates/net/network/src/fetch/client.rs +++ b/crates/net/network/src/fetch/client.rs @@ -6,7 +6,7 @@ use futures::{future, future::Either}; use reth_eth_wire::{BlockAccessLists, EthNetworkPrimitives, NetworkPrimitives}; use reth_network_api::test_utils::PeersHandle; use reth_network_p2p::{ - block_access_lists::client::BlockAccessListsClient, + block_access_lists::client::{BalRequirement, BlockAccessListsClient}, bodies::client::{BodiesClient, BodiesFut}, download::DownloadClient, error::{PeerRequestResult, RequestError}, @@ -135,11 +135,29 @@ impl BlockAccessListsClient for FetchClient { &self, hashes: Vec, priority: Priority, + ) -> Self::Output { + self.get_block_access_lists_with_priority_and_requirement( + hashes, + priority, + BalRequirement::Mandatory, + ) + } + + fn get_block_access_lists_with_priority_and_requirement( + &self, + hashes: Vec, + priority: Priority, + requirement: BalRequirement, ) -> Self::Output { let (response, rx) = oneshot::channel(); if self .request_tx - .send(DownloadRequest::GetBlockAccessLists { request: hashes, response, priority }) + .send(DownloadRequest::GetBlockAccessLists { + request: hashes, + response, + priority, + requirement, + }) .is_ok() { Box::pin(FlattenedResponse::from(rx)) diff --git a/crates/net/network/src/fetch/mod.rs b/crates/net/network/src/fetch/mod.rs index a4bde2d1553..b1dcf321831 100644 --- a/crates/net/network/src/fetch/mod.rs +++ b/crates/net/network/src/fetch/mod.rs @@ -13,6 +13,7 @@ use reth_eth_wire::{ }; use reth_network_api::test_utils::PeersHandle; use reth_network_p2p::{ + block_access_lists::client::BalRequirement, error::{EthResponseValidator, PeerRequestResult, RequestError, RequestResult}, headers::client::HeadersRequest, priority::Priority, @@ -195,28 +196,39 @@ impl StateFetcher { Some(*best_peer.0) } + /// Returns whether any connected peer can serve BAL requests. + fn has_bal_capable_peer(&self) -> bool { + self.peers.values().any(|peer| { + !matches!(peer.state, PeerState::Closing) && + peer.capabilities.supports_eth_at_least(&EthVersion::Eth71) + }) + } + /// Returns the next action to return fn poll_action(&mut self) -> PollAction { - // we only check and not pop here since we don't know yet whether a peer is available. - if self.queued_requests.is_empty() { - return PollAction::NoRequests - } + loop { + // we only check and not pop here since we don't know yet whether a peer is available. + if self.queued_requests.is_empty() { + return PollAction::NoRequests + } - if self.peers.is_empty() { - return PollAction::NoPeersAvailable - } + let request = self.queued_requests.pop_front().expect("not empty"); + let Some(peer_id) = self.next_best_peer(request.best_peer_requirements()) else { + if request.should_complete_without_capable_peer(self.has_bal_capable_peer()) { + request.send_err_response(RequestError::UnsupportedCapability); + continue + } - let request = self.queued_requests.pop_front().expect("not empty"); - let Some(peer_id) = self.next_best_peer(request.best_peer_requirements()) else { - // no peer matches this request's requirements; requeue at the back so other - // queued requests get a chance on the next poll instead of head-of-line blocking. - self.queued_requests.push_back(request); - return PollAction::NoPeersAvailable - }; + // no peer matches this request's requirements; requeue at the back so other + // queued requests get a chance on the next poll instead of head-of-line blocking. + self.queued_requests.push_back(request); + return PollAction::NoPeersAvailable + }; - let request = self.prepare_block_request(peer_id, request); + let request = self.prepare_block_request(peer_id, request); - PollAction::Ready(FetchAction::BlockRequest { peer_id, request }) + return PollAction::Ready(FetchAction::BlockRequest { peer_id, request }) + } } /// Advance the state the syncer @@ -602,6 +614,7 @@ pub(crate) enum DownloadRequest { request: Vec, response: oneshot::Sender>, priority: Priority, + requirement: BalRequirement, }, /// Download receipts for the given block hashes and send response through channel GetReceipts { @@ -639,6 +652,22 @@ impl DownloadRequest { self.get_priority().is_normal() } + /// Returns whether the request may complete locally when no capable peer exists. + const fn should_complete_without_capable_peer(&self, has_bal_capable_peer: bool) -> bool { + matches!(self, Self::GetBlockAccessLists { requirement: BalRequirement::Optional, .. }) && + !has_bal_capable_peer + } + + /// Sends an error response to the waiting caller. + fn send_err_response(self, err: RequestError) { + let _ = match self { + Self::GetBlockHeaders { response, .. } => response.send(Err(err)).ok(), + Self::GetBlockBodies { response, .. } => response.send(Err(err)).ok(), + Self::GetBlockAccessLists { response, .. } => response.send(Err(err)).ok(), + Self::GetReceipts { response, .. } => response.send(Err(err)).ok(), + }; + } + /// Returns the best peer requirements for this request. fn best_peer_requirements(&self) -> BestPeerRequirements { match self { @@ -1541,6 +1570,7 @@ mod tests { request: vec![], response: tx, priority: Priority::Normal, + requirement: BalRequirement::Mandatory, }); let waker = noop_waker(); @@ -1583,4 +1613,76 @@ mod tests { assert_eq!(peer_id, peer_71); } } + + #[tokio::test] + async fn test_optional_bal_request_completes_without_capable_peer() { + use futures::task::noop_waker; + use std::task::{Context, Poll}; + + let manager = PeersManager::new(PeersConfig::default()); + let mut fetcher = + StateFetcher::::new(manager.handle(), Default::default()); + + let peer_old = B512::random(); + let caps_old = Arc::new(Capabilities::new(vec![])); + fetcher.new_active_peer( + peer_old, + B256::random(), + 100, + caps_old, + Arc::new(AtomicU64::new(10)), + None, + ); + + let (tx, rx) = oneshot::channel(); + fetcher.queued_requests.push_back(DownloadRequest::GetBlockAccessLists { + request: vec![], + response: tx, + priority: Priority::Normal, + requirement: BalRequirement::Optional, + }); + + let waker = noop_waker(); + let mut cx = Context::from_waker(&waker); + + assert!(matches!(fetcher.poll(&mut cx), Poll::Pending)); + assert!(fetcher.queued_requests.is_empty()); + assert_eq!(rx.await.unwrap().unwrap_err(), RequestError::UnsupportedCapability); + } + + #[tokio::test] + async fn test_optional_bal_request_waits_for_busy_capable_peer() { + use futures::task::noop_waker; + use std::task::{Context, Poll}; + + let manager = PeersManager::new(PeersConfig::default()); + let mut fetcher = + StateFetcher::::new(manager.handle(), Default::default()); + + let peer_71 = B512::random(); + let caps_71 = Arc::new(Capabilities::from(vec![Capability::new("eth".into(), 71)])); + fetcher.new_active_peer( + peer_71, + B256::random(), + 100, + caps_71, + Arc::new(AtomicU64::new(10)), + None, + ); + fetcher.peers.get_mut(&peer_71).expect("peer exists").state = PeerState::GetBlockHeaders; + + let (tx, _rx) = oneshot::channel(); + fetcher.queued_requests.push_back(DownloadRequest::GetBlockAccessLists { + request: vec![], + response: tx, + priority: Priority::Normal, + requirement: BalRequirement::Optional, + }); + + let waker = noop_waker(); + let mut cx = Context::from_waker(&waker); + + assert!(matches!(fetcher.poll(&mut cx), Poll::Pending)); + assert_eq!(fetcher.queued_requests.len(), 1); + } } diff --git a/crates/net/p2p/src/block_access_lists/client.rs b/crates/net/p2p/src/block_access_lists/client.rs index 80cbff3909f..cc9301d91bc 100644 --- a/crates/net/p2p/src/block_access_lists/client.rs +++ b/crates/net/p2p/src/block_access_lists/client.rs @@ -4,6 +4,17 @@ use auto_impl::auto_impl; use futures::Future; use reth_eth_wire_types::BlockAccessLists; +/// Controls whether a BAL request must wait for a capable peer or may complete early when none are +/// available. +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] +pub enum BalRequirement { + /// Keep waiting until an eth/71-capable peer is available. + #[default] + Mandatory, + /// Return early if no connected peer can serve BALs. + Optional, +} + /// A client capable of downloading block access lists. #[auto_impl(&, Arc, Box)] pub trait BlockAccessListsClient: DownloadClient { @@ -12,7 +23,24 @@ pub trait BlockAccessListsClient: DownloadClient { /// Fetches the block access lists for given hashes. fn get_block_access_lists(&self, hashes: Vec) -> Self::Output { - self.get_block_access_lists_with_priority(hashes, Priority::Normal) + self.get_block_access_lists_with_priority_and_requirement( + hashes, + Priority::Normal, + BalRequirement::Mandatory, + ) + } + + /// Fetches the block access lists for given hashes with the requested BAL availability policy. + fn get_block_access_lists_with_requirement( + &self, + hashes: Vec, + requirement: BalRequirement, + ) -> Self::Output { + self.get_block_access_lists_with_priority_and_requirement( + hashes, + Priority::Normal, + requirement, + ) } /// Fetches the block access lists for given hashes with priority @@ -20,5 +48,19 @@ pub trait BlockAccessListsClient: DownloadClient { &self, hashes: Vec, priority: Priority, + ) -> Self::Output { + self.get_block_access_lists_with_priority_and_requirement( + hashes, + priority, + BalRequirement::Mandatory, + ) + } + + /// Fetches the block access lists for given hashes with priority and BAL availability policy. + fn get_block_access_lists_with_priority_and_requirement( + &self, + hashes: Vec, + priority: Priority, + requirement: BalRequirement, ) -> Self::Output; } diff --git a/crates/net/p2p/src/error.rs b/crates/net/p2p/src/error.rs index d650763b4eb..609121cad5e 100644 --- a/crates/net/p2p/src/error.rs +++ b/crates/net/p2p/src/error.rs @@ -57,8 +57,8 @@ impl EthResponseValidator for RequestResult> { /// [`RequestError::ConnectionDropped`] should be ignored here because this is already handled /// when the dropped connection is handled. /// - /// [`RequestError::UnsupportedCapability`] is not used yet because we only support active - /// session for eth protocol. + /// [`RequestError::UnsupportedCapability`] is also used for locally rejected optional requests, + /// which should not affect peer reputation. fn reputation_change_err(&self) -> Option { if let Err(err) = self { match err { diff --git a/crates/net/p2p/src/full_block.rs b/crates/net/p2p/src/full_block.rs index a52df8402e1..45bedcbd39c 100644 --- a/crates/net/p2p/src/full_block.rs +++ b/crates/net/p2p/src/full_block.rs @@ -1110,10 +1110,11 @@ mod tests { impl BlockAccessListsClient for FullBlockWithAccessListsClient { type Output = futures::future::Ready>; - fn get_block_access_lists_with_priority( + fn get_block_access_lists_with_priority_and_requirement( &self, hashes: Vec, _priority: Priority, + _requirement: crate::block_access_lists::client::BalRequirement, ) -> Self::Output { self.access_list_requests.fetch_add(1, Ordering::SeqCst); diff --git a/crates/net/p2p/src/lib.rs b/crates/net/p2p/src/lib.rs index 4107f68faa0..4457fdd9f9d 100644 --- a/crates/net/p2p/src/lib.rs +++ b/crates/net/p2p/src/lib.rs @@ -54,7 +54,7 @@ pub mod snap; #[cfg(any(test, feature = "test-utils"))] pub mod test_utils; -pub use block_access_lists::client::BlockAccessListsClient; +pub use block_access_lists::client::{BalRequirement, BlockAccessListsClient}; pub use bodies::client::BodiesClient; pub use headers::client::HeadersClient; pub use receipts::client::ReceiptsClient; From 695b99e0e7dab2cc224961848e47bf56ce7e2cae Mon Sep 17 00:00:00 2001 From: Karl Date: Fri, 24 Apr 2026 09:13:22 +0800 Subject: [PATCH 02/11] feat(net): reject optional BAL at ingress --- crates/net/network/src/fetch/mod.rs | 84 ++++++++++++++++------------- 1 file changed, 46 insertions(+), 38 deletions(-) diff --git a/crates/net/network/src/fetch/mod.rs b/crates/net/network/src/fetch/mod.rs index b1dcf321831..8f3b8e77cdf 100644 --- a/crates/net/network/src/fetch/mod.rs +++ b/crates/net/network/src/fetch/mod.rs @@ -214,11 +214,6 @@ impl StateFetcher { let request = self.queued_requests.pop_front().expect("not empty"); let Some(peer_id) = self.next_best_peer(request.best_peer_requirements()) else { - if request.should_complete_without_capable_peer(self.has_bal_capable_peer()) { - request.send_err_response(RequestError::UnsupportedCapability); - continue - } - // no peer matches this request's requirements; requeue at the back so other // queued requests get a chance on the next poll instead of head-of-line blocking. self.queued_requests.push_back(request); @@ -244,21 +239,34 @@ impl StateFetcher { loop { // poll incoming requests match self.download_requests_rx.poll_next_unpin(cx) { - Poll::Ready(Some(request)) => match request.get_priority() { - Priority::High => { - // find the first normal request and queue before, add this request to - // the back of the high-priority queue - let pos = self - .queued_requests - .iter() - .position(|req| req.is_normal_priority()) - .unwrap_or(0); - self.queued_requests.insert(pos, request); + Poll::Ready(Some(request)) => { + if matches!( + request, + DownloadRequest::GetBlockAccessLists { + requirement: BalRequirement::Optional, + .. + } + ) && !self.has_bal_capable_peer() + { + request.send_err_response(RequestError::UnsupportedCapability); + continue } - Priority::Normal => { - self.queued_requests.push_back(request); + + match request.get_priority() { + Priority::High => { + // Queue before the first normal request. + let pos = self + .queued_requests + .iter() + .position(|req| req.is_normal_priority()) + .unwrap_or(0); + self.queued_requests.insert(pos, request); + } + Priority::Normal => { + self.queued_requests.push_back(request); + } } - }, + } Poll::Ready(None) => { unreachable!("channel can't close") } @@ -652,12 +660,6 @@ impl DownloadRequest { self.get_priority().is_normal() } - /// Returns whether the request may complete locally when no capable peer exists. - const fn should_complete_without_capable_peer(&self, has_bal_capable_peer: bool) -> bool { - matches!(self, Self::GetBlockAccessLists { requirement: BalRequirement::Optional, .. }) && - !has_bal_capable_peer - } - /// Sends an error response to the waiting caller. fn send_err_response(self, err: RequestError) { let _ = match self { @@ -1615,7 +1617,7 @@ mod tests { } #[tokio::test] - async fn test_optional_bal_request_completes_without_capable_peer() { + async fn test_optional_bal_request_rejected_without_eth71_peer() { use futures::task::noop_waker; use std::task::{Context, Poll}; @@ -1635,12 +1637,15 @@ mod tests { ); let (tx, rx) = oneshot::channel(); - fetcher.queued_requests.push_back(DownloadRequest::GetBlockAccessLists { - request: vec![], - response: tx, - priority: Priority::Normal, - requirement: BalRequirement::Optional, - }); + fetcher + .download_requests_tx + .send(DownloadRequest::GetBlockAccessLists { + request: vec![], + response: tx, + priority: Priority::Normal, + requirement: BalRequirement::Optional, + }) + .unwrap(); let waker = noop_waker(); let mut cx = Context::from_waker(&waker); @@ -1651,7 +1656,7 @@ mod tests { } #[tokio::test] - async fn test_optional_bal_request_waits_for_busy_capable_peer() { + async fn test_optional_bal_request_waits_for_busy_eth71_peer() { use futures::task::noop_waker; use std::task::{Context, Poll}; @@ -1672,12 +1677,15 @@ mod tests { fetcher.peers.get_mut(&peer_71).expect("peer exists").state = PeerState::GetBlockHeaders; let (tx, _rx) = oneshot::channel(); - fetcher.queued_requests.push_back(DownloadRequest::GetBlockAccessLists { - request: vec![], - response: tx, - priority: Priority::Normal, - requirement: BalRequirement::Optional, - }); + fetcher + .download_requests_tx + .send(DownloadRequest::GetBlockAccessLists { + request: vec![], + response: tx, + priority: Priority::Normal, + requirement: BalRequirement::Optional, + }) + .unwrap(); let waker = noop_waker(); let mut cx = Context::from_waker(&waker); From a7f6ca403f716ba52b1c31cf79d83be861edf4aa Mon Sep 17 00:00:00 2001 From: Karl Date: Fri, 24 Apr 2026 10:14:24 +0800 Subject: [PATCH 03/11] fix(net): remove never_loop in fetch poll_action --- crates/net/network/src/fetch/mod.rs | 28 +++++++++++++--------------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/crates/net/network/src/fetch/mod.rs b/crates/net/network/src/fetch/mod.rs index 8f3b8e77cdf..d1e8ced2b8d 100644 --- a/crates/net/network/src/fetch/mod.rs +++ b/crates/net/network/src/fetch/mod.rs @@ -206,24 +206,22 @@ impl StateFetcher { /// Returns the next action to return fn poll_action(&mut self) -> PollAction { - loop { - // we only check and not pop here since we don't know yet whether a peer is available. - if self.queued_requests.is_empty() { - return PollAction::NoRequests - } + // We only pop once we know the queue is non-empty. + if self.queued_requests.is_empty() { + return PollAction::NoRequests + } - let request = self.queued_requests.pop_front().expect("not empty"); - let Some(peer_id) = self.next_best_peer(request.best_peer_requirements()) else { - // no peer matches this request's requirements; requeue at the back so other - // queued requests get a chance on the next poll instead of head-of-line blocking. - self.queued_requests.push_back(request); - return PollAction::NoPeersAvailable - }; + let request = self.queued_requests.pop_front().expect("not empty"); + let Some(peer_id) = self.next_best_peer(request.best_peer_requirements()) else { + // no peer matches this request's requirements; requeue at the back so other + // queued requests get a chance on the next poll instead of head-of-line blocking. + self.queued_requests.push_back(request); + return PollAction::NoPeersAvailable + }; - let request = self.prepare_block_request(peer_id, request); + let request = self.prepare_block_request(peer_id, request); - return PollAction::Ready(FetchAction::BlockRequest { peer_id, request }) - } + PollAction::Ready(FetchAction::BlockRequest { peer_id, request }) } /// Advance the state the syncer From abbb7e1684916349eab804ed41b3a3414d5c9919 Mon Sep 17 00:00:00 2001 From: Karl Date: Fri, 24 Apr 2026 10:38:44 +0800 Subject: [PATCH 04/11] rename --- crates/net/network/src/fetch/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/net/network/src/fetch/mod.rs b/crates/net/network/src/fetch/mod.rs index d1e8ced2b8d..0fb960a7ca3 100644 --- a/crates/net/network/src/fetch/mod.rs +++ b/crates/net/network/src/fetch/mod.rs @@ -197,7 +197,7 @@ impl StateFetcher { } /// Returns whether any connected peer can serve BAL requests. - fn has_bal_capable_peer(&self) -> bool { + fn has_eth71_peer(&self) -> bool { self.peers.values().any(|peer| { !matches!(peer.state, PeerState::Closing) && peer.capabilities.supports_eth_at_least(&EthVersion::Eth71) @@ -244,7 +244,7 @@ impl StateFetcher { requirement: BalRequirement::Optional, .. } - ) && !self.has_bal_capable_peer() + ) && !self.has_eth71_peer() { request.send_err_response(RequestError::UnsupportedCapability); continue From 57601efefaee03d251daeb7ce896dd91c1c33ee0 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Fri, 24 Apr 2026 10:03:19 +0200 Subject: [PATCH 05/11] fix(net): handle optional BAL queue edge cases --- crates/net/network/src/fetch/mod.rs | 176 +++++++++++++++++++++++++--- 1 file changed, 162 insertions(+), 14 deletions(-) diff --git a/crates/net/network/src/fetch/mod.rs b/crates/net/network/src/fetch/mod.rs index 0fb960a7ca3..2c818baa3ca 100644 --- a/crates/net/network/src/fetch/mod.rs +++ b/crates/net/network/src/fetch/mod.rs @@ -160,15 +160,10 @@ impl StateFetcher { /// full history available fn next_best_peer(&self, requirement: BestPeerRequirements) -> Option { // filter out peers that aren't idle or don't meet the requirement - let mut idle = self.peers.iter().filter(|(_, peer)| { - peer.state.is_idle() && - match &requirement { - BestPeerRequirements::EthVersion(ver) => { - peer.capabilities.supports_eth_at_least(ver) - } - _ => true, - } - }); + let mut idle = self + .peers + .iter() + .filter(|(_, peer)| peer.state.is_idle() && peer.satisfies(&requirement)); let mut best_peer = idle.next()?; @@ -206,11 +201,21 @@ impl StateFetcher { /// Returns the next action to return fn poll_action(&mut self) -> PollAction { - // We only pop once we know the queue is non-empty. + // we only check and not pop here since we don't know yet whether a peer is available. + if self.queued_requests.is_empty() { + return PollAction::NoRequests + } + + self.reject_optional_bal_requests_without_eth71(); + if self.queued_requests.is_empty() { return PollAction::NoRequests } + if self.peers.is_empty() { + return PollAction::NoPeersAvailable + } + let request = self.queued_requests.pop_front().expect("not empty"); let Some(peer_id) = self.next_best_peer(request.best_peer_requirements()) else { // no peer matches this request's requirements; requeue at the back so other @@ -252,7 +257,8 @@ impl StateFetcher { match request.get_priority() { Priority::High => { - // Queue before the first normal request. + // find first normal request and queue before it; add this request + // to the back of the high-priority queue let pos = self .queued_requests .iter() @@ -278,6 +284,24 @@ impl StateFetcher { } } + /// Rejects queued optional BAL requests if no connected peer can serve them. + fn reject_optional_bal_requests_without_eth71(&mut self) { + if self.has_eth71_peer() { + return + } + + let mut idx = 0; + while idx < self.queued_requests.len() { + if self.queued_requests[idx].is_optional_bal() { + if let Some(request) = self.queued_requests.remove(idx) { + request.send_err_response(RequestError::UnsupportedCapability); + } + } else { + idx += 1; + } + } + } + /// Handles a new request to a peer. /// /// Caution: this assumes the peer exists and is idle @@ -321,9 +345,24 @@ impl StateFetcher { /// /// Caution: this expects that the peer is _not_ closed. fn followup_request(&mut self, peer_id: PeerId) -> Option { - let req = self.queued_requests.pop_front()?; - let req = self.prepare_block_request(peer_id, req); - Some(BlockResponseOutcome::Request(peer_id, req)) + self.reject_optional_bal_requests_without_eth71(); + + let queued_len = self.queued_requests.len(); + for _ in 0..queued_len { + let req = self.queued_requests.pop_front()?; + let can_request = self.peers.get(&peer_id).is_some_and(|peer| { + peer.state.is_idle() && peer.satisfies(&req.best_peer_requirements()) + }); + + if can_request { + let req = self.prepare_block_request(peer_id, req); + return Some(BlockResponseOutcome::Request(peer_id, req)) + } + + self.queued_requests.push_back(req); + } + + None } /// Called on a `GetBlockHeaders` response from a peer. @@ -494,6 +533,16 @@ impl Peer { self.range_info.as_ref().map(|info| info.range()) } + /// Returns whether this peer can serve requests with the given hard requirements. + fn satisfies(&self, requirement: &BestPeerRequirements) -> bool { + match requirement { + BestPeerRequirements::EthVersion(ver) => self.capabilities.supports_eth_at_least(ver), + BestPeerRequirements::None | + BestPeerRequirements::FullBlock | + BestPeerRequirements::FullBlockRange(_) => true, + } + } + /// Returns true if this peer has a better range than the other peer for serving the requested /// range. /// @@ -658,6 +707,11 @@ impl DownloadRequest { self.get_priority().is_normal() } + /// Returns `true` if this is an optional BAL request. + const fn is_optional_bal(&self) -> bool { + matches!(self, Self::GetBlockAccessLists { requirement: BalRequirement::Optional, .. }) + } + /// Sends an error response to the waiting caller. fn send_err_response(self, err: RequestError) { let _ = match self { @@ -1691,4 +1745,98 @@ mod tests { assert!(matches!(fetcher.poll(&mut cx), Poll::Pending)); assert_eq!(fetcher.queued_requests.len(), 1); } + + #[tokio::test] + async fn test_queued_optional_bal_request_rejected_after_eth71_disconnect() { + use futures::task::noop_waker; + use std::task::{Context, Poll}; + + let manager = PeersManager::new(PeersConfig::default()); + let mut fetcher = + StateFetcher::::new(manager.handle(), Default::default()); + + let peer_71 = B512::random(); + let caps_71 = Arc::new(Capabilities::from(vec![Capability::new("eth".into(), 71)])); + fetcher.new_active_peer( + peer_71, + B256::random(), + 100, + caps_71, + Arc::new(AtomicU64::new(10)), + None, + ); + fetcher.peers.get_mut(&peer_71).expect("peer exists").state = PeerState::GetBlockHeaders; + + let (tx, rx) = oneshot::channel(); + fetcher + .download_requests_tx + .send(DownloadRequest::GetBlockAccessLists { + request: vec![], + response: tx, + priority: Priority::Normal, + requirement: BalRequirement::Optional, + }) + .unwrap(); + + let waker = noop_waker(); + let mut cx = Context::from_waker(&waker); + + assert!(matches!(fetcher.poll(&mut cx), Poll::Pending)); + assert_eq!(fetcher.queued_requests.len(), 1); + + fetcher.on_session_closed(&peer_71); + + assert!(matches!(fetcher.poll(&mut cx), Poll::Pending)); + assert!(fetcher.queued_requests.is_empty()); + assert_eq!(rx.await.unwrap().unwrap_err(), RequestError::UnsupportedCapability); + } + + #[tokio::test] + async fn test_followup_skips_bal_for_peer_without_eth71() { + let manager = PeersManager::new(PeersConfig::default()); + let mut fetcher = + StateFetcher::::new(manager.handle(), Default::default()); + + let peer_old = B512::random(); + let caps_old = Arc::new(Capabilities::new(vec![])); + fetcher.new_active_peer( + peer_old, + B256::random(), + 100, + caps_old, + Arc::new(AtomicU64::new(10)), + None, + ); + fetcher.peers.get_mut(&peer_old).expect("peer exists").state = PeerState::GetBlockHeaders; + + let peer_71 = B512::random(); + let caps_71 = Arc::new(Capabilities::from(vec![Capability::new("eth".into(), 71)])); + fetcher.new_active_peer( + peer_71, + B256::random(), + 100, + caps_71, + Arc::new(AtomicU64::new(10)), + None, + ); + fetcher.peers.get_mut(&peer_71).expect("peer exists").state = PeerState::GetBlockHeaders; + + let (tx, _rx) = oneshot::channel(); + fetcher.queued_requests.push_back(DownloadRequest::GetBlockAccessLists { + request: vec![], + response: tx, + priority: Priority::Normal, + requirement: BalRequirement::Optional, + }); + + assert!(fetcher.on_block_headers_response(peer_old, Ok(Vec::
::new())).is_none()); + assert_eq!(fetcher.queued_requests.len(), 1); + assert!(matches!( + fetcher.queued_requests.front(), + Some(DownloadRequest::GetBlockAccessLists { + requirement: BalRequirement::Optional, + .. + }) + )); + } } From 2dd8642778ade034f0d908f6efa12caa68e237cc Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Fri, 24 Apr 2026 10:05:50 +0200 Subject: [PATCH 06/11] Revert "fix(net): handle optional BAL queue edge cases" This reverts commit 57601efefaee03d251daeb7ce896dd91c1c33ee0. --- crates/net/network/src/fetch/mod.rs | 176 +++------------------------- 1 file changed, 14 insertions(+), 162 deletions(-) diff --git a/crates/net/network/src/fetch/mod.rs b/crates/net/network/src/fetch/mod.rs index 2c818baa3ca..0fb960a7ca3 100644 --- a/crates/net/network/src/fetch/mod.rs +++ b/crates/net/network/src/fetch/mod.rs @@ -160,10 +160,15 @@ impl StateFetcher { /// full history available fn next_best_peer(&self, requirement: BestPeerRequirements) -> Option { // filter out peers that aren't idle or don't meet the requirement - let mut idle = self - .peers - .iter() - .filter(|(_, peer)| peer.state.is_idle() && peer.satisfies(&requirement)); + let mut idle = self.peers.iter().filter(|(_, peer)| { + peer.state.is_idle() && + match &requirement { + BestPeerRequirements::EthVersion(ver) => { + peer.capabilities.supports_eth_at_least(ver) + } + _ => true, + } + }); let mut best_peer = idle.next()?; @@ -201,21 +206,11 @@ impl StateFetcher { /// Returns the next action to return fn poll_action(&mut self) -> PollAction { - // we only check and not pop here since we don't know yet whether a peer is available. - if self.queued_requests.is_empty() { - return PollAction::NoRequests - } - - self.reject_optional_bal_requests_without_eth71(); - + // We only pop once we know the queue is non-empty. if self.queued_requests.is_empty() { return PollAction::NoRequests } - if self.peers.is_empty() { - return PollAction::NoPeersAvailable - } - let request = self.queued_requests.pop_front().expect("not empty"); let Some(peer_id) = self.next_best_peer(request.best_peer_requirements()) else { // no peer matches this request's requirements; requeue at the back so other @@ -257,8 +252,7 @@ impl StateFetcher { match request.get_priority() { Priority::High => { - // find first normal request and queue before it; add this request - // to the back of the high-priority queue + // Queue before the first normal request. let pos = self .queued_requests .iter() @@ -284,24 +278,6 @@ impl StateFetcher { } } - /// Rejects queued optional BAL requests if no connected peer can serve them. - fn reject_optional_bal_requests_without_eth71(&mut self) { - if self.has_eth71_peer() { - return - } - - let mut idx = 0; - while idx < self.queued_requests.len() { - if self.queued_requests[idx].is_optional_bal() { - if let Some(request) = self.queued_requests.remove(idx) { - request.send_err_response(RequestError::UnsupportedCapability); - } - } else { - idx += 1; - } - } - } - /// Handles a new request to a peer. /// /// Caution: this assumes the peer exists and is idle @@ -345,24 +321,9 @@ impl StateFetcher { /// /// Caution: this expects that the peer is _not_ closed. fn followup_request(&mut self, peer_id: PeerId) -> Option { - self.reject_optional_bal_requests_without_eth71(); - - let queued_len = self.queued_requests.len(); - for _ in 0..queued_len { - let req = self.queued_requests.pop_front()?; - let can_request = self.peers.get(&peer_id).is_some_and(|peer| { - peer.state.is_idle() && peer.satisfies(&req.best_peer_requirements()) - }); - - if can_request { - let req = self.prepare_block_request(peer_id, req); - return Some(BlockResponseOutcome::Request(peer_id, req)) - } - - self.queued_requests.push_back(req); - } - - None + let req = self.queued_requests.pop_front()?; + let req = self.prepare_block_request(peer_id, req); + Some(BlockResponseOutcome::Request(peer_id, req)) } /// Called on a `GetBlockHeaders` response from a peer. @@ -533,16 +494,6 @@ impl Peer { self.range_info.as_ref().map(|info| info.range()) } - /// Returns whether this peer can serve requests with the given hard requirements. - fn satisfies(&self, requirement: &BestPeerRequirements) -> bool { - match requirement { - BestPeerRequirements::EthVersion(ver) => self.capabilities.supports_eth_at_least(ver), - BestPeerRequirements::None | - BestPeerRequirements::FullBlock | - BestPeerRequirements::FullBlockRange(_) => true, - } - } - /// Returns true if this peer has a better range than the other peer for serving the requested /// range. /// @@ -707,11 +658,6 @@ impl DownloadRequest { self.get_priority().is_normal() } - /// Returns `true` if this is an optional BAL request. - const fn is_optional_bal(&self) -> bool { - matches!(self, Self::GetBlockAccessLists { requirement: BalRequirement::Optional, .. }) - } - /// Sends an error response to the waiting caller. fn send_err_response(self, err: RequestError) { let _ = match self { @@ -1745,98 +1691,4 @@ mod tests { assert!(matches!(fetcher.poll(&mut cx), Poll::Pending)); assert_eq!(fetcher.queued_requests.len(), 1); } - - #[tokio::test] - async fn test_queued_optional_bal_request_rejected_after_eth71_disconnect() { - use futures::task::noop_waker; - use std::task::{Context, Poll}; - - let manager = PeersManager::new(PeersConfig::default()); - let mut fetcher = - StateFetcher::::new(manager.handle(), Default::default()); - - let peer_71 = B512::random(); - let caps_71 = Arc::new(Capabilities::from(vec![Capability::new("eth".into(), 71)])); - fetcher.new_active_peer( - peer_71, - B256::random(), - 100, - caps_71, - Arc::new(AtomicU64::new(10)), - None, - ); - fetcher.peers.get_mut(&peer_71).expect("peer exists").state = PeerState::GetBlockHeaders; - - let (tx, rx) = oneshot::channel(); - fetcher - .download_requests_tx - .send(DownloadRequest::GetBlockAccessLists { - request: vec![], - response: tx, - priority: Priority::Normal, - requirement: BalRequirement::Optional, - }) - .unwrap(); - - let waker = noop_waker(); - let mut cx = Context::from_waker(&waker); - - assert!(matches!(fetcher.poll(&mut cx), Poll::Pending)); - assert_eq!(fetcher.queued_requests.len(), 1); - - fetcher.on_session_closed(&peer_71); - - assert!(matches!(fetcher.poll(&mut cx), Poll::Pending)); - assert!(fetcher.queued_requests.is_empty()); - assert_eq!(rx.await.unwrap().unwrap_err(), RequestError::UnsupportedCapability); - } - - #[tokio::test] - async fn test_followup_skips_bal_for_peer_without_eth71() { - let manager = PeersManager::new(PeersConfig::default()); - let mut fetcher = - StateFetcher::::new(manager.handle(), Default::default()); - - let peer_old = B512::random(); - let caps_old = Arc::new(Capabilities::new(vec![])); - fetcher.new_active_peer( - peer_old, - B256::random(), - 100, - caps_old, - Arc::new(AtomicU64::new(10)), - None, - ); - fetcher.peers.get_mut(&peer_old).expect("peer exists").state = PeerState::GetBlockHeaders; - - let peer_71 = B512::random(); - let caps_71 = Arc::new(Capabilities::from(vec![Capability::new("eth".into(), 71)])); - fetcher.new_active_peer( - peer_71, - B256::random(), - 100, - caps_71, - Arc::new(AtomicU64::new(10)), - None, - ); - fetcher.peers.get_mut(&peer_71).expect("peer exists").state = PeerState::GetBlockHeaders; - - let (tx, _rx) = oneshot::channel(); - fetcher.queued_requests.push_back(DownloadRequest::GetBlockAccessLists { - request: vec![], - response: tx, - priority: Priority::Normal, - requirement: BalRequirement::Optional, - }); - - assert!(fetcher.on_block_headers_response(peer_old, Ok(Vec::
::new())).is_none()); - assert_eq!(fetcher.queued_requests.len(), 1); - assert!(matches!( - fetcher.queued_requests.front(), - Some(DownloadRequest::GetBlockAccessLists { - requirement: BalRequirement::Optional, - .. - }) - )); - } } From 22de11b3862cbc2af00e0845ea2e9d2743dff223 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Fri, 24 Apr 2026 10:07:32 +0200 Subject: [PATCH 07/11] fix(net): restore fetch queue comments --- crates/net/network/src/fetch/mod.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/crates/net/network/src/fetch/mod.rs b/crates/net/network/src/fetch/mod.rs index 0fb960a7ca3..f457ccfb4b3 100644 --- a/crates/net/network/src/fetch/mod.rs +++ b/crates/net/network/src/fetch/mod.rs @@ -206,11 +206,15 @@ impl StateFetcher { /// Returns the next action to return fn poll_action(&mut self) -> PollAction { - // We only pop once we know the queue is non-empty. + // we only check and not pop here since we don't know yet whether a peer is available. if self.queued_requests.is_empty() { return PollAction::NoRequests } + if self.peers.is_empty() { + return PollAction::NoPeersAvailable + } + let request = self.queued_requests.pop_front().expect("not empty"); let Some(peer_id) = self.next_best_peer(request.best_peer_requirements()) else { // no peer matches this request's requirements; requeue at the back so other @@ -252,7 +256,8 @@ impl StateFetcher { match request.get_priority() { Priority::High => { - // Queue before the first normal request. + // find first normal request and queue before it; add this request + // to the back of the high-priority queue let pos = self .queued_requests .iter() From f7fd967f4a8466a422609007218092a1aae7575d Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Fri, 24 Apr 2026 10:08:45 +0200 Subject: [PATCH 08/11] refactor(net): add peer requirement helper --- crates/net/network/src/fetch/mod.rs | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/crates/net/network/src/fetch/mod.rs b/crates/net/network/src/fetch/mod.rs index f457ccfb4b3..b7bec6ff48f 100644 --- a/crates/net/network/src/fetch/mod.rs +++ b/crates/net/network/src/fetch/mod.rs @@ -160,15 +160,10 @@ impl StateFetcher { /// full history available fn next_best_peer(&self, requirement: BestPeerRequirements) -> Option { // filter out peers that aren't idle or don't meet the requirement - let mut idle = self.peers.iter().filter(|(_, peer)| { - peer.state.is_idle() && - match &requirement { - BestPeerRequirements::EthVersion(ver) => { - peer.capabilities.supports_eth_at_least(ver) - } - _ => true, - } - }); + let mut idle = self + .peers + .iter() + .filter(|(_, peer)| peer.state.is_idle() && peer.satisfies(&requirement)); let mut best_peer = idle.next()?; @@ -499,6 +494,16 @@ impl Peer { self.range_info.as_ref().map(|info| info.range()) } + /// Returns whether this peer can serve requests with the given hard requirements. + fn satisfies(&self, requirement: &BestPeerRequirements) -> bool { + match requirement { + BestPeerRequirements::EthVersion(ver) => self.capabilities.supports_eth_at_least(ver), + BestPeerRequirements::None | + BestPeerRequirements::FullBlock | + BestPeerRequirements::FullBlockRange(_) => true, + } + } + /// Returns true if this peer has a better range than the other peer for serving the requested /// range. /// From 46e33d4741090ce78804b9fb2bb79f5d165fd31c Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Fri, 24 Apr 2026 10:09:29 +0200 Subject: [PATCH 09/11] refactor(net): add optional BAL request helper --- crates/net/network/src/fetch/mod.rs | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/crates/net/network/src/fetch/mod.rs b/crates/net/network/src/fetch/mod.rs index b7bec6ff48f..60d5363ffd0 100644 --- a/crates/net/network/src/fetch/mod.rs +++ b/crates/net/network/src/fetch/mod.rs @@ -237,14 +237,7 @@ impl StateFetcher { // poll incoming requests match self.download_requests_rx.poll_next_unpin(cx) { Poll::Ready(Some(request)) => { - if matches!( - request, - DownloadRequest::GetBlockAccessLists { - requirement: BalRequirement::Optional, - .. - } - ) && !self.has_eth71_peer() - { + if request.is_optional_bal() && !self.has_eth71_peer() { request.send_err_response(RequestError::UnsupportedCapability); continue } @@ -668,6 +661,11 @@ impl DownloadRequest { self.get_priority().is_normal() } + /// Returns `true` if this is an optional BAL request. + const fn is_optional_bal(&self) -> bool { + matches!(self, Self::GetBlockAccessLists { requirement: BalRequirement::Optional, .. }) + } + /// Sends an error response to the waiting caller. fn send_err_response(self, err: RequestError) { let _ = match self { From f827ae673c77123e946fd353d7f05d909c7ad300 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Fri, 24 Apr 2026 10:10:40 +0200 Subject: [PATCH 10/11] docs(net): explain optional BAL peer check --- crates/net/network/src/fetch/mod.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/crates/net/network/src/fetch/mod.rs b/crates/net/network/src/fetch/mod.rs index 60d5363ffd0..e0215fcefd5 100644 --- a/crates/net/network/src/fetch/mod.rs +++ b/crates/net/network/src/fetch/mod.rs @@ -237,6 +237,8 @@ impl StateFetcher { // poll incoming requests match self.download_requests_rx.poll_next_unpin(cx) { Poll::Ready(Some(request)) => { + // Optional BAL requests should not wait for future peer churn if no + // connected peer can serve them right now. if request.is_optional_bal() && !self.has_eth71_peer() { request.send_err_response(RequestError::UnsupportedCapability); continue From 05fc4883d1555368ce59830ddbd6de47f6ec037a Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Fri, 24 Apr 2026 10:14:38 +0200 Subject: [PATCH 11/11] fix(net): reject queued optional BAL without eth71 --- crates/net/network/src/fetch/mod.rs | 68 +++++++++++++++++++++++++++-- 1 file changed, 65 insertions(+), 3 deletions(-) diff --git a/crates/net/network/src/fetch/mod.rs b/crates/net/network/src/fetch/mod.rs index e0215fcefd5..e284ffda35d 100644 --- a/crates/net/network/src/fetch/mod.rs +++ b/crates/net/network/src/fetch/mod.rs @@ -212,9 +212,15 @@ impl StateFetcher { let request = self.queued_requests.pop_front().expect("not empty"); let Some(peer_id) = self.next_best_peer(request.best_peer_requirements()) else { - // no peer matches this request's requirements; requeue at the back so other - // queued requests get a chance on the next poll instead of head-of-line blocking. - self.queued_requests.push_back(request); + // Optional BAL requests can lose their eth/71 peer while queued; complete them + // instead of waiting for future peer churn. + if request.is_optional_bal() && !self.has_eth71_peer() { + request.send_err_response(RequestError::UnsupportedCapability); + } else { + // no peer matches this request's requirements; requeue at the back so other + // queued requests get a chance on the next poll instead of head-of-line blocking. + self.queued_requests.push_back(request); + } return PollAction::NoPeersAvailable }; @@ -1701,4 +1707,60 @@ mod tests { assert!(matches!(fetcher.poll(&mut cx), Poll::Pending)); assert_eq!(fetcher.queued_requests.len(), 1); } + + #[tokio::test] + async fn test_queued_optional_bal_request_rejected_after_eth71_disconnect() { + use futures::task::noop_waker; + use std::task::{Context, Poll}; + + let manager = PeersManager::new(PeersConfig::default()); + let mut fetcher = + StateFetcher::::new(manager.handle(), Default::default()); + + let peer_old = B512::random(); + let caps_old = Arc::new(Capabilities::new(vec![])); + fetcher.new_active_peer( + peer_old, + B256::random(), + 100, + caps_old, + Arc::new(AtomicU64::new(10)), + None, + ); + + let peer_71 = B512::random(); + let caps_71 = Arc::new(Capabilities::from(vec![Capability::new("eth".into(), 71)])); + fetcher.new_active_peer( + peer_71, + B256::random(), + 100, + caps_71, + Arc::new(AtomicU64::new(10)), + None, + ); + fetcher.peers.get_mut(&peer_71).expect("peer exists").state = PeerState::GetBlockHeaders; + + let (tx, rx) = oneshot::channel(); + fetcher + .download_requests_tx + .send(DownloadRequest::GetBlockAccessLists { + request: vec![], + response: tx, + priority: Priority::Normal, + requirement: BalRequirement::Optional, + }) + .unwrap(); + + let waker = noop_waker(); + let mut cx = Context::from_waker(&waker); + + assert!(matches!(fetcher.poll(&mut cx), Poll::Pending)); + assert_eq!(fetcher.queued_requests.len(), 1); + + fetcher.on_session_closed(&peer_71); + + assert!(matches!(fetcher.poll(&mut cx), Poll::Pending)); + assert!(fetcher.queued_requests.is_empty()); + assert_eq!(rx.await.unwrap().unwrap_err(), RequestError::UnsupportedCapability); + } }