diff --git a/Cargo.lock b/Cargo.lock index 4581fb9ce05..9dcd428123d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6043,6 +6043,7 @@ dependencies = [ "hex", "igd-next 0.16.0", "itertools 0.10.5", + "k256 0.13.4", "kzg", "lighthouse_network", "logging", @@ -6052,6 +6053,7 @@ dependencies = [ "operation_pool", "parking_lot 0.12.3", "rand 0.8.5", + "rand_chacha 0.3.1", "serde_json", "slog", "slog-async", diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs index 37cb5df6ea5..8e5d6121e04 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs @@ -689,8 +689,8 @@ impl PeerDB { &mut self, supernode: bool, spec: &ChainSpec, + enr_key: CombinedKey, ) -> PeerId { - let enr_key = CombinedKey::generate_secp256k1(); let mut enr = Enr::builder().build(&enr_key).unwrap(); let peer_id = enr.peer_id(); diff --git a/beacon_node/network/Cargo.toml b/beacon_node/network/Cargo.toml index 09179c4a516..5071e247a32 100644 --- a/beacon_node/network/Cargo.toml +++ b/beacon_node/network/Cargo.toml @@ -10,8 +10,10 @@ eth2 = { workspace = true } eth2_network_config = { workspace = true } genesis = { workspace = true } gossipsub = { workspace = true } +k256 = "0.13.4" kzg = { workspace = true } matches = "0.1.8" +rand_chacha = "0.3.1" serde_json = { workspace = true } slog-async = { workspace = true } slog-term = { workspace = true } diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index fc31e837277..14702d3536c 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -344,6 +344,16 @@ impl SyncManager { self.range_sync.state() } + #[cfg(test)] + pub(crate) fn range_sync_state(&self) -> super::range_sync::SyncChainStatus { + self.range_sync.state() + } + + #[cfg(test)] + pub(crate) fn __range_failed_chains(&mut self) -> Vec { + self.range_sync.__failed_chains() + } + #[cfg(test)] pub(crate) fn get_failed_chains(&mut self) -> Vec { self.block_lookups.get_failed_chains() @@ -368,11 +378,6 @@ impl SyncManager { self.sampling.get_request_status(block_root, index) } - #[cfg(test)] - pub(crate) fn range_sync_state(&self) -> super::range_sync::SyncChainStatus { - self.range_sync.state() - } - #[cfg(test)] pub(crate) fn update_execution_engine_state(&mut self, state: EngineState) { self.handle_new_execution_engine_state(state); diff --git a/beacon_node/network/src/sync/range_sync/chain_collection.rs b/beacon_node/network/src/sync/range_sync/chain_collection.rs index 15bdf85e203..40285309469 100644 --- a/beacon_node/network/src/sync/range_sync/chain_collection.rs +++ b/beacon_node/network/src/sync/range_sync/chain_collection.rs @@ -477,7 +477,7 @@ impl ChainCollection { .find(|(_, chain)| chain.has_same_target(target_head_slot, target_head_root)) { Some((&id, chain)) => { - debug!(self.log, "Adding peer to known chain"; "peer_id" => %peer, "sync_type" => ?sync_type, &chain); + debug!(self.log, "Adding peer to known chain"; "peer_id" => %peer, "sync_type" => ?sync_type, "id" => id); debug_assert_eq!(chain.target_head_root, target_head_root); debug_assert_eq!(chain.target_head_slot, target_head_slot); if let Err(remove_reason) = chain.add_peer(network, peer) { diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index 78679403bb4..38b032136cb 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -94,6 +94,11 @@ where } } + #[cfg(test)] + pub(crate) fn __failed_chains(&mut self) -> Vec { + self.failed_chains.keys().copied().collect() + } + pub fn state(&self) -> SyncChainStatus { self.chains.state() } diff --git a/beacon_node/network/src/sync/tests/lookups.rs b/beacon_node/network/src/sync/tests/lookups.rs index 9ab581950c9..ea20141df63 100644 --- a/beacon_node/network/src/sync/tests/lookups.rs +++ b/beacon_node/network/src/sync/tests/lookups.rs @@ -27,6 +27,7 @@ use beacon_chain::{ PayloadVerificationOutcome, PayloadVerificationStatus, }; use beacon_processor::WorkEvent; +use lighthouse_network::discovery::CombinedKey; use lighthouse_network::{ rpc::{RPCError, RequestType, RpcErrorResponse}, service::api_types::{ @@ -39,18 +40,16 @@ use lighthouse_network::{ use slog::info; use slot_clock::{SlotClock, TestingSlotClock}; use tokio::sync::mpsc; -use types::ForkContext; use types::{ data_column_sidecar::ColumnIndex, test_utils::{SeedableRng, TestRandom, XorShiftRng}, - BeaconState, BeaconStateBase, BlobSidecar, DataColumnSidecar, EthSpec, ForkName, Hash256, - MinimalEthSpec as E, SignedBeaconBlock, Slot, + BeaconState, BeaconStateBase, BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, ForkName, + Hash256, MinimalEthSpec as E, SignedBeaconBlock, Slot, }; const D: Duration = Duration::new(0, 0); const PARENT_FAIL_TOLERANCE: u8 = SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS; const SAMPLING_REQUIRED_SUCCESSES: usize = 2; - type DCByRootIds = Vec; type DCByRootId = (SyncRequestId, Vec); @@ -117,7 +116,9 @@ impl TestRig { let spec = chain.spec.clone(); - let rng = XorShiftRng::from_seed([42; 16]); + // deterministic seed + let rng = ChaCha20Rng::from_seed([0u8; 32]); + TestRig { beacon_processor_rx, beacon_processor_rx_queue: vec![], @@ -154,7 +155,7 @@ impl TestRig { } } - fn test_setup_after_fulu() -> Option { + pub fn test_setup_after_fulu() -> Option { let r = Self::test_setup(); if r.fork_name.fulu_enabled() { Some(r) @@ -369,20 +370,26 @@ impl TestRig { } pub fn new_connected_peer(&mut self) -> PeerId { + let key = self.determinstic_key(); self.network_globals .peers .write() - .__add_connected_peer_testing_only(false, &self.harness.spec) + .__add_connected_peer_testing_only(false, &self.harness.spec, key) } pub fn new_connected_supernode_peer(&mut self) -> PeerId { + let key = self.determinstic_key(); self.network_globals .peers .write() - .__add_connected_peer_testing_only(true, &self.harness.spec) + .__add_connected_peer_testing_only(true, &self.harness.spec, key) + } + + fn determinstic_key(&mut self) -> CombinedKey { + k256::ecdsa::SigningKey::random(&mut self.rng).into() } - fn new_connected_peers_for_peerdas(&mut self) { + pub fn new_connected_peers_for_peerdas(&mut self) { // Enough sampling peers with few columns for _ in 0..100 { self.new_connected_peer(); @@ -1113,7 +1120,7 @@ impl TestRig { } #[track_caller] - fn expect_empty_network(&mut self) { + pub fn expect_empty_network(&mut self) { self.drain_network_rx(); if !self.network_rx_queue.is_empty() { let n = self.network_rx_queue.len(); diff --git a/beacon_node/network/src/sync/tests/mod.rs b/beacon_node/network/src/sync/tests/mod.rs index 6ed5c7f8fab..ef2bec80b80 100644 --- a/beacon_node/network/src/sync/tests/mod.rs +++ b/beacon_node/network/src/sync/tests/mod.rs @@ -7,12 +7,13 @@ use beacon_chain::eth1_chain::CachingEth1Backend; use beacon_chain::test_utils::{BeaconChainHarness, EphemeralHarnessType}; use beacon_processor::WorkEvent; use lighthouse_network::NetworkGlobals; +use rand_chacha::ChaCha20Rng; use slog::Logger; use slot_clock::ManualSlotClock; use std::sync::Arc; use store::MemoryStore; use tokio::sync::mpsc; -use types::{test_utils::XorShiftRng, ChainSpec, ForkName, MinimalEthSpec as E}; +use types::{ChainSpec, ForkName, MinimalEthSpec as E}; mod lookups; mod range; @@ -61,7 +62,7 @@ struct TestRig { /// Beacon chain harness harness: BeaconChainHarness>, /// `rng` for generating test blocks and blobs. - rng: XorShiftRng, + rng: ChaCha20Rng, fork_name: ForkName, log: Logger, spec: Arc, diff --git a/beacon_node/network/src/sync/tests/range.rs b/beacon_node/network/src/sync/tests/range.rs index f78b44308d1..ddd4626cce6 100644 --- a/beacon_node/network/src/sync/tests/range.rs +++ b/beacon_node/network/src/sync/tests/range.rs @@ -1,11 +1,14 @@ use super::*; +use crate::network_beacon_processor::ChainSegmentProcessId; use crate::status::ToStatusMessage; use crate::sync::manager::SLOT_IMPORT_TOLERANCE; +use crate::sync::network_context::RangeRequestId; use crate::sync::range_sync::RangeSyncType; use crate::sync::SyncMessage; use beacon_chain::data_column_verification::CustodyDataColumn; use beacon_chain::test_utils::{AttestationStrategy, BlockStrategy}; use beacon_chain::{block_verification_types::RpcBlock, EngineState, NotifyExecutionLayer}; +use beacon_processor::WorkType; use lighthouse_network::rpc::methods::{ BlobsByRangeRequest, DataColumnsByRangeRequest, OldBlocksByRangeRequest, OldBlocksByRangeRequestV2, @@ -18,8 +21,8 @@ use lighthouse_network::service::api_types::{ use lighthouse_network::{PeerId, SyncInfo}; use std::time::Duration; use types::{ - BlobSidecarList, BlockImportSource, EthSpec, Hash256, MinimalEthSpec as E, SignedBeaconBlock, - SignedBeaconBlockHash, Slot, + BlobSidecarList, BlockImportSource, Epoch, EthSpec, Hash256, MinimalEthSpec as E, + SignedBeaconBlock, SignedBeaconBlockHash, Slot, }; const D: Duration = Duration::new(0, 0); @@ -43,7 +46,7 @@ enum ByRangeDataRequestIds { /// To make writting tests succint, the machinery in this testing rig automatically identifies /// _which_ request to complete. Picking the right request is critical for tests to pass, so this /// filter allows better expressivity on the criteria to identify the right request. -#[derive(Default)] +#[derive(Default, Debug, Clone)] struct RequestFilter { peer: Option, epoch: Option, @@ -74,7 +77,7 @@ impl TestRig { /// Produce a head peer with an advanced head fn add_head_peer_with_root(&mut self, head_root: Hash256) -> PeerId { let local_info = self.local_info(); - self.add_peer(SyncInfo { + self.add_random_peer(SyncInfo { head_root, head_slot: local_info.head_slot + 1 + Slot::new(SLOT_IMPORT_TOLERANCE as u64), ..local_info @@ -90,7 +93,7 @@ impl TestRig { fn add_finalized_peer_with_root(&mut self, finalized_root: Hash256) -> PeerId { let local_info = self.local_info(); let finalized_epoch = local_info.finalized_epoch + 2; - self.add_peer(SyncInfo { + self.add_random_peer(SyncInfo { finalized_epoch, finalized_root, head_slot: finalized_epoch.start_slot(E::slots_per_epoch()), @@ -98,6 +101,17 @@ impl TestRig { }) } + fn finalized_remote_info_advanced_by(&self, advanced_epochs: Epoch) -> SyncInfo { + let local_info = self.local_info(); + let finalized_epoch = local_info.finalized_epoch + advanced_epochs; + SyncInfo { + finalized_epoch, + finalized_root: Hash256::random(), + head_slot: finalized_epoch.start_slot(E::slots_per_epoch()), + head_root: Hash256::random(), + } + } + fn local_info(&self) -> SyncInfo { let StatusMessage { fork_digest: _, @@ -114,28 +128,59 @@ impl TestRig { } } - fn add_peer(&mut self, remote_info: SyncInfo) -> PeerId { + fn add_random_peer_not_supernode(&mut self, remote_info: SyncInfo) -> PeerId { + let peer_id = self.new_connected_peer(); + self.send_sync_message(SyncMessage::AddPeer(peer_id, remote_info)); + peer_id + } + + fn add_random_peer(&mut self, remote_info: SyncInfo) -> PeerId { // Create valid peer known to network globals // TODO(fulu): Using supernode peers to ensure we have peer across all column // subnets for syncing. Should add tests connecting to full node peers. let peer_id = self.new_connected_supernode_peer(); // Send peer to sync - self.send_sync_message(SyncMessage::AddPeer(peer_id, remote_info.clone())); + self.send_sync_message(SyncMessage::AddPeer(peer_id, remote_info)); peer_id } + fn add_random_peers(&mut self, remote_info: SyncInfo, count: usize) { + for _ in 0..count { + let peer = self.new_connected_peer(); + self.add_peer(peer, remote_info.clone()); + } + } + + fn add_peer(&mut self, peer: PeerId, remote_info: SyncInfo) { + self.send_sync_message(SyncMessage::AddPeer(peer, remote_info)); + } + fn assert_state(&self, state: RangeSyncType) { assert_eq!( self.sync_manager .range_sync_state() .expect("State is ok") - .expect("Range should be syncing") + .expect("Range should be syncing, there are no chains") .0, state, "not expected range sync state" ); } + fn assert_no_chains_exist(&self) { + if let Some(chain) = self.sync_manager.get_range_sync_chains().unwrap() { + panic!("There still exists a chain {chain:?}"); + } + } + + fn assert_no_failed_chains(&mut self) { + assert_eq!( + self.sync_manager.__range_failed_chains(), + Vec::::new(), + "Expected no failed chains" + ) + } + #[track_caller] fn expect_chain_segments(&mut self, count: usize) { for i in 0..count { @@ -170,7 +215,7 @@ impl TestRig { true }; - let block_req_id = self + let block_req = self .pop_received_network_event(|ev| match ev { NetworkMessage::SendRequest { peer_id, @@ -182,7 +227,9 @@ impl TestRig { } if filter_f(*peer_id, *start_slot) => Some((*id, *peer_id)), _ => None, }) - .expect("Should have a blocks by range request"); + .unwrap_or_else(|e| { + panic!("Should have a BlocksByRange request, filter {request_filter:?}: {e:?}") + }); let by_range_data_requests = if self.after_fulu() { let mut data_columns_requests = vec![]; @@ -200,7 +247,7 @@ impl TestRig { data_columns_requests.push(data_columns_request); } if data_columns_requests.is_empty() { - panic!("Found zero DataColumnsByRange requests"); + panic!("Found zero DataColumnsByRange requests, filter {request_filter:?}"); } ByRangeDataRequestIds::PostPeerDAS(data_columns_requests) } else if self.after_deneb() { @@ -213,16 +260,21 @@ impl TestRig { } if filter_f(*peer_id, *start_slot) => Some((*id, *peer_id)), _ => None, }) - .expect("Should have a blobs by range request"); + .unwrap_or_else(|e| { + panic!("Should have a blobs by range request, filter {request_filter:?}: {e:?}") + }); ByRangeDataRequestIds::PrePeerDAS(id, peer) } else { ByRangeDataRequestIds::PreDeneb }; - (block_req_id, by_range_data_requests) + (block_req, by_range_data_requests) } - fn find_and_complete_blocks_by_range_request(&mut self, request_filter: RequestFilter) { + fn find_and_complete_blocks_by_range_request( + &mut self, + request_filter: RequestFilter, + ) -> RangeRequestId { let ((blocks_req_id, block_peer), by_range_data_request_ids) = self.find_blocks_by_range_request(request_filter); @@ -266,6 +318,60 @@ impl TestRig { } } } + + blocks_req_id.parent_request_id.requester + } + + fn find_and_complete_processing_chain_segment(&mut self, id: ChainSegmentProcessId) { + self.pop_received_processor_event(|ev| { + (ev.work_type() == WorkType::ChainSegment).then_some(()) + }) + .unwrap_or_else(|e| panic!("Expected chain segment work event: {e}")); + + self.log(&format!( + "Completing ChainSegment processing work {id:?} with success" + )); + self.send_sync_message(SyncMessage::BatchProcessed { + sync_type: id, + result: crate::sync::BatchProcessResult::Success { + sent_blocks: 8, + imported_blocks: 8, + }, + }); + } + + fn complete_and_process_range_sync_until( + &mut self, + last_epoch: u64, + request_filter: RequestFilter, + ) { + for epoch in 0..last_epoch { + // Note: In this test we can't predict the block peer + let id = + self.find_and_complete_blocks_by_range_request(request_filter.clone().epoch(epoch)); + if let RangeRequestId::RangeSync { batch_id, .. } = id { + assert_eq!(batch_id.as_u64(), epoch, "Unexpected batch_id"); + } else { + panic!("unexpected RangeRequestId {id:?}"); + } + + let id = match id { + RangeRequestId::RangeSync { chain_id, batch_id } => { + ChainSegmentProcessId::RangeBatchId(chain_id, batch_id) + } + RangeRequestId::BackfillSync { batch_id } => { + ChainSegmentProcessId::BackSyncBatchId(batch_id) + } + }; + + self.find_and_complete_processing_chain_segment(id); + if epoch < last_epoch - 1 { + self.assert_state(RangeSyncType::Finalized); + } else { + self.assert_no_chains_exist(); + self.assert_no_failed_chains(); + } + } } async fn create_canonical_block(&mut self) -> (SignedBeaconBlock, Option>) { @@ -442,3 +548,65 @@ fn pause_and_resume_on_ee_offline() { // The head chain and finalized chain (2) should be in the processing queue rig.expect_chain_segments(2); } + +/// To attempt to finalize the peer's status finalized checkpoint we synced to its finalized epoch + +/// 2 epochs + 1 slot. +const EXTRA_SYNCED_EPOCHS: u64 = 2 + 1; + +#[test] +fn finalized_sync_enough_global_custody_peers_few_chain_peers() { + // Run for all forks + let mut r = TestRig::test_setup(); + // This test creates enough global custody peers to satisfy column queries but only adds few + // peers to the chain + r.new_connected_peers_for_peerdas(); + + let advanced_epochs: u64 = 2; + let remote_info = r.finalized_remote_info_advanced_by(advanced_epochs.into()); + + // Current priorization only sends batches to idle peers, so we need enough peers for each batch + // TODO: Test this with a single peer in the chain, it should still work + r.add_random_peers( + remote_info, + (advanced_epochs + EXTRA_SYNCED_EPOCHS) as usize, + ); + r.assert_state(RangeSyncType::Finalized); + + let last_epoch = advanced_epochs + EXTRA_SYNCED_EPOCHS; + r.complete_and_process_range_sync_until(last_epoch, filter()); +} + +#[test] +fn finalized_sync_not_enough_custody_peers_on_start() { + let mut r = TestRig::test_setup(); + // Only run post-PeerDAS + if !r.fork_name.fulu_enabled() { + return; + } + + let advanced_epochs: u64 = 2; + let remote_info = r.finalized_remote_info_advanced_by(advanced_epochs.into()); + + // Unikely that the single peer we added has enough columns for us. Tests are determinstic and + // this error should never be hit + r.add_random_peer_not_supernode(remote_info.clone()); + r.assert_state(RangeSyncType::Finalized); + + // Because we don't have enough peers on all columns we haven't sent any request. + // NOTE: There's a small chance that this single peer happens to custody exactly the set we + // expect, in that case the test will fail. Find a way to make the test deterministic. + r.expect_empty_network(); + + // Generate enough peers and supernodes to cover all custody columns + r.new_connected_peers_for_peerdas(); + // Note: not necessary to add this peers to the chain, as we draw from the global pool + // We still need to add enough peers to trigger batch downloads with idle peers. Same issue as + // the test above. + r.add_random_peers( + remote_info, + (advanced_epochs + EXTRA_SYNCED_EPOCHS - 1) as usize, + ); + + let last_epoch = advanced_epochs + EXTRA_SYNCED_EPOCHS; + r.complete_and_process_range_sync_until(last_epoch, filter()); +}