Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

53 changes: 42 additions & 11 deletions beacon_node/network/src/sync/block_lookups/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1319,14 +1319,44 @@ impl TestRig {
});
}

fn assert_sampling_request_status(
&self,
block_root: Hash256,
ongoing: &Vec<ColumnIndex>,
no_peers: &Vec<ColumnIndex>,
) {
self.sync_manager
.assert_sampling_request_status(block_root, ongoing, no_peers)
fn assert_sampling_request_ongoing(&self, block_root: Hash256, indices: &[ColumnIndex]) {
for index in indices {
let status = self
.sync_manager
.get_sampling_request_status(block_root, index)
.unwrap_or_else(|| panic!("No request state for {index}"));
if !matches!(status, crate::sync::peer_sampling::Status::Sampling { .. }) {
panic!("expected {block_root} {index} request to be on going: {status:?}");
}
}
}

fn assert_sampling_request_nopeers(&self, block_root: Hash256, indices: &[ColumnIndex]) {
for index in indices {
let status = self
.sync_manager
.get_sampling_request_status(block_root, index)
.unwrap_or_else(|| panic!("No request state for {index}"));
if !matches!(status, crate::sync::peer_sampling::Status::NoPeers { .. }) {
panic!("expected {block_root} {index} request to be no peers: {status:?}");
}
}
}

fn log_sampling_requests(&self, block_root: Hash256, indices: &[ColumnIndex]) {
let statuses = indices
.iter()
.map(|index| {
let status = self
.sync_manager
.get_sampling_request_status(block_root, index)
.unwrap_or_else(|| panic!("No request state for {index}"));
(index, status)
})
.collect::<Vec<_>>();
self.log(&format!(
"Sampling request status for {block_root}: {statuses:?}"
));
}
}

