Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 25 additions & 11 deletions beacon_node/network/src/sync/block_lookups/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1319,14 +1319,28 @@ impl TestRig {
});
}

fn assert_sampling_request_status(
&self,
block_root: Hash256,
ongoing: &Vec<ColumnIndex>,
no_peers: &Vec<ColumnIndex>,
) {
self.sync_manager
.assert_sampling_request_status(block_root, ongoing, no_peers)
fn assert_sampling_request_ongoing(&self, block_root: Hash256, indices: &Vec<ColumnIndex>) {
for index in indices {
let status = self
.sync_manager
.get_sampling_request_status(block_root, index)
.unwrap_or_else(|| panic!("No request state for {index}"));
if !matches!(status, crate::sync::peer_sampling::Status::Sampling { .. }) {
panic!("expected {block_root} {index} request to be on going: {status:?}");
}
}
}

fn assert_sampling_request_nopeers(&self, block_root: Hash256, indices: &Vec<ColumnIndex>) {
for index in indices {
let status = self
.sync_manager
.get_sampling_request_status(block_root, index)
.unwrap_or_else(|| panic!("No request state for {index}"));
if !matches!(status, crate::sync::peer_sampling::Status::NoPeers { .. }) {
panic!("expected {block_root} {index} request to be no peers: {status:?}");
}
}
}
}

Expand Down Expand Up @@ -2099,7 +2113,7 @@ fn sampling_batch_requests() {
.pop()
.unwrap();
assert_eq!(column_indexes.len(), SAMPLING_REQUIRED_SUCCESSES);
r.assert_sampling_request_status(block_root, &column_indexes, &vec![]);
r.assert_sampling_request_ongoing(block_root, &column_indexes);

// Resolve the request.
r.complete_valid_sampling_column_requests(
Expand Down Expand Up @@ -2127,7 +2141,7 @@ fn sampling_batch_requests_not_enough_responses_returned() {
assert_eq!(column_indexes.len(), SAMPLING_REQUIRED_SUCCESSES);

// The request status should be set to Sampling.
r.assert_sampling_request_status(block_root, &column_indexes, &vec![]);
r.assert_sampling_request_ongoing(block_root, &column_indexes);

// Split the indexes to simulate the case where the supernode doesn't have the requested column.
let (_column_indexes_supernode_does_not_have, column_indexes_to_complete) =
Expand All @@ -2145,7 +2159,7 @@ fn sampling_batch_requests_not_enough_responses_returned() {
);

// The request status should be set to NoPeers since the supernode, the only peer, returned not enough responses.
r.assert_sampling_request_status(block_root, &vec![], &column_indexes);
r.assert_sampling_request_nopeers(block_root, &column_indexes);

// The sampling request stalls.
r.expect_empty_network();
Expand Down
10 changes: 4 additions & 6 deletions beacon_node/network/src/sync/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,14 +354,12 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}

#[cfg(test)]
pub(crate) fn assert_sampling_request_status(
pub(crate) fn get_sampling_request_status(
&self,
block_root: Hash256,
ongoing: &Vec<ColumnIndex>,
no_peers: &Vec<ColumnIndex>,
) {
self.sampling
.assert_sampling_request_status(block_root, ongoing, no_peers);
index: &ColumnIndex,
) -> Option<super::peer_sampling::Status> {
self.sampling.get_request_status(block_root, index)
}

fn network_globals(&self) -> &NetworkGlobals<T::EthSpec> {
Expand Down
35 changes: 14 additions & 21 deletions beacon_node/network/src/sync/peer_sampling.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use self::request::ActiveColumnSampleRequest;
#[cfg(test)]
pub(crate) use self::request::Status;
use super::network_context::{
DataColumnsByRootSingleBlockRequest, RpcResponseError, SyncNetworkContext,
};
Expand Down Expand Up @@ -43,15 +45,15 @@ impl<T: BeaconChainTypes> Sampling<T> {
}

#[cfg(test)]
pub fn assert_sampling_request_status(
pub fn get_request_status(
&self,
block_root: Hash256,
ongoing: &Vec<ColumnIndex>,
no_peers: &Vec<ColumnIndex>,
) {
index: &ColumnIndex,
) -> Option<self::request::Status> {
let requester = SamplingRequester::ImportedBlock(block_root);
let active_sampling_request = self.requests.get(&requester).unwrap();
active_sampling_request.assert_sampling_request_status(ongoing, no_peers);
self.requests
.get(&requester)
.and_then(|req| req.get_request_status(index))
}

/// Create a new sampling request for a known block
Expand Down Expand Up @@ -233,18 +235,8 @@ impl<T: BeaconChainTypes> ActiveSamplingRequest<T> {
}

#[cfg(test)]
pub fn assert_sampling_request_status(
&self,
ongoing: &Vec<ColumnIndex>,
no_peers: &Vec<ColumnIndex>,
) {
for idx in ongoing {
assert!(self.column_requests.get(idx).unwrap().is_ongoing());
}

for idx in no_peers {
assert!(self.column_requests.get(idx).unwrap().is_no_peers());
}
pub fn get_request_status(&self, index: &ColumnIndex) -> Option<self::request::Status> {
self.column_requests.get(index).map(|req| req.status())
}

/// Insert a downloaded column into an active sampling request. Then make progress on the
Expand Down Expand Up @@ -584,8 +576,9 @@ mod request {
peers_dont_have: HashSet<PeerId>,
}

// Exposed only for testing assertions in lookup tests
#[derive(Debug, Clone)]
enum Status {
pub(crate) enum Status {
NoPeers,
NotStarted,
Sampling(PeerId),
Expand Down Expand Up @@ -630,8 +623,8 @@ mod request {
}

#[cfg(test)]
pub(crate) fn is_no_peers(&self) -> bool {
matches!(self.status, Status::NoPeers)
pub(crate) fn status(&self) -> Status {
self.status.clone()
}

pub(crate) fn choose_peer<T: BeaconChainTypes>(
Expand Down
Loading