Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 159 additions & 33 deletions crates/net/network/tests/it/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,29 @@
//! Tests for eth related requests

use alloy_consensus::Header;
use alloy_primitives::Bytes;
use alloy_primitives::{Bytes, B256};
use rand::Rng;
use reth_eth_wire::{BlockAccessLists, EthVersion, GetBlockAccessLists, HeadersDirection};
use reth_ethereum_primitives::Block;
use reth_network::{
test_utils::{NetworkEventStream, PeerConfig, Testnet},
eth_requests::SOFT_RESPONSE_LIMIT,
test_utils::{NetworkEventStream, PeerConfig, Testnet, TestnetHandle},
BlockDownloaderProvider, NetworkEventListenerProvider,
};
use reth_network_api::{NetworkInfo, Peers};
use reth_network_p2p::{
bodies::client::BodiesClient,
error::RequestError,
headers::client::{HeadersClient, HeadersRequest},
BalRequirement, BlockAccessListsClient,
};
use reth_provider::{test_utils::MockEthProvider, BalStoreHandle, InMemoryBalStore};
use reth_transaction_pool::test_utils::{TestPool, TransactionGenerator};
use std::sync::Arc;
use tokio::sync::oneshot;

type BalTestnetHandle = TestnetHandle<Arc<MockEthProvider>, TestPool>;

