From 7dcbac0144b70b798ef917b04423985902d436f8 Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Sun, 26 Jan 2025 16:02:55 +0400 Subject: [PATCH 1/3] Add more range sync tests --- beacon_node/network/src/sync/manager.rs | 18 +- .../network/src/sync/network_context.rs | 7 + .../network/src/sync/range_sync/chain.rs | 7 +- .../src/sync/range_sync/chain_collection.rs | 2 +- .../network/src/sync/range_sync/range.rs | 5 + beacon_node/network/src/sync/tests/lookups.rs | 18 +- beacon_node/network/src/sync/tests/range.rs | 167 ++++++++++++++++-- 7 files changed, 199 insertions(+), 25 deletions(-) diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index fd91dc78b13..081bdb2b8ae 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -344,6 +344,16 @@ impl SyncManager { self.range_sync.state() } + #[cfg(test)] + pub(crate) fn range_sync_state(&self) -> super::range_sync::SyncChainStatus { + self.range_sync.state() + } + + #[cfg(test)] + pub(crate) fn __range_failed_chains(&mut self) -> Vec { + self.range_sync.__failed_chains() + } + #[cfg(test)] pub(crate) fn get_failed_chains(&mut self) -> Vec { self.block_lookups.get_failed_chains() @@ -369,13 +379,13 @@ impl SyncManager { } #[cfg(test)] - pub(crate) fn range_sync_state(&self) -> super::range_sync::SyncChainStatus { - self.range_sync.state() + pub(crate) fn update_execution_engine_state(&mut self, state: EngineState) { + self.handle_new_execution_engine_state(state); } #[cfg(test)] - pub(crate) fn update_execution_engine_state(&mut self, state: EngineState) { - self.handle_new_execution_engine_state(state); + pub fn resolve_range_block_components_requests(&self, id: Id) -> Option { + self.network.resolve_range_block_components_requests(id) } fn network_globals(&self) -> &NetworkGlobals { diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 4135f901b1c..6dfba475866 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -265,6 +265,13 @@ impl SyncNetworkContext { } } + #[cfg(test)] + pub fn resolve_range_block_components_requests(&self, id: Id) -> Option { + self.range_block_components_requests + .get(&id) + .map(|req| req.0) + } + pub fn send_sync_message(&mut self, sync_message: SyncMessage) { self.network_beacon_processor .send_sync_message(sync_message); diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 4eb73f54839..8695fdb56dd 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -412,15 +412,14 @@ impl SyncingChain { BatchState::AwaitingProcessing(..) => { return self.process_batch(network, self.processing_target); } - BatchState::Downloading(..) => { + BatchState::Downloading(..) | BatchState::AwaitingDownload => { // Batch is not ready, nothing to process + // A batch may remain in AwaitingDownload if it doesn't have enough peers yet } BatchState::Poisoned => unreachable!("Poisoned batch"), - BatchState::Failed | BatchState::AwaitingDownload | BatchState::Processing(_) => { + BatchState::Failed | BatchState::Processing(_) => { // these are all inconsistent states: // - Failed -> non recoverable batch. Chain should have beee removed - // - AwaitingDownload -> A recoverable failed batch should have been - // re-requested. // - Processing -> `self.current_processing_batch` is None return Err(RemoveChain::WrongChainState(format!( "Robust target batch indicates inconsistent chain state: {:?}", diff --git a/beacon_node/network/src/sync/range_sync/chain_collection.rs b/beacon_node/network/src/sync/range_sync/chain_collection.rs index 15bdf85e203..40285309469 100644 --- a/beacon_node/network/src/sync/range_sync/chain_collection.rs +++ b/beacon_node/network/src/sync/range_sync/chain_collection.rs @@ -477,7 +477,7 @@ impl ChainCollection { .find(|(_, chain)| chain.has_same_target(target_head_slot, target_head_root)) { Some((&id, chain)) => { - debug!(self.log, "Adding peer to known chain"; "peer_id" => %peer, "sync_type" => ?sync_type, &chain); + debug!(self.log, "Adding peer to known chain"; "peer_id" => %peer, "sync_type" => ?sync_type, "id" => id); debug_assert_eq!(chain.target_head_root, target_head_root); debug_assert_eq!(chain.target_head_slot, target_head_slot); if let Err(remove_reason) = chain.add_peer(network, peer) { diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index 78679403bb4..38b032136cb 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -94,6 +94,11 @@ where } } + #[cfg(test)] + pub(crate) fn __failed_chains(&mut self) -> Vec { + self.failed_chains.keys().copied().collect() + } + pub fn state(&self) -> SyncChainStatus { self.chains.state() } diff --git a/beacon_node/network/src/sync/tests/lookups.rs b/beacon_node/network/src/sync/tests/lookups.rs index 9ab581950c9..42ca3321b71 100644 --- a/beacon_node/network/src/sync/tests/lookups.rs +++ b/beacon_node/network/src/sync/tests/lookups.rs @@ -39,12 +39,11 @@ use lighthouse_network::{ use slog::info; use slot_clock::{SlotClock, TestingSlotClock}; use tokio::sync::mpsc; -use types::ForkContext; use types::{ data_column_sidecar::ColumnIndex, test_utils::{SeedableRng, TestRandom, XorShiftRng}, - BeaconState, BeaconStateBase, BlobSidecar, DataColumnSidecar, EthSpec, ForkName, Hash256, - MinimalEthSpec as E, SignedBeaconBlock, Slot, + BeaconState, BeaconStateBase, BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, ForkName, + Hash256, MinimalEthSpec as E, SignedBeaconBlock, Slot, }; const D: Duration = Duration::new(0, 0); @@ -154,7 +153,7 @@ impl TestRig { } } - fn test_setup_after_fulu() -> Option { + pub fn test_setup_after_fulu() -> Option { let r = Self::test_setup(); if r.fork_name.fulu_enabled() { Some(r) @@ -382,7 +381,7 @@ impl TestRig { .__add_connected_peer_testing_only(true, &self.harness.spec) } - fn new_connected_peers_for_peerdas(&mut self) { + pub fn new_connected_peers_for_peerdas(&mut self) { // Enough sampling peers with few columns for _ in 0..100 { self.new_connected_peer(); @@ -391,6 +390,15 @@ impl TestRig { self.new_connected_supernode_peer(); } + pub fn connected_peers(&self) -> Vec { + self.network_globals + .peers + .read() + .connected_peers() + .map(|(peer, _)| *peer) + .collect() + } + fn parent_chain_processed_success( &mut self, chain_hash: Hash256, diff --git a/beacon_node/network/src/sync/tests/range.rs b/beacon_node/network/src/sync/tests/range.rs index cfd89f7b44e..465513ef906 100644 --- a/beacon_node/network/src/sync/tests/range.rs +++ b/beacon_node/network/src/sync/tests/range.rs @@ -1,11 +1,14 @@ use super::*; +use crate::network_beacon_processor::ChainSegmentProcessId; use crate::status::ToStatusMessage; use crate::sync::manager::SLOT_IMPORT_TOLERANCE; +use crate::sync::network_context::RangeRequestId; use crate::sync::range_sync::RangeSyncType; use crate::sync::SyncMessage; use beacon_chain::data_column_verification::CustodyDataColumn; use beacon_chain::test_utils::{AttestationStrategy, BlockStrategy}; use beacon_chain::{block_verification_types::RpcBlock, EngineState, NotifyExecutionLayer}; +use beacon_processor::WorkType; use lighthouse_network::rpc::methods::{ BlobsByRangeRequest, DataColumnsByRangeRequest, OldBlocksByRangeRequest, OldBlocksByRangeRequestV2, @@ -15,8 +18,8 @@ use lighthouse_network::service::api_types::{AppRequestId, Id, SyncRequestId}; use lighthouse_network::{PeerId, SyncInfo}; use std::time::Duration; use types::{ - BlobSidecarList, BlockImportSource, EthSpec, Hash256, MinimalEthSpec as E, SignedBeaconBlock, - SignedBeaconBlockHash, Slot, + BlobSidecarList, BlockImportSource, Epoch, EthSpec, Hash256, MinimalEthSpec as E, + SignedBeaconBlock, SignedBeaconBlockHash, Slot, }; const D: Duration = Duration::new(0, 0); @@ -40,7 +43,7 @@ enum ByRangeDataRequestIds { /// To make writting tests succint, the machinery in this testing rig automatically identifies /// _which_ request to complete. Picking the right request is critical for tests to pass, so this /// filter allows better expressivity on the criteria to identify the right request. -#[derive(Default)] +#[derive(Default, Debug, Clone)] struct RequestFilter { peer: Option, epoch: Option, @@ -95,6 +98,21 @@ impl TestRig { }) } + fn add_finalized_peer_advanced_by(&mut self, advanced_epochs: Epoch) -> PeerId { + self.add_peer(self.finalized_remote_info_advanced_by(advanced_epochs)) + } + + fn finalized_remote_info_advanced_by(&self, advanced_epochs: Epoch) -> SyncInfo { + let local_info = self.local_info(); + let finalized_epoch = local_info.finalized_epoch + advanced_epochs; + SyncInfo { + finalized_epoch, + finalized_root: Hash256::random(), + head_slot: finalized_epoch.start_slot(E::slots_per_epoch()), + head_root: Hash256::random(), + } + } + fn local_info(&self) -> SyncInfo { let StatusMessage { fork_digest: _, @@ -117,7 +135,7 @@ impl TestRig { // subnets for syncing. Should add tests connecting to full node peers. let peer_id = self.new_connected_supernode_peer(); // Send peer to sync - self.send_sync_message(SyncMessage::AddPeer(peer_id, remote_info.clone())); + self.send_sync_message(SyncMessage::AddPeer(peer_id, remote_info)); peer_id } @@ -126,13 +144,27 @@ impl TestRig { self.sync_manager .range_sync_state() .expect("State is ok") - .expect("Range should be syncing") + .expect("Range should be syncing, there are no chains") .0, state, "not expected range sync state" ); } + fn assert_no_chains_exist(&self) { + if let Some(chain) = self.sync_manager.get_range_sync_chains().unwrap() { + panic!("There still exists a chain {chain:?}"); + } + } + + fn assert_no_failed_chains(&mut self) { + assert_eq!( + self.sync_manager.__range_failed_chains(), + Vec::::new(), + "Expected no failed chains" + ) + } + #[track_caller] fn expect_chain_segments(&mut self, count: usize) { for i in 0..count { @@ -167,7 +199,7 @@ impl TestRig { true }; - let block_req_id = self + let block_req = self .pop_received_network_event(|ev| match ev { NetworkMessage::SendRequest { peer_id, @@ -179,7 +211,9 @@ impl TestRig { } if filter_f(*peer_id, *start_slot) => Some((*id, *peer_id)), _ => None, }) - .expect("Should have a blocks by range request"); + .unwrap_or_else(|e| { + panic!("Should have a BlocksByRange request, filter {request_filter:?}: {e:?}") + }); let by_range_data_requests = if self.after_fulu() { let mut data_columns_requests = vec![]; @@ -197,7 +231,7 @@ impl TestRig { data_columns_requests.push(data_columns_request); } if data_columns_requests.is_empty() { - panic!("Found zero DataColumnsByRange requests"); + panic!("Found zero DataColumnsByRange requests, filter {request_filter:?}"); } ByRangeDataRequestIds::PostPeerDAS(data_columns_requests) } else if self.after_deneb() { @@ -210,19 +244,30 @@ impl TestRig { } if filter_f(*peer_id, *start_slot) => Some((*id, *peer_id)), _ => None, }) - .expect("Should have a blobs by range request"); + .unwrap_or_else(|e| { + panic!("Should have a blobs by range request, filter {request_filter:?}: {e:?}") + }); ByRangeDataRequestIds::PrePeerDAS(id, peer) } else { ByRangeDataRequestIds::PreDeneb }; - (block_req_id, by_range_data_requests) + (block_req, by_range_data_requests) } - fn find_and_complete_blocks_by_range_request(&mut self, request_filter: RequestFilter) { + fn find_and_complete_blocks_by_range_request( + &mut self, + request_filter: RequestFilter, + ) -> RangeRequestId { let ((blocks_req_id, block_peer), by_range_data_request_ids) = self.find_blocks_by_range_request(request_filter); + // Retrieve the RangeRequestId before completing the request + let range_request_id = self + .sync_manager + .resolve_range_block_components_requests(blocks_req_id) + .expect("no range_block_components_requests for block id"); + // Complete the request with a single stream termination self.log(&format!( "Completing BlocksByRange request {blocks_req_id} with empty stream" @@ -263,6 +308,60 @@ impl TestRig { } } } + + range_request_id + } + + fn find_and_complete_processing_chain_segment(&mut self, id: ChainSegmentProcessId) { + self.pop_received_processor_event(|ev| { + (ev.work_type() == WorkType::ChainSegment).then_some(()) + }) + .unwrap_or_else(|e| panic!("Expected chain segment work event: {e}")); + + self.log(&format!( + "Completing ChainSegment processing work {id:?} with success" + )); + self.send_sync_message(SyncMessage::BatchProcessed { + sync_type: id, + result: crate::sync::BatchProcessResult::Success { + sent_blocks: 8, + imported_blocks: 8, + }, + }); + } + + fn complete_and_process_range_sync_until( + &mut self, + last_epoch: u64, + request_filter: RequestFilter, + ) { + for epoch in 0..last_epoch { + // Note: In this test we can't predict the block peer + let id = + self.find_and_complete_blocks_by_range_request(request_filter.clone().epoch(epoch)); + if let RangeRequestId::RangeSync { batch_id, .. } = id { + assert_eq!(batch_id.as_u64(), epoch, "Unexpected batch_id"); + } else { + panic!("unexpected RangeRequestId {id:?}"); + } + + let id = match id { + RangeRequestId::RangeSync { chain_id, batch_id } => { + ChainSegmentProcessId::RangeBatchId(chain_id, batch_id) + } + RangeRequestId::BackfillSync { batch_id } => { + ChainSegmentProcessId::BackSyncBatchId(batch_id) + } + }; + + self.find_and_complete_processing_chain_segment(id); + if epoch < last_epoch - 1 { + self.assert_state(RangeSyncType::Finalized); + } else { + self.assert_no_chains_exist(); + self.assert_no_failed_chains(); + } + } } async fn create_canonical_block(&mut self) -> (SignedBeaconBlock, Option>) { @@ -439,3 +538,49 @@ fn pause_and_resume_on_ee_offline() { // The head chain and finalized chain (2) should be in the processing queue rig.expect_chain_segments(2); } + +/// To attempt to finalize the peer's status finalized checkpoint we synced to its finalized epoch + +/// 2 epochs + 1 slot. +const EXTRA_SYNCED_EPOCHS: u64 = 2 + 1; + +#[test] +fn finalized_sync_single_peer_happy_case() { + // Run for all forks + let mut r = TestRig::test_setup(); + r.new_connected_peers_for_peerdas(); + + let advanced_epochs: u64 = 2; + let block_peer = r.add_finalized_peer_advanced_by(advanced_epochs.into()); + r.assert_state(RangeSyncType::Finalized); + + let last_epoch = 2 + EXTRA_SYNCED_EPOCHS; + r.complete_and_process_range_sync_until(last_epoch, filter().peer(block_peer)); +} + +#[test] +fn finalized_sync_initially_no_peers() { + let Some(mut r) = TestRig::test_setup_after_fulu() else { + return; + }; + r.new_connected_peers_for_peerdas(); + + let advanced_epochs: u64 = 2; + let remote_info = r.finalized_remote_info_advanced_by(advanced_epochs.into()); + + // Unikely that the single peer we added has enough columns for us. Find a way to make this test + // deterministic. + let block_peer = r.new_connected_peer(); + r.send_sync_message(SyncMessage::AddPeer(block_peer, remote_info.clone())); + r.assert_state(RangeSyncType::Finalized); + // Here all batches should be queued but stuck in AwaitingDownload state after not being able to + // find custody peer on some specific column + + // Now add the rest of connected peers to the chain, which includes a supernode, and so all + // columns should be requested. + for peer in r.connected_peers() { + r.send_sync_message(SyncMessage::AddPeer(peer, remote_info.clone())); + } + + let last_epoch = 2 + EXTRA_SYNCED_EPOCHS; + r.complete_and_process_range_sync_until(last_epoch, filter()); +} From 7c39d1769fc82d674a561889c231c51d30f00b98 Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Fri, 31 Jan 2025 18:51:42 -0300 Subject: [PATCH 2/3] Fix rebase --- .../network/src/sync/range_sync/chain.rs | 7 +- beacon_node/network/src/sync/tests/lookups.rs | 12 +-- beacon_node/network/src/sync/tests/range.rs | 79 +++++++++++++------ 3 files changed, 59 insertions(+), 39 deletions(-) diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 8695fdb56dd..4eb73f54839 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -412,14 +412,15 @@ impl SyncingChain { BatchState::AwaitingProcessing(..) => { return self.process_batch(network, self.processing_target); } - BatchState::Downloading(..) | BatchState::AwaitingDownload => { + BatchState::Downloading(..) => { // Batch is not ready, nothing to process - // A batch may remain in AwaitingDownload if it doesn't have enough peers yet } BatchState::Poisoned => unreachable!("Poisoned batch"), - BatchState::Failed | BatchState::Processing(_) => { + BatchState::Failed | BatchState::AwaitingDownload | BatchState::Processing(_) => { // these are all inconsistent states: // - Failed -> non recoverable batch. Chain should have beee removed + // - AwaitingDownload -> A recoverable failed batch should have been + // re-requested. // - Processing -> `self.current_processing_batch` is None return Err(RemoveChain::WrongChainState(format!( "Robust target batch indicates inconsistent chain state: {:?}", diff --git a/beacon_node/network/src/sync/tests/lookups.rs b/beacon_node/network/src/sync/tests/lookups.rs index 42ca3321b71..71fd9c7fa8d 100644 --- a/beacon_node/network/src/sync/tests/lookups.rs +++ b/beacon_node/network/src/sync/tests/lookups.rs @@ -49,7 +49,6 @@ use types::{ const D: Duration = Duration::new(0, 0); const PARENT_FAIL_TOLERANCE: u8 = SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS; const SAMPLING_REQUIRED_SUCCESSES: usize = 2; - type DCByRootIds = Vec; type DCByRootId = (SyncRequestId, Vec); @@ -390,15 +389,6 @@ impl TestRig { self.new_connected_supernode_peer(); } - pub fn connected_peers(&self) -> Vec { - self.network_globals - .peers - .read() - .connected_peers() - .map(|(peer, _)| *peer) - .collect() - } - fn parent_chain_processed_success( &mut self, chain_hash: Hash256, @@ -1121,7 +1111,7 @@ impl TestRig { } #[track_caller] - fn expect_empty_network(&mut self) { + pub fn expect_empty_network(&mut self) { self.drain_network_rx(); if !self.network_rx_queue.is_empty() { let n = self.network_rx_queue.len(); diff --git a/beacon_node/network/src/sync/tests/range.rs b/beacon_node/network/src/sync/tests/range.rs index 465513ef906..e410ed44a88 100644 --- a/beacon_node/network/src/sync/tests/range.rs +++ b/beacon_node/network/src/sync/tests/range.rs @@ -74,7 +74,7 @@ impl TestRig { /// Produce a head peer with an advanced head fn add_head_peer_with_root(&mut self, head_root: Hash256) -> PeerId { let local_info = self.local_info(); - self.add_peer(SyncInfo { + self.add_random_peer(SyncInfo { head_root, head_slot: local_info.head_slot + 1 + Slot::new(SLOT_IMPORT_TOLERANCE as u64), ..local_info @@ -90,7 +90,7 @@ impl TestRig { fn add_finalized_peer_with_root(&mut self, finalized_root: Hash256) -> PeerId { let local_info = self.local_info(); let finalized_epoch = local_info.finalized_epoch + 2; - self.add_peer(SyncInfo { + self.add_random_peer(SyncInfo { finalized_epoch, finalized_root, head_slot: finalized_epoch.start_slot(E::slots_per_epoch()), @@ -98,10 +98,6 @@ impl TestRig { }) } - fn add_finalized_peer_advanced_by(&mut self, advanced_epochs: Epoch) -> PeerId { - self.add_peer(self.finalized_remote_info_advanced_by(advanced_epochs)) - } - fn finalized_remote_info_advanced_by(&self, advanced_epochs: Epoch) -> SyncInfo { let local_info = self.local_info(); let finalized_epoch = local_info.finalized_epoch + advanced_epochs; @@ -129,7 +125,13 @@ impl TestRig { } } - fn add_peer(&mut self, remote_info: SyncInfo) -> PeerId { + fn add_random_peer_not_supernode(&mut self, remote_info: SyncInfo) -> PeerId { + let peer_id = self.new_connected_peer(); + self.send_sync_message(SyncMessage::AddPeer(peer_id, remote_info)); + peer_id + } + + fn add_random_peer(&mut self, remote_info: SyncInfo) -> PeerId { // Create valid peer known to network globals // TODO(fulu): Using supernode peers to ensure we have peer across all column // subnets for syncing. Should add tests connecting to full node peers. @@ -139,6 +141,17 @@ impl TestRig { peer_id } + fn add_random_peers(&mut self, remote_info: SyncInfo, count: usize) { + for _ in 0..count { + let peer = self.new_connected_peer(); + self.add_peer(peer, remote_info.clone()); + } + } + + fn add_peer(&mut self, peer: PeerId, remote_info: SyncInfo) { + self.send_sync_message(SyncMessage::AddPeer(peer, remote_info)); + } + fn assert_state(&self, state: RangeSyncType) { assert_eq!( self.sync_manager @@ -544,43 +557,59 @@ fn pause_and_resume_on_ee_offline() { const EXTRA_SYNCED_EPOCHS: u64 = 2 + 1; #[test] -fn finalized_sync_single_peer_happy_case() { +fn finalized_sync_enough_global_custody_peers_few_chain_peers() { // Run for all forks let mut r = TestRig::test_setup(); + // This test creates enough global custody peers to satisfy column queries but only adds few + // peers to the chain r.new_connected_peers_for_peerdas(); let advanced_epochs: u64 = 2; - let block_peer = r.add_finalized_peer_advanced_by(advanced_epochs.into()); + let remote_info = r.finalized_remote_info_advanced_by(advanced_epochs.into()); + + // Current priorization only sends batches to idle peers, so we need enough peers for each batch + // TODO: Test this with a single peer in the chain, it should still work + r.add_random_peers( + remote_info, + (advanced_epochs + EXTRA_SYNCED_EPOCHS) as usize, + ); r.assert_state(RangeSyncType::Finalized); - let last_epoch = 2 + EXTRA_SYNCED_EPOCHS; - r.complete_and_process_range_sync_until(last_epoch, filter().peer(block_peer)); + let last_epoch = advanced_epochs + EXTRA_SYNCED_EPOCHS; + r.complete_and_process_range_sync_until(last_epoch, filter()); } #[test] -fn finalized_sync_initially_no_peers() { - let Some(mut r) = TestRig::test_setup_after_fulu() else { +fn finalized_sync_not_enough_custody_peers_on_start() { + let mut r = TestRig::test_setup(); + // Only run post-PeerDAS + if !r.fork_name.fulu_enabled() { return; - }; - r.new_connected_peers_for_peerdas(); + } let advanced_epochs: u64 = 2; let remote_info = r.finalized_remote_info_advanced_by(advanced_epochs.into()); // Unikely that the single peer we added has enough columns for us. Find a way to make this test // deterministic. - let block_peer = r.new_connected_peer(); - r.send_sync_message(SyncMessage::AddPeer(block_peer, remote_info.clone())); + r.add_random_peer_not_supernode(remote_info.clone()); r.assert_state(RangeSyncType::Finalized); - // Here all batches should be queued but stuck in AwaitingDownload state after not being able to - // find custody peer on some specific column - // Now add the rest of connected peers to the chain, which includes a supernode, and so all - // columns should be requested. - for peer in r.connected_peers() { - r.send_sync_message(SyncMessage::AddPeer(peer, remote_info.clone())); - } + // Because we don't have enough peers on all columns we haven't sent any request. + // NOTE: There's a small chance that this single peer happens to custody exactly the set we + // expect, in that case the test will fail. Find a way to make the test deterministic. + r.expect_empty_network(); - let last_epoch = 2 + EXTRA_SYNCED_EPOCHS; + // Generate enough peers and supernodes to cover all custody columns + r.new_connected_peers_for_peerdas(); + // Note: not necessary to add this peers to the chain, as we draw from the global pool + // We still need to add enough peers to trigger batch downloads with idle peers. Same issue as + // the test above. + r.add_random_peers( + remote_info, + (advanced_epochs + EXTRA_SYNCED_EPOCHS - 1) as usize, + ); + + let last_epoch = advanced_epochs + EXTRA_SYNCED_EPOCHS; r.complete_and_process_range_sync_until(last_epoch, filter()); } From 2021abcd515bc4ac71c63c6f40e5c026ce18592e Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Fri, 7 Feb 2025 19:08:18 -0300 Subject: [PATCH 3/3] Make tests deterministic --- Cargo.lock | 4 +++- .../lighthouse_network/src/peer_manager/peerdb.rs | 2 +- beacon_node/network/Cargo.toml | 2 ++ beacon_node/network/src/sync/tests/lookups.rs | 15 ++++++++++++--- beacon_node/network/src/sync/tests/mod.rs | 5 +++-- beacon_node/network/src/sync/tests/range.rs | 4 ++-- 6 files changed, 23 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 20d2548d09b..61b97a14202 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "Inflector" @@ -5999,6 +5999,7 @@ dependencies = [ "hex", "igd-next 0.16.0", "itertools 0.10.5", + "k256 0.13.4", "kzg", "lighthouse_network", "logging", @@ -6008,6 +6009,7 @@ dependencies = [ "operation_pool", "parking_lot 0.12.3", "rand", + "rand_chacha", "serde_json", "slog", "slog-async", diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs index 37cb5df6ea5..8e5d6121e04 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs @@ -689,8 +689,8 @@ impl PeerDB { &mut self, supernode: bool, spec: &ChainSpec, + enr_key: CombinedKey, ) -> PeerId { - let enr_key = CombinedKey::generate_secp256k1(); let mut enr = Enr::builder().build(&enr_key).unwrap(); let peer_id = enr.peer_id(); diff --git a/beacon_node/network/Cargo.toml b/beacon_node/network/Cargo.toml index 09179c4a516..5071e247a32 100644 --- a/beacon_node/network/Cargo.toml +++ b/beacon_node/network/Cargo.toml @@ -10,8 +10,10 @@ eth2 = { workspace = true } eth2_network_config = { workspace = true } genesis = { workspace = true } gossipsub = { workspace = true } +k256 = "0.13.4" kzg = { workspace = true } matches = "0.1.8" +rand_chacha = "0.3.1" serde_json = { workspace = true } slog-async = { workspace = true } slog-term = { workspace = true } diff --git a/beacon_node/network/src/sync/tests/lookups.rs b/beacon_node/network/src/sync/tests/lookups.rs index 71fd9c7fa8d..ea20141df63 100644 --- a/beacon_node/network/src/sync/tests/lookups.rs +++ b/beacon_node/network/src/sync/tests/lookups.rs @@ -27,6 +27,7 @@ use beacon_chain::{ PayloadVerificationOutcome, PayloadVerificationStatus, }; use beacon_processor::WorkEvent; +use lighthouse_network::discovery::CombinedKey; use lighthouse_network::{ rpc::{RPCError, RequestType, RpcErrorResponse}, service::api_types::{ @@ -115,7 +116,9 @@ impl TestRig { let spec = chain.spec.clone(); - let rng = XorShiftRng::from_seed([42; 16]); + // deterministic seed + let rng = ChaCha20Rng::from_seed([0u8; 32]); + TestRig { beacon_processor_rx, beacon_processor_rx_queue: vec![], @@ -367,17 +370,23 @@ impl TestRig { } pub fn new_connected_peer(&mut self) -> PeerId { + let key = self.determinstic_key(); self.network_globals .peers .write() - .__add_connected_peer_testing_only(false, &self.harness.spec) + .__add_connected_peer_testing_only(false, &self.harness.spec, key) } pub fn new_connected_supernode_peer(&mut self) -> PeerId { + let key = self.determinstic_key(); self.network_globals .peers .write() - .__add_connected_peer_testing_only(true, &self.harness.spec) + .__add_connected_peer_testing_only(true, &self.harness.spec, key) + } + + fn determinstic_key(&mut self) -> CombinedKey { + k256::ecdsa::SigningKey::random(&mut self.rng).into() } pub fn new_connected_peers_for_peerdas(&mut self) { diff --git a/beacon_node/network/src/sync/tests/mod.rs b/beacon_node/network/src/sync/tests/mod.rs index 6ed5c7f8fab..ef2bec80b80 100644 --- a/beacon_node/network/src/sync/tests/mod.rs +++ b/beacon_node/network/src/sync/tests/mod.rs @@ -7,12 +7,13 @@ use beacon_chain::eth1_chain::CachingEth1Backend; use beacon_chain::test_utils::{BeaconChainHarness, EphemeralHarnessType}; use beacon_processor::WorkEvent; use lighthouse_network::NetworkGlobals; +use rand_chacha::ChaCha20Rng; use slog::Logger; use slot_clock::ManualSlotClock; use std::sync::Arc; use store::MemoryStore; use tokio::sync::mpsc; -use types::{test_utils::XorShiftRng, ChainSpec, ForkName, MinimalEthSpec as E}; +use types::{ChainSpec, ForkName, MinimalEthSpec as E}; mod lookups; mod range; @@ -61,7 +62,7 @@ struct TestRig { /// Beacon chain harness harness: BeaconChainHarness>, /// `rng` for generating test blocks and blobs. - rng: XorShiftRng, + rng: ChaCha20Rng, fork_name: ForkName, log: Logger, spec: Arc, diff --git a/beacon_node/network/src/sync/tests/range.rs b/beacon_node/network/src/sync/tests/range.rs index e410ed44a88..a6b6aace5c5 100644 --- a/beacon_node/network/src/sync/tests/range.rs +++ b/beacon_node/network/src/sync/tests/range.rs @@ -590,8 +590,8 @@ fn finalized_sync_not_enough_custody_peers_on_start() { let advanced_epochs: u64 = 2; let remote_info = r.finalized_remote_info_advanced_by(advanced_epochs.into()); - // Unikely that the single peer we added has enough columns for us. Find a way to make this test - // deterministic. + // Unikely that the single peer we added has enough columns for us. Tests are determinstic and + // this error should never be hit r.add_random_peer_not_supernode(remote_info.clone()); r.assert_state(RangeSyncType::Finalized);