diff --git a/node/bft/examples/simple_node.rs b/node/bft/examples/simple_node.rs index 4641d292dd..36e2b96307 100644 --- a/node/bft/examples/simple_node.rs +++ b/node/bft/examples/simple_node.rs @@ -26,7 +26,6 @@ use snarkos_node_bft::{ }; use snarkos_node_bft_ledger_service::TranslucentLedgerService; use snarkos_node_bft_storage_service::BFTMemoryService; -use snarkos_node_sync::BlockSync; use snarkvm::{ console::{account::PrivateKey, algorithms::BHP256, types::Address}, ledger::{ @@ -143,9 +142,7 @@ pub async fn start_bft( // Initialize the consensus receiver handler. consensus_handler(consensus_receiver); // Initialize the BFT instance. - let block_sync = Arc::new(BlockSync::new(ledger.clone())); - let mut bft = - BFT::::new(account, storage, ledger, block_sync, ip, &trusted_validators, storage_mode)?; + let mut bft = BFT::::new(account, storage, ledger, ip, &trusted_validators, storage_mode)?; // Run the BFT instance. bft.run(Some(consensus_sender), sender.clone(), receiver).await?; // Retrieve the BFT's primary. @@ -183,9 +180,7 @@ pub async fn start_primary( // Initialize the trusted validators. let trusted_validators = trusted_validators(node_id, num_nodes, peers); // Initialize the primary instance. - let block_sync = Arc::new(BlockSync::new(ledger.clone())); - let mut primary = - Primary::::new(account, storage, ledger, block_sync, ip, &trusted_validators, storage_mode)?; + let mut primary = Primary::::new(account, storage, ledger, ip, &trusted_validators, storage_mode)?; // Run the primary instance. primary.run(None, sender.clone(), receiver).await?; // Handle OS signals. diff --git a/node/bft/src/bft.rs b/node/bft/src/bft.rs index fd0c81d2ae..ef87edcc8f 100644 --- a/node/bft/src/bft.rs +++ b/node/bft/src/bft.rs @@ -30,7 +30,6 @@ use crate::{ }; use snarkos_account::Account; use snarkos_node_bft_ledger_service::LedgerService; -use snarkos_node_sync::BlockSync; use snarkvm::{ console::account::Address, ledger::{ @@ -70,9 +69,9 @@ use tokio::{ #[derive(Clone)] pub struct BFT { - /// The primary for this node. + /// The primary. primary: Primary, - /// The DAG of batches from which we build the blockchain. + /// The DAG. dag: Arc>>, /// The batch certificate of the leader from the current even round, if one was present. leader_certificate: Arc>>>, @@ -92,13 +91,12 @@ impl BFT { account: Account, storage: Storage, ledger: Arc>, - block_sync: Arc>, ip: Option, trusted_validators: &[SocketAddr], storage_mode: StorageMode, ) -> Result { Ok(Self { - primary: Primary::new(account, storage, ledger, block_sync, ip, trusted_validators, storage_mode)?, + primary: Primary::new(account, storage, ledger, ip, trusted_validators, storage_mode)?, dag: Default::default(), leader_certificate: Default::default(), leader_certificate_timer: Default::default(), @@ -933,7 +931,6 @@ mod tests { use snarkos_account::Account; use snarkos_node_bft_ledger_service::MockLedgerService; use snarkos_node_bft_storage_service::BFTMemoryService; - use snarkos_node_sync::BlockSync; use snarkvm::{ console::account::{Address, PrivateKey}, ledger::{ @@ -973,18 +970,6 @@ mod tests { (committee, account, ledger, storage) } - // Helper function to set up BFT for testing. - fn initialize_bft( - account: Account, - storage: Storage, - ledger: Arc>, - ) -> anyhow::Result> { - // Create the block synchronization logic. - let block_sync = Arc::new(BlockSync::new(ledger.clone())); - // Initialize the BFT. - BFT::new(account.clone(), storage.clone(), ledger.clone(), block_sync, None, &[], StorageMode::new_test(None)) - } - #[test] #[tracing_test::traced_test] fn test_is_leader_quorum_odd() -> Result<()> { @@ -1016,7 +1001,7 @@ mod tests { // Initialize the account. let account = Account::new(rng)?; // Initialize the BFT. - let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?; + let bft = BFT::new(account.clone(), storage.clone(), ledger.clone(), None, &[], StorageMode::new_test(None))?; assert!(bft.is_timer_expired()); // Ensure this call succeeds on an odd round. let result = bft.is_leader_quorum_or_nonleaders_available(1); @@ -1050,8 +1035,8 @@ mod tests { assert_eq!(storage.current_round(), 1); assert_eq!(storage.max_gc_rounds(), 10); - // Set up the BFT logic. - let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?; + // Initialize the BFT. + let bft = BFT::new(account, storage, ledger, None, &[], StorageMode::new_test(None))?; assert!(bft.is_timer_expired()); // 0 + 5 < now() // Store is at round 1, and we are checking for round 2. @@ -1072,8 +1057,8 @@ mod tests { assert_eq!(storage.current_round(), 2); assert_eq!(storage.max_gc_rounds(), 10); - // Set up the BFT logic. - let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?; + // Initialize the BFT. + let bft = BFT::new(account, storage, ledger, None, &[], StorageMode::new_test(None))?; assert!(bft.is_timer_expired()); // 0 + 5 < now() // Ensure this call fails on an even round. @@ -1112,11 +1097,8 @@ mod tests { let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10); // Initialize the account. let account = Account::new(rng)?; - - // Set up the BFT logic. - let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?; - assert!(bft.is_timer_expired()); // 0 + 5 < now() - + // Initialize the BFT. + let bft = BFT::new(account.clone(), storage.clone(), ledger.clone(), None, &[], StorageMode::new_test(None))?; // Set the leader certificate. let leader_certificate = sample_batch_certificate_for_round(2, rng); *bft.leader_certificate.write() = Some(leader_certificate); @@ -1128,7 +1110,8 @@ mod tests { assert!(result); // Initialize a new BFT. - let bft_timer = initialize_bft(account.clone(), storage.clone(), ledger.clone())?; + let bft_timer = + BFT::new(account.clone(), storage.clone(), ledger.clone(), None, &[], StorageMode::new_test(None))?; // If the leader certificate is not set and the timer has not expired, we are not ready for the next round. let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2); if !bft_timer.is_timer_expired() { @@ -1159,8 +1142,7 @@ mod tests { assert_eq!(storage.max_gc_rounds(), 10); // Initialize the BFT. - let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?; - assert!(bft.is_timer_expired()); // 0 + 5 < now() + let bft = BFT::new(account, storage, ledger, None, &[], StorageMode::new_test(None))?; // Ensure this call fails on an odd round. let result = bft.update_leader_certificate_to_even_round(1); @@ -1178,7 +1160,7 @@ mod tests { assert_eq!(storage.max_gc_rounds(), 10); // Initialize the BFT. - let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?; + let bft = BFT::new(account, storage, ledger, None, &[], StorageMode::new_test(None))?; // Ensure this call succeeds on an even round. let result = bft.update_leader_certificate_to_even_round(6); @@ -1230,7 +1212,7 @@ mod tests { // Initialize the BFT. let account = Account::new(rng)?; - let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?; + let bft = BFT::new(account, storage.clone(), ledger, None, &[], StorageMode::new_test(None))?; // Set the leader certificate. *bft.leader_certificate.write() = Some(leader_certificate); @@ -1268,7 +1250,7 @@ mod tests { // Initialize the storage. let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1); // Initialize the BFT. - let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?; + let bft = BFT::new(account.clone(), storage, ledger.clone(), None, &[], StorageMode::new_test(None))?; // Insert a mock DAG in the BFT. *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(3); @@ -1298,7 +1280,7 @@ mod tests { // Initialize the storage. let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1); // Initialize the BFT. - let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?; + let bft = BFT::new(account, storage, ledger, None, &[], StorageMode::new_test(None))?; // Insert a mock DAG in the BFT. *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(2); @@ -1356,7 +1338,7 @@ mod tests { /* Test missing previous certificate. */ // Initialize the BFT. - let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?; + let bft = BFT::new(account, storage, ledger, None, &[], StorageMode::new_test(None))?; // The expected error message. let error_msg = format!( @@ -1417,8 +1399,8 @@ mod tests { // Initialize the BFT. let account = Account::new(rng)?; - let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?; - + let bft = BFT::new(account, storage.clone(), ledger, None, &[], StorageMode::new_test(None))?; + // Insert a mock DAG in the BFT. *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round); // Ensure that the `gc_round` has not been updated yet. @@ -1483,7 +1465,7 @@ mod tests { // Initialize the BFT. let account = Account::new(rng)?; - let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?; + let bft = BFT::new(account.clone(), storage, ledger.clone(), None, &[], StorageMode::new_test(None))?; // Insert a mock DAG in the BFT. *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round); @@ -1501,7 +1483,7 @@ mod tests { // Initialize a new instance of storage. let storage_2 = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds); // Initialize a new instance of BFT. - let bootup_bft = initialize_bft(account.clone(), storage_2, ledger)?; + let bootup_bft = BFT::new(account, storage_2, ledger, None, &[], StorageMode::new_test(None))?; // Sync the BFT DAG at bootup. bootup_bft.sync_bft_dag_at_bootup(certificates.clone()).await; @@ -1655,7 +1637,7 @@ mod tests { // Initialize the BFT without bootup. let account = Account::new(rng)?; - let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?; + let bft = BFT::new(account.clone(), storage, ledger.clone(), None, &[], StorageMode::new_test(None))?; // Insert a mock DAG in the BFT without bootup. *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0); @@ -1680,7 +1662,8 @@ mod tests { let bootup_storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds); // Initialize a new instance of BFT with bootup. - let bootup_bft = initialize_bft(account.clone(), bootup_storage.clone(), ledger.clone())?; + let bootup_bft = + BFT::new(account, bootup_storage.clone(), ledger.clone(), None, &[], StorageMode::new_test(None))?; // Sync the BFT DAG at bootup. bootup_bft.sync_bft_dag_at_bootup(pre_shutdown_certificates.clone()).await; @@ -1858,8 +1841,8 @@ mod tests { } // Initialize the bootup BFT. let account = Account::new(rng)?; - let bootup_bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?; - + let bootup_bft = + BFT::new(account.clone(), storage.clone(), ledger.clone(), None, &[], StorageMode::new_test(None))?; // Insert a mock DAG in the BFT without bootup. *bootup_bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0); // Sync the BFT DAG at bootup. diff --git a/node/bft/src/gateway.rs b/node/bft/src/gateway.rs index 70d7f58c90..a9c220c2be 100644 --- a/node/bft/src/gateway.rs +++ b/node/bft/src/gateway.rs @@ -110,8 +110,6 @@ pub trait Transport: Send + Sync { fn broadcast(&self, event: Event); } -/// The gateway maintains connections to other validators. -/// For connections with clients and provers, the Router logic is used. #[derive(Clone)] pub struct Gateway { /// The account of the node. @@ -516,8 +514,9 @@ impl Gateway { self.update_metrics(); } - /// Inserts the given peer into the connected peers. This is only used in testing. + /// Inserts the given peer into the connected peers. #[cfg(test)] + // For unit tests, we need to make this public so we can inject peers. pub fn insert_connected_peer(&self, peer_ip: SocketAddr, peer_addr: SocketAddr, address: Address) { // Adds a bidirectional map between the listener address and (ambiguous) peer address. self.resolver.insert_peer(peer_ip, peer_addr, address); diff --git a/node/bft/src/helpers/channels.rs b/node/bft/src/helpers/channels.rs index 1370ee33de..aeb210cde4 100644 --- a/node/bft/src/helpers/channels.rs +++ b/node/bft/src/helpers/channels.rs @@ -60,7 +60,6 @@ pub fn init_consensus_channels() -> (ConsensusSender, ConsensusRe (sender, receiver) } -/// "Interface" that enables, for example, sending data from storage to the the BFT logic. #[derive(Clone, Debug)] pub struct BFTSender { pub tx_primary_round: mpsc::Sender<(u64, oneshot::Sender)>, @@ -101,7 +100,6 @@ impl BFTSender { } } -/// Receiving counterpart to `BFTSender` #[derive(Debug)] pub struct BFTReceiver { pub rx_primary_round: mpsc::Receiver<(u64, oneshot::Sender)>, @@ -110,7 +108,7 @@ pub struct BFTReceiver { pub rx_sync_bft: mpsc::Receiver<(BatchCertificate, oneshot::Sender>)>, } -/// Initializes the BFT channels, and returns the sending and receiving ends. +/// Initializes the BFT channels. pub fn init_bft_channels() -> (BFTSender, BFTReceiver) { let (tx_primary_round, rx_primary_round) = mpsc::channel(MAX_CHANNEL_SIZE); let (tx_primary_certificate, rx_primary_certificate) = mpsc::channel(MAX_CHANNEL_SIZE); diff --git a/node/bft/src/helpers/dag.rs b/node/bft/src/helpers/dag.rs index bfde7d3893..aa5dd90594 100644 --- a/node/bft/src/helpers/dag.rs +++ b/node/bft/src/helpers/dag.rs @@ -22,12 +22,9 @@ use snarkvm::{ use indexmap::IndexSet; use std::collections::{BTreeMap, HashMap}; -/// Maintains an directed acyclic graph (DAG) of batches, from which we build a totally-ordered blockchain. -/// The DAG is updated in rounds, where each validator adds at most one new batch. #[derive(Debug)] pub struct DAG { /// The in-memory collection of certificates that comprise the DAG. - /// For each round, there is a mapping from node address to batch. graph: BTreeMap, BatchCertificate>>, /// The in-memory collection of recently committed certificate IDs (up to GC). recent_committed_ids: BTreeMap>>, diff --git a/node/bft/src/lib.rs b/node/bft/src/lib.rs index 90816c5501..b043d4ba5d 100644 --- a/node/bft/src/lib.rs +++ b/node/bft/src/lib.rs @@ -61,10 +61,10 @@ pub const MAX_TIMESTAMP_DELTA_IN_SECS: i64 = 10; // seconds /// The maximum number of workers that can be spawned. pub const MAX_WORKERS: u8 = 1; // worker(s) -/// The interval at which each primary broadcasts a ping to every other node. +/// The frequency at which each primary broadcasts a ping to every other node. /// Note: If this is updated, be sure to update `MAX_BLOCKS_BEHIND` to correspond properly. pub const PRIMARY_PING_IN_MS: u64 = 2 * MAX_BATCH_DELAY_IN_MS; // ms -/// The interval at which each worker broadcasts a ping to every other node. +/// The frequency at which each worker broadcasts a ping to every other node. pub const WORKER_PING_IN_MS: u64 = 4 * MAX_BATCH_DELAY_IN_MS; // ms /// A helper macro to spawn a blocking task. diff --git a/node/bft/src/primary.rs b/node/bft/src/primary.rs index 2cbd607035..fa08f62949 100644 --- a/node/bft/src/primary.rs +++ b/node/bft/src/primary.rs @@ -44,7 +44,7 @@ use crate::{ use snarkos_account::Account; use snarkos_node_bft_events::PrimaryPing; use snarkos_node_bft_ledger_service::LedgerService; -use snarkos_node_sync::{BlockSync, DUMMY_SELF_IP}; +use snarkos_node_sync::DUMMY_SELF_IP; use snarkvm::{ console::{ prelude::*, @@ -84,13 +84,11 @@ use tokio::{sync::OnceCell, task::JoinHandle}; /// A helper type for an optional proposed batch. pub type ProposedBatch = RwLock>>; -/// The primary logic of a node. -/// AleoBFT adopts a primary-worker architecture as described in the Narwhal and Tusk paper (Section 4.2). #[derive(Clone)] pub struct Primary { - /// The sync module enables fetching data from other validators. + /// The sync module. sync: Sync, - /// The gateway allows talking to other nodes in the validator set. + /// The gateway. gateway: Gateway, /// The storage. storage: Storage, @@ -106,7 +104,7 @@ pub struct Primary { latest_proposed_batch_timestamp: Arc>, /// The recently-signed batch proposals. signed_proposals: Arc>>, - /// The handles for all background tasks spawned by this primary. + /// The spawned handles. handles: Arc>>>, /// The lock for propose_batch. propose_lock: Arc>, @@ -123,7 +121,6 @@ impl Primary { account: Account, storage: Storage, ledger: Arc>, - block_sync: Arc>, ip: Option, trusted_validators: &[SocketAddr], storage_mode: StorageMode, @@ -132,7 +129,7 @@ impl Primary { let gateway = Gateway::new(account, storage.clone(), ledger.clone(), ip, trusted_validators, storage_mode.dev())?; // Initialize the sync module. - let sync = Sync::new(gateway.clone(), storage.clone(), ledger.clone(), block_sync); + let sync = Sync::new(gateway.clone(), storage.clone(), ledger.clone()); // Initialize the primary instance. Ok(Self { @@ -1538,10 +1535,10 @@ impl Primary { fn check_proposal_timestamp(&self, previous_round: u64, author: Address, timestamp: i64) -> Result<()> { // Retrieve the timestamp of the previous timestamp to check against. let previous_timestamp = match self.storage.get_certificate_for_round_with_author(previous_round, author) { - // Ensure that the previous certificate was created at least `MIN_BATCH_DELAY_IN_SECS` seconds ago. + // Ensure that the previous certificate was created at least `MIN_BATCH_DELAY_IN_MS` seconds ago. Some(certificate) => certificate.timestamp(), None => match self.gateway.account().address() == author { - // If we are the author, then ensure the previous proposal was created at least `MIN_BATCH_DELAY_IN_SECS` seconds ago. + // If we are the author, then ensure the previous proposal was created at least `MIN_BATCH_DELAY_IN_MS` seconds ago. true => *self.latest_proposed_batch_timestamp.read(), // If we do not see a previous certificate for the author, then proceed optimistically. false => return Ok(()), @@ -1552,7 +1549,7 @@ impl Primary { let elapsed = timestamp .checked_sub(previous_timestamp) .ok_or_else(|| anyhow!("Timestamp cannot be before the previous certificate at round {previous_round}"))?; - // Ensure that the previous certificate was created at least `MIN_BATCH_DELAY_IN_SECS` seconds ago. + // Ensure that the previous certificate was created at least `MIN_BATCH_DELAY_IN_MS` seconds ago. match elapsed < MIN_BATCH_DELAY_IN_SECS as i64 { true => bail!("Timestamp is too soon after the previous certificate at round {previous_round}"), false => Ok(()), @@ -1899,7 +1896,6 @@ mod tests { use super::*; use snarkos_node_bft_ledger_service::MockLedgerService; use snarkos_node_bft_storage_service::BFTMemoryService; - use snarkos_node_sync::BlockSync; use snarkvm::{ ledger::{ committee::{Committee, MIN_VALIDATOR_STAKE}, @@ -1943,9 +1939,7 @@ mod tests { // Initialize the primary. let account = accounts[account_index].1.clone(); - let block_sync = Arc::new(BlockSync::new(ledger.clone())); - let mut primary = - Primary::new(account, storage, ledger, block_sync, None, &[], StorageMode::Test(None)).unwrap(); + let mut primary = Primary::new(account, storage, ledger, None, &[], StorageMode::new_test(None)).unwrap(); // Construct a worker instance. primary.workers = Arc::from([Worker::new( @@ -2131,7 +2125,7 @@ mod tests { (certificate, transmissions) } - // Create a certificate chain up to, but not including, the specified round in the primary storage. + // Create a certificate chain up to round in primary storage. fn store_certificate_chain( primary: &Primary, accounts: &[(SocketAddr, Account)], @@ -2365,7 +2359,7 @@ mod tests { // The author must be known to resolver to pass propose checks. primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address()); // The primary must be considered synced. - primary.sync.try_block_sync().await; + primary.sync.block_sync().try_block_sync(&primary.gateway.clone()).await; // Try to process the batch proposal from the peer, should succeed. assert!( @@ -2438,7 +2432,7 @@ mod tests { // The author must be known to resolver to pass propose checks. primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address()); // The primary must be considered synced. - primary.sync.try_block_sync().await; + primary.sync.block_sync().try_block_sync(&primary.gateway.clone()).await; // Try to process the batch proposal from the peer, should succeed. primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.unwrap(); @@ -2472,7 +2466,7 @@ mod tests { // The author must be known to resolver to pass propose checks. primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address()); // The primary must be considered synced. - primary.sync.try_block_sync().await; + primary.sync.block_sync().try_block_sync(&primary.gateway.clone()).await; // Try to process the batch proposal from the peer, should error. assert!( @@ -2517,7 +2511,7 @@ mod tests { // The author must be known to resolver to pass propose checks. primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address()); // The primary must be considered synced. - primary.sync.try_block_sync().await; + primary.sync.block_sync().try_block_sync(&primary.gateway.clone()).await; // Try to process the batch proposal from the peer, should error. assert!( @@ -2531,7 +2525,6 @@ mod tests { ); } - /// Tests that the minimum batch delay is enforced as expected, i.e., that proposals with timestamps that are too close to the previous proposal are rejected. #[tokio::test] async fn test_batch_propose_from_peer_with_past_timestamp() { let round = 2; @@ -2544,23 +2537,13 @@ mod tests { // Create a valid proposal with an author that isn't the primary. let peer_account = &accounts[1]; let peer_ip = peer_account.0; - - // Use a timestamp that is too early. - // Set it to something that is less than the minimum batch delay - // Note, that the minimum delay is currently 1, so this will be equal to the last timestamp - let last_timestamp = primary - .storage - .get_certificate_for_round_with_author(round - 1, peer_account.1.address()) - .expect("No previous proposal exists") - .timestamp(); - let invalid_timestamp = last_timestamp + (MIN_BATCH_DELAY_IN_SECS as i64) - 1; - + let past_timestamp = now() - 100; // Use a timestamp that is in the past. let proposal = create_test_proposal( &peer_account.1, primary.ledger.current_committee().unwrap(), round, previous_certificates, - invalid_timestamp, + past_timestamp, 1, &mut rng, ); @@ -2573,7 +2556,7 @@ mod tests { // The author must be known to resolver to pass propose checks. primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address()); // The primary must be considered synced. - primary.sync.try_block_sync().await; + primary.sync.block_sync().try_block_sync(&primary.gateway.clone()).await; // Try to process the batch proposal from the peer, should error. assert!( @@ -2581,7 +2564,6 @@ mod tests { ); } - /// Check that proposals rejected that have timestamps older than the previous proposal. #[tokio::test] async fn test_batch_propose_from_peer_over_spend_limit() { let mut rng = TestRng::default(); @@ -2605,9 +2587,7 @@ mod tests { let round = 1; let peer_account = &accounts[2]; let peer_ip = peer_account.0; - let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64; - let proposal = create_test_proposal(&peer_account.1, committee, round, Default::default(), timestamp, 4, &mut rng); @@ -2620,10 +2600,9 @@ mod tests { // The author must be known to resolver to pass propose checks. primary_v4.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address()); primary_v5.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address()); - // The primary must be considered synced. - primary_v4.sync.try_block_sync().await; - primary_v5.sync.try_block_sync().await; + primary_v4.sync.block_sync().try_block_sync(&primary_v4.gateway.clone()).await; + primary_v5.sync.block_sync().try_block_sync(&primary_v5.gateway.clone()).await; // Check the spend limit is enforced from V5 onwards. assert!( @@ -2632,7 +2611,6 @@ mod tests { .await .is_ok() ); - assert!( primary_v5 .process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()) diff --git a/node/bft/src/sync/mod.rs b/node/bft/src/sync/mod.rs index c65acd108e..aa689ea662 100644 --- a/node/bft/src/sync/mod.rs +++ b/node/bft/src/sync/mod.rs @@ -18,13 +18,12 @@ use crate::{ MAX_FETCH_TIMEOUT_IN_MS, PRIMARY_PING_IN_MS, Transport, - events::DataBlocks, helpers::{BFTSender, Pending, Storage, SyncReceiver, fmt_id, max_redundant_requests}, spawn_blocking, }; use snarkos_node_bft_events::{CertificateRequest, CertificateResponse, Event}; use snarkos_node_bft_ledger_service::LedgerService; -use snarkos_node_sync::{BLOCK_REQUEST_BATCH_DELAY, BlockSync, locators::BlockLocators}; +use snarkos_node_sync::{BlockSync, BlockSyncMode, locators::BlockLocators}; use snarkos_node_tcp::P2P; use snarkvm::{ console::{network::Network, types::Field}, @@ -46,37 +45,25 @@ use tokio::{ task::JoinHandle, }; -/// Block synchronization logic for validators. -/// -/// Synchronization works differently for nodes that act as validators in AleoBFT; -/// In the common case, validators generate blocks after receiving an anchor block that has been accepted -/// by a supermajority of the committee instead of fetching entire blocks from other nodes. -/// However, if a validator does not have an up-to-date DAG, it might still fetch entire blocks from other nodes. -/// -/// This struct also manages fetching certificates from other validators during normal operation, -/// and blocks when falling behind. -/// -/// Finally, `Sync` handles synchronization of blocks with the validator's local storage: -/// it loads blocks from the storage on startup and writes new blocks to the storage after discovering them. #[derive(Clone)] pub struct Sync { - /// The gateway enables communication with other validators. + /// The gateway. gateway: Gateway, /// The storage. storage: Storage, /// The ledger service. ledger: Arc>, - /// The block synchronization logic. - block_sync: Arc>, + /// The block sync module. + block_sync: BlockSync, /// The pending certificates queue. pending: Arc, BatchCertificate>>, /// The BFT sender. bft_sender: Arc>>, - /// Handles to the spawned background tasks. + /// The spawned handles. handles: Arc>>>, /// The response lock. response_lock: Arc>, - /// The sync lock. Ensures that only one task syncs the ledger at a time. + /// The sync lock. sync_lock: Arc>, /// The latest block responses. /// @@ -87,12 +74,9 @@ pub struct Sync { impl Sync { /// Initializes a new sync instance. - pub fn new( - gateway: Gateway, - storage: Storage, - ledger: Arc>, - block_sync: Arc>, - ) -> Self { + pub fn new(gateway: Gateway, storage: Storage, ledger: Arc>) -> Self { + // Initialize the block sync module. + let block_sync = BlockSync::new(BlockSyncMode::Gateway, ledger.clone(), gateway.tcp().clone()); // Return the sync instance. Self { gateway, @@ -121,49 +105,13 @@ impl Sync { self.sync_storage_with_ledger_at_bootup().await } - /// Performs one iteration of the block synchronization. - #[inline] - pub async fn try_block_sync(&self) { - // First see if any peers need removal - let peers_to_ban = self.block_sync.remove_timed_out_block_requests(); - for peer_ip in peers_to_ban { - trace!("Banning peer {peer_ip} for timing out on block requests"); - - let tcp = self.gateway.tcp().clone(); - tcp.banned_peers().update_ip_ban(peer_ip.ip()); - - tokio::spawn(async move { - tcp.disconnect(peer_ip).await; - }); - } - - // Prepare the block requests, if any. - // In the process, we update the state of `is_block_synced` for the sync module. - let (block_requests, sync_peers) = self.block_sync.prepare_block_requests(); - trace!("Prepared {} block requests", block_requests.len()); - - // Sends the block requests to the sync peers. - for requests in block_requests.chunks(DataBlocks::::MAXIMUM_NUMBER_OF_BLOCKS as usize) { - if !self.block_sync.send_block_requests(&self.gateway, &sync_peers, requests).await { - // Stop if we fail to process a batch of requests. - break; - } - - // Sleep to avoid triggering spam detection. - tokio::time::sleep(BLOCK_REQUEST_BATCH_DELAY).await; - } - } - /// Starts the sync module. - /// - /// When this function returns sucessfully, the sync module will have spawned background tasks - /// that fetch blocks from other validators. pub async fn run(&self, sync_receiver: SyncReceiver) -> Result<()> { info!("Starting the sync module..."); // Start the block sync loop. let self_ = self.clone(); - self.spawn(async move { + self.handles.lock().push(tokio::spawn(async move { // Sleep briefly to allow an initial primary ping to come in prior to entering the loop. // Ideally, a node does not consider itself synced when it has not received // any block locators from peers. However, in the initial bootup of validators, @@ -172,8 +120,10 @@ impl Sync { loop { // Sleep briefly to avoid triggering spam detection. tokio::time::sleep(Duration::from_millis(PRIMARY_PING_IN_MS)).await; - - self_.try_block_sync().await; + // Perform the sync routine. + let communication = &self_.gateway; + // let communication = &node.router; + self_.block_sync.try_block_sync(communication).await; // Sync the storage with the blocks. if let Err(e) = self_.sync_storage_with_blocks().await { @@ -185,7 +135,7 @@ impl Sync { self_.latest_block_responses.lock().await.clear(); } } - }); + })); // Start the pending queue expiration loop. let self_ = self.clone(); @@ -203,8 +153,6 @@ impl Sync { } }); - /* Set up callbacks for events from the Gateway */ - // Retrieve the sync receiver. let SyncReceiver { mut rx_block_sync_advance_with_sync_blocks, @@ -223,7 +171,22 @@ impl Sync { let self_ = self.clone(); self.spawn(async move { while let Some((peer_ip, blocks, callback)) = rx_block_sync_advance_with_sync_blocks.recv().await { - callback.send(self_.advance_with_sync_blocks(peer_ip, blocks).await).ok(); + // Process the block response. + if let Err(e) = self_.block_sync.process_block_response(peer_ip, blocks) { + // Send the error to the callback. + callback.send(Err(e)).ok(); + continue; + } + + // Sync the storage with the blocks. + if let Err(e) = self_.sync_storage_with_blocks().await { + // Send the error to the callback. + callback.send(Err(e)).ok(); + continue; + } + + // Send the result to the callback. + callback.send(Ok(())).ok(); } }); @@ -231,7 +194,7 @@ impl Sync { let self_ = self.clone(); self.spawn(async move { while let Some(peer_ip) = rx_block_sync_remove_peer.recv().await { - self_.remove_peer(peer_ip); + self_.block_sync.remove_peer(&peer_ip); } }); @@ -246,7 +209,10 @@ impl Sync { while let Some((peer_ip, locators, callback)) = rx_block_sync_update_peer_locators.recv().await { let self_clone = self_.clone(); tokio::spawn(async move { - callback.send(self_clone.update_peer_locators(peer_ip, locators)).ok(); + // Update the peer locators. + let result = self_clone.block_sync.update_peer_locators(peer_ip, locators); + // Send the result to the callback. + callback.send(result).ok(); }); } }); @@ -271,7 +237,7 @@ impl Sync { let self_ = self.clone(); self.spawn(async move { while let Some((peer_ip, certificate_response)) = rx_certificate_response.recv().await { - self_.finish_certificate_request(peer_ip, certificate_response); + self_.finish_certificate_request(peer_ip, certificate_response) } }); @@ -279,38 +245,10 @@ impl Sync { } } -// Callbacks used when receiving messages from the Gateway -impl Sync { - /// We received a block response and can (possibly) advance synchronization. - async fn advance_with_sync_blocks(&self, peer_ip: SocketAddr, blocks: Vec>) -> Result<()> { - // Verify that the response is valid. - self.block_sync.insert_block_responses(peer_ip, blocks)?; - - // Advance block synchronization - self.block_sync.try_advancing_block_synchronization(); - - // Sync the storage with the blocks. - self.sync_storage_with_blocks().await?; - Ok(()) - } - - /// We received new peer locators during a Ping. - fn update_peer_locators(&self, peer_ip: SocketAddr, locators: BlockLocators) -> Result<()> { - self.block_sync.update_peer_locators(peer_ip, locators) - } - - /// A peer disconnected. - fn remove_peer(&self, peer_ip: SocketAddr) { - self.block_sync.remove_peer(&peer_ip); - } -} - // Methods to manage storage. impl Sync { /// Syncs the storage with the ledger at bootup. - /// - /// This is called exactly once when starting the validator. - async fn sync_storage_with_ledger_at_bootup(&self) -> Result<()> { + pub async fn sync_storage_with_ledger_at_bootup(&self) -> Result<()> { // Retrieve the latest block in the ledger. let latest_block = self.ledger.latest_block(); @@ -396,9 +334,7 @@ impl Sync { } /// Syncs the storage with blocks already received from peers. - /// - /// This is called periodically at runtime. - async fn sync_storage_with_blocks(&self) -> Result<()> { + pub async fn sync_storage_with_blocks(&self) -> Result<()> { // Acquire the response lock. let _lock = self.response_lock.lock().await; @@ -466,8 +402,6 @@ impl Sync { } /// Syncs the ledger with the given block without updating the BFT. - /// - /// This is only used at startup when fetching blocks from storage. async fn sync_ledger_with_block_without_bft(&self, block: Block) -> Result<()> { // Acquire the sync lock. let _lock = self.sync_lock.lock().await; @@ -501,7 +435,7 @@ impl Sync { /// and its addition to the ledger is deferred until the check passes. /// Several blocks may be stored in `Sync::latest_block_responses` /// before they can be all checked and added to the ledger. - async fn sync_storage_with_block(&self, block: Block) -> Result<()> { + pub async fn sync_storage_with_block(&self, block: Block) -> Result<()> { // Acquire the sync lock. let _lock = self.sync_lock.lock().await; // Acquire the latest block responses lock. @@ -709,10 +643,22 @@ impl Sync { self.block_sync.num_blocks_behind() } + /// Returns `true` if the node is in gateway mode. + pub const fn is_gateway_mode(&self) -> bool { + self.block_sync.mode().is_gateway() + } + /// Returns the current block locators of the node. pub fn get_block_locators(&self) -> Result> { self.block_sync.get_block_locators() } + + /// Returns the block sync module. + #[cfg(test)] + #[doc(hidden)] + pub(super) fn block_sync(&self) -> &BlockSync { + &self.block_sync + } } // Methods to assist with fetching batch certificates from peers. @@ -804,15 +750,12 @@ impl Sync { self.handles.lock().iter().for_each(|handle| handle.abort()); } } - #[cfg(test)] mod tests { use super::*; use crate::{helpers::now, ledger_service::CoreLedgerService, storage_service::BFTMemoryService}; - use snarkos_account::Account; - use snarkos_node_sync::BlockSync; use snarkvm::{ console::{ account::{Address, PrivateKey}, @@ -1041,10 +984,8 @@ mod tests { )); // Initialize the gateway. let gateway = Gateway::new(account.clone(), storage.clone(), syncing_ledger.clone(), None, &[], None)?; - // Initialize the block synchronization logic. - let block_sync = Arc::new(BlockSync::new(syncing_ledger.clone())); // Initialize the sync module. - let sync = Sync::new(gateway.clone(), storage.clone(), syncing_ledger.clone(), block_sync); + let sync = Sync::new(gateway.clone(), storage.clone(), syncing_ledger.clone()); // Try to sync block 1. sync.sync_storage_with_block(block_1).await?; assert_eq!(syncing_ledger.latest_block_height(), 1); diff --git a/node/bft/tests/common/primary.rs b/node/bft/tests/common/primary.rs index 17f362b80f..b1d510883a 100644 --- a/node/bft/tests/common/primary.rs +++ b/node/bft/tests/common/primary.rs @@ -27,7 +27,6 @@ use snarkos_node_bft::{ helpers::{PrimarySender, Storage, init_primary_channels}, }; use snarkos_node_bft_storage_service::BFTMemoryService; -use snarkos_node_sync::BlockSync; use snarkvm::{ console::{ account::{Address, PrivateKey}, @@ -165,14 +164,12 @@ impl TestNetwork { Arc::new(BFTMemoryService::new()), BatchHeader::::MAX_GC_ROUNDS as u64, ); - // Initialize the block synchronization logic. - let block_sync = Arc::new(BlockSync::new(ledger.clone())); + let (primary, bft) = if config.bft { let bft = BFT::::new( account, storage, ledger, - block_sync, Some(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), MEMORY_POOL_PORT + id as u16)), &[], StorageMode::new_test(None), @@ -184,7 +181,6 @@ impl TestNetwork { account, storage, ledger, - block_sync, Some(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), MEMORY_POOL_PORT + id as u16)), &[], StorageMode::new_test(None), diff --git a/node/consensus/README.md b/node/consensus/README.md index 800d1ea643..0ee51ea5c9 100644 --- a/node/consensus/README.md +++ b/node/consensus/README.md @@ -5,6 +5,3 @@ [![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](./LICENSE.md) The `snarkos-node-consensus` crate provides the consensus layer for the snarkOS node. - -It builds on top of the `snarkos-node-bft` crate, which provides an abstract implementationv of the AleoBFT. -More concretely, this crate provides the communication logic that allows multiple AleoBFT nodes to interact with each other. diff --git a/node/consensus/src/lib.rs b/node/consensus/src/lib.rs index 16edb0edd8..1413e2392f 100644 --- a/node/consensus/src/lib.rs +++ b/node/consensus/src/lib.rs @@ -35,7 +35,6 @@ use snarkos_node_bft::{ }; use snarkos_node_bft_ledger_service::LedgerService; use snarkos_node_bft_storage_service::BFTPersistentStorage; -use snarkos_node_sync::BlockSync; use snarkvm::{ ledger::{ block::Transaction, @@ -118,7 +117,6 @@ impl Consensus { pub fn new( account: Account, ledger: Arc>, - block_sync: Arc>, ip: Option, trusted_validators: &[SocketAddr], storage_mode: StorageMode, @@ -128,7 +126,7 @@ impl Consensus { // Initialize the Narwhal storage. let storage = NarwhalStorage::new(ledger.clone(), transmissions, BatchHeader::::MAX_GC_ROUNDS as u64); // Initialize the BFT. - let bft = BFT::new(account, storage, ledger.clone(), block_sync, ip, trusted_validators, storage_mode)?; + let bft = BFT::new(account, storage, ledger.clone(), ip, trusted_validators, storage_mode)?; // Return the consensus. Ok(Self { ledger, diff --git a/node/src/client/mod.rs b/node/src/client/mod.rs index db2919af23..d53491cc14 100644 --- a/node/src/client/mod.rs +++ b/node/src/client/mod.rs @@ -16,9 +16,8 @@ mod router; use crate::traits::NodeInterface; - use snarkos_account::Account; -use snarkos_node_bft::{events::DataBlocks, ledger_service::CoreLedgerService}; +use snarkos_node_bft::ledger_service::CoreLedgerService; use snarkos_node_rest::Rest; use snarkos_node_router::{ Heartbeat, @@ -28,7 +27,7 @@ use snarkos_node_router::{ Routing, messages::{Message, NodeType, UnconfirmedSolution, UnconfirmedTransaction}, }; -use snarkos_node_sync::{BLOCK_REQUEST_BATCH_DELAY, BlockSync}; +use snarkos_node_sync::{BlockSync, BlockSyncMode}; use snarkos_node_tcp::{ P2P, protocols::{Disconnect, Handshake, OnConnect, Reading, Writing}, @@ -102,7 +101,7 @@ pub struct Client> { router: Router, /// The REST server of the node. rest: Option>, - /// The block synchronization logic. + /// The sync module. sync: Arc>, /// The genesis block. genesis: Block, @@ -177,7 +176,7 @@ impl> Client { .await?; // Initialize the sync module. - let sync = BlockSync::new(ledger_service.clone()); + let sync = BlockSync::new(BlockSyncMode::Router, ledger_service.clone(), router.tcp().clone()); // Initialize the node. let mut node = Self { @@ -234,64 +233,23 @@ impl> Client { /// Initializes the sync pool. fn initialize_sync(&self) { // Start the sync loop. - let _self = self.clone(); + let node = self.clone(); self.handles.lock().push(tokio::spawn(async move { loop { // If the Ctrl-C handler registered the signal, stop the node. - if _self.shutdown.load(std::sync::atomic::Ordering::Acquire) { + if node.shutdown.load(std::sync::atomic::Ordering::Acquire) { info!("Shutting down block production"); break; } // Sleep briefly to avoid triggering spam detection. tokio::time::sleep(std::time::Duration::from_secs(5)).await; - // Perform the sync routine. - _self.try_block_sync().await; + node.sync.try_block_sync(&node).await; } })); } - /// Client-side version of `snarkvm_node_bft::Sync::try_block_sync()`. - async fn try_block_sync(&self) { - // First see if any peers need removal. - let peers_to_ban = self.sync.remove_timed_out_block_requests(); - for peer_ip in peers_to_ban { - trace!("Banning peer {peer_ip} for timing out on block requests"); - - let tcp = self.router.tcp().clone(); - tcp.banned_peers().update_ip_ban(peer_ip.ip()); - - tokio::spawn(async move { - tcp.disconnect(peer_ip).await; - }); - } - - // Prepare the block requests, if any. - // In the process, we update the state of `is_block_synced` for the sync module. - let (block_requests, sync_peers) = self.sync.prepare_block_requests(); - trace!("Prepared {} block requests", block_requests.len()); - - // If there are no block requests, but there are pending block responses in the sync pool, - // then try to advance the ledger using these pending block responses. - if block_requests.is_empty() && self.sync.has_pending_responses() { - // Try to advance the ledger with the sync pool. - trace!("No block requests to send, but there are still pending block responses."); - self.sync.try_advancing_block_synchronization(); - } else { - // Issues the block requests in batches. - for requests in block_requests.chunks(DataBlocks::::MAXIMUM_NUMBER_OF_BLOCKS as usize) { - if !self.sync.send_block_requests(self, &sync_peers, requests).await { - // Stop if we fail to process a batch of requests. - break; - } - - // Sleep to avoid triggering spam detection. - tokio::time::sleep(BLOCK_REQUEST_BATCH_DELAY).await; - } - } - } - /// Initializes solution verification. fn initialize_solution_verification(&self) { // Start the solution verification loop. diff --git a/node/src/client/router.rs b/node/src/client/router.rs index b651b19ddc..e3550dab9f 100644 --- a/node/src/client/router.rs +++ b/node/src/client/router.rs @@ -230,11 +230,9 @@ impl> Inbound for Client { /// Handles a `BlockResponse` message. fn block_response(&self, peer_ip: SocketAddr, blocks: Vec>) -> bool { - match self.sync.insert_block_responses(peer_ip, blocks) { - Ok(()) => { - self.sync.try_advancing_block_synchronization(); - true - } + // Tries to advance with blocks from the sync module. + match self.sync.advance_with_sync_blocks(peer_ip, blocks) { + Ok(()) => true, Err(error) => { warn!("{error}"); false @@ -244,12 +242,15 @@ impl> Inbound for Client { /// Processes the block locators and sends back a `Pong` message. fn ping(&self, peer_ip: SocketAddr, message: Ping) -> bool { - // If block locators were provided, then update the peer in the sync pool. - if let Some(block_locators) = message.block_locators { - // Check the block locators are valid, and update the peer in the sync pool. - if let Err(error) = self.sync.update_peer_locators(peer_ip, block_locators) { - warn!("Peer '{peer_ip}' sent invalid block locators: {error}"); - return false; + // Check if the sync module is in router mode. + if self.sync.mode().is_router() { + // If block locators were provided, then update the peer in the sync pool. + if let Some(block_locators) = message.block_locators { + // Check the block locators are valid, and update the peer in the sync pool. + if let Err(error) = self.sync.update_peer_locators(peer_ip, block_locators) { + warn!("Peer '{peer_ip}' sent invalid block locators: {error}"); + return false; + } } } diff --git a/node/src/prover/mod.rs b/node/src/prover/mod.rs index a08eed216b..e734f1b3fd 100644 --- a/node/src/prover/mod.rs +++ b/node/src/prover/mod.rs @@ -26,7 +26,7 @@ use snarkos_node_router::{ Routing, messages::{Message, NodeType, UnconfirmedSolution}, }; -use snarkos_node_sync::BlockSync; +use snarkos_node_sync::{BlockSync, BlockSyncMode}; use snarkos_node_tcp::{ P2P, protocols::{Disconnect, Handshake, OnConnect, Reading, Writing}, @@ -123,7 +123,7 @@ impl> Prover { .await?; // Initialize the sync module. - let sync = BlockSync::new(ledger_service.clone()); + let sync = BlockSync::new(BlockSyncMode::Router, ledger_service.clone(), router.tcp().clone()); // Compute the maximum number of puzzle instances. let max_puzzle_instances = num_cpus::get().saturating_sub(2).clamp(1, 6); diff --git a/node/src/prover/router.rs b/node/src/prover/router.rs index a38dbe8013..211425afd8 100644 --- a/node/src/prover/router.rs +++ b/node/src/prover/router.rs @@ -172,12 +172,15 @@ impl> Inbound for Prover { /// Processes the block locators and sends back a `Pong` message. fn ping(&self, peer_ip: SocketAddr, message: Ping) -> bool { - // If block locators were provided, then update the peer in the sync pool. - if let Some(block_locators) = message.block_locators { - // Check the block locators are valid, and update the peer in the sync pool. - if let Err(error) = self.sync.update_peer_locators(peer_ip, block_locators) { - warn!("Peer '{peer_ip}' sent invalid block locators: {error}"); - return false; + // Check if the sync module is in router mode. + if self.sync.mode().is_router() { + // If block locators were provided, then update the peer in the sync pool. + if let Some(block_locators) = message.block_locators { + // Check the block locators are valid, and update the peer in the sync pool. + if let Err(error) = self.sync.update_peer_locators(peer_ip, block_locators) { + warn!("Peer '{peer_ip}' sent invalid block locators: {error}"); + return false; + } } } diff --git a/node/src/validator/mod.rs b/node/src/validator/mod.rs index 6371d994e7..ba0d36d269 100644 --- a/node/src/validator/mod.rs +++ b/node/src/validator/mod.rs @@ -28,7 +28,7 @@ use snarkos_node_router::{ Routing, messages::{NodeType, PuzzleResponse, UnconfirmedSolution, UnconfirmedTransaction}, }; -use snarkos_node_sync::BlockSync; +use snarkos_node_sync::{BlockSync, BlockSyncMode}; use snarkos_node_tcp::{ P2P, protocols::{Disconnect, Handshake, OnConnect, Reading, Writing}, @@ -66,8 +66,8 @@ pub struct Validator> { router: Router, /// The REST server of the node. rest: Option>, - /// The block synchronization logic (used in the Router impl). - sync: Arc>, + /// The sync module. + sync: BlockSync, /// The spawned handles. handles: Arc>>>, /// The shutdown signal. @@ -111,6 +111,13 @@ impl> Validator { // Initialize the ledger service. let ledger_service = Arc::new(CoreLedgerService::new(ledger.clone(), shutdown.clone())); + // Initialize the consensus. + let mut consensus = + Consensus::new(account.clone(), ledger_service.clone(), bft_ip, trusted_validators, storage_mode.clone())?; + // Initialize the primary channels. + let (primary_sender, primary_receiver) = init_primary_channels::(); + // Start the consensus. + consensus.run(primary_sender, primary_receiver).await?; // Determine if the validator should rotate external peers. let rotate_external_peers = false; @@ -128,22 +135,9 @@ impl> Validator { ) .await?; - // Initialize the block synchronization logic. - let sync = Arc::new(BlockSync::new(ledger_service.clone())); + // Initialize the sync module. + let sync = BlockSync::new(BlockSyncMode::Gateway, ledger_service, router.tcp().clone()); - // Initialize the consensus layer. - let mut consensus = Consensus::new( - account.clone(), - ledger_service.clone(), - sync.clone(), - bft_ip, - trusted_validators, - storage_mode.clone(), - )?; - // Initialize the primary channels. - let (primary_sender, primary_receiver) = init_primary_channels::(); - // Start the consensus. - consensus.run(primary_sender, primary_receiver).await?; // Initialize the node. let mut node = Self { ledger: ledger.clone(), diff --git a/node/src/validator/router.rs b/node/src/validator/router.rs index ac7b486f5c..c7d3e33e44 100644 --- a/node/src/validator/router.rs +++ b/node/src/validator/router.rs @@ -199,11 +199,9 @@ impl> Inbound for Validator { /// Handles a `BlockResponse` message. fn block_response(&self, peer_ip: SocketAddr, blocks: Vec>) -> bool { - match self.sync.insert_block_responses(peer_ip, blocks) { - Ok(()) => { - self.sync.try_advancing_block_synchronization(); - true - } + // Tries to advance with blocks from the sync module. + match self.sync.advance_with_sync_blocks(peer_ip, blocks) { + Ok(()) => true, Err(error) => { warn!("{error}"); false @@ -211,10 +209,19 @@ impl> Inbound for Validator { } } - /// Processes a ping message from a client (or prover) and sends back a `Pong` message. - fn ping(&self, peer_ip: SocketAddr, _message: Ping) -> bool { - // In gateway/validator mode, we do not need to process client block locators. - // Instead, locators are fetched from other validators in `Gateway` using `PrimaryPing` messages. + /// Processes the block locators and sends back a `Pong` message. + fn ping(&self, peer_ip: SocketAddr, message: Ping) -> bool { + // Check if the sync module is in router mode. + if self.sync.mode().is_router() { + // If block locators were provided, then update the peer in the sync pool. + if let Some(block_locators) = message.block_locators { + // Check the block locators are valid, and update the peer in the sync pool. + if let Err(error) = self.sync.update_peer_locators(peer_ip, block_locators) { + warn!("Peer '{peer_ip}' sent invalid block locators: {error}"); + return false; + } + } + } // Send a `Pong` message to the peer. Outbound::send(self, peer_ip, Message::Pong(Pong { is_fork: Some(false) })); diff --git a/node/sync/communication-service/src/lib.rs b/node/sync/communication-service/src/lib.rs index 03bbcded71..a0d3e745db 100644 --- a/node/sync/communication-service/src/lib.rs +++ b/node/sync/communication-service/src/lib.rs @@ -21,15 +21,12 @@ extern crate async_trait; use std::{io, net::SocketAddr}; use tokio::sync::oneshot; -/// Abstract communcation service. -/// -/// Implemented by `Gateway` and `Client`. #[async_trait] pub trait CommunicationService: Send + Sync { - /// The message type used by this communication service. + /// The message type. type Message: Clone; - /// Generates the service-specific message for a block request. + /// Prepares a block request to be sent. fn prepare_block_request(start: u32, end: u32) -> Self::Message; /// Sends the given message to specified peer. @@ -37,6 +34,5 @@ pub trait CommunicationService: Send + Sync { /// This function returns as soon as the message is queued to be sent, /// without waiting for the actual delivery; instead, the caller is provided with a [`oneshot::Receiver`] /// which can be used to determine when and whether the message has been delivered. - /// If no peer with the given IP exists, this function returns None. async fn send(&self, peer_ip: SocketAddr, message: Self::Message) -> Option>>; } diff --git a/node/sync/src/block_sync.rs b/node/sync/src/block_sync.rs index 3323aded59..e52605ef26 100644 --- a/node/sync/src/block_sync.rs +++ b/node/sync/src/block_sync.rs @@ -21,6 +21,7 @@ use snarkos_node_bft_ledger_service::LedgerService; use snarkos_node_router::messages::DataBlocks; use snarkos_node_sync_communication_service::CommunicationService; use snarkos_node_sync_locators::{CHECKPOINT_INTERVAL, NUM_RECENT_BLOCKS}; +use snarkos_node_tcp::Tcp; use snarkvm::prelude::{Network, block::Block}; use anyhow::{Result, bail, ensure}; @@ -30,7 +31,7 @@ use itertools::Itertools; use locktick::parking_lot::{Mutex, RwLock}; #[cfg(not(feature = "locktick"))] use parking_lot::{Mutex, RwLock}; -use rand::seq::{IteratorRandom, SliceRandom}; +use rand::{CryptoRng, Rng, prelude::IteratorRandom}; use std::{ collections::{BTreeMap, HashMap, HashSet}, net::{IpAddr, Ipv4Addr, SocketAddr}, @@ -38,21 +39,13 @@ use std::{ Arc, atomic::{AtomicBool, AtomicU32, Ordering}, }, - time::{Duration, Instant}, + time::Instant, }; -// The redudancy factor decreases the possiblity of a malicious peers sending us an invalid block locator -// by requiring multiple peers to advertise the same (prefix of) block locators. -// However, we do not use this in production yet. #[cfg(not(test))] pub const REDUNDANCY_FACTOR: usize = 1; #[cfg(test)] pub const REDUNDANCY_FACTOR: usize = 3; - -/// The time nodes wait between issuing batches of block requests to avoid triggering spam detection. -// TODO (kaimast): Document why 10ms (not 1 or 100) -pub const BLOCK_REQUEST_BATCH_DELAY: Duration = Duration::from_millis(10); - const EXTRA_REDUNDANCY_FACTOR: usize = REDUNDANCY_FACTOR * 3; const NUM_SYNC_CANDIDATE_PEERS: usize = REDUNDANCY_FACTOR * 5; @@ -66,18 +59,25 @@ pub const MAX_BLOCKS_BEHIND: u32 = 1; // blocks /// Note: This here does not need to be a real IP address, but it must be unique/distinct from all other connections. pub const DUMMY_SELF_IP: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0); -/// A struct that keeps track of synchronizing blocks with other nodes. -/// -/// It generates requests to send to other peers and processes responses to those requests. -/// The struct also keeps track of block locators, which indicate which peers it can fetch blocks from. -/// -/// # Notes -/// - The actual network communication happens in `snarkos_node::Client` (for clients and provers) and in `snarkos_node_bft::Sync` (for validators). -/// -/// - Validators only sync from other nodes using this struct if they fall behind, e.g., -/// because they experience a network partition. -/// In the common case, validators will generate blocks from the DAG after an anchor certificate has been approved -/// by a supermajority of the committee. +#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] +pub enum BlockSyncMode { + Router, + Gateway, +} + +impl BlockSyncMode { + /// Returns `true` if the node is in router mode. + pub const fn is_router(&self) -> bool { + matches!(self, Self::Router) + } + + /// Returns `true` if the node is in gateway mode. + pub const fn is_gateway(&self) -> bool { + matches!(self, Self::Gateway) + } +} + +/// A struct that keeps track of the current block sync state. /// /// # State /// - When a request is inserted, the `requests` map and `request_timestamps` map insert an entry for the request height. @@ -86,38 +86,44 @@ pub const DUMMY_SELF_IP: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(1 /// the `request_timestamps` map remains unchanged. /// - When a response is removed/completed, the `requests` map and `request_timestamps` map also remove the entry for the request height. /// - When a request is timed out, the `requests`, `request_timestamps`, and `responses` map remove the entry for the request height. -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct BlockSync { + /// The block sync mode. + mode: BlockSyncMode, /// The ledger. ledger: Arc>, + /// The TCP stack. + tcp: Tcp, /// The map of peer IP to their block locators. /// The block locators are consistent with the ledger and every other peer's block locators. - locators: RwLock>>, + locators: Arc>>>, /// The map of peer-to-peer to their common ancestor. /// This map is used to determine which peers to request blocks from. - common_ancestors: RwLock>, + common_ancestors: Arc>>, /// The map of block height to the expected block hash and peer IPs. /// Each entry is removed when its corresponding entry in the responses map is removed. - requests: RwLock>>, + requests: Arc>>>, /// The map of block height to the received blocks. /// Removing an entry from this map must remove the corresponding entry from the requests map. - responses: RwLock>>, + responses: Arc>>>, /// The map of block height to the timestamp of the last time the block was requested. /// This map is used to determine which requests to remove if they have been pending for too long. - request_timestamps: RwLock>, + request_timestamps: Arc>>, /// The boolean indicator of whether the node is synced up to the latest block (within the given tolerance). - is_block_synced: AtomicBool, + is_block_synced: Arc, /// The number of blocks the peer is behind the greatest peer height. - num_blocks_behind: AtomicU32, + num_blocks_behind: Arc, /// The lock to guarantee advance_with_sync_blocks() is called only once at a time. - advance_with_sync_blocks_lock: Mutex<()>, + advance_with_sync_blocks_lock: Arc>, } impl BlockSync { /// Initializes a new block sync module. - pub fn new(ledger: Arc>) -> Self { + pub fn new(mode: BlockSyncMode, ledger: Arc>, tcp: Tcp) -> Self { Self { + mode, ledger, + tcp, locators: Default::default(), common_ancestors: Default::default(), requests: Default::default(), @@ -129,6 +135,12 @@ impl BlockSync { } } + /// Returns the block sync mode. + #[inline] + pub const fn mode(&self) -> BlockSyncMode { + self.mode + } + /// Returns `true` if the node is synced up to the latest block (within the given tolerance). #[inline] pub fn is_block_synced(&self) -> bool { @@ -142,14 +154,35 @@ impl BlockSync { } } -// Helper functions needed for testing -#[cfg(test)] +#[allow(dead_code)] impl BlockSync { /// Returns the latest block height of the given peer IP. fn get_peer_height(&self, peer_ip: &SocketAddr) -> Option { self.locators.read().get(peer_ip).map(|locators| locators.latest_locator_height()) } + // /// Returns a map of peer height to peer IPs. + // /// e.g. `{{ 127 => \[peer1, peer2\], 128 => \[peer3\], 135 => \[peer4, peer5\] }}` + // fn get_peer_heights(&self) -> BTreeMap> { + // self.locators.read().iter().map(|(peer_ip, locators)| (locators.latest_locator_height(), *peer_ip)).fold( + // Default::default(), + // |mut map, (height, peer_ip)| { + // map.entry(height).or_default().push(peer_ip); + // map + // }, + // ) + // } + + // /// Returns the list of peers with their heights, sorted by height (descending). + // fn get_peers_by_height(&self) -> Vec<(SocketAddr, u32)> { + // self.locators + // .read() + // .iter() + // .map(|(peer_ip, locators)| (*peer_ip, locators.latest_locator_height())) + // .sorted_by(|(_, a), (_, b)| b.cmp(a)) + // .collect() + // } + /// Returns the common ancestor for the given peer pair, if it exists. fn get_common_ancestor(&self, peer_a: SocketAddr, peer_b: SocketAddr) -> Option { self.common_ancestors.read().get(&PeerPair(peer_a, peer_b)).copied() @@ -192,69 +225,96 @@ impl BlockSync { BlockLocators::new(recents, checkpoints) } - /// Returns true if there are pending responses to block requests that need to be processed. - pub fn has_pending_responses(&self) -> bool { - !self.responses.read().is_empty() - } - - /// Send a batch of block requests. - pub async fn send_block_requests( - &self, - communication: &C, - sync_peers: &IndexMap>, - requests: &[(u32, PrepareSyncRequest)], - ) -> bool { - let (start_height, max_num_sync_ips) = match requests.first() { - Some((height, (_, _, max_num_sync_ips))) => (*height, *max_num_sync_ips), - None => { - warn!("Block sync failed - no block requests"); - return false; - } - }; + /// Performs one iteration of the block sync. + #[inline] + pub async fn try_block_sync(&self, communication: &C) { + // Prepare the block requests, if any. + // In the process, we update the state of `is_block_synced` for the sync module. + let (block_requests, sync_peers) = self.prepare_block_requests(); + trace!("Prepared {} block requests", block_requests.len()); + + // If there are no block requests, but there are pending block responses in the sync pool, + // then try to advance the ledger using these pending block responses. + // Note: This condition is guarded by `mode.is_router()` because validators sync blocks + // using another code path that updates both `storage` and `ledger` when advancing blocks. + if block_requests.is_empty() && !self.responses.read().is_empty() && self.mode.is_router() { + // Retrieve the latest block height. + let current_height = self.ledger.latest_block_height(); + + // Acquire the lock to ensure try_advancing_with_block_responses is called only once at a time. + // If the lock is already acquired, return early. + let Some(_lock) = self.advance_with_sync_blocks_lock.try_lock() else { + trace!( + "Skipping a call to try_block_sync() as a block advance is already in progress (at block {current_height})" + ); + return; + }; - // Use a randomly sampled subset of the sync IPs. - let sync_ips: IndexSet<_> = - sync_peers.keys().copied().choose_multiple(&mut rand::thread_rng(), max_num_sync_ips).into_iter().collect(); + // Try to advance the ledger with the sync pool. + trace!("No block requests to send - try advancing with block responses (at block {current_height})"); + self.try_advancing_with_block_responses(current_height); + // Return early. + return; + } - // Calculate the end height. - let end_height = start_height.saturating_add(requests.len() as u32); + // Process the block requests. + 'outer: for requests in block_requests.chunks(DataBlocks::::MAXIMUM_NUMBER_OF_BLOCKS as usize) { + // Retrieve the starting height and the sync IPs. + let (start_height, max_num_sync_ips) = match requests.first() { + Some((height, (_, _, max_num_sync_ips))) => (*height, *max_num_sync_ips), + None => { + warn!("Block sync failed - no block requests"); + break 'outer; + } + }; - // Insert the chunk of block requests. - for (height, (hash, previous_hash, _)) in requests.iter() { - // Insert the block request into the sync pool using the sync IPs from the last block request in the chunk. - if let Err(error) = self.insert_block_request(*height, (*hash, *previous_hash, sync_ips.clone())) { - warn!("Block sync failed - {error}"); - return false; + // Use a randomly sampled subset of the sync IPs. + let sync_ips: IndexSet<_> = sync_peers + .keys() + .copied() + .choose_multiple(&mut rand::thread_rng(), max_num_sync_ips) + .into_iter() + .collect(); + + // Calculate the end height. + let end_height = start_height.saturating_add(requests.len() as u32); + + // Insert the chunk of block requests. + for (height, (hash, previous_hash, _)) in requests.iter() { + // Insert the block request into the sync pool using the sync IPs from the last block request in the chunk. + if let Err(error) = self.insert_block_request(*height, (*hash, *previous_hash, sync_ips.clone())) { + warn!("Block sync failed - {error}"); + // Break out of the loop. + break 'outer; + } } - } - /* Send the block request to the peers */ - - // Construct the message. - let message = C::prepare_block_request(start_height, end_height); - // Send the message to the peers. - for sync_ip in sync_ips { - let sender = communication.send(sync_ip, message.clone()).await; - // If the send fails for any peer, remove the block request from the sync pool. - if sender.is_none() { - warn!("Failed to send block request to peer '{sync_ip}'"); - // Remove the entire block request from the sync pool. - for height in start_height..end_height { - self.remove_block_request(height); + /* Send the block request to the peers */ + + // Construct the message. + let message = C::prepare_block_request(start_height, end_height); + // Send the message to the peers. + for sync_ip in sync_ips { + let sender = communication.send(sync_ip, message.clone()).await; + // If the send fails for any peer, remove the block request from the sync pool. + if sender.is_none() { + warn!("Failed to send block request to peer '{sync_ip}'"); + // Remove the entire block request from the sync pool. + for height in start_height..end_height { + self.remove_block_request(height); + } + // Break out of the loop. + break 'outer; } - // Break out of the loop. - return false; } + // Sleep for 10 milliseconds to avoid triggering spam detection. + tokio::time::sleep(std::time::Duration::from_millis(10)).await; } - true } - /// Inserts a new block response from the given peer IP. - /// - /// Returns an error if the block was malformed, or we already received a different block for this height. - /// Note, that this only queues the response. After this, you most likely want to call `Self::try_advancing_block_synchronization`. + /// Processes the block response from the given peer IP. #[inline] - pub fn insert_block_responses(&self, peer_ip: SocketAddr, blocks: Vec>) -> Result<()> { + pub fn process_block_response(&self, peer_ip: SocketAddr, blocks: Vec>) -> Result<()> { // Insert the candidate blocks into the sync pool. for block in blocks { if let Err(error) = self.insert_block_response(peer_ip, block) { @@ -278,23 +338,35 @@ impl BlockSync { requests.get(&next_height).map(|(_, _, peer_ips)| peer_ips.is_empty()).unwrap_or(true); // If the request is not complete, return early. - if is_request_complete { self.responses.read().get(&next_height).cloned() } else { None } + if !is_request_complete { + return None; + } + + self.responses.read().get(&next_height).cloned() } /// Attempts to advance with blocks from the sync pool. #[inline] - pub fn try_advancing_block_synchronization(&self) { + pub fn advance_with_sync_blocks(&self, peer_ip: SocketAddr, blocks: Vec>) -> Result<()> { + // Process the block response from the given peer IP. + self.process_block_response(peer_ip, blocks)?; + // Acquire the lock to ensure this function is called only once at a time. // If the lock is already acquired, return early. let Some(_lock) = self.advance_with_sync_blocks_lock.try_lock() else { - trace!("Skipping attempt to advance block synchronziation as it is already in progress"); - return; + trace!("Skipping a call to advance_with_sync_blocks() as it is already in progress"); + return Ok(()); }; - // Start with the current height. - let mut current_height = self.ledger.latest_block_height(); - trace!("Try advancing with block responses (at block {current_height})"); + // Retrieve the latest block height. + let current_height = self.ledger.latest_block_height(); + // Try to advance the ledger with the sync pool. + self.try_advancing_with_block_responses(current_height); + Ok(()) + } + /// Handles the block responses from the sync pool. + fn try_advancing_with_block_responses(&self, mut current_height: u32) { while let Some(block) = self.peek_next_block(current_height + 1) { // Ensure the block height matches. if block.height() != current_height + 1 { @@ -416,14 +488,12 @@ impl BlockSync { } } -// Helper type for prepare_block_requests -type BlockRequestBatch = (Vec<(u32, PrepareSyncRequest)>, IndexMap>); - impl BlockSync { /// Returns a list of block requests and the sync peers, if the node needs to sync. - /// - /// You usually want to call `remove_timed_out_block_requests` before invoking this function. - pub fn prepare_block_requests(&self) -> BlockRequestBatch { + #[allow(clippy::type_complexity)] + fn prepare_block_requests(&self) -> (Vec<(u32, PrepareSyncRequest)>, IndexMap>) { + // Remove timed out block requests. + self.remove_timed_out_block_requests(); // Prepare the block requests. if let Some((sync_peers, min_common_ancestor)) = self.find_sync_peers_inner() { // Retrieve the highest block height. @@ -593,9 +663,25 @@ impl BlockSync { self.responses.write().remove(&height); } + /// Removes the block request for the given peer IP, if it exists. + #[allow(dead_code)] + fn remove_block_request_to_peer(&self, peer_ip: &SocketAddr, height: u32) { + let mut can_revoke = self.peek_next_block(height).is_none(); + + // Remove the peer IP from the request entry. If the request entry is now empty, + // and the response entry for this height is also empty, then remove the request entry altogether. + if let Some((_, _, sync_ips)) = self.requests.write().get_mut(&height) { + sync_ips.swap_remove(peer_ip); + can_revoke &= sync_ips.is_empty(); + } + + if can_revoke { + self.requests.write().remove(&height); + self.request_timestamps.write().remove(&height); + } + } + /// Removes all block requests for the given peer IP. - /// - /// This is used when disconnecting from a peer or when a peer sends invalid block responses. fn remove_block_requests_to_peer(&self, peer_ip: &SocketAddr) { trace!("Block sync is removing all block requests to peer {peer_ip}..."); // Acquire the write lock on the requests map. @@ -617,10 +703,9 @@ impl BlockSync { }); } - /// Removes block requests that have timed out, i.e, requests we sent that did not receive a response in time. - /// - /// This removes the corresponding block responses and returns the set of peers/addresses that timed out. - pub fn remove_timed_out_block_requests(&self) -> HashSet { + /// Removes block requests that have timed out. This also removes the corresponding block responses, + /// and adds the timed out sync IPs to a map for tracking. Returns the number of timed out block requests. + fn remove_timed_out_block_requests(&self) -> usize { // Acquire the write lock on the requests map. let mut requests = self.requests.write(); // Acquire the write lock on the responses map. @@ -677,7 +762,18 @@ impl BlockSync { !is_timeout && !is_obsolete }); - peers_to_ban + // After the retain loop, handle the banning of peers + for peer_ip in peers_to_ban { + trace!("Banning peer {peer_ip} for timing out on block requests"); + self.tcp.banned_peers().update_ip_ban(peer_ip.ip()); + + let tcp = self.tcp.clone(); + tokio::spawn(async move { + tcp.disconnect(peer_ip).await; + }); + } + + num_timed_out_block_requests } /// Returns the sync peers and their minimum common ancestor, if the node needs to sync. @@ -686,7 +782,6 @@ impl BlockSync { let latest_ledger_height = self.ledger.latest_block_height(); // Pick a set of peers above the latest ledger height, and include their locators. - // This will sort the peers by locator height in descending order. let candidate_locators: IndexMap<_, _> = self .locators .read() @@ -708,46 +803,55 @@ impl BlockSync { // a common ancestor above the block request range. Set the end height to their common ancestor. // Determine the threshold number of peers to sync from. - let threshold_to_request = candidate_locators.len().min(REDUNDANCY_FACTOR); + let threshold_to_request = core::cmp::min(candidate_locators.len(), REDUNDANCY_FACTOR); + + let mut min_common_ancestor = 0; + let mut sync_peers = IndexMap::new(); // Breaks the loop when the first threshold number of peers are found, biasing for the peer with the highest height // and a cohort of peers who share a common ancestor above this node's latest ledger height. - for (idx, (peer_ip, peer_locators)) in candidate_locators.iter().enumerate() { - // The height of the common ancestor shared by all selected peers. - let mut min_common_ancestor = peer_locators.latest_locator_height(); + for (i, (peer_ip, peer_locators)) in candidate_locators.iter().enumerate() { + // As the previous iteration did not `break`, reset the sync peers. + sync_peers.clear(); - // The peers we will synchronize from. - // As the previous iteration did not succeed, restart with the next candidate peers. - let mut sync_peers = vec![(*peer_ip, peer_locators.clone())]; + // Set the minimum common ancestor. + min_common_ancestor = peer_locators.latest_locator_height(); + // Add the peer to the sync peers. + sync_peers.insert(*peer_ip, peer_locators.clone()); - // Try adding other peers consistent with this one to the sync peer set. - for (other_ip, other_locators) in candidate_locators.iter().skip(idx + 1) { + for (other_ip, other_locators) in candidate_locators.iter().skip(i + 1) { // Check if these two peers have a common ancestor above the latest ledger height. if let Some(common_ancestor) = self.common_ancestors.read().get(&PeerPair(*peer_ip, *other_ip)) { - // If so, then check that their block locators are consistent. - if *common_ancestor > latest_ledger_height && peer_locators.is_consistent_with(other_locators) { - // If their common ancestor is less than the minimum common ancestor, then update it. - min_common_ancestor = min_common_ancestor.min(*common_ancestor); - - // Add the other peer to the list of sync peers. - sync_peers.push((*other_ip, other_locators.clone())); + if *common_ancestor > latest_ledger_height { + // If so, then check that their block locators are consistent. + if peer_locators.is_consistent_with(other_locators) { + // If their common ancestor is less than the minimum common ancestor, then update it. + if *common_ancestor < min_common_ancestor { + min_common_ancestor = *common_ancestor; + } + // Add the other peer to the list of sync peers. + sync_peers.insert(*other_ip, other_locators.clone()); + } } } } - // If we have enough sync peers above the latest ledger height, finish and return them. + // If we have enough sync peers above the latest ledger height, then break the loop. if min_common_ancestor > latest_ledger_height && sync_peers.len() >= threshold_to_request { - // Shuffle the sync peers prior to returning. This ensures the rest of the stack - // does not rely on the order of the sync peers, and that the sync peers are not biased. - sync_peers.shuffle(&mut rand::thread_rng()); - - // Collect into an IndexMap and return. - return Some((sync_peers.into_iter().collect(), min_common_ancestor)); + break; } } - // If there is not enough peers with a minimum common ancestor above the latest ledger height, return None. - None + // If there is not enough peers with a minimum common ancestor above the latest ledger height, then return early. + if min_common_ancestor <= latest_ledger_height || sync_peers.len() < threshold_to_request { + return None; + } + + // Shuffle the sync peers prior to returning. This ensures the rest of the stack + // does not rely on the order of the sync peers, and that the sync peers are not biased. + let sync_peers = shuffle_indexmap(sync_peers, &mut rand::thread_rng()); + + Some((sync_peers, min_common_ancestor)) } /// Given the sync peers and their minimum common ancestor, return a list of block requests. @@ -886,6 +990,18 @@ fn construct_request( (hash, previous_hash, num_sync_ips, is_honest) } +/// Shuffles a given `IndexMap` using the given random number generator. +fn shuffle_indexmap(mut map: IndexMap, rng: &mut R) -> IndexMap +where + K: core::hash::Hash + Eq + Clone, + V: Clone, +{ + use rand::seq::SliceRandom; + let mut pairs: Vec<_> = map.drain(..).collect(); // Drain elements to a vector + pairs.shuffle(rng); // Shuffle the vector of tuples + pairs.into_iter().collect() // Collect back into an IndexMap +} + #[cfg(test)] mod tests { use super::*; @@ -894,15 +1010,12 @@ mod tests { NUM_RECENT_BLOCKS, test_helpers::{sample_block_locators, sample_block_locators_with_fork}, }; - use snarkos_node_bft_ledger_service::MockLedgerService; - use snarkvm::{ - ledger::committee::Committee, - prelude::{Field, TestRng}, - }; + use snarkvm::prelude::{Field, TestRng}; use indexmap::{IndexSet, indexset}; - use rand::Rng; + use snarkos_node_tcp::Config; + use snarkvm::ledger::committee::Committee; use std::net::{IpAddr, Ipv4Addr}; type CurrentNetwork = snarkvm::prelude::MainnetV0; @@ -926,24 +1039,34 @@ mod tests { /// Returns the sync pool, with the ledger initialized to the given height. fn sample_sync_at_height(height: u32) -> BlockSync { - BlockSync::::new(Arc::new(sample_ledger_service(height))) + BlockSync::::new(BlockSyncMode::Router, Arc::new(sample_ledger_service(height)), sample_tcp()) } - /// Returns a duplicate (deep copy) of the sync pool with a different ledger height. + /// Returns a duplicate sync pool with a different ledger height. fn duplicate_sync_at_new_height(sync: &BlockSync, height: u32) -> BlockSync { BlockSync:: { + mode: sync.mode, ledger: Arc::new(sample_ledger_service(height)), - locators: RwLock::new(sync.locators.read().clone()), - common_ancestors: RwLock::new(sync.common_ancestors.read().clone()), - requests: RwLock::new(sync.requests.read().clone()), - responses: RwLock::new(sync.responses.read().clone()), - request_timestamps: RwLock::new(sync.request_timestamps.read().clone()), - is_block_synced: AtomicBool::new(sync.is_block_synced.load(Ordering::SeqCst)), - num_blocks_behind: AtomicU32::new(sync.num_blocks_behind.load(Ordering::SeqCst)), + tcp: sync.tcp.clone(), + locators: sync.locators.clone(), + common_ancestors: sync.common_ancestors.clone(), + requests: sync.requests.clone(), + responses: sync.responses.clone(), + request_timestamps: sync.request_timestamps.clone(), + is_block_synced: sync.is_block_synced.clone(), + num_blocks_behind: sync.num_blocks_behind.clone(), advance_with_sync_blocks_lock: Default::default(), } } + fn sample_tcp() -> Tcp { + Tcp::new(Config { + listener_ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)), + max_connections: 200, + ..Default::default() + }) + } + /// Checks that the sync pool (starting at genesis) returns the correct requests. fn check_prepare_block_requests( sync: BlockSync, @@ -997,23 +1120,30 @@ mod tests { } } - /// Tests that height and hash values are set correctly using many different maximum block heights. #[test] - fn test_get_block_height_and_hash() { + fn test_latest_block_height() { for height in 0..100_002u32 { let sync = sample_sync_at_height(height); - - // Check that the latest blokc height is the maximum height. assert_eq!(sync.ledger.latest_block_height(), height); + } + } - // Check the hash to height mapping + #[test] + fn test_get_block_height() { + for height in 0..100_002u32 { + let sync = sample_sync_at_height(height); assert_eq!(sync.ledger.get_block_height(&(Field::::from_u32(0)).into()).unwrap(), 0); assert_eq!( sync.ledger.get_block_height(&(Field::::from_u32(height)).into()).unwrap(), height ); + } + } - // Check the height to hash mapping + #[test] + fn test_get_block_hash() { + for height in 0..100_002u32 { + let sync = sample_sync_at_height(height); assert_eq!(sync.ledger.get_block_hash(0).unwrap(), (Field::::from_u32(0)).into()); assert_eq!(sync.ledger.get_block_hash(height).unwrap(), (Field::::from_u32(height)).into()); }