diff --git a/crates/net/network/src/fetch/mod.rs b/crates/net/network/src/fetch/mod.rs index e284ffda35d..c160dc76e6d 100644 --- a/crates/net/network/src/fetch/mod.rs +++ b/crates/net/network/src/fetch/mod.rs @@ -288,6 +288,15 @@ impl StateFetcher { peer.state = req.peer_state(); } + self.prepare_inflight_block_request(peer_id, req) + } + + /// Tracks an inflight request and converts it into a peer request. + fn prepare_inflight_block_request( + &mut self, + peer_id: PeerId, + req: DownloadRequest, + ) -> BlockRequest { match req { DownloadRequest::GetBlockHeaders { request, response, .. } => { let inflight = Request { request: request.clone(), response }; @@ -318,12 +327,23 @@ impl StateFetcher { } } - /// Returns a new followup request for the peer. + /// Returns a queued followup request the peer can serve. + /// + /// This is an immediate scheduling shortcut after a successful response. It skips queued + /// requests whose hard requirements do not match this peer, leaving them for the regular peer + /// selection path. /// /// Caution: this expects that the peer is _not_ closed. fn followup_request(&mut self, peer_id: PeerId) -> Option { - let req = self.queued_requests.pop_front()?; - let req = self.prepare_block_request(peer_id, req); + let peer = self.peers.get_mut(&peer_id)?; + let req_idx = self.queued_requests.iter().position(|req| { + // Find the first queued request this peer can serve. + peer.satisfies(&req.best_peer_requirements()) + })?; + let req = self.queued_requests.remove(req_idx).expect("valid request index"); + + peer.state = req.peer_state(); + let req = self.prepare_inflight_block_request(peer_id, req); Some(BlockResponseOutcome::Request(peer_id, req)) } @@ -1449,6 +1469,98 @@ mod tests { assert!(matches!(outcome, Some(BlockResponseOutcome::Request(pid, _)) if pid == peer_id)); } + #[tokio::test] + async fn test_followup_skips_request_peer_cannot_serve() { + let (mut fetcher, peer_id) = fetcher_with_peer(); + + let peer_71 = B512::random(); + let caps_71 = Arc::new(Capabilities::from(vec![Capability::new("eth".into(), 71)])); + fetcher.new_active_peer( + peer_71, + B256::random(), + 100, + caps_71, + Arc::new(AtomicU64::new(10)), + None, + ); + fetcher.peers.get_mut(&peer_71).expect("peer exists").state = PeerState::GetBlockHeaders; + + let (followup_tx, _followup_rx) = oneshot::channel(); + fetcher.queued_requests.push_back(DownloadRequest::GetBlockAccessLists { + request: vec![B256::random()], + response: followup_tx, + priority: Priority::Normal, + requirement: BalRequirement::Optional, + }); + + let _rx = insert_inflight_receipts(&mut fetcher, peer_id); + + let resp = ReceiptsResponse::new(vec![vec![]]); + assert!(fetcher.on_receipts_response(peer_id, Ok(resp)).is_none()); + assert!(fetcher.peers[&peer_id].state.is_idle()); + assert!(!fetcher.inflight_bals_requests.contains_key(&peer_id)); + assert!(matches!( + fetcher.queued_requests.front(), + Some(DownloadRequest::GetBlockAccessLists { + requirement: BalRequirement::Optional, + .. + }) + )); + } + + #[tokio::test] + async fn test_followup_uses_first_satisfiable_request() { + let (mut fetcher, peer_id) = fetcher_with_peer(); + + let peer_71 = B512::random(); + let caps_71 = Arc::new(Capabilities::from(vec![Capability::new("eth".into(), 71)])); + fetcher.new_active_peer( + peer_71, + B256::random(), + 100, + caps_71, + Arc::new(AtomicU64::new(10)), + None, + ); + fetcher.peers.get_mut(&peer_71).expect("peer exists").state = PeerState::GetBlockHeaders; + + let (bal_tx, _bal_rx) = oneshot::channel(); + fetcher.queued_requests.push_back(DownloadRequest::GetBlockAccessLists { + request: vec![B256::random()], + response: bal_tx, + priority: Priority::Normal, + requirement: BalRequirement::Optional, + }); + + let (bodies_tx, _bodies_rx) = oneshot::channel(); + fetcher.queued_requests.push_back(DownloadRequest::GetBlockBodies { + request: vec![B256::random()], + response: bodies_tx, + priority: Priority::Normal, + range_hint: None, + }); + + let _rx = insert_inflight_receipts(&mut fetcher, peer_id); + + let resp = ReceiptsResponse::new(vec![vec![]]); + let outcome = fetcher.on_receipts_response(peer_id, Ok(resp)); + + assert!(matches!( + outcome, + Some(BlockResponseOutcome::Request(pid, BlockRequest::GetBlockBodies(_))) if pid == peer_id + )); + assert!(fetcher.inflight_bodies_requests.contains_key(&peer_id)); + assert!(matches!(fetcher.peers[&peer_id].state, PeerState::GetBlockBodies)); + assert_eq!(fetcher.queued_requests.len(), 1); + assert!(matches!( + fetcher.queued_requests.front(), + Some(DownloadRequest::GetBlockAccessLists { + requirement: BalRequirement::Optional, + .. + }) + )); + } + #[tokio::test] async fn test_prepare_block_request_creates_inflight_receipts() { let (mut fetcher, peer_id) = fetcher_with_peer();