diff --git a/crates/net/network/src/fetch/client.rs b/crates/net/network/src/fetch/client.rs index cacef745e8b..bd00dd9a49b 100644 --- a/crates/net/network/src/fetch/client.rs +++ b/crates/net/network/src/fetch/client.rs @@ -6,7 +6,7 @@ use futures::{future, future::Either}; use reth_eth_wire::{BlockAccessLists, EthNetworkPrimitives, NetworkPrimitives}; use reth_network_api::test_utils::PeersHandle; use reth_network_p2p::{ - block_access_lists::client::BlockAccessListsClient, + block_access_lists::client::{BalRequirement, BlockAccessListsClient}, bodies::client::{BodiesClient, BodiesFut}, download::DownloadClient, error::{PeerRequestResult, RequestError}, @@ -135,11 +135,29 @@ impl BlockAccessListsClient for FetchClient { &self, hashes: Vec, priority: Priority, + ) -> Self::Output { + self.get_block_access_lists_with_priority_and_requirement( + hashes, + priority, + BalRequirement::Mandatory, + ) + } + + fn get_block_access_lists_with_priority_and_requirement( + &self, + hashes: Vec, + priority: Priority, + requirement: BalRequirement, ) -> Self::Output { let (response, rx) = oneshot::channel(); if self .request_tx - .send(DownloadRequest::GetBlockAccessLists { request: hashes, response, priority }) + .send(DownloadRequest::GetBlockAccessLists { + request: hashes, + response, + priority, + requirement, + }) .is_ok() { Box::pin(FlattenedResponse::from(rx)) diff --git a/crates/net/network/src/fetch/mod.rs b/crates/net/network/src/fetch/mod.rs index a4bde2d1553..e284ffda35d 100644 --- a/crates/net/network/src/fetch/mod.rs +++ b/crates/net/network/src/fetch/mod.rs @@ -13,6 +13,7 @@ use reth_eth_wire::{ }; use reth_network_api::test_utils::PeersHandle; use reth_network_p2p::{ + block_access_lists::client::BalRequirement, error::{EthResponseValidator, PeerRequestResult, RequestError, RequestResult}, headers::client::HeadersRequest, priority::Priority, @@ -159,15 +160,10 @@ impl StateFetcher { /// full history available fn next_best_peer(&self, requirement: BestPeerRequirements) -> Option { // filter out peers that aren't idle or don't meet the requirement - let mut idle = self.peers.iter().filter(|(_, peer)| { - peer.state.is_idle() && - match &requirement { - BestPeerRequirements::EthVersion(ver) => { - peer.capabilities.supports_eth_at_least(ver) - } - _ => true, - } - }); + let mut idle = self + .peers + .iter() + .filter(|(_, peer)| peer.state.is_idle() && peer.satisfies(&requirement)); let mut best_peer = idle.next()?; @@ -195,6 +191,14 @@ impl StateFetcher { Some(*best_peer.0) } + /// Returns whether any connected peer can serve BAL requests. + fn has_eth71_peer(&self) -> bool { + self.peers.values().any(|peer| { + !matches!(peer.state, PeerState::Closing) && + peer.capabilities.supports_eth_at_least(&EthVersion::Eth71) + }) + } + /// Returns the next action to return fn poll_action(&mut self) -> PollAction { // we only check and not pop here since we don't know yet whether a peer is available. @@ -208,9 +212,15 @@ impl StateFetcher { let request = self.queued_requests.pop_front().expect("not empty"); let Some(peer_id) = self.next_best_peer(request.best_peer_requirements()) else { - // no peer matches this request's requirements; requeue at the back so other - // queued requests get a chance on the next poll instead of head-of-line blocking. - self.queued_requests.push_back(request); + // Optional BAL requests can lose their eth/71 peer while queued; complete them + // instead of waiting for future peer churn. + if request.is_optional_bal() && !self.has_eth71_peer() { + request.send_err_response(RequestError::UnsupportedCapability); + } else { + // no peer matches this request's requirements; requeue at the back so other + // queued requests get a chance on the next poll instead of head-of-line blocking. + self.queued_requests.push_back(request); + } return PollAction::NoPeersAvailable }; @@ -232,21 +242,30 @@ impl StateFetcher { loop { // poll incoming requests match self.download_requests_rx.poll_next_unpin(cx) { - Poll::Ready(Some(request)) => match request.get_priority() { - Priority::High => { - // find the first normal request and queue before, add this request to - // the back of the high-priority queue - let pos = self - .queued_requests - .iter() - .position(|req| req.is_normal_priority()) - .unwrap_or(0); - self.queued_requests.insert(pos, request); + Poll::Ready(Some(request)) => { + // Optional BAL requests should not wait for future peer churn if no + // connected peer can serve them right now. + if request.is_optional_bal() && !self.has_eth71_peer() { + request.send_err_response(RequestError::UnsupportedCapability); + continue } - Priority::Normal => { - self.queued_requests.push_back(request); + + match request.get_priority() { + Priority::High => { + // find first normal request and queue before it; add this request + // to the back of the high-priority queue + let pos = self + .queued_requests + .iter() + .position(|req| req.is_normal_priority()) + .unwrap_or(0); + self.queued_requests.insert(pos, request); + } + Priority::Normal => { + self.queued_requests.push_back(request); + } } - }, + } Poll::Ready(None) => { unreachable!("channel can't close") } @@ -476,6 +495,16 @@ impl Peer { self.range_info.as_ref().map(|info| info.range()) } + /// Returns whether this peer can serve requests with the given hard requirements. + fn satisfies(&self, requirement: &BestPeerRequirements) -> bool { + match requirement { + BestPeerRequirements::EthVersion(ver) => self.capabilities.supports_eth_at_least(ver), + BestPeerRequirements::None | + BestPeerRequirements::FullBlock | + BestPeerRequirements::FullBlockRange(_) => true, + } + } + /// Returns true if this peer has a better range than the other peer for serving the requested /// range. /// @@ -602,6 +631,7 @@ pub(crate) enum DownloadRequest { request: Vec, response: oneshot::Sender>, priority: Priority, + requirement: BalRequirement, }, /// Download receipts for the given block hashes and send response through channel GetReceipts { @@ -639,6 +669,21 @@ impl DownloadRequest { self.get_priority().is_normal() } + /// Returns `true` if this is an optional BAL request. + const fn is_optional_bal(&self) -> bool { + matches!(self, Self::GetBlockAccessLists { requirement: BalRequirement::Optional, .. }) + } + + /// Sends an error response to the waiting caller. + fn send_err_response(self, err: RequestError) { + let _ = match self { + Self::GetBlockHeaders { response, .. } => response.send(Err(err)).ok(), + Self::GetBlockBodies { response, .. } => response.send(Err(err)).ok(), + Self::GetBlockAccessLists { response, .. } => response.send(Err(err)).ok(), + Self::GetReceipts { response, .. } => response.send(Err(err)).ok(), + }; + } + /// Returns the best peer requirements for this request. fn best_peer_requirements(&self) -> BestPeerRequirements { match self { @@ -1541,6 +1586,7 @@ mod tests { request: vec![], response: tx, priority: Priority::Normal, + requirement: BalRequirement::Mandatory, }); let waker = noop_waker(); @@ -1583,4 +1629,138 @@ mod tests { assert_eq!(peer_id, peer_71); } } + + #[tokio::test] + async fn test_optional_bal_request_rejected_without_eth71_peer() { + use futures::task::noop_waker; + use std::task::{Context, Poll}; + + let manager = PeersManager::new(PeersConfig::default()); + let mut fetcher = + StateFetcher::::new(manager.handle(), Default::default()); + + let peer_old = B512::random(); + let caps_old = Arc::new(Capabilities::new(vec![])); + fetcher.new_active_peer( + peer_old, + B256::random(), + 100, + caps_old, + Arc::new(AtomicU64::new(10)), + None, + ); + + let (tx, rx) = oneshot::channel(); + fetcher + .download_requests_tx + .send(DownloadRequest::GetBlockAccessLists { + request: vec![], + response: tx, + priority: Priority::Normal, + requirement: BalRequirement::Optional, + }) + .unwrap(); + + let waker = noop_waker(); + let mut cx = Context::from_waker(&waker); + + assert!(matches!(fetcher.poll(&mut cx), Poll::Pending)); + assert!(fetcher.queued_requests.is_empty()); + assert_eq!(rx.await.unwrap().unwrap_err(), RequestError::UnsupportedCapability); + } + + #[tokio::test] + async fn test_optional_bal_request_waits_for_busy_eth71_peer() { + use futures::task::noop_waker; + use std::task::{Context, Poll}; + + let manager = PeersManager::new(PeersConfig::default()); + let mut fetcher = + StateFetcher::::new(manager.handle(), Default::default()); + + let peer_71 = B512::random(); + let caps_71 = Arc::new(Capabilities::from(vec![Capability::new("eth".into(), 71)])); + fetcher.new_active_peer( + peer_71, + B256::random(), + 100, + caps_71, + Arc::new(AtomicU64::new(10)), + None, + ); + fetcher.peers.get_mut(&peer_71).expect("peer exists").state = PeerState::GetBlockHeaders; + + let (tx, _rx) = oneshot::channel(); + fetcher + .download_requests_tx + .send(DownloadRequest::GetBlockAccessLists { + request: vec![], + response: tx, + priority: Priority::Normal, + requirement: BalRequirement::Optional, + }) + .unwrap(); + + let waker = noop_waker(); + let mut cx = Context::from_waker(&waker); + + assert!(matches!(fetcher.poll(&mut cx), Poll::Pending)); + assert_eq!(fetcher.queued_requests.len(), 1); + } + + #[tokio::test] + async fn test_queued_optional_bal_request_rejected_after_eth71_disconnect() { + use futures::task::noop_waker; + use std::task::{Context, Poll}; + + let manager = PeersManager::new(PeersConfig::default()); + let mut fetcher = + StateFetcher::::new(manager.handle(), Default::default()); + + let peer_old = B512::random(); + let caps_old = Arc::new(Capabilities::new(vec![])); + fetcher.new_active_peer( + peer_old, + B256::random(), + 100, + caps_old, + Arc::new(AtomicU64::new(10)), + None, + ); + + let peer_71 = B512::random(); + let caps_71 = Arc::new(Capabilities::from(vec![Capability::new("eth".into(), 71)])); + fetcher.new_active_peer( + peer_71, + B256::random(), + 100, + caps_71, + Arc::new(AtomicU64::new(10)), + None, + ); + fetcher.peers.get_mut(&peer_71).expect("peer exists").state = PeerState::GetBlockHeaders; + + let (tx, rx) = oneshot::channel(); + fetcher + .download_requests_tx + .send(DownloadRequest::GetBlockAccessLists { + request: vec![], + response: tx, + priority: Priority::Normal, + requirement: BalRequirement::Optional, + }) + .unwrap(); + + let waker = noop_waker(); + let mut cx = Context::from_waker(&waker); + + assert!(matches!(fetcher.poll(&mut cx), Poll::Pending)); + assert_eq!(fetcher.queued_requests.len(), 1); + + fetcher.on_session_closed(&peer_71); + + assert!(matches!(fetcher.poll(&mut cx), Poll::Pending)); + assert!(fetcher.queued_requests.is_empty()); + assert_eq!(rx.await.unwrap().unwrap_err(), RequestError::UnsupportedCapability); + } } diff --git a/crates/net/p2p/src/block_access_lists/client.rs b/crates/net/p2p/src/block_access_lists/client.rs index 80cbff3909f..cc9301d91bc 100644 --- a/crates/net/p2p/src/block_access_lists/client.rs +++ b/crates/net/p2p/src/block_access_lists/client.rs @@ -4,6 +4,17 @@ use auto_impl::auto_impl; use futures::Future; use reth_eth_wire_types::BlockAccessLists; +/// Controls whether a BAL request must wait for a capable peer or may complete early when none are +/// available. +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] +pub enum BalRequirement { + /// Keep waiting until an eth/71-capable peer is available. + #[default] + Mandatory, + /// Return early if no connected peer can serve BALs. + Optional, +} + /// A client capable of downloading block access lists. #[auto_impl(&, Arc, Box)] pub trait BlockAccessListsClient: DownloadClient { @@ -12,7 +23,24 @@ pub trait BlockAccessListsClient: DownloadClient { /// Fetches the block access lists for given hashes. fn get_block_access_lists(&self, hashes: Vec) -> Self::Output { - self.get_block_access_lists_with_priority(hashes, Priority::Normal) + self.get_block_access_lists_with_priority_and_requirement( + hashes, + Priority::Normal, + BalRequirement::Mandatory, + ) + } + + /// Fetches the block access lists for given hashes with the requested BAL availability policy. + fn get_block_access_lists_with_requirement( + &self, + hashes: Vec, + requirement: BalRequirement, + ) -> Self::Output { + self.get_block_access_lists_with_priority_and_requirement( + hashes, + Priority::Normal, + requirement, + ) } /// Fetches the block access lists for given hashes with priority @@ -20,5 +48,19 @@ pub trait BlockAccessListsClient: DownloadClient { &self, hashes: Vec, priority: Priority, + ) -> Self::Output { + self.get_block_access_lists_with_priority_and_requirement( + hashes, + priority, + BalRequirement::Mandatory, + ) + } + + /// Fetches the block access lists for given hashes with priority and BAL availability policy. + fn get_block_access_lists_with_priority_and_requirement( + &self, + hashes: Vec, + priority: Priority, + requirement: BalRequirement, ) -> Self::Output; } diff --git a/crates/net/p2p/src/error.rs b/crates/net/p2p/src/error.rs index d650763b4eb..609121cad5e 100644 --- a/crates/net/p2p/src/error.rs +++ b/crates/net/p2p/src/error.rs @@ -57,8 +57,8 @@ impl EthResponseValidator for RequestResult> { /// [`RequestError::ConnectionDropped`] should be ignored here because this is already handled /// when the dropped connection is handled. /// - /// [`RequestError::UnsupportedCapability`] is not used yet because we only support active - /// session for eth protocol. + /// [`RequestError::UnsupportedCapability`] is also used for locally rejected optional requests, + /// which should not affect peer reputation. fn reputation_change_err(&self) -> Option { if let Err(err) = self { match err { diff --git a/crates/net/p2p/src/full_block.rs b/crates/net/p2p/src/full_block.rs index a52df8402e1..45bedcbd39c 100644 --- a/crates/net/p2p/src/full_block.rs +++ b/crates/net/p2p/src/full_block.rs @@ -1110,10 +1110,11 @@ mod tests { impl BlockAccessListsClient for FullBlockWithAccessListsClient { type Output = futures::future::Ready>; - fn get_block_access_lists_with_priority( + fn get_block_access_lists_with_priority_and_requirement( &self, hashes: Vec, _priority: Priority, + _requirement: crate::block_access_lists::client::BalRequirement, ) -> Self::Output { self.access_list_requests.fetch_add(1, Ordering::SeqCst); diff --git a/crates/net/p2p/src/lib.rs b/crates/net/p2p/src/lib.rs index 4107f68faa0..4457fdd9f9d 100644 --- a/crates/net/p2p/src/lib.rs +++ b/crates/net/p2p/src/lib.rs @@ -54,7 +54,7 @@ pub mod snap; #[cfg(any(test, feature = "test-utils"))] pub mod test_utils; -pub use block_access_lists::client::BlockAccessListsClient; +pub use block_access_lists::client::{BalRequirement, BlockAccessListsClient}; pub use bodies::client::BodiesClient; pub use headers::client::HeadersClient; pub use receipts::client::ReceiptsClient;