Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 8 additions & 11 deletions node/bft/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ cuda = [
"snarkos-node-bft-ledger-service/cuda",
"snarkos-node-sync/cuda"
]
test = [
# "snarkvm/test" this breaks some of the tests
"snarkvm/test-helpers",
"snarkos-node-bft-ledger-service/test",
"snarkos-node-bft-storage-service/test"
]

[dependencies.aleo-std]
workspace = true
Expand Down Expand Up @@ -176,19 +182,10 @@ version = "0.4"
[dev-dependencies.rayon]
version = "1"

[dev-dependencies.snarkos-node-bft-ledger-service]
path = "./ledger-service"
default-features = false
features = [ "test" ]

[dev-dependencies.snarkos-node-bft-storage-service]
path = "./storage-service"
[dev-dependencies.snarkos-node-bft]
path = "."
features = [ "test" ]

[dev-dependencies.snarkvm]
workspace = true
features = [ "test-helpers" ]

[dev-dependencies.test-strategy]
version = "0.3.1"

Expand Down
9 changes: 7 additions & 2 deletions node/bft/examples/simple_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use snarkos_node_bft::{
};
use snarkos_node_bft_ledger_service::TranslucentLedgerService;
use snarkos_node_bft_storage_service::BFTMemoryService;
use snarkos_node_sync::BlockSync;
use snarkvm::{
console::{account::PrivateKey, algorithms::BHP256, types::Address},
ledger::{
Expand Down Expand Up @@ -142,7 +143,9 @@ pub async fn start_bft(
// Initialize the consensus receiver handler.
consensus_handler(consensus_receiver);
// Initialize the BFT instance.
let mut bft = BFT::<CurrentNetwork>::new(account, storage, ledger, ip, &trusted_validators, storage_mode)?;
let block_sync = Arc::new(BlockSync::new(ledger.clone()));
let mut bft =
BFT::<CurrentNetwork>::new(account, storage, ledger, block_sync, ip, &trusted_validators, storage_mode)?;
// Run the BFT instance.
bft.run(Some(consensus_sender), sender.clone(), receiver).await?;
// Retrieve the BFT's primary.
Expand Down Expand Up @@ -180,7 +183,9 @@ pub async fn start_primary(
// Initialize the trusted validators.
let trusted_validators = trusted_validators(node_id, num_nodes, peers);
// Initialize the primary instance.
let mut primary = Primary::<CurrentNetwork>::new(account, storage, ledger, ip, &trusted_validators, storage_mode)?;
let block_sync = Arc::new(BlockSync::new(ledger.clone()));
let mut primary =
Primary::<CurrentNetwork>::new(account, storage, ledger, block_sync, ip, &trusted_validators, storage_mode)?;
// Run the primary instance.
primary.run(None, sender.clone(), receiver).await?;
// Handle OS signals.
Expand Down
90 changes: 56 additions & 34 deletions node/bft/src/bft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::{
};
use snarkos_account::Account;
use snarkos_node_bft_ledger_service::LedgerService;
use snarkos_node_sync::BlockSync;
use snarkvm::{
console::account::Address,
ledger::{
Expand Down Expand Up @@ -69,9 +70,9 @@ use tokio::{

#[derive(Clone)]
pub struct BFT<N: Network> {
/// The primary.
/// The primary for this node.
primary: Primary<N>,
/// The DAG.
/// The DAG of batches from which we build the blockchain.
dag: Arc<RwLock<DAG<N>>>,
/// The batch certificate of the leader from the current even round, if one was present.
leader_certificate: Arc<RwLock<Option<BatchCertificate<N>>>>,
Expand All @@ -91,12 +92,13 @@ impl<N: Network> BFT<N> {
account: Account<N>,
storage: Storage<N>,
ledger: Arc<dyn LedgerService<N>>,
block_sync: Arc<BlockSync<N>>,
ip: Option<SocketAddr>,
trusted_validators: &[SocketAddr],
storage_mode: StorageMode,
) -> Result<Self> {
Ok(Self {
primary: Primary::new(account, storage, ledger, ip, trusted_validators, storage_mode)?,
primary: Primary::new(account, storage, ledger, block_sync, ip, trusted_validators, storage_mode)?,
dag: Default::default(),
leader_certificate: Default::default(),
leader_certificate_timer: Default::default(),
Expand Down Expand Up @@ -256,7 +258,7 @@ impl<N: Network> BFT<N> {
} else {
match is_ready {
true => info!("\n\nRound {current_round} reached quorum without a leader\n"),
false => info!("{}", format!("\n\nRound {current_round} did not elect a leader\n").dimmed()),
false => info!("{}", format!("\n\nRound {current_round} did not elect a leader (yet)\n").dimmed()),
}
}
}
Expand Down Expand Up @@ -370,6 +372,8 @@ impl<N: Network> BFT<N> {
}

/// Returns `true` if the timer for the leader certificate has expired.
///
/// This is always true for a new BFT instance.
fn is_timer_expired(&self) -> bool {
self.leader_certificate_timer.load(Ordering::SeqCst) + MAX_LEADER_CERTIFICATE_DELAY_IN_SECS <= now()
}
Expand Down Expand Up @@ -467,14 +471,17 @@ impl<N: Network> BFT<N> {
// Acquire the BFT lock.
let _lock = self.lock.lock().await;

// Retrieve the certificate round.
// Retrieve the round of the new certificate to add to the DAG.
let certificate_round = certificate.round();

// Insert the certificate into the DAG.
self.dag.write().insert(certificate);

// Construct the commit round.
// Get the previous round number.
let commit_round = certificate_round.saturating_sub(1);
// If the commit round is odd, return early.

// Leaders are elected in even rounds.
// If the previous round is odd, the current round cannot commit any leader certs.
if commit_round % 2 != 0 || commit_round < 2 {
return Ok(());
}
Expand Down Expand Up @@ -882,7 +889,7 @@ impl<N: Network> BFT<N> {
}
});

// Process the request to sync the BFT.
// Handler for new certificates that were fetched by the sync module.
let self_ = self.clone();
self.spawn(async move {
while let Some((certificate, callback)) = rx_sync_bft.recv().await {
Expand Down Expand Up @@ -934,6 +941,7 @@ mod tests {
use snarkos_account::Account;
use snarkos_node_bft_ledger_service::MockLedgerService;
use snarkos_node_bft_storage_service::BFTMemoryService;
use snarkos_node_sync::BlockSync;
use snarkvm::{
console::account::{Address, PrivateKey},
ledger::{
Expand Down Expand Up @@ -973,6 +981,18 @@ mod tests {
(committee, account, ledger, storage)
}

// Helper function to set up BFT for testing.
fn initialize_bft(
account: Account<CurrentNetwork>,
storage: Storage<CurrentNetwork>,
ledger: Arc<MockLedgerService<CurrentNetwork>>,
) -> anyhow::Result<BFT<CurrentNetwork>> {
// Create the block synchronization logic.
let block_sync = Arc::new(BlockSync::new(ledger.clone()));
// Initialize the BFT.
BFT::new(account.clone(), storage.clone(), ledger.clone(), block_sync, None, &[], StorageMode::new_test(None))
}

#[test]
#[tracing_test::traced_test]
fn test_is_leader_quorum_odd() -> Result<()> {
Expand Down Expand Up @@ -1004,7 +1024,7 @@ mod tests {
// Initialize the account.
let account = Account::new(rng)?;
// Initialize the BFT.
let bft = BFT::new(account.clone(), storage.clone(), ledger.clone(), None, &[], StorageMode::new_test(None))?;
let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
assert!(bft.is_timer_expired());
// Ensure this call succeeds on an odd round.
let result = bft.is_leader_quorum_or_nonleaders_available(1);
Expand Down Expand Up @@ -1038,9 +1058,9 @@ mod tests {
assert_eq!(storage.current_round(), 1);
assert_eq!(storage.max_gc_rounds(), 10);

// Initialize the BFT.
let bft = BFT::new(account, storage, ledger, None, &[], StorageMode::new_test(None))?;
assert!(bft.is_timer_expired()); // 0 + 5 < now()
// Set up the BFT logic.
let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
assert!(bft.is_timer_expired());

// Store is at round 1, and we are checking for round 2.
// Ensure this call fails on an even round.
Expand All @@ -1060,9 +1080,9 @@ mod tests {
assert_eq!(storage.current_round(), 2);
assert_eq!(storage.max_gc_rounds(), 10);

// Initialize the BFT.
let bft = BFT::new(account, storage, ledger, None, &[], StorageMode::new_test(None))?;
assert!(bft.is_timer_expired()); // 0 + 5 < now()
// Set up the BFT logic.
let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
assert!(bft.is_timer_expired());

// Ensure this call fails on an even round.
let result = bft.is_leader_quorum_or_nonleaders_available(2);
Expand Down Expand Up @@ -1100,8 +1120,11 @@ mod tests {
let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
// Initialize the account.
let account = Account::new(rng)?;
// Initialize the BFT.
let bft = BFT::new(account.clone(), storage.clone(), ledger.clone(), None, &[], StorageMode::new_test(None))?;

// Set up the BFT logic.
let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
assert!(bft.is_timer_expired());

// Set the leader certificate.
let leader_certificate = sample_batch_certificate_for_round(2, rng);
*bft.leader_certificate.write() = Some(leader_certificate);
Expand All @@ -1113,8 +1136,7 @@ mod tests {
assert!(result);

// Initialize a new BFT.
let bft_timer =
BFT::new(account.clone(), storage.clone(), ledger.clone(), None, &[], StorageMode::new_test(None))?;
let bft_timer = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
// If the leader certificate is not set and the timer has not expired, we are not ready for the next round.
let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
if !bft_timer.is_timer_expired() {
Expand Down Expand Up @@ -1145,7 +1167,8 @@ mod tests {
assert_eq!(storage.max_gc_rounds(), 10);

// Initialize the BFT.
let bft = BFT::new(account, storage, ledger, None, &[], StorageMode::new_test(None))?;
let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
assert!(bft.is_timer_expired());

// Ensure this call fails on an odd round.
let result = bft.update_leader_certificate_to_even_round(1);
Expand All @@ -1163,7 +1186,7 @@ mod tests {
assert_eq!(storage.max_gc_rounds(), 10);

// Initialize the BFT.
let bft = BFT::new(account, storage, ledger, None, &[], StorageMode::new_test(None))?;
let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;

// Ensure this call succeeds on an even round.
let result = bft.update_leader_certificate_to_even_round(6);
Expand Down Expand Up @@ -1215,7 +1238,7 @@ mod tests {

// Initialize the BFT.
let account = Account::new(rng)?;
let bft = BFT::new(account, storage.clone(), ledger, None, &[], StorageMode::new_test(None))?;
let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;

// Set the leader certificate.
*bft.leader_certificate.write() = Some(leader_certificate);
Expand Down Expand Up @@ -1253,7 +1276,7 @@ mod tests {
// Initialize the storage.
let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
// Initialize the BFT.
let bft = BFT::new(account.clone(), storage, ledger.clone(), None, &[], StorageMode::new_test(None))?;
let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;

// Insert a mock DAG in the BFT.
*bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(3);
Expand Down Expand Up @@ -1283,7 +1306,7 @@ mod tests {
// Initialize the storage.
let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
// Initialize the BFT.
let bft = BFT::new(account, storage, ledger, None, &[], StorageMode::new_test(None))?;
let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;

// Insert a mock DAG in the BFT.
*bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(2);
Expand Down Expand Up @@ -1341,7 +1364,7 @@ mod tests {
/* Test missing previous certificate. */

// Initialize the BFT.
let bft = BFT::new(account, storage, ledger, None, &[], StorageMode::new_test(None))?;
let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;

// The expected error message.
let error_msg = format!(
Expand Down Expand Up @@ -1402,8 +1425,8 @@ mod tests {

// Initialize the BFT.
let account = Account::new(rng)?;
let bft = BFT::new(account, storage.clone(), ledger, None, &[], StorageMode::new_test(None))?;
// Insert a mock DAG in the BFT.
let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;

*bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round);

// Ensure that the `gc_round` has not been updated yet.
Expand Down Expand Up @@ -1468,7 +1491,7 @@ mod tests {

// Initialize the BFT.
let account = Account::new(rng)?;
let bft = BFT::new(account.clone(), storage, ledger.clone(), None, &[], StorageMode::new_test(None))?;
let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;

// Insert a mock DAG in the BFT.
*bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round);
Expand All @@ -1486,7 +1509,7 @@ mod tests {
// Initialize a new instance of storage.
let storage_2 = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
// Initialize a new instance of BFT.
let bootup_bft = BFT::new(account, storage_2, ledger, None, &[], StorageMode::new_test(None))?;
let bootup_bft = initialize_bft(account.clone(), storage_2, ledger)?;

// Sync the BFT DAG at bootup.
bootup_bft.sync_bft_dag_at_bootup(certificates.clone()).await;
Expand Down Expand Up @@ -1640,7 +1663,7 @@ mod tests {

// Initialize the BFT without bootup.
let account = Account::new(rng)?;
let bft = BFT::new(account.clone(), storage, ledger.clone(), None, &[], StorageMode::new_test(None))?;
let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;

// Insert a mock DAG in the BFT without bootup.
*bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
Expand All @@ -1665,8 +1688,7 @@ mod tests {
let bootup_storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);

// Initialize a new instance of BFT with bootup.
let bootup_bft =
BFT::new(account, bootup_storage.clone(), ledger.clone(), None, &[], StorageMode::new_test(None))?;
let bootup_bft = initialize_bft(account.clone(), bootup_storage.clone(), ledger.clone())?;

// Sync the BFT DAG at bootup.
bootup_bft.sync_bft_dag_at_bootup(pre_shutdown_certificates.clone()).await;
Expand Down Expand Up @@ -1844,8 +1866,8 @@ mod tests {
}
// Initialize the bootup BFT.
let account = Account::new(rng)?;
let bootup_bft =
BFT::new(account.clone(), storage.clone(), ledger.clone(), None, &[], StorageMode::new_test(None))?;
let bootup_bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;

// Insert a mock DAG in the BFT without bootup.
*bootup_bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
// Sync the BFT DAG at bootup.
Expand Down
5 changes: 3 additions & 2 deletions node/bft/src/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ pub trait Transport<N: Network>: Send + Sync {
fn broadcast(&self, event: Event<N>);
}

/// The gateway maintains connections to other validators.
/// For connections with clients and provers, the Router logic is used.
#[derive(Clone)]
pub struct Gateway<N: Network> {
/// The account of the node.
Expand Down Expand Up @@ -514,9 +516,8 @@ impl<N: Network> Gateway<N> {
self.update_metrics();
}

/// Inserts the given peer into the connected peers.
/// Inserts the given peer into the connected peers. This is only used in testing.
#[cfg(test)]
// For unit tests, we need to make this public so we can inject peers.
pub fn insert_connected_peer(&self, peer_ip: SocketAddr, peer_addr: SocketAddr, address: Address<N>) {
// Adds a bidirectional map between the listener address and (ambiguous) peer address.
self.resolver.insert_peer(peer_ip, peer_addr, address);
Expand Down
Loading