diff --git a/Cargo.lock b/Cargo.lock index b0ce5e9495..6c8cd18b0f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3546,6 +3546,7 @@ dependencies = [ "rayon", "sha2", "snarkos-account", + "snarkos-node-bft", "snarkos-node-bft-events", "snarkos-node-bft-ledger-service", "snarkos-node-bft-storage-service", @@ -3649,6 +3650,7 @@ dependencies = [ "snarkos-node-bft-ledger-service", "snarkos-node-bft-storage-service", "snarkos-node-metrics", + "snarkos-node-sync", "snarkvm", "tokio", "tracing", @@ -3759,9 +3761,7 @@ dependencies = [ "snarkos-node-router", "snarkos-node-sync-communication-service", "snarkos-node-sync-locators", - "snarkos-node-tcp", "snarkvm", - "tokio", "tracing", ] diff --git a/node/bft/Cargo.toml b/node/bft/Cargo.toml index 47f6a29f4b..01046a4099 100644 --- a/node/bft/Cargo.toml +++ b/node/bft/Cargo.toml @@ -38,6 +38,12 @@ cuda = [ "snarkos-node-bft-ledger-service/cuda", "snarkos-node-sync/cuda" ] +test = [ + # "snarkvm/test" this breaks some of the tests + "snarkvm/test-helpers", + "snarkos-node-bft-ledger-service/test", + "snarkos-node-bft-storage-service/test" +] [dependencies.aleo-std] workspace = true @@ -176,19 +182,10 @@ version = "0.4" [dev-dependencies.rayon] version = "1" -[dev-dependencies.snarkos-node-bft-ledger-service] -path = "./ledger-service" -default-features = false -features = [ "test" ] - -[dev-dependencies.snarkos-node-bft-storage-service] -path = "./storage-service" +[dev-dependencies.snarkos-node-bft] +path = "." features = [ "test" ] -[dev-dependencies.snarkvm] -workspace = true -features = [ "test-helpers" ] - [dev-dependencies.test-strategy] version = "0.3.1" diff --git a/node/bft/examples/simple_node.rs b/node/bft/examples/simple_node.rs index 36e2b96307..4641d292dd 100644 --- a/node/bft/examples/simple_node.rs +++ b/node/bft/examples/simple_node.rs @@ -26,6 +26,7 @@ use snarkos_node_bft::{ }; use snarkos_node_bft_ledger_service::TranslucentLedgerService; use snarkos_node_bft_storage_service::BFTMemoryService; +use snarkos_node_sync::BlockSync; use snarkvm::{ console::{account::PrivateKey, algorithms::BHP256, types::Address}, ledger::{ @@ -142,7 +143,9 @@ pub async fn start_bft( // Initialize the consensus receiver handler. consensus_handler(consensus_receiver); // Initialize the BFT instance. - let mut bft = BFT::::new(account, storage, ledger, ip, &trusted_validators, storage_mode)?; + let block_sync = Arc::new(BlockSync::new(ledger.clone())); + let mut bft = + BFT::::new(account, storage, ledger, block_sync, ip, &trusted_validators, storage_mode)?; // Run the BFT instance. bft.run(Some(consensus_sender), sender.clone(), receiver).await?; // Retrieve the BFT's primary. @@ -180,7 +183,9 @@ pub async fn start_primary( // Initialize the trusted validators. let trusted_validators = trusted_validators(node_id, num_nodes, peers); // Initialize the primary instance. - let mut primary = Primary::::new(account, storage, ledger, ip, &trusted_validators, storage_mode)?; + let block_sync = Arc::new(BlockSync::new(ledger.clone())); + let mut primary = + Primary::::new(account, storage, ledger, block_sync, ip, &trusted_validators, storage_mode)?; // Run the primary instance. primary.run(None, sender.clone(), receiver).await?; // Handle OS signals. diff --git a/node/bft/src/bft.rs b/node/bft/src/bft.rs index 1b09bd92ca..9b24ba176d 100644 --- a/node/bft/src/bft.rs +++ b/node/bft/src/bft.rs @@ -30,6 +30,7 @@ use crate::{ }; use snarkos_account::Account; use snarkos_node_bft_ledger_service::LedgerService; +use snarkos_node_sync::BlockSync; use snarkvm::{ console::account::Address, ledger::{ @@ -69,9 +70,9 @@ use tokio::{ #[derive(Clone)] pub struct BFT { - /// The primary. + /// The primary for this node. primary: Primary, - /// The DAG. + /// The DAG of batches from which we build the blockchain. dag: Arc>>, /// The batch certificate of the leader from the current even round, if one was present. leader_certificate: Arc>>>, @@ -91,12 +92,13 @@ impl BFT { account: Account, storage: Storage, ledger: Arc>, + block_sync: Arc>, ip: Option, trusted_validators: &[SocketAddr], storage_mode: StorageMode, ) -> Result { Ok(Self { - primary: Primary::new(account, storage, ledger, ip, trusted_validators, storage_mode)?, + primary: Primary::new(account, storage, ledger, block_sync, ip, trusted_validators, storage_mode)?, dag: Default::default(), leader_certificate: Default::default(), leader_certificate_timer: Default::default(), @@ -256,7 +258,7 @@ impl BFT { } else { match is_ready { true => info!("\n\nRound {current_round} reached quorum without a leader\n"), - false => info!("{}", format!("\n\nRound {current_round} did not elect a leader\n").dimmed()), + false => info!("{}", format!("\n\nRound {current_round} did not elect a leader (yet)\n").dimmed()), } } } @@ -370,6 +372,8 @@ impl BFT { } /// Returns `true` if the timer for the leader certificate has expired. + /// + /// This is always true for a new BFT instance. fn is_timer_expired(&self) -> bool { self.leader_certificate_timer.load(Ordering::SeqCst) + MAX_LEADER_CERTIFICATE_DELAY_IN_SECS <= now() } @@ -467,14 +471,17 @@ impl BFT { // Acquire the BFT lock. let _lock = self.lock.lock().await; - // Retrieve the certificate round. + // Retrieve the round of the new certificate to add to the DAG. let certificate_round = certificate.round(); + // Insert the certificate into the DAG. self.dag.write().insert(certificate); - // Construct the commit round. + // Get the previous round number. let commit_round = certificate_round.saturating_sub(1); - // If the commit round is odd, return early. + + // Leaders are elected in even rounds. + // If the previous round is odd, the current round cannot commit any leader certs. if commit_round % 2 != 0 || commit_round < 2 { return Ok(()); } @@ -882,7 +889,7 @@ impl BFT { } }); - // Process the request to sync the BFT. + // Handler for new certificates that were fetched by the sync module. let self_ = self.clone(); self.spawn(async move { while let Some((certificate, callback)) = rx_sync_bft.recv().await { @@ -934,6 +941,7 @@ mod tests { use snarkos_account::Account; use snarkos_node_bft_ledger_service::MockLedgerService; use snarkos_node_bft_storage_service::BFTMemoryService; + use snarkos_node_sync::BlockSync; use snarkvm::{ console::account::{Address, PrivateKey}, ledger::{ @@ -973,6 +981,18 @@ mod tests { (committee, account, ledger, storage) } + // Helper function to set up BFT for testing. + fn initialize_bft( + account: Account, + storage: Storage, + ledger: Arc>, + ) -> anyhow::Result> { + // Create the block synchronization logic. + let block_sync = Arc::new(BlockSync::new(ledger.clone())); + // Initialize the BFT. + BFT::new(account.clone(), storage.clone(), ledger.clone(), block_sync, None, &[], StorageMode::new_test(None)) + } + #[test] #[tracing_test::traced_test] fn test_is_leader_quorum_odd() -> Result<()> { @@ -1004,7 +1024,7 @@ mod tests { // Initialize the account. let account = Account::new(rng)?; // Initialize the BFT. - let bft = BFT::new(account.clone(), storage.clone(), ledger.clone(), None, &[], StorageMode::new_test(None))?; + let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?; assert!(bft.is_timer_expired()); // Ensure this call succeeds on an odd round. let result = bft.is_leader_quorum_or_nonleaders_available(1); @@ -1038,9 +1058,9 @@ mod tests { assert_eq!(storage.current_round(), 1); assert_eq!(storage.max_gc_rounds(), 10); - // Initialize the BFT. - let bft = BFT::new(account, storage, ledger, None, &[], StorageMode::new_test(None))?; - assert!(bft.is_timer_expired()); // 0 + 5 < now() + // Set up the BFT logic. + let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?; + assert!(bft.is_timer_expired()); // Store is at round 1, and we are checking for round 2. // Ensure this call fails on an even round. @@ -1060,9 +1080,9 @@ mod tests { assert_eq!(storage.current_round(), 2); assert_eq!(storage.max_gc_rounds(), 10); - // Initialize the BFT. - let bft = BFT::new(account, storage, ledger, None, &[], StorageMode::new_test(None))?; - assert!(bft.is_timer_expired()); // 0 + 5 < now() + // Set up the BFT logic. + let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?; + assert!(bft.is_timer_expired()); // Ensure this call fails on an even round. let result = bft.is_leader_quorum_or_nonleaders_available(2); @@ -1100,8 +1120,11 @@ mod tests { let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10); // Initialize the account. let account = Account::new(rng)?; - // Initialize the BFT. - let bft = BFT::new(account.clone(), storage.clone(), ledger.clone(), None, &[], StorageMode::new_test(None))?; + + // Set up the BFT logic. + let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?; + assert!(bft.is_timer_expired()); + // Set the leader certificate. let leader_certificate = sample_batch_certificate_for_round(2, rng); *bft.leader_certificate.write() = Some(leader_certificate); @@ -1113,8 +1136,7 @@ mod tests { assert!(result); // Initialize a new BFT. - let bft_timer = - BFT::new(account.clone(), storage.clone(), ledger.clone(), None, &[], StorageMode::new_test(None))?; + let bft_timer = initialize_bft(account.clone(), storage.clone(), ledger.clone())?; // If the leader certificate is not set and the timer has not expired, we are not ready for the next round. let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2); if !bft_timer.is_timer_expired() { @@ -1145,7 +1167,8 @@ mod tests { assert_eq!(storage.max_gc_rounds(), 10); // Initialize the BFT. - let bft = BFT::new(account, storage, ledger, None, &[], StorageMode::new_test(None))?; + let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?; + assert!(bft.is_timer_expired()); // Ensure this call fails on an odd round. let result = bft.update_leader_certificate_to_even_round(1); @@ -1163,7 +1186,7 @@ mod tests { assert_eq!(storage.max_gc_rounds(), 10); // Initialize the BFT. - let bft = BFT::new(account, storage, ledger, None, &[], StorageMode::new_test(None))?; + let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?; // Ensure this call succeeds on an even round. let result = bft.update_leader_certificate_to_even_round(6); @@ -1215,7 +1238,7 @@ mod tests { // Initialize the BFT. let account = Account::new(rng)?; - let bft = BFT::new(account, storage.clone(), ledger, None, &[], StorageMode::new_test(None))?; + let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?; // Set the leader certificate. *bft.leader_certificate.write() = Some(leader_certificate); @@ -1253,7 +1276,7 @@ mod tests { // Initialize the storage. let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1); // Initialize the BFT. - let bft = BFT::new(account.clone(), storage, ledger.clone(), None, &[], StorageMode::new_test(None))?; + let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?; // Insert a mock DAG in the BFT. *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(3); @@ -1283,7 +1306,7 @@ mod tests { // Initialize the storage. let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1); // Initialize the BFT. - let bft = BFT::new(account, storage, ledger, None, &[], StorageMode::new_test(None))?; + let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?; // Insert a mock DAG in the BFT. *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(2); @@ -1341,7 +1364,7 @@ mod tests { /* Test missing previous certificate. */ // Initialize the BFT. - let bft = BFT::new(account, storage, ledger, None, &[], StorageMode::new_test(None))?; + let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?; // The expected error message. let error_msg = format!( @@ -1402,8 +1425,8 @@ mod tests { // Initialize the BFT. let account = Account::new(rng)?; - let bft = BFT::new(account, storage.clone(), ledger, None, &[], StorageMode::new_test(None))?; - // Insert a mock DAG in the BFT. + let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?; + *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round); // Ensure that the `gc_round` has not been updated yet. @@ -1468,7 +1491,7 @@ mod tests { // Initialize the BFT. let account = Account::new(rng)?; - let bft = BFT::new(account.clone(), storage, ledger.clone(), None, &[], StorageMode::new_test(None))?; + let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?; // Insert a mock DAG in the BFT. *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round); @@ -1486,7 +1509,7 @@ mod tests { // Initialize a new instance of storage. let storage_2 = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds); // Initialize a new instance of BFT. - let bootup_bft = BFT::new(account, storage_2, ledger, None, &[], StorageMode::new_test(None))?; + let bootup_bft = initialize_bft(account.clone(), storage_2, ledger)?; // Sync the BFT DAG at bootup. bootup_bft.sync_bft_dag_at_bootup(certificates.clone()).await; @@ -1640,7 +1663,7 @@ mod tests { // Initialize the BFT without bootup. let account = Account::new(rng)?; - let bft = BFT::new(account.clone(), storage, ledger.clone(), None, &[], StorageMode::new_test(None))?; + let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?; // Insert a mock DAG in the BFT without bootup. *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0); @@ -1665,8 +1688,7 @@ mod tests { let bootup_storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds); // Initialize a new instance of BFT with bootup. - let bootup_bft = - BFT::new(account, bootup_storage.clone(), ledger.clone(), None, &[], StorageMode::new_test(None))?; + let bootup_bft = initialize_bft(account.clone(), bootup_storage.clone(), ledger.clone())?; // Sync the BFT DAG at bootup. bootup_bft.sync_bft_dag_at_bootup(pre_shutdown_certificates.clone()).await; @@ -1844,8 +1866,8 @@ mod tests { } // Initialize the bootup BFT. let account = Account::new(rng)?; - let bootup_bft = - BFT::new(account.clone(), storage.clone(), ledger.clone(), None, &[], StorageMode::new_test(None))?; + let bootup_bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?; + // Insert a mock DAG in the BFT without bootup. *bootup_bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0); // Sync the BFT DAG at bootup. diff --git a/node/bft/src/gateway.rs b/node/bft/src/gateway.rs index 840b429851..d80299a17f 100644 --- a/node/bft/src/gateway.rs +++ b/node/bft/src/gateway.rs @@ -110,6 +110,8 @@ pub trait Transport: Send + Sync { fn broadcast(&self, event: Event); } +/// The gateway maintains connections to other validators. +/// For connections with clients and provers, the Router logic is used. #[derive(Clone)] pub struct Gateway { /// The account of the node. @@ -514,9 +516,8 @@ impl Gateway { self.update_metrics(); } - /// Inserts the given peer into the connected peers. + /// Inserts the given peer into the connected peers. This is only used in testing. #[cfg(test)] - // For unit tests, we need to make this public so we can inject peers. pub fn insert_connected_peer(&self, peer_ip: SocketAddr, peer_addr: SocketAddr, address: Address) { // Adds a bidirectional map between the listener address and (ambiguous) peer address. self.resolver.insert_peer(peer_ip, peer_addr, address); diff --git a/node/bft/src/helpers/channels.rs b/node/bft/src/helpers/channels.rs index aeb210cde4..1370ee33de 100644 --- a/node/bft/src/helpers/channels.rs +++ b/node/bft/src/helpers/channels.rs @@ -60,6 +60,7 @@ pub fn init_consensus_channels() -> (ConsensusSender, ConsensusRe (sender, receiver) } +/// "Interface" that enables, for example, sending data from storage to the the BFT logic. #[derive(Clone, Debug)] pub struct BFTSender { pub tx_primary_round: mpsc::Sender<(u64, oneshot::Sender)>, @@ -100,6 +101,7 @@ impl BFTSender { } } +/// Receiving counterpart to `BFTSender` #[derive(Debug)] pub struct BFTReceiver { pub rx_primary_round: mpsc::Receiver<(u64, oneshot::Sender)>, @@ -108,7 +110,7 @@ pub struct BFTReceiver { pub rx_sync_bft: mpsc::Receiver<(BatchCertificate, oneshot::Sender>)>, } -/// Initializes the BFT channels. +/// Initializes the BFT channels, and returns the sending and receiving ends. pub fn init_bft_channels() -> (BFTSender, BFTReceiver) { let (tx_primary_round, rx_primary_round) = mpsc::channel(MAX_CHANNEL_SIZE); let (tx_primary_certificate, rx_primary_certificate) = mpsc::channel(MAX_CHANNEL_SIZE); diff --git a/node/bft/src/helpers/dag.rs b/node/bft/src/helpers/dag.rs index aa5dd90594..dcdfa11dd9 100644 --- a/node/bft/src/helpers/dag.rs +++ b/node/bft/src/helpers/dag.rs @@ -22,12 +22,24 @@ use snarkvm::{ use indexmap::IndexSet; use std::collections::{BTreeMap, HashMap}; +/// Maintains a directed acyclic graph (DAG) of certificates, from which we build a totally-ordered blockchain. +/// +/// The DAG is updated in rounds, where each validator adds at most one new batch. +/// Certificates older than GC are removed, as they are not longer needed. +/// +/// Invariants: +/// - For each entry in `graph` there is a corresponding certificate ID in `recent_committed_ids`. +/// - `last_committed_round`, if >0, is equal to the largest key in `recent_commit_ids`. #[derive(Debug)] pub struct DAG { /// The in-memory collection of certificates that comprise the DAG. + /// For each round, there is a mapping from node address to batch. graph: BTreeMap, BatchCertificate>>, - /// The in-memory collection of recently committed certificate IDs (up to GC). + + /// The in-memory collection of recently committed certificate IDs (up to GC), + /// where each entry has a round number as its key and a set of certificate IDs as its value. recent_committed_ids: BTreeMap>>, + /// The last round that was committed. last_committed_round: u64, } @@ -95,10 +107,12 @@ impl DAG { // If the certificate was not recently committed, insert it into the DAG. if !self.is_recently_committed(round, certificate.id()) { // Insert the certificate into the DAG. - let _previous = self.graph.entry(round).or_default().insert(author, certificate); + let previous = self.graph.entry(round).or_default().insert(author, certificate); // If a previous certificate existed for the author, log it. - #[cfg(debug_assertions)] - if _previous.is_some() { + if previous.is_none() { + trace!("Added new certificate for round {round} by author {author} to the DAG"); + } else { + #[cfg(debug_assertions)] error!("A certificate for round {round} by author {author} already existed in the DAG"); } } @@ -113,19 +127,29 @@ impl DAG { /* Updates */ // Update the recently committed IDs. - self.recent_committed_ids.entry(certificate_round).or_default().insert(certificate_id); + let is_new = self.recent_committed_ids.entry(certificate_round).or_default().insert(certificate_id); + if !is_new { + //TODO (kaimast): return early here? + trace!("Certificate {certificate_id} was already committed for round {certificate_round}"); + } // Update the last committed round. - self.last_committed_round = self.last_committed_round.max(certificate_round); + if self.last_committed_round < certificate_round { + self.last_committed_round = certificate_round; + } else { + // TODO(kaimast); only remove old certificates for specific author here? + } - /* GC */ + /* Garbage collect old commit ids */ // Remove committed IDs that are below the GC round. self.recent_committed_ids.retain(|round, _| round + max_gc_rounds > self.last_committed_round); // Remove certificates that are below the GC round. self.graph.retain(|round, _| round + max_gc_rounds > self.last_committed_round); + // Remove any certificates for this author that are at or below the certificate round. + // TODO (kaimast): is this extra retain needed? It might be less expensive to keep the certificate and skip this additional iteration. self.graph.retain(|round, map| match *round > certificate_round { true => true, false => { diff --git a/node/bft/src/helpers/storage.rs b/node/bft/src/helpers/storage.rs index bd6e7d2f3f..caf1709364 100644 --- a/node/bft/src/helpers/storage.rs +++ b/node/bft/src/helpers/storage.rs @@ -775,11 +775,15 @@ impl Storage { unconfirmed_transactions: &HashMap>, ) { // Skip if the certificate round is below the GC round. - if certificate.round() <= self.gc_round() { + let gc_round = self.gc_round(); + if certificate.round() <= gc_round { + trace!("Got certificate for round {} below GC round ({gc_round}). Will not store it.", certificate.round()); return; } + // If the certificate ID already exists in storage, skip it. if self.contains_certificate(certificate.id()) { + trace!("Got certificate {} for round {} more than once.", certificate.id(), certificate.round()); return; } // Retrieve the transmissions for the certificate. diff --git a/node/bft/src/lib.rs b/node/bft/src/lib.rs index b043d4ba5d..90816c5501 100644 --- a/node/bft/src/lib.rs +++ b/node/bft/src/lib.rs @@ -61,10 +61,10 @@ pub const MAX_TIMESTAMP_DELTA_IN_SECS: i64 = 10; // seconds /// The maximum number of workers that can be spawned. pub const MAX_WORKERS: u8 = 1; // worker(s) -/// The frequency at which each primary broadcasts a ping to every other node. +/// The interval at which each primary broadcasts a ping to every other node. /// Note: If this is updated, be sure to update `MAX_BLOCKS_BEHIND` to correspond properly. pub const PRIMARY_PING_IN_MS: u64 = 2 * MAX_BATCH_DELAY_IN_MS; // ms -/// The frequency at which each worker broadcasts a ping to every other node. +/// The interval at which each worker broadcasts a ping to every other node. pub const WORKER_PING_IN_MS: u64 = 4 * MAX_BATCH_DELAY_IN_MS; // ms /// A helper macro to spawn a blocking task. diff --git a/node/bft/src/primary.rs b/node/bft/src/primary.rs index 88e9bd9f30..9e01273bb0 100644 --- a/node/bft/src/primary.rs +++ b/node/bft/src/primary.rs @@ -44,7 +44,7 @@ use crate::{ use snarkos_account::Account; use snarkos_node_bft_events::PrimaryPing; use snarkos_node_bft_ledger_service::LedgerService; -use snarkos_node_sync::DUMMY_SELF_IP; +use snarkos_node_sync::{BlockSync, DUMMY_SELF_IP}; use snarkvm::{ console::{ prelude::*, @@ -84,11 +84,13 @@ use tokio::{sync::OnceCell, task::JoinHandle}; /// A helper type for an optional proposed batch. pub type ProposedBatch = RwLock>>; +/// The primary logic of a node. +/// AleoBFT adopts a primary-worker architecture as described in the Narwhal and Tusk paper (Section 4.2). #[derive(Clone)] pub struct Primary { - /// The sync module. + /// The sync module enables fetching data from other validators. sync: Sync, - /// The gateway. + /// The gateway allows talking to other nodes in the validator set. gateway: Gateway, /// The storage. storage: Storage, @@ -104,7 +106,7 @@ pub struct Primary { latest_proposed_batch_timestamp: Arc>, /// The recently-signed batch proposals. signed_proposals: Arc>>, - /// The spawned handles. + /// The handles for all background tasks spawned by this primary. handles: Arc>>>, /// The lock for propose_batch. propose_lock: Arc>, @@ -121,6 +123,7 @@ impl Primary { account: Account, storage: Storage, ledger: Arc>, + block_sync: Arc>, ip: Option, trusted_validators: &[SocketAddr], storage_mode: StorageMode, @@ -129,7 +132,7 @@ impl Primary { let gateway = Gateway::new(account, storage.clone(), ledger.clone(), ip, trusted_validators, storage_mode.dev())?; // Initialize the sync module. - let sync = Sync::new(gateway.clone(), storage.clone(), ledger.clone()); + let sync = Sync::new(gateway.clone(), storage.clone(), ledger.clone(), block_sync); // Initialize the primary instance. Ok(Self { @@ -1210,7 +1213,9 @@ impl Primary { self.spawn(async move { while let Some((peer_ip, primary_certificate)) = rx_primary_ping.recv().await { // If the primary is not synced, then do not process the primary ping. - if !self_.sync.is_synced() { + if self_.sync.is_synced() { + trace!("Processing new primary ping from '{peer_ip}'"); + } else { trace!("Skipping a primary ping from '{peer_ip}' {}", "(node is syncing)".dimmed()); continue; } @@ -1548,10 +1553,10 @@ impl Primary { fn check_proposal_timestamp(&self, previous_round: u64, author: Address, timestamp: i64) -> Result<()> { // Retrieve the timestamp of the previous timestamp to check against. let previous_timestamp = match self.storage.get_certificate_for_round_with_author(previous_round, author) { - // Ensure that the previous certificate was created at least `MIN_BATCH_DELAY_IN_MS` seconds ago. + // Ensure that the previous certificate was created at least `MIN_BATCH_DELAY_IN_SECS` seconds ago. Some(certificate) => certificate.timestamp(), None => match self.gateway.account().address() == author { - // If we are the author, then ensure the previous proposal was created at least `MIN_BATCH_DELAY_IN_MS` seconds ago. + // If we are the author, then ensure the previous proposal was created at least `MIN_BATCH_DELAY_IN_SECS` seconds ago. true => *self.latest_proposed_batch_timestamp.read(), // If we do not see a previous certificate for the author, then proceed optimistically. false => return Ok(()), @@ -1562,7 +1567,7 @@ impl Primary { let elapsed = timestamp .checked_sub(previous_timestamp) .ok_or_else(|| anyhow!("Timestamp cannot be before the previous certificate at round {previous_round}"))?; - // Ensure that the previous certificate was created at least `MIN_BATCH_DELAY_IN_MS` seconds ago. + // Ensure that the previous certificate was created at least `MIN_BATCH_DELAY_IN_SECS` seconds ago. match elapsed < MIN_BATCH_DELAY_IN_SECS as i64 { true => bail!("Timestamp is too soon after the previous certificate at round {previous_round}"), false => Ok(()), @@ -1909,6 +1914,7 @@ mod tests { use super::*; use snarkos_node_bft_ledger_service::MockLedgerService; use snarkos_node_bft_storage_service::BFTMemoryService; + use snarkos_node_sync::BlockSync; use snarkvm::{ ledger::{ committee::{Committee, MIN_VALIDATOR_STAKE}, @@ -1952,7 +1958,9 @@ mod tests { // Initialize the primary. let account = accounts[account_index].1.clone(); - let mut primary = Primary::new(account, storage, ledger, None, &[], StorageMode::new_test(None)).unwrap(); + let block_sync = Arc::new(BlockSync::new(ledger.clone())); + let mut primary = + Primary::new(account, storage, ledger, block_sync, None, &[], StorageMode::Test(None)).unwrap(); // Construct a worker instance. primary.workers = Arc::from([Worker::new( @@ -2138,7 +2146,7 @@ mod tests { (certificate, transmissions) } - // Create a certificate chain up to round in primary storage. + // Create a certificate chain up to, but not including, the specified round in the primary storage. fn store_certificate_chain( primary: &Primary, accounts: &[(SocketAddr, Account)], @@ -2372,7 +2380,7 @@ mod tests { // The author must be known to resolver to pass propose checks. primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address()); // The primary must be considered synced. - primary.sync.block_sync().try_block_sync(&primary.gateway.clone()).await; + primary.sync.try_block_sync().await; // Try to process the batch proposal from the peer, should succeed. assert!( @@ -2445,7 +2453,7 @@ mod tests { // The author must be known to resolver to pass propose checks. primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address()); // The primary must be considered synced. - primary.sync.block_sync().try_block_sync(&primary.gateway.clone()).await; + primary.sync.try_block_sync().await; // Try to process the batch proposal from the peer, should succeed. primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.unwrap(); @@ -2479,7 +2487,7 @@ mod tests { // The author must be known to resolver to pass propose checks. primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address()); // The primary must be considered synced. - primary.sync.block_sync().try_block_sync(&primary.gateway.clone()).await; + primary.sync.try_block_sync().await; // Try to process the batch proposal from the peer, should error. assert!( @@ -2524,7 +2532,7 @@ mod tests { // The author must be known to resolver to pass propose checks. primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address()); // The primary must be considered synced. - primary.sync.block_sync().try_block_sync(&primary.gateway.clone()).await; + primary.sync.try_block_sync().await; // Try to process the batch proposal from the peer, should error. assert!( @@ -2538,6 +2546,7 @@ mod tests { ); } + /// Tests that the minimum batch delay is enforced as expected, i.e., that proposals with timestamps that are too close to the previous proposal are rejected. #[tokio::test] async fn test_batch_propose_from_peer_with_past_timestamp() { let round = 2; @@ -2550,13 +2559,23 @@ mod tests { // Create a valid proposal with an author that isn't the primary. let peer_account = &accounts[1]; let peer_ip = peer_account.0; - let past_timestamp = now() - 100; // Use a timestamp that is in the past. + + // Use a timestamp that is too early. + // Set it to something that is less than the minimum batch delay + // Note, that the minimum delay is currently 1, so this will be equal to the last timestamp + let last_timestamp = primary + .storage + .get_certificate_for_round_with_author(round - 1, peer_account.1.address()) + .expect("No previous proposal exists") + .timestamp(); + let invalid_timestamp = last_timestamp + (MIN_BATCH_DELAY_IN_SECS as i64) - 1; + let proposal = create_test_proposal( &peer_account.1, primary.ledger.current_committee().unwrap(), round, previous_certificates, - past_timestamp, + invalid_timestamp, 1, &mut rng, ); @@ -2569,7 +2588,7 @@ mod tests { // The author must be known to resolver to pass propose checks. primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address()); // The primary must be considered synced. - primary.sync.block_sync().try_block_sync(&primary.gateway.clone()).await; + primary.sync.try_block_sync().await; // Try to process the batch proposal from the peer, should error. assert!( @@ -2577,6 +2596,7 @@ mod tests { ); } + /// Check that proposals rejected that have timestamps older than the previous proposal. #[tokio::test] async fn test_batch_propose_from_peer_over_spend_limit() { let mut rng = TestRng::default(); @@ -2600,7 +2620,9 @@ mod tests { let round = 1; let peer_account = &accounts[2]; let peer_ip = peer_account.0; + let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64; + let proposal = create_test_proposal(&peer_account.1, committee, round, Default::default(), timestamp, 4, &mut rng); @@ -2613,9 +2635,10 @@ mod tests { // The author must be known to resolver to pass propose checks. primary_v4.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address()); primary_v5.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address()); + // The primary must be considered synced. - primary_v4.sync.block_sync().try_block_sync(&primary_v4.gateway.clone()).await; - primary_v5.sync.block_sync().try_block_sync(&primary_v5.gateway.clone()).await; + primary_v4.sync.try_block_sync().await; + primary_v5.sync.try_block_sync().await; // Check the spend limit is enforced from V5 onwards. assert!( @@ -2624,6 +2647,7 @@ mod tests { .await .is_ok() ); + assert!( primary_v5 .process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()) diff --git a/node/bft/src/sync/mod.rs b/node/bft/src/sync/mod.rs index aa689ea662..536457045f 100644 --- a/node/bft/src/sync/mod.rs +++ b/node/bft/src/sync/mod.rs @@ -18,12 +18,13 @@ use crate::{ MAX_FETCH_TIMEOUT_IN_MS, PRIMARY_PING_IN_MS, Transport, + events::DataBlocks, helpers::{BFTSender, Pending, Storage, SyncReceiver, fmt_id, max_redundant_requests}, spawn_blocking, }; use snarkos_node_bft_events::{CertificateRequest, CertificateResponse, Event}; use snarkos_node_bft_ledger_service::LedgerService; -use snarkos_node_sync::{BlockSync, BlockSyncMode, locators::BlockLocators}; +use snarkos_node_sync::{BLOCK_REQUEST_BATCH_DELAY, BlockSync, locators::BlockLocators}; use snarkos_node_tcp::P2P; use snarkvm::{ console::{network::Network, types::Field}, @@ -37,7 +38,13 @@ use locktick::{parking_lot::Mutex, tokio::Mutex as TMutex}; #[cfg(not(feature = "locktick"))] use parking_lot::Mutex; use rayon::prelude::*; -use std::{collections::HashMap, future::Future, net::SocketAddr, sync::Arc, time::Duration}; +use std::{ + collections::{BTreeMap, HashMap}, + future::Future, + net::SocketAddr, + sync::Arc, + time::Duration, +}; #[cfg(not(feature = "locktick"))] use tokio::sync::Mutex as TMutex; use tokio::{ @@ -45,38 +52,54 @@ use tokio::{ task::JoinHandle, }; +/// Block synchronization logic for validators. +/// +/// Synchronization works differently for nodes that act as validators in AleoBFT; +/// In the common case, validators generate blocks after receiving an anchor block that has been accepted +/// by a supermajority of the committee instead of fetching entire blocks from other nodes. +/// However, if a validator does not have an up-to-date DAG, it might still fetch entire blocks from other nodes. +/// +/// This struct also manages fetching certificates from other validators during normal operation, +/// and blocks when falling behind. +/// +/// Finally, `Sync` handles synchronization of blocks with the validator's local storage: +/// it loads blocks from the storage on startup and writes new blocks to the storage after discovering them. #[derive(Clone)] pub struct Sync { - /// The gateway. + /// The gateway enables communication with other validators. gateway: Gateway, /// The storage. storage: Storage, /// The ledger service. ledger: Arc>, - /// The block sync module. - block_sync: BlockSync, + /// The block synchronization logic. + block_sync: Arc>, /// The pending certificates queue. pending: Arc, BatchCertificate>>, /// The BFT sender. bft_sender: Arc>>, - /// The spawned handles. + /// Handles to the spawned background tasks. handles: Arc>>>, /// The response lock. response_lock: Arc>, - /// The sync lock. + /// The sync lock. Ensures that only one task syncs the ledger at a time. sync_lock: Arc>, /// The latest block responses. /// /// This is used in [`Sync::sync_storage_with_block()`] to accumulate blocks /// whose addition to the ledger is deferred until certain checks pass. - latest_block_responses: Arc>>>, + /// Blocks need to be processed in order, hence a BTree map. + latest_block_responses: Arc>>>, } impl Sync { /// Initializes a new sync instance. - pub fn new(gateway: Gateway, storage: Storage, ledger: Arc>) -> Self { - // Initialize the block sync module. - let block_sync = BlockSync::new(BlockSyncMode::Gateway, ledger.clone(), gateway.tcp().clone()); + pub fn new( + gateway: Gateway, + storage: Storage, + ledger: Arc>, + block_sync: Arc>, + ) -> Self { // Return the sync instance. Self { gateway, @@ -102,16 +125,67 @@ impl Sync { info!("Syncing storage with the ledger..."); // Sync the storage with the ledger. - self.sync_storage_with_ledger_at_bootup().await + self.sync_storage_with_ledger_at_bootup().await?; + + debug!("Finished initial block synchronization at startup"); + Ok(()) + } + + /// Issues new requests for blocks, if needed. + /// + /// Additionally, this function removes obsolete and timed out block requests, + /// and disconnects/bans unresponsive peers. + /// + /// Responses to block requests will eventually be processed by `Self::try_advancing_block_synchronization`. + #[inline] + pub async fn issue_block_requests(&self) { + // First see if any peers need removal + let peers_to_ban = self.block_sync.remove_timed_out_block_requests(); + for peer_ip in peers_to_ban { + trace!("Banning peer {peer_ip} for timing out on block requests"); + + let tcp = self.gateway.tcp().clone(); + tcp.banned_peers().update_ip_ban(peer_ip.ip()); + + tokio::spawn(async move { + tcp.disconnect(peer_ip).await; + }); + } + + // We might be further ahead than the ledger, if there are queued + // responses. + let current_height = { + let responses = self.latest_block_responses.lock().await; + if let Some((height, _)) = responses.last_key_value() { *height } else { self.ledger.latest_block_height() } + }; + + // Prepare the block requests, if any. + // In the process, we update the state of `is_block_synced` for the sync module. + let (block_requests, sync_peers) = self.block_sync.prepare_block_requests_at_height(current_height); + trace!("Prepared {} block requests", block_requests.len()); + + // Sends the block requests to the sync peers. + for requests in block_requests.chunks(DataBlocks::::MAXIMUM_NUMBER_OF_BLOCKS as usize) { + if !self.block_sync.send_block_requests(&self.gateway, &sync_peers, requests).await { + // Stop if we fail to process a batch of requests. + break; + } + + // Sleep to avoid triggering spam detection. + tokio::time::sleep(BLOCK_REQUEST_BATCH_DELAY).await; + } } /// Starts the sync module. + /// + /// When this function returns sucessfully, the sync module will have spawned background tasks + /// that fetch blocks from other validators. pub async fn run(&self, sync_receiver: SyncReceiver) -> Result<()> { info!("Starting the sync module..."); // Start the block sync loop. let self_ = self.clone(); - self.handles.lock().push(tokio::spawn(async move { + self.spawn(async move { // Sleep briefly to allow an initial primary ping to come in prior to entering the loop. // Ideally, a node does not consider itself synced when it has not received // any block locators from peers. However, in the initial bootup of validators, @@ -120,22 +194,10 @@ impl Sync { loop { // Sleep briefly to avoid triggering spam detection. tokio::time::sleep(Duration::from_millis(PRIMARY_PING_IN_MS)).await; - // Perform the sync routine. - let communication = &self_.gateway; - // let communication = &node.router; - self_.block_sync.try_block_sync(communication).await; - - // Sync the storage with the blocks. - if let Err(e) = self_.sync_storage_with_blocks().await { - error!("Unable to sync storage with blocks - {e}"); - } - // If the node is synced, clear the `latest_block_responses`. - if self_.is_synced() { - self_.latest_block_responses.lock().await.clear(); - } + self_.try_block_sync().await; } - })); + }); // Start the pending queue expiration loop. let self_ = self.clone(); @@ -153,6 +215,8 @@ impl Sync { } }); + /* Set up callbacks for events from the Gateway */ + // Retrieve the sync receiver. let SyncReceiver { mut rx_block_sync_advance_with_sync_blocks, @@ -171,22 +235,7 @@ impl Sync { let self_ = self.clone(); self.spawn(async move { while let Some((peer_ip, blocks, callback)) = rx_block_sync_advance_with_sync_blocks.recv().await { - // Process the block response. - if let Err(e) = self_.block_sync.process_block_response(peer_ip, blocks) { - // Send the error to the callback. - callback.send(Err(e)).ok(); - continue; - } - - // Sync the storage with the blocks. - if let Err(e) = self_.sync_storage_with_blocks().await { - // Send the error to the callback. - callback.send(Err(e)).ok(); - continue; - } - - // Send the result to the callback. - callback.send(Ok(())).ok(); + callback.send(self_.advance_with_sync_blocks(peer_ip, blocks).await).ok(); } }); @@ -194,7 +243,7 @@ impl Sync { let self_ = self.clone(); self.spawn(async move { while let Some(peer_ip) = rx_block_sync_remove_peer.recv().await { - self_.block_sync.remove_peer(&peer_ip); + self_.remove_peer(peer_ip); } }); @@ -209,10 +258,7 @@ impl Sync { while let Some((peer_ip, locators, callback)) = rx_block_sync_update_peer_locators.recv().await { let self_clone = self_.clone(); tokio::spawn(async move { - // Update the peer locators. - let result = self_clone.block_sync.update_peer_locators(peer_ip, locators); - // Send the result to the callback. - callback.send(result).ok(); + callback.send(self_clone.update_peer_locators(peer_ip, locators)).ok(); }); } }); @@ -237,18 +283,65 @@ impl Sync { let self_ = self.clone(); self.spawn(async move { while let Some((peer_ip, certificate_response)) = rx_certificate_response.recv().await { - self_.finish_certificate_request(peer_ip, certificate_response) + self_.finish_certificate_request(peer_ip, certificate_response); } }); Ok(()) } + + /// Execute one iteration of block synchronization. + /// + /// This is called periodically by a tokio background task spawned in `Self::run`. + /// Some unit tests also call this function directly to manually trigger block synchronization. + pub(crate) async fn try_block_sync(&self) { + self.issue_block_requests().await; + + // Sync the storage with the blocks. + if let Err(err) = self.try_advancing_block_synchronization().await { + error!("Block synchronization failed - {err}"); + } + + // If the node is synced, clear the `latest_block_responses`. + if self.is_synced() { + self.latest_block_responses.lock().await.clear(); + } + } +} + +// Callbacks used when receiving messages from the Gateway +impl Sync { + /// We received a block response and can (possibly) advance synchronization. + async fn advance_with_sync_blocks(&self, peer_ip: SocketAddr, blocks: Vec>) -> Result<()> { + // Verify that the response is valid and add it to block sync. + self.block_sync.insert_block_responses(peer_ip, blocks)?; + + // Try to process responses stored in BlockSync. + // Note: Do not call `self.block_sync.try_advancing_block_synchronziation` here as it will process + // and remove any completed requests, which means the call to `sync_storage_with_blocks` will not process + // them as expected. + self.try_advancing_block_synchronization().await?; + + Ok(()) + } + + /// We received new peer locators during a Ping. + fn update_peer_locators(&self, peer_ip: SocketAddr, locators: BlockLocators) -> Result<()> { + self.block_sync.update_peer_locators(peer_ip, locators) + } + + /// A peer disconnected. + fn remove_peer(&self, peer_ip: SocketAddr) { + self.block_sync.remove_peer(&peer_ip); + } } // Methods to manage storage. impl Sync { /// Syncs the storage with the ledger at bootup. - pub async fn sync_storage_with_ledger_at_bootup(&self) -> Result<()> { + /// + /// This is called when starting the validator and after finishing a sync without BFT. + async fn sync_storage_with_ledger_at_bootup(&self) -> Result<()> { // Retrieve the latest block in the ledger. let latest_block = self.ledger.latest_block(); @@ -333,24 +426,49 @@ impl Sync { Ok(()) } - /// Syncs the storage with blocks already received from peers. - pub async fn sync_storage_with_blocks(&self) -> Result<()> { + /// Aims to advance synchronization using any recent block responses + /// received from peers. + /// + /// This is the validator's version of `BlockSync::try_advancing_block_synchronization` and is called periodically at runtime. + /// + /// A key difference to `BlockSync`'s versions is that it will only add blocks to the ledger once they have been confirmed by the network. + /// If blocks are not confirmed yet, they will be kept in [`Self::latest_block_responses`]. + /// It will also pass certificates from synced blocks to the BFT module so that consensus can progress as expected + /// (see [`Self::sync_storage_with_block`] for more details). + /// + /// If the node falls behind more than GC rounds, this function calls [`Self::sync_storage_without_bft`] instead, + /// which syncs without updating the BFT state. + async fn try_advancing_block_synchronization(&self) -> Result<()> { // Acquire the response lock. let _lock = self.response_lock.lock().await; + // Figure out which height we are synchronized to. + // If there are queued block responses, this might be higher than the latest block in the ledger. + let sync_height = { + let responses = self.latest_block_responses.lock().await; + if let Some((height, _)) = responses.last_key_value() { *height } else { self.ledger.latest_block_height() } + }; + // Retrieve the next block height. // This variable is used to index blocks that are added to the ledger; - // it is incremented as blocks as added. + // it is incremented as blocks are added. // So 'current' means 'currently being added'. - let mut current_height = self.ledger.latest_block_height() + 1; + let mut current_height = sync_height + 1; + trace!("Try advancing with block responses (at block {current_height})"); // Retrieve the maximum block height of the peers. - let tip = self.block_sync.find_sync_peers().map(|(x, _)| x.into_values().max().unwrap_or(0)).unwrap_or(0); + let tip = self + .block_sync + .find_sync_peers_at_height(sync_height) + .map(|(x, _)| x.into_values().max().unwrap_or(0)) + .unwrap_or(0); + // Determine the maximum number of blocks corresponding to rounds // that would not have been garbage collected, i.e. that would be kept in storage. // Since at most one block is created every two rounds, // this is half of the maximum number of rounds kept in storage. let max_gc_blocks = u32::try_from(self.storage.max_gc_rounds())?.saturating_div(2); + // Determine the earliest height of blocks corresponding to rounds kept in storage, // conservatively set to the block height minus the maximum number of blocks calculated above. // By virtue of the BFT protocol, we can guarantee that all GC range blocks will be loaded. @@ -358,6 +476,8 @@ impl Sync { // Determine if we can sync the ledger without updating the BFT first. if current_height <= max_gc_height { + info!("Block sync is too far behind other validators. Syncing without BFT."); + // Try to advance the ledger *to tip* without updating the BFT. while let Some(block) = self.block_sync.peek_next_block(current_height) { info!("Syncing the ledger to block {}...", block.height()); @@ -376,7 +496,9 @@ impl Sync { } // Sync the storage with the ledger if we should transition to the BFT sync. if current_height > max_gc_height { + info!("Finished catching up with the network. Switching back to BFT sync."); if let Err(e) = self.sync_storage_with_ledger_at_bootup().await { + //TODO (kaimast): bail! here? error!("BFT sync (with bootup routine) failed - {e}"); } } @@ -402,6 +524,8 @@ impl Sync { } /// Syncs the ledger with the given block without updating the BFT. + /// + /// This is only used at startup when fetching blocks from storage. async fn sync_ledger_with_block_without_bft(&self, block: Block) -> Result<()> { // Acquire the sync lock. let _lock = self.sync_lock.lock().await; @@ -425,7 +549,7 @@ impl Sync { .await? } - /// Syncs the storage with the given block. + /// Advances the ledger by the given block and updates the storage accordingly. /// /// This also updates the DAG, and uses the DAG to ensure that the block's leader certificate /// meets the voter availability threshold (i.e. > f voting stake) @@ -435,14 +559,22 @@ impl Sync { /// and its addition to the ledger is deferred until the check passes. /// Several blocks may be stored in `Sync::latest_block_responses` /// before they can be all checked and added to the ledger. - pub async fn sync_storage_with_block(&self, block: Block) -> Result<()> { + async fn sync_storage_with_block(&self, block: Block) -> Result<()> { // Acquire the sync lock. let _lock = self.sync_lock.lock().await; + + // If this block has already been processed, return early. + // TODO(kaimast): Should we remove the response here? + if self.ledger.contains_block_height(block.height()) { + debug!("Ledger is already synced with block at height {}. Will not sync.", block.height()); + return Ok(()); + } + // Acquire the latest block responses lock. let mut latest_block_responses = self.latest_block_responses.lock().await; - // If this block has already been processed, return early. - if self.ledger.contains_block_height(block.height()) || latest_block_responses.contains_key(&block.height()) { + if latest_block_responses.contains_key(&block.height()) { + debug!("An unconfirmed block is queued already for height {}. Will not sync.", block.height()); return Ok(()); } @@ -468,10 +600,11 @@ impl Sync { // Sync the BFT DAG with the certificates. for certificate in certificates { // If a BFT sender was provided, send the certificate to the BFT. + // For validators, BFT spawns a receiver task in `BFT::start_handlers`. if let Some(bft_sender) = self.bft_sender.get() { // Await the callback to continue. - if let Err(e) = bft_sender.send_sync_bft(certificate).await { - bail!("Sync - {e}"); + if let Err(err) = bft_sender.send_sync_bft(certificate).await { + bail!("Failed to sync certificate - {err}"); }; } } @@ -587,8 +720,6 @@ impl Sync { .await??; // Remove the block height from the latest block responses. latest_block_responses.remove(&block_height); - // Mark the block height as processed in block_sync. - self.block_sync.remove_block_response(block_height); // Update the validator telemetry. #[cfg(feature = "telemetry")] @@ -601,6 +732,11 @@ impl Sync { "Availability threshold was not reached for block {next_block_height} at round {commit_round}. Checking next block..." ); } + + // Mark the block height as processed in block_sync. + // Even if we did not add the block to the ledger yet, the associated request can safely + // be removed as the block is now stored in `latest_block_responses`. + self.block_sync.remove_block_response(next_block_height); } Ok(()) @@ -643,22 +779,10 @@ impl Sync { self.block_sync.num_blocks_behind() } - /// Returns `true` if the node is in gateway mode. - pub const fn is_gateway_mode(&self) -> bool { - self.block_sync.mode().is_gateway() - } - /// Returns the current block locators of the node. pub fn get_block_locators(&self) -> Result> { self.block_sync.get_block_locators() } - - /// Returns the block sync module. - #[cfg(test)] - #[doc(hidden)] - pub(super) fn block_sync(&self) -> &BlockSync { - &self.block_sync - } } // Methods to assist with fetching batch certificates from peers. @@ -750,12 +874,15 @@ impl Sync { self.handles.lock().iter().for_each(|handle| handle.abort()); } } + #[cfg(test)] mod tests { use super::*; use crate::{helpers::now, ledger_service::CoreLedgerService, storage_service::BFTMemoryService}; + use snarkos_account::Account; + use snarkos_node_sync::BlockSync; use snarkvm::{ console::{ account::{Address, PrivateKey}, @@ -984,8 +1111,10 @@ mod tests { )); // Initialize the gateway. let gateway = Gateway::new(account.clone(), storage.clone(), syncing_ledger.clone(), None, &[], None)?; + // Initialize the block synchronization logic. + let block_sync = Arc::new(BlockSync::new(syncing_ledger.clone())); // Initialize the sync module. - let sync = Sync::new(gateway.clone(), storage.clone(), syncing_ledger.clone()); + let sync = Sync::new(gateway.clone(), storage.clone(), syncing_ledger.clone(), block_sync); // Try to sync block 1. sync.sync_storage_with_block(block_1).await?; assert_eq!(syncing_ledger.latest_block_height(), 1); diff --git a/node/bft/tests/common/primary.rs b/node/bft/tests/common/primary.rs index b1d510883a..17f362b80f 100644 --- a/node/bft/tests/common/primary.rs +++ b/node/bft/tests/common/primary.rs @@ -27,6 +27,7 @@ use snarkos_node_bft::{ helpers::{PrimarySender, Storage, init_primary_channels}, }; use snarkos_node_bft_storage_service::BFTMemoryService; +use snarkos_node_sync::BlockSync; use snarkvm::{ console::{ account::{Address, PrivateKey}, @@ -164,12 +165,14 @@ impl TestNetwork { Arc::new(BFTMemoryService::new()), BatchHeader::::MAX_GC_ROUNDS as u64, ); - + // Initialize the block synchronization logic. + let block_sync = Arc::new(BlockSync::new(ledger.clone())); let (primary, bft) = if config.bft { let bft = BFT::::new( account, storage, ledger, + block_sync, Some(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), MEMORY_POOL_PORT + id as u16)), &[], StorageMode::new_test(None), @@ -181,6 +184,7 @@ impl TestNetwork { account, storage, ledger, + block_sync, Some(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), MEMORY_POOL_PORT + id as u16)), &[], StorageMode::new_test(None), diff --git a/node/consensus/Cargo.toml b/node/consensus/Cargo.toml index 15ec75c470..44b62ae387 100644 --- a/node/consensus/Cargo.toml +++ b/node/consensus/Cargo.toml @@ -78,6 +78,10 @@ version = "=3.7.1" default-features = false features = [ "persistent" ] +[dependencies.snarkos-node-sync] +path = "../sync" +version = "=3.7.1" + [dependencies.snarkvm] workspace = true diff --git a/node/consensus/src/lib.rs b/node/consensus/src/lib.rs index 86b67377e1..66a57154e3 100644 --- a/node/consensus/src/lib.rs +++ b/node/consensus/src/lib.rs @@ -35,6 +35,7 @@ use snarkos_node_bft::{ }; use snarkos_node_bft_ledger_service::LedgerService; use snarkos_node_bft_storage_service::BFTPersistentStorage; +use snarkos_node_sync::BlockSync; use snarkvm::{ ledger::{ block::Transaction, @@ -120,6 +121,7 @@ impl Consensus { pub async fn new( account: Account, ledger: Arc>, + block_sync: Arc>, ip: Option, trusted_validators: &[SocketAddr], storage_mode: StorageMode, @@ -131,7 +133,7 @@ impl Consensus { // Initialize the Narwhal storage. let storage = NarwhalStorage::new(ledger.clone(), transmissions, BatchHeader::::MAX_GC_ROUNDS as u64); // Initialize the BFT. - let bft = BFT::new(account, storage, ledger.clone(), ip, trusted_validators, storage_mode)?; + let bft = BFT::new(account, storage, ledger.clone(), block_sync, ip, trusted_validators, storage_mode)?; // Create a new instance of Consensus. let mut _self = Self { ledger, diff --git a/node/src/client/mod.rs b/node/src/client/mod.rs index d53491cc14..db2919af23 100644 --- a/node/src/client/mod.rs +++ b/node/src/client/mod.rs @@ -16,8 +16,9 @@ mod router; use crate::traits::NodeInterface; + use snarkos_account::Account; -use snarkos_node_bft::ledger_service::CoreLedgerService; +use snarkos_node_bft::{events::DataBlocks, ledger_service::CoreLedgerService}; use snarkos_node_rest::Rest; use snarkos_node_router::{ Heartbeat, @@ -27,7 +28,7 @@ use snarkos_node_router::{ Routing, messages::{Message, NodeType, UnconfirmedSolution, UnconfirmedTransaction}, }; -use snarkos_node_sync::{BlockSync, BlockSyncMode}; +use snarkos_node_sync::{BLOCK_REQUEST_BATCH_DELAY, BlockSync}; use snarkos_node_tcp::{ P2P, protocols::{Disconnect, Handshake, OnConnect, Reading, Writing}, @@ -101,7 +102,7 @@ pub struct Client> { router: Router, /// The REST server of the node. rest: Option>, - /// The sync module. + /// The block synchronization logic. sync: Arc>, /// The genesis block. genesis: Block, @@ -176,7 +177,7 @@ impl> Client { .await?; // Initialize the sync module. - let sync = BlockSync::new(BlockSyncMode::Router, ledger_service.clone(), router.tcp().clone()); + let sync = BlockSync::new(ledger_service.clone()); // Initialize the node. let mut node = Self { @@ -233,23 +234,64 @@ impl> Client { /// Initializes the sync pool. fn initialize_sync(&self) { // Start the sync loop. - let node = self.clone(); + let _self = self.clone(); self.handles.lock().push(tokio::spawn(async move { loop { // If the Ctrl-C handler registered the signal, stop the node. - if node.shutdown.load(std::sync::atomic::Ordering::Acquire) { + if _self.shutdown.load(std::sync::atomic::Ordering::Acquire) { info!("Shutting down block production"); break; } // Sleep briefly to avoid triggering spam detection. tokio::time::sleep(std::time::Duration::from_secs(5)).await; + // Perform the sync routine. - node.sync.try_block_sync(&node).await; + _self.try_block_sync().await; } })); } + /// Client-side version of `snarkvm_node_bft::Sync::try_block_sync()`. + async fn try_block_sync(&self) { + // First see if any peers need removal. + let peers_to_ban = self.sync.remove_timed_out_block_requests(); + for peer_ip in peers_to_ban { + trace!("Banning peer {peer_ip} for timing out on block requests"); + + let tcp = self.router.tcp().clone(); + tcp.banned_peers().update_ip_ban(peer_ip.ip()); + + tokio::spawn(async move { + tcp.disconnect(peer_ip).await; + }); + } + + // Prepare the block requests, if any. + // In the process, we update the state of `is_block_synced` for the sync module. + let (block_requests, sync_peers) = self.sync.prepare_block_requests(); + trace!("Prepared {} block requests", block_requests.len()); + + // If there are no block requests, but there are pending block responses in the sync pool, + // then try to advance the ledger using these pending block responses. + if block_requests.is_empty() && self.sync.has_pending_responses() { + // Try to advance the ledger with the sync pool. + trace!("No block requests to send, but there are still pending block responses."); + self.sync.try_advancing_block_synchronization(); + } else { + // Issues the block requests in batches. + for requests in block_requests.chunks(DataBlocks::::MAXIMUM_NUMBER_OF_BLOCKS as usize) { + if !self.sync.send_block_requests(self, &sync_peers, requests).await { + // Stop if we fail to process a batch of requests. + break; + } + + // Sleep to avoid triggering spam detection. + tokio::time::sleep(BLOCK_REQUEST_BATCH_DELAY).await; + } + } + } + /// Initializes solution verification. fn initialize_solution_verification(&self) { // Start the solution verification loop. diff --git a/node/src/client/router.rs b/node/src/client/router.rs index e3550dab9f..b651b19ddc 100644 --- a/node/src/client/router.rs +++ b/node/src/client/router.rs @@ -230,9 +230,11 @@ impl> Inbound for Client { /// Handles a `BlockResponse` message. fn block_response(&self, peer_ip: SocketAddr, blocks: Vec>) -> bool { - // Tries to advance with blocks from the sync module. - match self.sync.advance_with_sync_blocks(peer_ip, blocks) { - Ok(()) => true, + match self.sync.insert_block_responses(peer_ip, blocks) { + Ok(()) => { + self.sync.try_advancing_block_synchronization(); + true + } Err(error) => { warn!("{error}"); false @@ -242,15 +244,12 @@ impl> Inbound for Client { /// Processes the block locators and sends back a `Pong` message. fn ping(&self, peer_ip: SocketAddr, message: Ping) -> bool { - // Check if the sync module is in router mode. - if self.sync.mode().is_router() { - // If block locators were provided, then update the peer in the sync pool. - if let Some(block_locators) = message.block_locators { - // Check the block locators are valid, and update the peer in the sync pool. - if let Err(error) = self.sync.update_peer_locators(peer_ip, block_locators) { - warn!("Peer '{peer_ip}' sent invalid block locators: {error}"); - return false; - } + // If block locators were provided, then update the peer in the sync pool. + if let Some(block_locators) = message.block_locators { + // Check the block locators are valid, and update the peer in the sync pool. + if let Err(error) = self.sync.update_peer_locators(peer_ip, block_locators) { + warn!("Peer '{peer_ip}' sent invalid block locators: {error}"); + return false; } } diff --git a/node/src/prover/mod.rs b/node/src/prover/mod.rs index e734f1b3fd..a08eed216b 100644 --- a/node/src/prover/mod.rs +++ b/node/src/prover/mod.rs @@ -26,7 +26,7 @@ use snarkos_node_router::{ Routing, messages::{Message, NodeType, UnconfirmedSolution}, }; -use snarkos_node_sync::{BlockSync, BlockSyncMode}; +use snarkos_node_sync::BlockSync; use snarkos_node_tcp::{ P2P, protocols::{Disconnect, Handshake, OnConnect, Reading, Writing}, @@ -123,7 +123,7 @@ impl> Prover { .await?; // Initialize the sync module. - let sync = BlockSync::new(BlockSyncMode::Router, ledger_service.clone(), router.tcp().clone()); + let sync = BlockSync::new(ledger_service.clone()); // Compute the maximum number of puzzle instances. let max_puzzle_instances = num_cpus::get().saturating_sub(2).clamp(1, 6); diff --git a/node/src/prover/router.rs b/node/src/prover/router.rs index 211425afd8..a38dbe8013 100644 --- a/node/src/prover/router.rs +++ b/node/src/prover/router.rs @@ -172,15 +172,12 @@ impl> Inbound for Prover { /// Processes the block locators and sends back a `Pong` message. fn ping(&self, peer_ip: SocketAddr, message: Ping) -> bool { - // Check if the sync module is in router mode. - if self.sync.mode().is_router() { - // If block locators were provided, then update the peer in the sync pool. - if let Some(block_locators) = message.block_locators { - // Check the block locators are valid, and update the peer in the sync pool. - if let Err(error) = self.sync.update_peer_locators(peer_ip, block_locators) { - warn!("Peer '{peer_ip}' sent invalid block locators: {error}"); - return false; - } + // If block locators were provided, then update the peer in the sync pool. + if let Some(block_locators) = message.block_locators { + // Check the block locators are valid, and update the peer in the sync pool. + if let Err(error) = self.sync.update_peer_locators(peer_ip, block_locators) { + warn!("Peer '{peer_ip}' sent invalid block locators: {error}"); + return false; } } diff --git a/node/src/validator/mod.rs b/node/src/validator/mod.rs index fd73912902..a2a82cf1d7 100644 --- a/node/src/validator/mod.rs +++ b/node/src/validator/mod.rs @@ -28,7 +28,7 @@ use snarkos_node_router::{ Routing, messages::{NodeType, PuzzleResponse, UnconfirmedSolution, UnconfirmedTransaction}, }; -use snarkos_node_sync::{BlockSync, BlockSyncMode}; +use snarkos_node_sync::BlockSync; use snarkos_node_tcp::{ P2P, protocols::{Disconnect, Handshake, OnConnect, Reading, Writing}, @@ -66,8 +66,8 @@ pub struct Validator> { router: Router, /// The REST server of the node. rest: Option>, - /// The sync module. - sync: BlockSync, + /// The block synchronization logic (used in the Router impl). + sync: Arc>, /// The spawned handles. handles: Arc>>>, /// The shutdown signal. @@ -111,11 +111,6 @@ impl> Validator { // Initialize the ledger service. let ledger_service = Arc::new(CoreLedgerService::new(ledger.clone(), shutdown.clone())); - // Initialize the consensus layer. - let consensus = - Consensus::new(account.clone(), ledger_service.clone(), bft_ip, trusted_validators, storage_mode.clone()) - .await?; - // Determine if the validator should rotate external peers. let rotate_external_peers = false; @@ -133,8 +128,19 @@ impl> Validator { ) .await?; - // Initialize the sync module. - let sync = BlockSync::new(BlockSyncMode::Gateway, ledger_service, router.tcp().clone()); + // Initialize the block synchronization logic. + let sync = Arc::new(BlockSync::new(ledger_service.clone())); + + // Initialize the consensus layer. + let consensus = Consensus::new( + account.clone(), + ledger_service.clone(), + sync.clone(), + bft_ip, + trusted_validators, + storage_mode.clone(), + ) + .await?; // Initialize the node. let mut node = Self { diff --git a/node/src/validator/router.rs b/node/src/validator/router.rs index c7d3e33e44..ac7b486f5c 100644 --- a/node/src/validator/router.rs +++ b/node/src/validator/router.rs @@ -199,9 +199,11 @@ impl> Inbound for Validator { /// Handles a `BlockResponse` message. fn block_response(&self, peer_ip: SocketAddr, blocks: Vec>) -> bool { - // Tries to advance with blocks from the sync module. - match self.sync.advance_with_sync_blocks(peer_ip, blocks) { - Ok(()) => true, + match self.sync.insert_block_responses(peer_ip, blocks) { + Ok(()) => { + self.sync.try_advancing_block_synchronization(); + true + } Err(error) => { warn!("{error}"); false @@ -209,19 +211,10 @@ impl> Inbound for Validator { } } - /// Processes the block locators and sends back a `Pong` message. - fn ping(&self, peer_ip: SocketAddr, message: Ping) -> bool { - // Check if the sync module is in router mode. - if self.sync.mode().is_router() { - // If block locators were provided, then update the peer in the sync pool. - if let Some(block_locators) = message.block_locators { - // Check the block locators are valid, and update the peer in the sync pool. - if let Err(error) = self.sync.update_peer_locators(peer_ip, block_locators) { - warn!("Peer '{peer_ip}' sent invalid block locators: {error}"); - return false; - } - } - } + /// Processes a ping message from a client (or prover) and sends back a `Pong` message. + fn ping(&self, peer_ip: SocketAddr, _message: Ping) -> bool { + // In gateway/validator mode, we do not need to process client block locators. + // Instead, locators are fetched from other validators in `Gateway` using `PrimaryPing` messages. // Send a `Pong` message to the peer. Outbound::send(self, peer_ip, Message::Pong(Pong { is_fork: Some(false) })); diff --git a/node/sync/Cargo.toml b/node/sync/Cargo.toml index ff90762863..ea173e0937 100644 --- a/node/sync/Cargo.toml +++ b/node/sync/Cargo.toml @@ -22,9 +22,8 @@ locktick = [ "dep:locktick", "snarkos-node-bft-ledger-service/locktick", "snarkos-node-router/locktick", - "snarkos-node-tcp/locktick", - "snarkvm/locktick" ] + metrics = [ "dep:metrics" ] cuda = [ "snarkvm/cuda", "snarkos-node-bft-ledger-service/cuda", "snarkos-node-router/cuda" ] test = [ "snarkos-node-sync-locators/test" ] @@ -73,17 +72,9 @@ version = "=3.7.1" path = "locators" version = "=3.7.1" -[dependencies.snarkos-node-tcp] -path = "../tcp" -version = "=3.7.1" - [dependencies.snarkvm] workspace = true -[dependencies.tokio] -version = "1.28" -features = [ "rt", "signal" ] - [dependencies.tracing] version = "0.1" diff --git a/node/sync/communication-service/src/lib.rs b/node/sync/communication-service/src/lib.rs index a0d3e745db..03bbcded71 100644 --- a/node/sync/communication-service/src/lib.rs +++ b/node/sync/communication-service/src/lib.rs @@ -21,12 +21,15 @@ extern crate async_trait; use std::{io, net::SocketAddr}; use tokio::sync::oneshot; +/// Abstract communcation service. +/// +/// Implemented by `Gateway` and `Client`. #[async_trait] pub trait CommunicationService: Send + Sync { - /// The message type. + /// The message type used by this communication service. type Message: Clone; - /// Prepares a block request to be sent. + /// Generates the service-specific message for a block request. fn prepare_block_request(start: u32, end: u32) -> Self::Message; /// Sends the given message to specified peer. @@ -34,5 +37,6 @@ pub trait CommunicationService: Send + Sync { /// This function returns as soon as the message is queued to be sent, /// without waiting for the actual delivery; instead, the caller is provided with a [`oneshot::Receiver`] /// which can be used to determine when and whether the message has been delivered. + /// If no peer with the given IP exists, this function returns None. async fn send(&self, peer_ip: SocketAddr, message: Self::Message) -> Option>>; } diff --git a/node/sync/src/block_sync.rs b/node/sync/src/block_sync.rs index e52605ef26..c5e44f830d 100644 --- a/node/sync/src/block_sync.rs +++ b/node/sync/src/block_sync.rs @@ -21,7 +21,6 @@ use snarkos_node_bft_ledger_service::LedgerService; use snarkos_node_router::messages::DataBlocks; use snarkos_node_sync_communication_service::CommunicationService; use snarkos_node_sync_locators::{CHECKPOINT_INTERVAL, NUM_RECENT_BLOCKS}; -use snarkos_node_tcp::Tcp; use snarkvm::prelude::{Network, block::Block}; use anyhow::{Result, bail, ensure}; @@ -31,7 +30,7 @@ use itertools::Itertools; use locktick::parking_lot::{Mutex, RwLock}; #[cfg(not(feature = "locktick"))] use parking_lot::{Mutex, RwLock}; -use rand::{CryptoRng, Rng, prelude::IteratorRandom}; +use rand::seq::{IteratorRandom, SliceRandom}; use std::{ collections::{BTreeMap, HashMap, HashSet}, net::{IpAddr, Ipv4Addr, SocketAddr}, @@ -39,13 +38,21 @@ use std::{ Arc, atomic::{AtomicBool, AtomicU32, Ordering}, }, - time::Instant, + time::{Duration, Instant}, }; +// The redudancy factor decreases the possiblity of a malicious peers sending us an invalid block locator +// by requiring multiple peers to advertise the same (prefix of) block locators. +// However, we do not use this in production yet. #[cfg(not(test))] pub const REDUNDANCY_FACTOR: usize = 1; #[cfg(test)] pub const REDUNDANCY_FACTOR: usize = 3; + +/// The time nodes wait between issuing batches of block requests to avoid triggering spam detection. +// TODO (kaimast): Document why 10ms (not 1 or 100) +pub const BLOCK_REQUEST_BATCH_DELAY: Duration = Duration::from_millis(10); + const EXTRA_REDUNDANCY_FACTOR: usize = REDUNDANCY_FACTOR * 3; const NUM_SYNC_CANDIDATE_PEERS: usize = REDUNDANCY_FACTOR * 5; @@ -59,25 +66,18 @@ pub const MAX_BLOCKS_BEHIND: u32 = 1; // blocks /// Note: This here does not need to be a real IP address, but it must be unique/distinct from all other connections. pub const DUMMY_SELF_IP: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0); -#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] -pub enum BlockSyncMode { - Router, - Gateway, -} - -impl BlockSyncMode { - /// Returns `true` if the node is in router mode. - pub const fn is_router(&self) -> bool { - matches!(self, Self::Router) - } - - /// Returns `true` if the node is in gateway mode. - pub const fn is_gateway(&self) -> bool { - matches!(self, Self::Gateway) - } -} - -/// A struct that keeps track of the current block sync state. +/// A struct that keeps track of synchronizing blocks with other nodes. +/// +/// It generates requests to send to other peers and processes responses to those requests. +/// The struct also keeps track of block locators, which indicate which peers it can fetch blocks from. +/// +/// # Notes +/// - The actual network communication happens in `snarkos_node::Client` (for clients and provers) and in `snarkos_node_bft::Sync` (for validators). +/// +/// - Validators only sync from other nodes using this struct if they fall behind, e.g., +/// because they experience a network partition. +/// In the common case, validators will generate blocks from the DAG after an anchor certificate has been approved +/// by a supermajority of the committee. /// /// # State /// - When a request is inserted, the `requests` map and `request_timestamps` map insert an entry for the request height. @@ -86,44 +86,43 @@ impl BlockSyncMode { /// the `request_timestamps` map remains unchanged. /// - When a response is removed/completed, the `requests` map and `request_timestamps` map also remove the entry for the request height. /// - When a request is timed out, the `requests`, `request_timestamps`, and `responses` map remove the entry for the request height. -#[derive(Clone, Debug)] +/// +/// Invariant: `requests` and `request_timestamps` always have the same keys. +/// Initially, they have no keys (see `new()`), thus establishing the invariant. +/// All the functions that change the keys of one map, also change the keys of the other map in the same way, +/// thus preserving the invariant. +#[derive(Debug)] pub struct BlockSync { - /// The block sync mode. - mode: BlockSyncMode, /// The ledger. ledger: Arc>, - /// The TCP stack. - tcp: Tcp, /// The map of peer IP to their block locators. /// The block locators are consistent with the ledger and every other peer's block locators. - locators: Arc>>>, + locators: RwLock>>, /// The map of peer-to-peer to their common ancestor. /// This map is used to determine which peers to request blocks from. - common_ancestors: Arc>>, + common_ancestors: RwLock>, /// The map of block height to the expected block hash and peer IPs. /// Each entry is removed when its corresponding entry in the responses map is removed. - requests: Arc>>>, + requests: RwLock>>, /// The map of block height to the received blocks. /// Removing an entry from this map must remove the corresponding entry from the requests map. - responses: Arc>>>, + responses: RwLock>>, /// The map of block height to the timestamp of the last time the block was requested. /// This map is used to determine which requests to remove if they have been pending for too long. - request_timestamps: Arc>>, + request_timestamps: RwLock>, /// The boolean indicator of whether the node is synced up to the latest block (within the given tolerance). - is_block_synced: Arc, + is_block_synced: AtomicBool, /// The number of blocks the peer is behind the greatest peer height. - num_blocks_behind: Arc, + num_blocks_behind: AtomicU32, /// The lock to guarantee advance_with_sync_blocks() is called only once at a time. - advance_with_sync_blocks_lock: Arc>, + advance_with_sync_blocks_lock: Mutex<()>, } impl BlockSync { /// Initializes a new block sync module. - pub fn new(mode: BlockSyncMode, ledger: Arc>, tcp: Tcp) -> Self { + pub fn new(ledger: Arc>) -> Self { Self { - mode, ledger, - tcp, locators: Default::default(), common_ancestors: Default::default(), requests: Default::default(), @@ -135,12 +134,6 @@ impl BlockSync { } } - /// Returns the block sync mode. - #[inline] - pub const fn mode(&self) -> BlockSyncMode { - self.mode - } - /// Returns `true` if the node is synced up to the latest block (within the given tolerance). #[inline] pub fn is_block_synced(&self) -> bool { @@ -154,35 +147,14 @@ impl BlockSync { } } -#[allow(dead_code)] +// Helper functions needed for testing +#[cfg(test)] impl BlockSync { /// Returns the latest block height of the given peer IP. fn get_peer_height(&self, peer_ip: &SocketAddr) -> Option { self.locators.read().get(peer_ip).map(|locators| locators.latest_locator_height()) } - // /// Returns a map of peer height to peer IPs. - // /// e.g. `{{ 127 => \[peer1, peer2\], 128 => \[peer3\], 135 => \[peer4, peer5\] }}` - // fn get_peer_heights(&self) -> BTreeMap> { - // self.locators.read().iter().map(|(peer_ip, locators)| (locators.latest_locator_height(), *peer_ip)).fold( - // Default::default(), - // |mut map, (height, peer_ip)| { - // map.entry(height).or_default().push(peer_ip); - // map - // }, - // ) - // } - - // /// Returns the list of peers with their heights, sorted by height (descending). - // fn get_peers_by_height(&self) -> Vec<(SocketAddr, u32)> { - // self.locators - // .read() - // .iter() - // .map(|(peer_ip, locators)| (*peer_ip, locators.latest_locator_height())) - // .sorted_by(|(_, a), (_, b)| b.cmp(a)) - // .collect() - // } - /// Returns the common ancestor for the given peer pair, if it exists. fn get_common_ancestor(&self, peer_a: SocketAddr, peer_b: SocketAddr) -> Option { self.common_ancestors.read().get(&PeerPair(peer_a, peer_b)).copied() @@ -225,96 +197,69 @@ impl BlockSync { BlockLocators::new(recents, checkpoints) } - /// Performs one iteration of the block sync. - #[inline] - pub async fn try_block_sync(&self, communication: &C) { - // Prepare the block requests, if any. - // In the process, we update the state of `is_block_synced` for the sync module. - let (block_requests, sync_peers) = self.prepare_block_requests(); - trace!("Prepared {} block requests", block_requests.len()); - - // If there are no block requests, but there are pending block responses in the sync pool, - // then try to advance the ledger using these pending block responses. - // Note: This condition is guarded by `mode.is_router()` because validators sync blocks - // using another code path that updates both `storage` and `ledger` when advancing blocks. - if block_requests.is_empty() && !self.responses.read().is_empty() && self.mode.is_router() { - // Retrieve the latest block height. - let current_height = self.ledger.latest_block_height(); - - // Acquire the lock to ensure try_advancing_with_block_responses is called only once at a time. - // If the lock is already acquired, return early. - let Some(_lock) = self.advance_with_sync_blocks_lock.try_lock() else { - trace!( - "Skipping a call to try_block_sync() as a block advance is already in progress (at block {current_height})" - ); - return; - }; + /// Returns true if there are pending responses to block requests that need to be processed. + pub fn has_pending_responses(&self) -> bool { + !self.responses.read().is_empty() + } - // Try to advance the ledger with the sync pool. - trace!("No block requests to send - try advancing with block responses (at block {current_height})"); - self.try_advancing_with_block_responses(current_height); - // Return early. - return; - } + /// Send a batch of block requests. + pub async fn send_block_requests( + &self, + communication: &C, + sync_peers: &IndexMap>, + requests: &[(u32, PrepareSyncRequest)], + ) -> bool { + let (start_height, max_num_sync_ips) = match requests.first() { + Some((height, (_, _, max_num_sync_ips))) => (*height, *max_num_sync_ips), + None => { + warn!("Block sync failed - no block requests"); + return false; + } + }; - // Process the block requests. - 'outer: for requests in block_requests.chunks(DataBlocks::::MAXIMUM_NUMBER_OF_BLOCKS as usize) { - // Retrieve the starting height and the sync IPs. - let (start_height, max_num_sync_ips) = match requests.first() { - Some((height, (_, _, max_num_sync_ips))) => (*height, *max_num_sync_ips), - None => { - warn!("Block sync failed - no block requests"); - break 'outer; - } - }; + // Use a randomly sampled subset of the sync IPs. + let sync_ips: IndexSet<_> = + sync_peers.keys().copied().choose_multiple(&mut rand::thread_rng(), max_num_sync_ips).into_iter().collect(); - // Use a randomly sampled subset of the sync IPs. - let sync_ips: IndexSet<_> = sync_peers - .keys() - .copied() - .choose_multiple(&mut rand::thread_rng(), max_num_sync_ips) - .into_iter() - .collect(); - - // Calculate the end height. - let end_height = start_height.saturating_add(requests.len() as u32); - - // Insert the chunk of block requests. - for (height, (hash, previous_hash, _)) in requests.iter() { - // Insert the block request into the sync pool using the sync IPs from the last block request in the chunk. - if let Err(error) = self.insert_block_request(*height, (*hash, *previous_hash, sync_ips.clone())) { - warn!("Block sync failed - {error}"); - // Break out of the loop. - break 'outer; - } + // Calculate the end height. + let end_height = start_height.saturating_add(requests.len() as u32); + + // Insert the chunk of block requests. + for (height, (hash, previous_hash, _)) in requests.iter() { + // Insert the block request into the sync pool using the sync IPs from the last block request in the chunk. + if let Err(error) = self.insert_block_request(*height, (*hash, *previous_hash, sync_ips.clone())) { + warn!("Block sync failed - {error}"); + return false; } + } - /* Send the block request to the peers */ - - // Construct the message. - let message = C::prepare_block_request(start_height, end_height); - // Send the message to the peers. - for sync_ip in sync_ips { - let sender = communication.send(sync_ip, message.clone()).await; - // If the send fails for any peer, remove the block request from the sync pool. - if sender.is_none() { - warn!("Failed to send block request to peer '{sync_ip}'"); - // Remove the entire block request from the sync pool. - for height in start_height..end_height { - self.remove_block_request(height); - } - // Break out of the loop. - break 'outer; + /* Send the block request to the peers */ + + // Construct the message. + let message = C::prepare_block_request(start_height, end_height); + // Send the message to the peers. + for sync_ip in sync_ips { + let sender = communication.send(sync_ip, message.clone()).await; + // If the send fails for any peer, remove the block request from the sync pool. + if sender.is_none() { + warn!("Failed to send block request to peer '{sync_ip}'"); + // Remove the entire block request from the sync pool. + for height in start_height..end_height { + self.remove_block_request(height); } + // Break out of the loop. + return false; } - // Sleep for 10 milliseconds to avoid triggering spam detection. - tokio::time::sleep(std::time::Duration::from_millis(10)).await; } + true } - /// Processes the block response from the given peer IP. + /// Inserts a new block response from the given peer IP. + /// + /// Returns an error if the block was malformed, or we already received a different block for this height. + /// Note, that this only queues the response. After this, you most likely want to call `Self::try_advancing_block_synchronization`. #[inline] - pub fn process_block_response(&self, peer_ip: SocketAddr, blocks: Vec>) -> Result<()> { + pub fn insert_block_responses(&self, peer_ip: SocketAddr, blocks: Vec>) -> Result<()> { // Insert the candidate blocks into the sync pool. for block in blocks { if let Err(error) = self.insert_block_response(peer_ip, block) { @@ -326,6 +271,8 @@ impl BlockSync { /// Returns the next block for the given `next_height` if the request is complete, /// or `None` otherwise. This does not remove the block from the `responses` map. + /// + /// Postcondition: If this function returns `Some`, then `self.responses` has `next_height` as a key. #[inline] pub fn peek_next_block(&self, next_height: u32) -> Option> { // Acquire the requests write lock. @@ -333,40 +280,32 @@ impl BlockSync { // from multiple peers that may be received concurrently. let requests = self.requests.read(); - // Determine if the request is complete. + // Determine if the request is complete: + // either there is no request for `next_height`, or the request has no peer socket addresses left. let is_request_complete = requests.get(&next_height).map(|(_, _, peer_ips)| peer_ips.is_empty()).unwrap_or(true); - // If the request is not complete, return early. - if !is_request_complete { - return None; - } - - self.responses.read().get(&next_height).cloned() + // If the request is complete, return the block from the responses, if there is one. + if is_request_complete { self.responses.read().get(&next_height).cloned() } else { None } } - /// Attempts to advance with blocks from the sync pool. + /// Attempts to advance synchronization by processing completed block responses. + /// + /// Validators will not call this function, but instead execute `snarkos_node_bft::Sync::try_advancing_block_synchronization` + /// which also updates the BFT state. #[inline] - pub fn advance_with_sync_blocks(&self, peer_ip: SocketAddr, blocks: Vec>) -> Result<()> { - // Process the block response from the given peer IP. - self.process_block_response(peer_ip, blocks)?; - + pub fn try_advancing_block_synchronization(&self) { // Acquire the lock to ensure this function is called only once at a time. // If the lock is already acquired, return early. let Some(_lock) = self.advance_with_sync_blocks_lock.try_lock() else { - trace!("Skipping a call to advance_with_sync_blocks() as it is already in progress"); - return Ok(()); + trace!("Skipping attempt to advance block synchronziation as it is already in progress"); + return; }; - // Retrieve the latest block height. - let current_height = self.ledger.latest_block_height(); - // Try to advance the ledger with the sync pool. - self.try_advancing_with_block_responses(current_height); - Ok(()) - } + // Start with the current height. + let mut current_height = self.ledger.latest_block_height(); + trace!("Try advancing with block responses (at block {current_height})"); - /// Handles the block responses from the sync pool. - fn try_advancing_with_block_responses(&self, mut current_height: u32) { while let Some(block) = self.peek_next_block(current_height + 1) { // Ensure the block height matches. if block.height() != current_height + 1 { @@ -411,7 +350,13 @@ impl BlockSync { /// This function returns peers that are consistent with each other, and have a block height /// that is greater than the ledger height of this node. pub fn find_sync_peers(&self) -> Option<(IndexMap, u32)> { - if let Some((sync_peers, min_common_ancestor)) = self.find_sync_peers_inner() { + self.find_sync_peers_at_height(self.ledger.latest_block_height()) + } + + /// Same as `Self::find_sync_peers`, but allows specifiying a custom height + /// (must be greater than the ledger height). + pub fn find_sync_peers_at_height(&self, current_height: u32) -> Option<(IndexMap, u32)> { + if let Some((sync_peers, min_common_ancestor)) = self.find_sync_peers_inner(current_height) { // Map the locators into the latest height. let sync_peers = sync_peers.into_iter().map(|(ip, locators)| (ip, locators.latest_locator_height())).collect(); @@ -488,40 +433,58 @@ impl BlockSync { } } +// Helper type for prepare_block_requests +type BlockRequestBatch = (Vec<(u32, PrepareSyncRequest)>, IndexMap>); + impl BlockSync { /// Returns a list of block requests and the sync peers, if the node needs to sync. - #[allow(clippy::type_complexity)] - fn prepare_block_requests(&self) -> (Vec<(u32, PrepareSyncRequest)>, IndexMap>) { - // Remove timed out block requests. - self.remove_timed_out_block_requests(); + /// + /// You usually want to call `remove_timed_out_block_requests` before invoking this function. + /// + /// `current_height` should either be the ledger height, or the height of the pending blocks (for validators). + pub fn prepare_block_requests(&self) -> BlockRequestBatch { + self.prepare_block_requests_at_height(self.ledger.latest_block_height()) + } + + /// Returns a list of block requests and the sync peers, if the node needs to sync. + /// + /// You usually want to call `remove_timed_out_block_requests` before invoking this function. + /// + /// `current_height` should either be the ledger height, or the height of the pending blocks (for validators). + pub fn prepare_block_requests_at_height(&self, current_height: u32) -> BlockRequestBatch { // Prepare the block requests. - if let Some((sync_peers, min_common_ancestor)) = self.find_sync_peers_inner() { + if let Some((sync_peers, min_common_ancestor)) = self.find_sync_peers_inner(current_height) { // Retrieve the highest block height. let greatest_peer_height = sync_peers.values().map(|l| l.latest_locator_height()).max().unwrap_or(0); // Update the state of `is_block_synced` for the sync module. - self.update_is_block_synced(greatest_peer_height, MAX_BLOCKS_BEHIND); + self.update_is_block_synced(greatest_peer_height, current_height, MAX_BLOCKS_BEHIND); // Return the list of block requests. (self.construct_requests(&sync_peers, min_common_ancestor), sync_peers) } else { // Update `is_block_synced` if there are no pending requests or responses. if self.requests.read().is_empty() && self.responses.read().is_empty() { + trace!("All requests have been processed. Will set block synced to true."); // Update the state of `is_block_synced` for the sync module. - self.update_is_block_synced(0, MAX_BLOCKS_BEHIND); + self.update_is_block_synced(0, current_height, MAX_BLOCKS_BEHIND); + } else { + trace!("No new blocks can be requests, but there are still outstanding requests."); } + // Return an empty list of block requests. (Default::default(), Default::default()) } } /// Updates the state of `is_block_synced` for the sync module. - fn update_is_block_synced(&self, greatest_peer_height: u32, max_blocks_behind: u32) { + fn update_is_block_synced(&self, greatest_peer_height: u32, current_height: u32, max_blocks_behind: u32) { // Retrieve the latest block height. let ledger_height = self.ledger.latest_block_height(); trace!( - "Updating is_block_synced: greatest_peer_height = {greatest_peer_height}, ledger_height = {ledger_height}" + "Updating is_block_synced: greatest_peer_height = {greatest_peer_height}, ledger_height = {ledger_height}, + current_height = {current_height}" ); // Compute the number of blocks that we are behind by. - let num_blocks_behind = greatest_peer_height.saturating_sub(ledger_height); + let num_blocks_behind = greatest_peer_height.saturating_sub(current_height); // Determine if the primary is synced. let is_synced = num_blocks_behind <= max_blocks_behind; // Update the num blocks behind. @@ -561,6 +524,7 @@ impl BlockSync { } // Remove the peer IP from the request entry. + // This `if` never fails, because of the postcondition of `check_block_response` (called above). if let Some((_, _, sync_ips)) = self.requests.write().get_mut(&height) { sync_ips.swap_remove(&peer_ip); } @@ -599,6 +563,8 @@ impl BlockSync { bail!("Failed to add block request, as block {height} exists in the responses map"); } // Ensure the block height is not already requested. + // TODO: Because of the invariant that `requests` and `request_timestamps` have the same keys + // (see `BlockSync` doc), the following check is redundant. if self.request_timestamps.read().contains_key(&height) { bail!("Failed to add block request, as block {height} exists in the timestamps map"); } @@ -606,6 +572,8 @@ impl BlockSync { } /// Checks the given block (response) from a peer against the expected block hash and previous block hash. + /// + /// Postcondition: If this function returns `Ok`, then `self.requests` has `height` as a key. fn check_block_response(&self, peer_ip: &SocketAddr, block: &Block) -> Result<()> { // Retrieve the block height. let height = block.height(); @@ -646,8 +614,12 @@ impl BlockSync { self.request_timestamps.write().remove(&height); } - /// Removes the block response for the given height + /// Removes the block request and response for the given height /// This may only be called after `peek_next_block`, which checked if the request for the given height was complete. + /// + /// Precondition: This may only be called after `peek_next_block` has returned `Some`, + /// which has checked if the request for the given height is complete + /// and there is a block with the given `height` in the `responses` map. pub fn remove_block_response(&self, height: u32) { // Acquire the requests write lock. // Note: This lock must be held across the entire scope, due to asynchronous block responses @@ -663,25 +635,9 @@ impl BlockSync { self.responses.write().remove(&height); } - /// Removes the block request for the given peer IP, if it exists. - #[allow(dead_code)] - fn remove_block_request_to_peer(&self, peer_ip: &SocketAddr, height: u32) { - let mut can_revoke = self.peek_next_block(height).is_none(); - - // Remove the peer IP from the request entry. If the request entry is now empty, - // and the response entry for this height is also empty, then remove the request entry altogether. - if let Some((_, _, sync_ips)) = self.requests.write().get_mut(&height) { - sync_ips.swap_remove(peer_ip); - can_revoke &= sync_ips.is_empty(); - } - - if can_revoke { - self.requests.write().remove(&height); - self.request_timestamps.write().remove(&height); - } - } - /// Removes all block requests for the given peer IP. + /// + /// This is used when disconnecting from a peer or when a peer sends invalid block responses. fn remove_block_requests_to_peer(&self, peer_ip: &SocketAddr) { trace!("Block sync is removing all block requests to peer {peer_ip}..."); // Acquire the write lock on the requests map. @@ -703,9 +659,10 @@ impl BlockSync { }); } - /// Removes block requests that have timed out. This also removes the corresponding block responses, - /// and adds the timed out sync IPs to a map for tracking. Returns the number of timed out block requests. - fn remove_timed_out_block_requests(&self) -> usize { + /// Removes block requests that have timed out, i.e, requests we sent that did not receive a response in time. + /// + /// This removes the corresponding block responses and returns the set of peers/addresses that timed out. + pub fn remove_timed_out_block_requests(&self) -> HashSet { // Acquire the write lock on the requests map. let mut requests = self.requests.write(); // Acquire the write lock on the responses map. @@ -721,9 +678,10 @@ impl BlockSync { // Retrieve the current block height let current_height = self.ledger.latest_block_height(); - // Track the number of timed out block requests. + // Track the number of timed out block requests (only used to print a log message). let mut num_timed_out_block_requests = 0; + // Track which peers should be banned due to unresponsiveness. let mut peers_to_ban: HashSet = HashSet::new(); // Remove timed out block requests. @@ -755,38 +713,36 @@ impl BlockSync { requests.remove(height); // Remove the response entry for the given height. responses.remove(height); + } + + if is_timeout { // Increment the number of timed out block requests. num_timed_out_block_requests += 1; } + // Retain if this is not a timeout and is not obsolete. !is_timeout && !is_obsolete }); - // After the retain loop, handle the banning of peers - for peer_ip in peers_to_ban { - trace!("Banning peer {peer_ip} for timing out on block requests"); - self.tcp.banned_peers().update_ip_ban(peer_ip.ip()); - - let tcp = self.tcp.clone(); - tokio::spawn(async move { - tcp.disconnect(peer_ip).await; - }); + if num_timed_out_block_requests > 0 { + debug!("{num_timed_out_block_requests} block requests timed out"); } - num_timed_out_block_requests + peers_to_ban } /// Returns the sync peers and their minimum common ancestor, if the node needs to sync. - fn find_sync_peers_inner(&self) -> Option<(IndexMap>, u32)> { + fn find_sync_peers_inner(&self, current_height: u32) -> Option<(IndexMap>, u32)> { // Retrieve the latest ledger height. let latest_ledger_height = self.ledger.latest_block_height(); // Pick a set of peers above the latest ledger height, and include their locators. + // This will sort the peers by locator height in descending order. let candidate_locators: IndexMap<_, _> = self .locators .read() .iter() - .filter(|(_, locators)| locators.latest_locator_height() > latest_ledger_height) + .filter(|(_, locators)| locators.latest_locator_height() > current_height) .sorted_by(|(_, a), (_, b)| b.latest_locator_height().cmp(&a.latest_locator_height())) .take(NUM_SYNC_CANDIDATE_PEERS) .map(|(peer_ip, locators)| (*peer_ip, locators.clone())) @@ -803,55 +759,46 @@ impl BlockSync { // a common ancestor above the block request range. Set the end height to their common ancestor. // Determine the threshold number of peers to sync from. - let threshold_to_request = core::cmp::min(candidate_locators.len(), REDUNDANCY_FACTOR); - - let mut min_common_ancestor = 0; - let mut sync_peers = IndexMap::new(); + let threshold_to_request = candidate_locators.len().min(REDUNDANCY_FACTOR); // Breaks the loop when the first threshold number of peers are found, biasing for the peer with the highest height // and a cohort of peers who share a common ancestor above this node's latest ledger height. - for (i, (peer_ip, peer_locators)) in candidate_locators.iter().enumerate() { - // As the previous iteration did not `break`, reset the sync peers. - sync_peers.clear(); + for (idx, (peer_ip, peer_locators)) in candidate_locators.iter().enumerate() { + // The height of the common ancestor shared by all selected peers. + let mut min_common_ancestor = peer_locators.latest_locator_height(); - // Set the minimum common ancestor. - min_common_ancestor = peer_locators.latest_locator_height(); - // Add the peer to the sync peers. - sync_peers.insert(*peer_ip, peer_locators.clone()); + // The peers we will synchronize from. + // As the previous iteration did not succeed, restart with the next candidate peers. + let mut sync_peers = vec![(*peer_ip, peer_locators.clone())]; - for (other_ip, other_locators) in candidate_locators.iter().skip(i + 1) { + // Try adding other peers consistent with this one to the sync peer set. + for (other_ip, other_locators) in candidate_locators.iter().skip(idx + 1) { // Check if these two peers have a common ancestor above the latest ledger height. if let Some(common_ancestor) = self.common_ancestors.read().get(&PeerPair(*peer_ip, *other_ip)) { - if *common_ancestor > latest_ledger_height { - // If so, then check that their block locators are consistent. - if peer_locators.is_consistent_with(other_locators) { - // If their common ancestor is less than the minimum common ancestor, then update it. - if *common_ancestor < min_common_ancestor { - min_common_ancestor = *common_ancestor; - } - // Add the other peer to the list of sync peers. - sync_peers.insert(*other_ip, other_locators.clone()); - } + // If so, then check that their block locators are consistent. + if *common_ancestor > latest_ledger_height && peer_locators.is_consistent_with(other_locators) { + // If their common ancestor is less than the minimum common ancestor, then update it. + min_common_ancestor = min_common_ancestor.min(*common_ancestor); + + // Add the other peer to the list of sync peers. + sync_peers.push((*other_ip, other_locators.clone())); } } } - // If we have enough sync peers above the latest ledger height, then break the loop. + // If we have enough sync peers above the latest ledger height, finish and return them. if min_common_ancestor > latest_ledger_height && sync_peers.len() >= threshold_to_request { - break; - } - } + // Shuffle the sync peers prior to returning. This ensures the rest of the stack + // does not rely on the order of the sync peers, and that the sync peers are not biased. + sync_peers.shuffle(&mut rand::thread_rng()); - // If there is not enough peers with a minimum common ancestor above the latest ledger height, then return early. - if min_common_ancestor <= latest_ledger_height || sync_peers.len() < threshold_to_request { - return None; + // Collect into an IndexMap and return. + return Some((sync_peers.into_iter().collect(), min_common_ancestor)); + } } - // Shuffle the sync peers prior to returning. This ensures the rest of the stack - // does not rely on the order of the sync peers, and that the sync peers are not biased. - let sync_peers = shuffle_indexmap(sync_peers, &mut rand::thread_rng()); - - Some((sync_peers, min_common_ancestor)) + // If there is not enough peers with a minimum common ancestor above the latest ledger height, return None. + None } /// Given the sync peers and their minimum common ancestor, return a list of block requests. @@ -990,18 +937,6 @@ fn construct_request( (hash, previous_hash, num_sync_ips, is_honest) } -/// Shuffles a given `IndexMap` using the given random number generator. -fn shuffle_indexmap(mut map: IndexMap, rng: &mut R) -> IndexMap -where - K: core::hash::Hash + Eq + Clone, - V: Clone, -{ - use rand::seq::SliceRandom; - let mut pairs: Vec<_> = map.drain(..).collect(); // Drain elements to a vector - pairs.shuffle(rng); // Shuffle the vector of tuples - pairs.into_iter().collect() // Collect back into an IndexMap -} - #[cfg(test)] mod tests { use super::*; @@ -1010,12 +945,15 @@ mod tests { NUM_RECENT_BLOCKS, test_helpers::{sample_block_locators, sample_block_locators_with_fork}, }; + use snarkos_node_bft_ledger_service::MockLedgerService; - use snarkvm::prelude::{Field, TestRng}; + use snarkvm::{ + ledger::committee::Committee, + prelude::{Field, TestRng}, + }; use indexmap::{IndexSet, indexset}; - use snarkos_node_tcp::Config; - use snarkvm::ledger::committee::Committee; + use rand::Rng; use std::net::{IpAddr, Ipv4Addr}; type CurrentNetwork = snarkvm::prelude::MainnetV0; @@ -1039,34 +977,24 @@ mod tests { /// Returns the sync pool, with the ledger initialized to the given height. fn sample_sync_at_height(height: u32) -> BlockSync { - BlockSync::::new(BlockSyncMode::Router, Arc::new(sample_ledger_service(height)), sample_tcp()) + BlockSync::::new(Arc::new(sample_ledger_service(height))) } - /// Returns a duplicate sync pool with a different ledger height. + /// Returns a duplicate (deep copy) of the sync pool with a different ledger height. fn duplicate_sync_at_new_height(sync: &BlockSync, height: u32) -> BlockSync { BlockSync:: { - mode: sync.mode, ledger: Arc::new(sample_ledger_service(height)), - tcp: sync.tcp.clone(), - locators: sync.locators.clone(), - common_ancestors: sync.common_ancestors.clone(), - requests: sync.requests.clone(), - responses: sync.responses.clone(), - request_timestamps: sync.request_timestamps.clone(), - is_block_synced: sync.is_block_synced.clone(), - num_blocks_behind: sync.num_blocks_behind.clone(), + locators: RwLock::new(sync.locators.read().clone()), + common_ancestors: RwLock::new(sync.common_ancestors.read().clone()), + requests: RwLock::new(sync.requests.read().clone()), + responses: RwLock::new(sync.responses.read().clone()), + request_timestamps: RwLock::new(sync.request_timestamps.read().clone()), + is_block_synced: AtomicBool::new(sync.is_block_synced.load(Ordering::SeqCst)), + num_blocks_behind: AtomicU32::new(sync.num_blocks_behind.load(Ordering::SeqCst)), advance_with_sync_blocks_lock: Default::default(), } } - fn sample_tcp() -> Tcp { - Tcp::new(Config { - listener_ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)), - max_connections: 200, - ..Default::default() - }) - } - /// Checks that the sync pool (starting at genesis) returns the correct requests. fn check_prepare_block_requests( sync: BlockSync, @@ -1120,30 +1048,23 @@ mod tests { } } + /// Tests that height and hash values are set correctly using many different maximum block heights. #[test] - fn test_latest_block_height() { + fn test_get_block_height_and_hash() { for height in 0..100_002u32 { let sync = sample_sync_at_height(height); + + // Check that the latest blokc height is the maximum height. assert_eq!(sync.ledger.latest_block_height(), height); - } - } - #[test] - fn test_get_block_height() { - for height in 0..100_002u32 { - let sync = sample_sync_at_height(height); + // Check the hash to height mapping assert_eq!(sync.ledger.get_block_height(&(Field::::from_u32(0)).into()).unwrap(), 0); assert_eq!( sync.ledger.get_block_height(&(Field::::from_u32(height)).into()).unwrap(), height ); - } - } - #[test] - fn test_get_block_hash() { - for height in 0..100_002u32 { - let sync = sample_sync_at_height(height); + // Check the height to hash mapping assert_eq!(sync.ledger.get_block_hash(0).unwrap(), (Field::::from_u32(0)).into()); assert_eq!(sync.ledger.get_block_hash(height).unwrap(), (Field::::from_u32(height)).into()); }