#[tokio::test(flavor = "multi_thread")]
async fn test_get_body() {
reth_tracing::init_test_tracing();
Expand Down Expand Up @@ -531,53 +536,174 @@ async fn test_eth69_get_receipts() {
#[tokio::test(flavor = "multi_thread")]
async fn test_eth71_get_block_access_lists() {
reth_tracing::init_test_tracing();
let mut rng = rand::rng();
let mut mock_provider = MockEthProvider::default();
let bal_store = BalStoreHandle::new(InMemoryBalStore::default());
mock_provider.bal_store = bal_store.clone();
let mock_provider = Arc::new(mock_provider);
let (net, bal_store) = spawn_eth71_bal_testnet().await;

let mut net: Testnet<Arc<MockEthProvider>, TestPool> = Testnet::default();
let hash0 = B256::random();
let hash1 = B256::random();
let hash2 = B256::random();
let bal0 = Bytes::from_static(&[0xc1, 0x01]);
let bal2 = Bytes::from_static(&[0xc1, 0x02]);

let p0 = PeerConfig::with_protocols(mock_provider.clone(), Some(EthVersion::Eth71.into()));
net.add_peer_with_config(p0).await.unwrap();
bal_store.insert(hash0, 1, bal0.clone()).unwrap();
bal_store.insert(hash2, 3, bal2.clone()).unwrap();

let p1 = PeerConfig::with_protocols(mock_provider.clone(), Some(EthVersion::Eth71.into()));
net.add_peer_with_config(p1).await.unwrap();
let response = request_block_access_lists(&net, vec![hash0, hash1, hash2]).await;
assert_eq!(
response,
BlockAccessLists(vec![bal0, Bytes::from_static(&[alloy_rlp::EMPTY_LIST_CODE]), bal2,])
);
}

net.for_each_mut(|peer| peer.install_request_handler());
// Ensures BAL responses stop at the soft response limit while keeping the item that crosses it.
#[tokio::test(flavor = "multi_thread")]
async fn test_eth71_get_block_access_lists_respects_response_soft_limit() {
reth_tracing::init_test_tracing();
let (net, bal_store) = spawn_eth71_bal_testnet().await;

let handle0 = net.peers()[0].handle();
let mut events0 = NetworkEventStream::new(handle0.event_listener());
let handle1 = net.peers()[1].handle();
let hash0 = B256::random();
let hash1 = B256::random();
let hash2 = B256::random();
let bal0 = raw_bal_with_len(2);
let bal1 = raw_bal_with_len(SOFT_RESPONSE_LIMIT);
let bal2 = raw_bal_with_len(2);
assert!(bal0.len() + bal1.len() > SOFT_RESPONSE_LIMIT);

let _handle = net.spawn();
bal_store.insert(hash0, 1, bal0.clone()).unwrap();
bal_store.insert(hash1, 2, bal1.clone()).unwrap();
bal_store.insert(hash2, 3, bal2).unwrap();

handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
let connected = events0.next_session_established().await.unwrap();
assert_eq!(connected, *handle1.peer_id());
let response = request_block_access_lists(&net, vec![hash0, hash1, hash2]).await;

assert_eq!(response, BlockAccessLists(vec![bal0, bal1]));
}

// Ensures a single BAL larger than the soft limit is still returned.
#[tokio::test(flavor = "multi_thread")]
async fn test_eth71_get_block_access_lists_returns_single_oversized_bal() {
reth_tracing::init_test_tracing();
let (net, bal_store) = spawn_eth71_bal_testnet().await;

let hash0 = B256::random();
let hash1 = B256::random();
let bal0 = raw_bal_with_len(SOFT_RESPONSE_LIMIT + 1);
let bal1 = raw_bal_with_len(2);

bal_store.insert(hash0, 1, bal0.clone()).unwrap();
bal_store.insert(hash1, 2, bal1).unwrap();

let response = request_block_access_lists(&net, vec![hash0, hash1]).await;

assert_eq!(response, BlockAccessLists(vec![bal0]));
}

// Ensures an empty BAL request roundtrips to an empty response.
#[tokio::test(flavor = "multi_thread")]
async fn test_eth71_get_block_access_lists_empty_request() {
reth_tracing::init_test_tracing();
let (net, _) = spawn_eth71_bal_testnet().await;

let response = request_block_access_lists(&net, Vec::new()).await;

assert_eq!(response, BlockAccessLists(Vec::new()));
}

// Ensures the fetch client can request BALs through an eth/71 peer.
#[tokio::test(flavor = "multi_thread")]
async fn test_eth71_fetch_client_get_block_access_lists() {
reth_tracing::init_test_tracing();
let (net, bal_store) = spawn_eth71_bal_testnet().await;

let hash0 = rng.random();
let hash1 = rng.random();
let hash2 = rng.random();
let hash0 = B256::random();
let hash1 = B256::random();
let bal0 = Bytes::from_static(&[0xc1, 0x01]);
let bal2 = Bytes::from_static(&[0xc1, 0x02]);

bal_store.insert(hash0, 1, bal0.clone()).unwrap();
bal_store.insert(hash2, 3, bal2.clone()).unwrap();

let fetch = net.peers()[0].network().fetch_client().await.unwrap();
let response = fetch.get_block_access_lists(vec![hash0, hash1]).await.unwrap().into_data();

assert_eq!(
response,
BlockAccessLists(vec![bal0, Bytes::from_static(&[alloy_rlp::EMPTY_LIST_CODE])])
);
}

// Ensures fetch client BAL requests are rejected when no eth/71 peer is available.
#[tokio::test(flavor = "multi_thread")]
async fn test_eth70_fetch_client_rejects_optional_block_access_lists_request() {
reth_tracing::init_test_tracing();
let (net, _) = spawn_bal_testnet([EthVersion::Eth70, EthVersion::Eth70]).await;

let fetch = net.peers()[0].network().fetch_client().await.unwrap();
let err = fetch
.get_block_access_lists_with_requirement(vec![B256::random()], BalRequirement::Optional)
.await
.unwrap_err();

assert_eq!(err, RequestError::UnsupportedCapability);
}

async fn spawn_eth71_bal_testnet() -> (BalTestnetHandle, BalStoreHandle) {
spawn_bal_testnet([EthVersion::Eth71, EthVersion::Eth71]).await
}

// Spawns a BAL testnet with one peer per requested eth protocol version.
async fn spawn_bal_testnet(
versions: impl IntoIterator<Item = EthVersion>,
) -> (BalTestnetHandle, BalStoreHandle) {
let mut mock_provider = MockEthProvider::default();
let bal_store = BalStoreHandle::new(InMemoryBalStore::default());
mock_provider.bal_store = bal_store.clone();
let mock_provider = Arc::new(mock_provider);

let mut net: Testnet<Arc<MockEthProvider>, TestPool> = Testnet::default();

for version in versions {
let peer = PeerConfig::with_protocols(mock_provider.clone(), Some(version.into()));
net.add_peer_with_config(peer).await.unwrap();
}

net.for_each_mut(|peer| peer.install_request_handler());

let net = net.spawn();
net.connect_peers().await;

(net, bal_store)
}

// Sends a GetBlockAccessLists request from peer 0 to peer 1.
async fn request_block_access_lists(net: &BalTestnetHandle, hashes: Vec<B256>) -> BlockAccessLists {
let requester = &net.peers()[0];
let responder = &net.peers()[1];
let (tx, rx) = oneshot::channel();
handle0.send_request(
*handle1.peer_id(),

requester.network().send_request(
*responder.peer_id(),
reth_network::PeerRequest::GetBlockAccessLists {
request: GetBlockAccessLists(vec![hash0, hash1, hash2]),
request: GetBlockAccessLists(hashes),
response: tx,
},
);

let response = rx.await.unwrap().unwrap();
assert_eq!(
response,
BlockAccessLists(vec![bal0, Bytes::from_static(&[alloy_rlp::EMPTY_LIST_CODE]), bal2,])
);
rx.await.unwrap().unwrap()
}

// Builds a complete raw RLP list item with the requested encoded byte length.
fn raw_bal_with_len(len: usize) -> Bytes {
assert!(len > 0);

let mut payload_length = len - 1;
loop {
let header_length = alloy_rlp::Header { list: true, payload_length }.length();
let next_payload_length = len.checked_sub(header_length).unwrap();
if next_payload_length == payload_length {
break
}
payload_length = next_payload_length;
}

let mut out = Vec::with_capacity(len);
alloy_rlp::Header { list: true, payload_length }.encode(&mut out);
out.resize(len, alloy_rlp::EMPTY_LIST_CODE);
Bytes::from(out)
}
Loading