Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions node/bft/examples/simple_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use snarkos_node_bft::{
};
use snarkos_node_bft_ledger_service::TranslucentLedgerService;
use snarkos_node_bft_storage_service::BFTMemoryService;
use snarkos_node_sync::BlockSync;
use snarkvm::{
console::{account::PrivateKey, algorithms::BHP256, types::Address},
ledger::{
Expand Down Expand Up @@ -143,9 +142,7 @@ pub async fn start_bft(
// Initialize the consensus receiver handler.
consensus_handler(consensus_receiver);
// Initialize the BFT instance.
let block_sync = Arc::new(BlockSync::new(ledger.clone()));
let mut bft =
BFT::<CurrentNetwork>::new(account, storage, ledger, block_sync, ip, &trusted_validators, storage_mode)?;
let mut bft = BFT::<CurrentNetwork>::new(account, storage, ledger, ip, &trusted_validators, storage_mode)?;
// Run the BFT instance.
bft.run(Some(consensus_sender), sender.clone(), receiver).await?;
// Retrieve the BFT's primary.
Expand Down Expand Up @@ -183,9 +180,7 @@ pub async fn start_primary(
// Initialize the trusted validators.
let trusted_validators = trusted_validators(node_id, num_nodes, peers);
// Initialize the primary instance.
let block_sync = Arc::new(BlockSync::new(ledger.clone()));
let mut primary =
Primary::<CurrentNetwork>::new(account, storage, ledger, block_sync, ip, &trusted_validators, storage_mode)?;
let mut primary = Primary::<CurrentNetwork>::new(account, storage, ledger, ip, &trusted_validators, storage_mode)?;
// Run the primary instance.
primary.run(None, sender.clone(), receiver).await?;
// Handle OS signals.
Expand Down
71 changes: 27 additions & 44 deletions node/bft/src/bft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use crate::{
};
use snarkos_account::Account;
use snarkos_node_bft_ledger_service::LedgerService;
use snarkos_node_sync::BlockSync;
use snarkvm::{
console::account::Address,
ledger::{
Expand Down Expand Up @@ -70,9 +69,9 @@ use tokio::{

#[derive(Clone)]
pub struct BFT<N: Network> {
/// The primary for this node.
/// The primary.
primary: Primary<N>,
/// The DAG of batches from which we build the blockchain.
/// The DAG.
dag: Arc<RwLock<DAG<N>>>,
/// The batch certificate of the leader from the current even round, if one was present.
leader_certificate: Arc<RwLock<Option<BatchCertificate<N>>>>,
Expand All @@ -92,13 +91,12 @@ impl<N: Network> BFT<N> {
account: Account<N>,
storage: Storage<N>,
ledger: Arc<dyn LedgerService<N>>,
block_sync: Arc<BlockSync<N>>,
ip: Option<SocketAddr>,
trusted_validators: &[SocketAddr],
storage_mode: StorageMode,
) -> Result<Self> {
Ok(Self {
primary: Primary::new(account, storage, ledger, block_sync, ip, trusted_validators, storage_mode)?,
primary: Primary::new(account, storage, ledger, ip, trusted_validators, storage_mode)?,
dag: Default::default(),
leader_certificate: Default::default(),
leader_certificate_timer: Default::default(),
Expand Down Expand Up @@ -933,7 +931,6 @@ mod tests {
use snarkos_account::Account;
use snarkos_node_bft_ledger_service::MockLedgerService;
use snarkos_node_bft_storage_service::BFTMemoryService;
use snarkos_node_sync::BlockSync;
use snarkvm::{
console::account::{Address, PrivateKey},
ledger::{
Expand Down Expand Up @@ -973,18 +970,6 @@ mod tests {
(committee, account, ledger, storage)
}

// Helper function to set up BFT for testing.
fn initialize_bft(
account: Account<CurrentNetwork>,
storage: Storage<CurrentNetwork>,
ledger: Arc<MockLedgerService<CurrentNetwork>>,
) -> anyhow::Result<BFT<CurrentNetwork>> {
// Create the block synchronization logic.
let block_sync = Arc::new(BlockSync::new(ledger.clone()));
// Initialize the BFT.
BFT::new(account.clone(), storage.clone(), ledger.clone(), block_sync, None, &[], StorageMode::new_test(None))
}

#[test]
#[tracing_test::traced_test]
fn test_is_leader_quorum_odd() -> Result<()> {
Expand Down Expand Up @@ -1016,7 +1001,7 @@ mod tests {
// Initialize the account.
let account = Account::new(rng)?;
// Initialize the BFT.
let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
let bft = BFT::new(account.clone(), storage.clone(), ledger.clone(), None, &[], StorageMode::new_test(None))?;
assert!(bft.is_timer_expired());
// Ensure this call succeeds on an odd round.
let result = bft.is_leader_quorum_or_nonleaders_available(1);
Expand Down Expand Up @@ -1050,8 +1035,8 @@ mod tests {
assert_eq!(storage.current_round(), 1);
assert_eq!(storage.max_gc_rounds(), 10);

// Set up the BFT logic.
let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
// Initialize the BFT.
let bft = BFT::new(account, storage, ledger, None, &[], StorageMode::new_test(None))?;
assert!(bft.is_timer_expired()); // 0 + 5 < now()

// Store is at round 1, and we are checking for round 2.
Expand All @@ -1072,8 +1057,8 @@ mod tests {
assert_eq!(storage.current_round(), 2);
assert_eq!(storage.max_gc_rounds(), 10);

// Set up the BFT logic.
let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
// Initialize the BFT.
let bft = BFT::new(account, storage, ledger, None, &[], StorageMode::new_test(None))?;
assert!(bft.is_timer_expired()); // 0 + 5 < now()

// Ensure this call fails on an even round.
Expand Down Expand Up @@ -1112,11 +1097,8 @@ mod tests {
let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
// Initialize the account.
let account = Account::new(rng)?;

// Set up the BFT logic.
let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
assert!(bft.is_timer_expired()); // 0 + 5 < now()

// Initialize the BFT.
let bft = BFT::new(account.clone(), storage.clone(), ledger.clone(), None, &[], StorageMode::new_test(None))?;
// Set the leader certificate.
let leader_certificate = sample_batch_certificate_for_round(2, rng);
*bft.leader_certificate.write() = Some(leader_certificate);
Expand All @@ -1128,7 +1110,8 @@ mod tests {
assert!(result);

// Initialize a new BFT.
let bft_timer = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
let bft_timer =
BFT::new(account.clone(), storage.clone(), ledger.clone(), None, &[], StorageMode::new_test(None))?;
// If the leader certificate is not set and the timer has not expired, we are not ready for the next round.
let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
if !bft_timer.is_timer_expired() {
Expand Down Expand Up @@ -1159,8 +1142,7 @@ mod tests {
assert_eq!(storage.max_gc_rounds(), 10);

// Initialize the BFT.
let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
assert!(bft.is_timer_expired()); // 0 + 5 < now()
let bft = BFT::new(account, storage, ledger, None, &[], StorageMode::new_test(None))?;

// Ensure this call fails on an odd round.
let result = bft.update_leader_certificate_to_even_round(1);
Expand All @@ -1178,7 +1160,7 @@ mod tests {
assert_eq!(storage.max_gc_rounds(), 10);

// Initialize the BFT.
let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
let bft = BFT::new(account, storage, ledger, None, &[], StorageMode::new_test(None))?;

// Ensure this call succeeds on an even round.
let result = bft.update_leader_certificate_to_even_round(6);
Expand Down Expand Up @@ -1230,7 +1212,7 @@ mod tests {

// Initialize the BFT.
let account = Account::new(rng)?;
let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
let bft = BFT::new(account, storage.clone(), ledger, None, &[], StorageMode::new_test(None))?;

// Set the leader certificate.
*bft.leader_certificate.write() = Some(leader_certificate);
Expand Down Expand Up @@ -1268,7 +1250,7 @@ mod tests {
// Initialize the storage.
let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
// Initialize the BFT.
let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
let bft = BFT::new(account.clone(), storage, ledger.clone(), None, &[], StorageMode::new_test(None))?;

// Insert a mock DAG in the BFT.
*bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(3);
Expand Down Expand Up @@ -1298,7 +1280,7 @@ mod tests {
// Initialize the storage.
let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
// Initialize the BFT.
let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
let bft = BFT::new(account, storage, ledger, None, &[], StorageMode::new_test(None))?;

// Insert a mock DAG in the BFT.
*bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(2);
Expand Down Expand Up @@ -1356,7 +1338,7 @@ mod tests {
/* Test missing previous certificate. */

// Initialize the BFT.
let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
let bft = BFT::new(account, storage, ledger, None, &[], StorageMode::new_test(None))?;

// The expected error message.
let error_msg = format!(
Expand Down Expand Up @@ -1417,8 +1399,8 @@ mod tests {

// Initialize the BFT.
let account = Account::new(rng)?;
let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;

let bft = BFT::new(account, storage.clone(), ledger, None, &[], StorageMode::new_test(None))?;
// Insert a mock DAG in the BFT.
*bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round);

// Ensure that the `gc_round` has not been updated yet.
Expand Down Expand Up @@ -1483,7 +1465,7 @@ mod tests {

// Initialize the BFT.
let account = Account::new(rng)?;
let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
let bft = BFT::new(account.clone(), storage, ledger.clone(), None, &[], StorageMode::new_test(None))?;

// Insert a mock DAG in the BFT.
*bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round);
Expand All @@ -1501,7 +1483,7 @@ mod tests {
// Initialize a new instance of storage.
let storage_2 = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
// Initialize a new instance of BFT.
let bootup_bft = initialize_bft(account.clone(), storage_2, ledger)?;
let bootup_bft = BFT::new(account, storage_2, ledger, None, &[], StorageMode::new_test(None))?;

// Sync the BFT DAG at bootup.
bootup_bft.sync_bft_dag_at_bootup(certificates.clone()).await;
Expand Down Expand Up @@ -1655,7 +1637,7 @@ mod tests {

// Initialize the BFT without bootup.
let account = Account::new(rng)?;
let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
let bft = BFT::new(account.clone(), storage, ledger.clone(), None, &[], StorageMode::new_test(None))?;

// Insert a mock DAG in the BFT without bootup.
*bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
Expand All @@ -1680,7 +1662,8 @@ mod tests {
let bootup_storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);

// Initialize a new instance of BFT with bootup.
let bootup_bft = initialize_bft(account.clone(), bootup_storage.clone(), ledger.clone())?;
let bootup_bft =
BFT::new(account, bootup_storage.clone(), ledger.clone(), None, &[], StorageMode::new_test(None))?;

// Sync the BFT DAG at bootup.
bootup_bft.sync_bft_dag_at_bootup(pre_shutdown_certificates.clone()).await;
Expand Down Expand Up @@ -1858,8 +1841,8 @@ mod tests {
}
// Initialize the bootup BFT.
let account = Account::new(rng)?;
let bootup_bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;

let bootup_bft =
BFT::new(account.clone(), storage.clone(), ledger.clone(), None, &[], StorageMode::new_test(None))?;
// Insert a mock DAG in the BFT without bootup.
*bootup_bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
// Sync the BFT DAG at bootup.
Expand Down
5 changes: 2 additions & 3 deletions node/bft/src/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,6 @@ pub trait Transport<N: Network>: Send + Sync {
fn broadcast(&self, event: Event<N>);
}

/// The gateway maintains connections to other validators.
/// For connections with clients and provers, the Router logic is used.
#[derive(Clone)]
pub struct Gateway<N: Network> {
/// The account of the node.
Expand Down Expand Up @@ -516,8 +514,9 @@ impl<N: Network> Gateway<N> {
self.update_metrics();
}

/// Inserts the given peer into the connected peers. This is only used in testing.
/// Inserts the given peer into the connected peers.
#[cfg(test)]
// For unit tests, we need to make this public so we can inject peers.
pub fn insert_connected_peer(&self, peer_ip: SocketAddr, peer_addr: SocketAddr, address: Address<N>) {
// Adds a bidirectional map between the listener address and (ambiguous) peer address.
self.resolver.insert_peer(peer_ip, peer_addr, address);
Expand Down
4 changes: 1 addition & 3 deletions node/bft/src/helpers/channels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ pub fn init_consensus_channels<N: Network>() -> (ConsensusSender<N>, ConsensusRe
(sender, receiver)
}

/// "Interface" that enables, for example, sending data from storage to the the BFT logic.
#[derive(Clone, Debug)]
pub struct BFTSender<N: Network> {
pub tx_primary_round: mpsc::Sender<(u64, oneshot::Sender<bool>)>,
Expand Down Expand Up @@ -101,7 +100,6 @@ impl<N: Network> BFTSender<N> {
}
}

/// Receiving counterpart to `BFTSender`
#[derive(Debug)]
pub struct BFTReceiver<N: Network> {
pub rx_primary_round: mpsc::Receiver<(u64, oneshot::Sender<bool>)>,
Expand All @@ -110,7 +108,7 @@ pub struct BFTReceiver<N: Network> {
pub rx_sync_bft: mpsc::Receiver<(BatchCertificate<N>, oneshot::Sender<Result<()>>)>,
}

/// Initializes the BFT channels, and returns the sending and receiving ends.
/// Initializes the BFT channels.
pub fn init_bft_channels<N: Network>() -> (BFTSender<N>, BFTReceiver<N>) {
let (tx_primary_round, rx_primary_round) = mpsc::channel(MAX_CHANNEL_SIZE);
let (tx_primary_certificate, rx_primary_certificate) = mpsc::channel(MAX_CHANNEL_SIZE);
Expand Down
3 changes: 0 additions & 3 deletions node/bft/src/helpers/dag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,9 @@ use snarkvm::{
use indexmap::IndexSet;
use std::collections::{BTreeMap, HashMap};

/// Maintains an directed acyclic graph (DAG) of batches, from which we build a totally-ordered blockchain.
/// The DAG is updated in rounds, where each validator adds at most one new batch.
#[derive(Debug)]
pub struct DAG<N: Network> {
/// The in-memory collection of certificates that comprise the DAG.
/// For each round, there is a mapping from node address to batch.
graph: BTreeMap<u64, HashMap<Address<N>, BatchCertificate<N>>>,
/// The in-memory collection of recently committed certificate IDs (up to GC).
recent_committed_ids: BTreeMap<u64, IndexSet<Field<N>>>,
Expand Down
4 changes: 2 additions & 2 deletions node/bft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ pub const MAX_TIMESTAMP_DELTA_IN_SECS: i64 = 10; // seconds
/// The maximum number of workers that can be spawned.
pub const MAX_WORKERS: u8 = 1; // worker(s)

/// The interval at which each primary broadcasts a ping to every other node.
/// The frequency at which each primary broadcasts a ping to every other node.
/// Note: If this is updated, be sure to update `MAX_BLOCKS_BEHIND` to correspond properly.
pub const PRIMARY_PING_IN_MS: u64 = 2 * MAX_BATCH_DELAY_IN_MS; // ms
/// The interval at which each worker broadcasts a ping to every other node.
/// The frequency at which each worker broadcasts a ping to every other node.
pub const WORKER_PING_IN_MS: u64 = 4 * MAX_BATCH_DELAY_IN_MS; // ms

/// A helper macro to spawn a blocking task.
Expand Down
Loading