Expand Down Expand Up @@ -2099,7 +2129,7 @@ fn sampling_batch_requests() {
.pop()
.unwrap();
assert_eq!(column_indexes.len(), SAMPLING_REQUIRED_SUCCESSES);
r.assert_sampling_request_status(block_root, &column_indexes, &vec![]);
r.assert_sampling_request_ongoing(block_root, &column_indexes);

// Resolve the request.
r.complete_valid_sampling_column_requests(
Expand Down Expand Up @@ -2127,7 +2157,7 @@ fn sampling_batch_requests_not_enough_responses_returned() {
assert_eq!(column_indexes.len(), SAMPLING_REQUIRED_SUCCESSES);

// The request status should be set to Sampling.
r.assert_sampling_request_status(block_root, &column_indexes, &vec![]);
r.assert_sampling_request_ongoing(block_root, &column_indexes);

// Split the indexes to simulate the case where the supernode doesn't have the requested column.
let (_column_indexes_supernode_does_not_have, column_indexes_to_complete) =
Expand All @@ -2145,7 +2175,8 @@ fn sampling_batch_requests_not_enough_responses_returned() {
);

// The request status should be set to NoPeers since the supernode, the only peer, returned not enough responses.
r.assert_sampling_request_status(block_root, &vec![], &column_indexes);
r.log_sampling_requests(block_root, &column_indexes);
r.assert_sampling_request_nopeers(block_root, &column_indexes);

// The sampling request stalls.
r.expect_empty_network();
Expand Down
10 changes: 4 additions & 6 deletions beacon_node/network/src/sync/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,14 +354,12 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}

#[cfg(test)]
pub(crate) fn assert_sampling_request_status(
pub(crate) fn get_sampling_request_status(
&self,
block_root: Hash256,
ongoing: &Vec<ColumnIndex>,
no_peers: &Vec<ColumnIndex>,
) {
self.sampling
.assert_sampling_request_status(block_root, ongoing, no_peers);
index: &ColumnIndex,
) -> Option<super::peer_sampling::Status> {
self.sampling.get_request_status(block_root, index)
}

fn network_globals(&self) -> &NetworkGlobals<T::EthSpec> {
Expand Down
35 changes: 14 additions & 21 deletions beacon_node/network/src/sync/peer_sampling.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use self::request::ActiveColumnSampleRequest;
#[cfg(test)]
pub(crate) use self::request::Status;
use super::network_context::{
DataColumnsByRootSingleBlockRequest, RpcResponseError, SyncNetworkContext,
};
Expand Down Expand Up @@ -43,15 +45,15 @@ impl<T: BeaconChainTypes> Sampling<T> {
}

#[cfg(test)]
pub fn assert_sampling_request_status(
pub fn get_request_status(
&self,
block_root: Hash256,
ongoing: &Vec<ColumnIndex>,
no_peers: &Vec<ColumnIndex>,
) {
index: &ColumnIndex,
) -> Option<self::request::Status> {
let requester = SamplingRequester::ImportedBlock(block_root);
let active_sampling_request = self.requests.get(&requester).unwrap();
active_sampling_request.assert_sampling_request_status(ongoing, no_peers);
self.requests
.get(&requester)
.and_then(|req| req.get_request_status(index))
}

/// Create a new sampling request for a known block
Expand Down Expand Up @@ -233,18 +235,8 @@ impl<T: BeaconChainTypes> ActiveSamplingRequest<T> {
}

#[cfg(test)]
pub fn assert_sampling_request_status(
&self,
ongoing: &Vec<ColumnIndex>,
no_peers: &Vec<ColumnIndex>,
) {
for idx in ongoing {
assert!(self.column_requests.get(idx).unwrap().is_ongoing());
}

for idx in no_peers {
assert!(self.column_requests.get(idx).unwrap().is_no_peers());
}
pub fn get_request_status(&self, index: &ColumnIndex) -> Option<self::request::Status> {
self.column_requests.get(index).map(|req| req.status())
}

/// Insert a downloaded column into an active sampling request. Then make progress on the
Expand Down Expand Up @@ -584,8 +576,9 @@ mod request {
peers_dont_have: HashSet<PeerId>,
}

// Exposed only for testing assertions in lookup tests
#[derive(Debug, Clone)]
enum Status {
pub(crate) enum Status {
NoPeers,
NotStarted,
Sampling(PeerId),
Expand Down Expand Up @@ -630,8 +623,8 @@ mod request {
}

#[cfg(test)]
pub(crate) fn is_no_peers(&self) -> bool {
matches!(self.status, Status::NoPeers)
pub(crate) fn status(&self) -> Status {
self.status.clone()
}

pub(crate) fn choose_peer<T: BeaconChainTypes>(
Expand Down
4 changes: 2 additions & 2 deletions scripts/local_testnet/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ This setup can be useful for testing and development.

1. Install [Kurtosis](https://docs.kurtosis.com/install/). Verify that Kurtosis has been successfully installed by running `kurtosis version` which should display the version.

1. Install [yq](https://github.com/mikefarah/yq). If you are on Ubuntu, you can install `yq` by running `sudo apt install yq -y`.
1. Install [yq](https://github.com/mikefarah/yq). If you are on Ubuntu, you can install `yq` by running `snap install yq`.

## Starting the testnet

Expand Down Expand Up @@ -82,4 +82,4 @@ The script comes with some CLI options, which can be viewed with `./start_local_

```bash
./start_local_testnet.sh -b false
```
```
2 changes: 1 addition & 1 deletion slasher/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ mdbx = { package = "libmdbx", git = "https://github.com/sigp/libmdbx-rs", rev =
lmdb-rkv = { git = "https://github.com/sigp/lmdb-rs", rev = "f33845c6469b94265319aac0ed5085597862c27e", optional = true }
lmdb-rkv-sys = { git = "https://github.com/sigp/lmdb-rs", rev = "f33845c6469b94265319aac0ed5085597862c27e", optional = true }

redb = { version = "2.1", optional = true }
redb = { version = "2.1.4", optional = true }

[dev-dependencies]
maplit = { workspace = true }
Expand Down
12 changes: 2 additions & 10 deletions slasher/src/database/redb_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,13 +164,9 @@ impl<'env> Cursor<'env> {
let table_definition: TableDefinition<'_, &[u8], &[u8]> =
TableDefinition::new(&self.db.table_name);
let table = self.txn.open_table(table_definition)?;
let first = table
.iter()?
.next()
.map(|x| x.map(|(key, _)| key.value().to_vec()));
let first = table.first()?.map(|(key, _)| key.value().to_vec());

if let Some(owned_key) = first {
let owned_key = owned_key?;
self.current_key = Some(Cow::from(owned_key));
Ok(self.current_key.clone())
} else {
Expand All @@ -182,13 +178,9 @@ impl<'env> Cursor<'env> {
let table_definition: TableDefinition<'_, &[u8], &[u8]> =
TableDefinition::new(&self.db.table_name);
let table = self.txn.open_table(table_definition)?;
let last = table
.iter()?
.next_back()
.map(|x| x.map(|(key, _)| key.value().to_vec()));
let last = table.last()?.map(|(key, _)| key.value().to_vec());

if let Some(owned_key) = last {
let owned_key = owned_key?;
self.current_key = Some(Cow::from(owned_key));
return Ok(self.current_key.clone());
}
Expand Down
Loading