Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 115 additions & 3 deletions crates/net/network/src/fetch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,15 @@ impl<N: NetworkPrimitives> StateFetcher<N> {
peer.state = req.peer_state();
}

self.prepare_inflight_block_request(peer_id, req)
}

/// Tracks an inflight request and converts it into a peer request.
fn prepare_inflight_block_request(
&mut self,
peer_id: PeerId,
req: DownloadRequest<N>,
) -> BlockRequest {
match req {
DownloadRequest::GetBlockHeaders { request, response, .. } => {
let inflight = Request { request: request.clone(), response };
Expand Down Expand Up @@ -318,12 +327,23 @@ impl<N: NetworkPrimitives> StateFetcher<N> {
}
}

/// Returns a new followup request for the peer.
/// Returns a queued followup request the peer can serve.
///
/// This is an immediate scheduling shortcut after a successful response. It skips queued
/// requests whose hard requirements do not match this peer, leaving them for the regular peer
/// selection path.
///
/// Caution: this expects that the peer is _not_ closed.
fn followup_request(&mut self, peer_id: PeerId) -> Option<BlockResponseOutcome> {
let req = self.queued_requests.pop_front()?;
let req = self.prepare_block_request(peer_id, req);
let peer = self.peers.get_mut(&peer_id)?;
let req_idx = self.queued_requests.iter().position(|req| {
// Find the first queued request this peer can serve.
peer.satisfies(&req.best_peer_requirements())
})?;
let req = self.queued_requests.remove(req_idx).expect("valid request index");

peer.state = req.peer_state();
let req = self.prepare_inflight_block_request(peer_id, req);
Some(BlockResponseOutcome::Request(peer_id, req))
}

Expand Down Expand Up @@ -1449,6 +1469,98 @@ mod tests {
assert!(matches!(outcome, Some(BlockResponseOutcome::Request(pid, _)) if pid == peer_id));
}

#[tokio::test]
async fn test_followup_skips_request_peer_cannot_serve() {
let (mut fetcher, peer_id) = fetcher_with_peer();

let peer_71 = B512::random();
let caps_71 = Arc::new(Capabilities::from(vec![Capability::new("eth".into(), 71)]));
fetcher.new_active_peer(
peer_71,
B256::random(),
100,
caps_71,
Arc::new(AtomicU64::new(10)),
None,
);
fetcher.peers.get_mut(&peer_71).expect("peer exists").state = PeerState::GetBlockHeaders;

let (followup_tx, _followup_rx) = oneshot::channel();
fetcher.queued_requests.push_back(DownloadRequest::GetBlockAccessLists {
request: vec![B256::random()],
response: followup_tx,
priority: Priority::Normal,
requirement: BalRequirement::Optional,
});

let _rx = insert_inflight_receipts(&mut fetcher, peer_id);

let resp = ReceiptsResponse::new(vec![vec![]]);
assert!(fetcher.on_receipts_response(peer_id, Ok(resp)).is_none());
assert!(fetcher.peers[&peer_id].state.is_idle());
assert!(!fetcher.inflight_bals_requests.contains_key(&peer_id));
assert!(matches!(
fetcher.queued_requests.front(),
Some(DownloadRequest::GetBlockAccessLists {
requirement: BalRequirement::Optional,
..
})
));
}

#[tokio::test]
async fn test_followup_uses_first_satisfiable_request() {
let (mut fetcher, peer_id) = fetcher_with_peer();

let peer_71 = B512::random();
let caps_71 = Arc::new(Capabilities::from(vec![Capability::new("eth".into(), 71)]));
fetcher.new_active_peer(
peer_71,
B256::random(),
100,
caps_71,
Arc::new(AtomicU64::new(10)),
None,
);
fetcher.peers.get_mut(&peer_71).expect("peer exists").state = PeerState::GetBlockHeaders;

let (bal_tx, _bal_rx) = oneshot::channel();
fetcher.queued_requests.push_back(DownloadRequest::GetBlockAccessLists {
request: vec![B256::random()],
response: bal_tx,
priority: Priority::Normal,
requirement: BalRequirement::Optional,
});

let (bodies_tx, _bodies_rx) = oneshot::channel();
fetcher.queued_requests.push_back(DownloadRequest::GetBlockBodies {
request: vec![B256::random()],
response: bodies_tx,
priority: Priority::Normal,
range_hint: None,
});

let _rx = insert_inflight_receipts(&mut fetcher, peer_id);

let resp = ReceiptsResponse::new(vec![vec![]]);
let outcome = fetcher.on_receipts_response(peer_id, Ok(resp));

assert!(matches!(
outcome,
Some(BlockResponseOutcome::Request(pid, BlockRequest::GetBlockBodies(_))) if pid == peer_id
));
assert!(fetcher.inflight_bodies_requests.contains_key(&peer_id));
assert!(matches!(fetcher.peers[&peer_id].state, PeerState::GetBlockBodies));
assert_eq!(fetcher.queued_requests.len(), 1);
assert!(matches!(
fetcher.queued_requests.front(),
Some(DownloadRequest::GetBlockAccessLists {
requirement: BalRequirement::Optional,
..
})
));
}

#[tokio::test]
async fn test_prepare_block_request_creates_inflight_receipts() {
let (mut fetcher, peer_id) = fetcher_with_peer();
Expand Down
Loading