Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
509c266
First draft for unifiying block synchronization logic
kaimast Feb 27, 2025
058a524
Merge branch 'staging' into deduplicate-blocksync
kaimast Mar 3, 2025
f14c056
Merge staging
kaimast Mar 3, 2025
2d88d4d
Merge branch 'staging' into deduplicate-blocksync
kaimast Mar 7, 2025
258ffcb
Comments and minor refactoring
kaimast Mar 10, 2025
1270b34
Merge remote-tracking branch 'origin/staging' into deduplicate-blocksync
kaimast Mar 10, 2025
173e8e4
More comment updates
kaimast Mar 11, 2025
9e4d4db
Merge remote-tracking branch 'origin/staging' into deduplicate-blocksync
kaimast Mar 11, 2025
4adaf15
Simplify tests
kaimast Mar 13, 2025
86c89a2
Prefer disconnecting from clients over validators
kaimast Feb 25, 2025
d8a11cf
Improve comments
kaimast Feb 25, 2025
3a5d597
Remove obsolete TODO
kaimast Feb 26, 2025
36fe568
Simplify sorting
kaimast Feb 26, 2025
9d5dd0f
Address code review comments
kaimast Feb 26, 2025
194aef8
Improve handling of provers in heartbeat
kaimast Feb 26, 2025
19a7ead
Actually remove the oldest peer
kaimast Mar 2, 2025
65a23bd
Revert back to `peer_ip` for a peers public address
kaimast Mar 6, 2025
7a3b010
Use first() instead of into_iter().next()
kaimast Mar 6, 2025
15a0b2c
Add tests for heartbeat logic
kaimast Mar 6, 2025
c39ec5c
Update comments and add license to tests
kaimast Mar 10, 2025
d7d6160
Simplify tests
kaimast Mar 11, 2025
e015a5d
Remove logging from tests
kaimast Mar 11, 2025
b893f86
Reorder imports
kaimast Mar 11, 2025
31ec40d
Fix typo
kaimast Mar 11, 2025
f439dd7
Fix bug in test_obsolete_block_requests
kaimast Mar 12, 2025
4bd2f8e
logs: remove the low-level h2 entries
ljedrz Mar 14, 2025
ef404d8
refactor: move the committee cache to snarkVM
ljedrz Mar 4, 2025
8805857
chore: update the lockfile
ljedrz Mar 11, 2025
0c75996
Remove client specific code from BlockSync.
kaimast Mar 17, 2025
c4b30b8
Merge remote-tracking branch 'origin/staging' into deduplicate-blocksync
kaimast Mar 18, 2025
242f9a5
Fix typo in function documentation
kaimast Mar 18, 2025
770606d
Fix occasional failure of test_batch_propose_from_peer_with_invalid_t…
kaimast Mar 19, 2025
baf2690
Fix typos in some comments
kaimast Mar 19, 2025
fa90c8b
Revert renaming of constants in bft/src/lib.rs
kaimast Mar 19, 2025
b46d308
Merge branch 'staging' into deduplicate-blocksync
vicsn Mar 20, 2025
b815526
Add --yes for rust uninstall in windows CircleCI
kaimast Mar 20, 2025
a05efcf
Remove derive(Clone) from BlockSync
kaimast Mar 24, 2025
6a23647
Minor comment update
kaimast Mar 24, 2025
1bbd4d5
Move Gateway callbacks to their own functions
kaimast Mar 26, 2025
02143a7
Merge staging and remove Tcp from BlockSync
kaimast Mar 26, 2025
4f941ca
Address Howard's comments and clean up some more
kaimast Mar 26, 2025
a2216b0
Pass block_sync after ledger
kaimast Mar 27, 2025
42ecf43
Merge origin/staging
kaimast Mar 31, 2025
233d833
Remove unneeded changes to Windows CircleCI
kaimast Mar 31, 2025
effa3e9
Merge remote-tracking branch 'origin/staging' into deduplicate-blocksync
kaimast Apr 6, 2025
f535847
Improve documentation for block sync
kaimast Apr 6, 2025
bff7a23
Re-add removed test_targets feature for snarkos_node_bft
kaimast Apr 6, 2025
af11dce
Update node/bft/src/primary.rs
kaimast Apr 9, 2025
3c75ef7
Merge staging
kaimast Apr 9, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 7 additions & 2 deletions node/bft/examples/simple_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use snarkos_node_bft::{
};
use snarkos_node_bft_ledger_service::TranslucentLedgerService;
use snarkos_node_bft_storage_service::BFTMemoryService;
use snarkos_node_sync::BlockSync;
use snarkvm::{
console::{account::PrivateKey, algorithms::BHP256, types::Address},
ledger::{
Expand Down Expand Up @@ -142,7 +143,9 @@ pub async fn start_bft(
// Initialize the consensus receiver handler.
consensus_handler(consensus_receiver);
// Initialize the BFT instance.
let mut bft = BFT::<CurrentNetwork>::new(account, storage, ledger, ip, &trusted_validators, storage_mode)?;
let block_sync = Arc::new(BlockSync::new(ledger.clone()));
let mut bft =
BFT::<CurrentNetwork>::new(account, storage, ledger, block_sync, ip, &trusted_validators, storage_mode)?;
// Run the BFT instance.
bft.run(Some(consensus_sender), sender.clone(), receiver).await?;
// Retrieve the BFT's primary.
Expand Down Expand Up @@ -180,7 +183,9 @@ pub async fn start_primary(
// Initialize the trusted validators.
let trusted_validators = trusted_validators(node_id, num_nodes, peers);
// Initialize the primary instance.
let mut primary = Primary::<CurrentNetwork>::new(account, storage, ledger, ip, &trusted_validators, storage_mode)?;
let block_sync = Arc::new(BlockSync::new(ledger.clone()));
let mut primary =
Primary::<CurrentNetwork>::new(account, storage, ledger, block_sync, ip, &trusted_validators, storage_mode)?;
// Run the primary instance.
primary.run(None, sender.clone(), receiver).await?;
// Handle OS signals.
Expand Down
71 changes: 44 additions & 27 deletions node/bft/src/bft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::{
};
use snarkos_account::Account;
use snarkos_node_bft_ledger_service::LedgerService;
use snarkos_node_sync::BlockSync;
use snarkvm::{
console::account::Address,
ledger::{
Expand Down Expand Up @@ -69,9 +70,9 @@ use tokio::{

#[derive(Clone)]
pub struct BFT<N: Network> {
/// The primary.
/// The primary for this node.
primary: Primary<N>,
/// The DAG.
/// The DAG of batches from which we build the blockchain.
dag: Arc<RwLock<DAG<N>>>,
/// The batch certificate of the leader from the current even round, if one was present.
leader_certificate: Arc<RwLock<Option<BatchCertificate<N>>>>,
Expand All @@ -91,12 +92,13 @@ impl<N: Network> BFT<N> {
account: Account<N>,
storage: Storage<N>,
ledger: Arc<dyn LedgerService<N>>,
block_sync: Arc<BlockSync<N>>,
ip: Option<SocketAddr>,
trusted_validators: &[SocketAddr],
storage_mode: StorageMode,
) -> Result<Self> {
Ok(Self {
primary: Primary::new(account, storage, ledger, ip, trusted_validators, storage_mode)?,
primary: Primary::new(account, storage, ledger, block_sync, ip, trusted_validators, storage_mode)?,
dag: Default::default(),
leader_certificate: Default::default(),
leader_certificate_timer: Default::default(),
Expand Down Expand Up @@ -931,6 +933,7 @@ mod tests {
use snarkos_account::Account;
use snarkos_node_bft_ledger_service::MockLedgerService;
use snarkos_node_bft_storage_service::BFTMemoryService;
use snarkos_node_sync::BlockSync;
use snarkvm::{
console::account::{Address, PrivateKey},
ledger::{
Expand Down Expand Up @@ -970,6 +973,18 @@ mod tests {
(committee, account, ledger, storage)
}

// Helper function to set up BFT for testing.
fn initialize_bft(
account: Account<CurrentNetwork>,
storage: Storage<CurrentNetwork>,
ledger: Arc<MockLedgerService<CurrentNetwork>>,
) -> anyhow::Result<BFT<CurrentNetwork>> {
// Create the block synchronization logic.
let block_sync = Arc::new(BlockSync::new(ledger.clone()));
// Initialize the BFT.
BFT::new(account.clone(), storage.clone(), ledger.clone(), block_sync, None, &[], StorageMode::new_test(None))
}

#[test]
#[tracing_test::traced_test]
fn test_is_leader_quorum_odd() -> Result<()> {
Expand Down Expand Up @@ -1001,7 +1016,7 @@ mod tests {
// Initialize the account.
let account = Account::new(rng)?;
// Initialize the BFT.
let bft = BFT::new(account.clone(), storage.clone(), ledger.clone(), None, &[], StorageMode::new_test(None))?;
let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
assert!(bft.is_timer_expired());
// Ensure this call succeeds on an odd round.
let result = bft.is_leader_quorum_or_nonleaders_available(1);
Expand Down Expand Up @@ -1035,8 +1050,8 @@ mod tests {
assert_eq!(storage.current_round(), 1);
assert_eq!(storage.max_gc_rounds(), 10);

// Initialize the BFT.
let bft = BFT::new(account, storage, ledger, None, &[], StorageMode::new_test(None))?;
// Set up the BFT logic.
let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
assert!(bft.is_timer_expired()); // 0 + 5 < now()

// Store is at round 1, and we are checking for round 2.
Expand All @@ -1057,8 +1072,8 @@ mod tests {
assert_eq!(storage.current_round(), 2);
assert_eq!(storage.max_gc_rounds(), 10);

// Initialize the BFT.
let bft = BFT::new(account, storage, ledger, None, &[], StorageMode::new_test(None))?;
// Set up the BFT logic.
let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
assert!(bft.is_timer_expired()); // 0 + 5 < now()

// Ensure this call fails on an even round.
Expand Down Expand Up @@ -1097,8 +1112,11 @@ mod tests {
let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
// Initialize the account.
let account = Account::new(rng)?;
// Initialize the BFT.
let bft = BFT::new(account.clone(), storage.clone(), ledger.clone(), None, &[], StorageMode::new_test(None))?;

// Set up the BFT logic.
let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
assert!(bft.is_timer_expired()); // 0 + 5 < now()

// Set the leader certificate.
let leader_certificate = sample_batch_certificate_for_round(2, rng);
*bft.leader_certificate.write() = Some(leader_certificate);
Expand All @@ -1110,8 +1128,7 @@ mod tests {
assert!(result);

// Initialize a new BFT.
let bft_timer =
BFT::new(account.clone(), storage.clone(), ledger.clone(), None, &[], StorageMode::new_test(None))?;
let bft_timer = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
// If the leader certificate is not set and the timer has not expired, we are not ready for the next round.
let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
if !bft_timer.is_timer_expired() {
Expand Down Expand Up @@ -1142,7 +1159,8 @@ mod tests {
assert_eq!(storage.max_gc_rounds(), 10);

// Initialize the BFT.
let bft = BFT::new(account, storage, ledger, None, &[], StorageMode::new_test(None))?;
let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
assert!(bft.is_timer_expired()); // 0 + 5 < now()

// Ensure this call fails on an odd round.
let result = bft.update_leader_certificate_to_even_round(1);
Expand All @@ -1160,7 +1178,7 @@ mod tests {
assert_eq!(storage.max_gc_rounds(), 10);

// Initialize the BFT.
let bft = BFT::new(account, storage, ledger, None, &[], StorageMode::new_test(None))?;
let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;

// Ensure this call succeeds on an even round.
let result = bft.update_leader_certificate_to_even_round(6);
Expand Down Expand Up @@ -1212,7 +1230,7 @@ mod tests {

// Initialize the BFT.
let account = Account::new(rng)?;
let bft = BFT::new(account, storage.clone(), ledger, None, &[], StorageMode::new_test(None))?;
let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;

// Set the leader certificate.
*bft.leader_certificate.write() = Some(leader_certificate);
Expand Down Expand Up @@ -1250,7 +1268,7 @@ mod tests {
// Initialize the storage.
let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
// Initialize the BFT.
let bft = BFT::new(account.clone(), storage, ledger.clone(), None, &[], StorageMode::new_test(None))?;
let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;

// Insert a mock DAG in the BFT.
*bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(3);
Expand Down Expand Up @@ -1280,7 +1298,7 @@ mod tests {
// Initialize the storage.
let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
// Initialize the BFT.
let bft = BFT::new(account, storage, ledger, None, &[], StorageMode::new_test(None))?;
let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;

// Insert a mock DAG in the BFT.
*bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(2);
Expand Down Expand Up @@ -1338,7 +1356,7 @@ mod tests {
/* Test missing previous certificate. */

// Initialize the BFT.
let bft = BFT::new(account, storage, ledger, None, &[], StorageMode::new_test(None))?;
let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;

// The expected error message.
let error_msg = format!(
Expand Down Expand Up @@ -1399,8 +1417,8 @@ mod tests {

// Initialize the BFT.
let account = Account::new(rng)?;
let bft = BFT::new(account, storage.clone(), ledger, None, &[], StorageMode::new_test(None))?;
// Insert a mock DAG in the BFT.
let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;

*bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round);

// Ensure that the `gc_round` has not been updated yet.
Expand Down Expand Up @@ -1465,7 +1483,7 @@ mod tests {

// Initialize the BFT.
let account = Account::new(rng)?;
let bft = BFT::new(account.clone(), storage, ledger.clone(), None, &[], StorageMode::new_test(None))?;
let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;

// Insert a mock DAG in the BFT.
*bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round);
Expand All @@ -1483,7 +1501,7 @@ mod tests {
// Initialize a new instance of storage.
let storage_2 = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
// Initialize a new instance of BFT.
let bootup_bft = BFT::new(account, storage_2, ledger, None, &[], StorageMode::new_test(None))?;
let bootup_bft = initialize_bft(account.clone(), storage_2, ledger)?;

// Sync the BFT DAG at bootup.
bootup_bft.sync_bft_dag_at_bootup(certificates.clone()).await;
Expand Down Expand Up @@ -1637,7 +1655,7 @@ mod tests {

// Initialize the BFT without bootup.
let account = Account::new(rng)?;
let bft = BFT::new(account.clone(), storage, ledger.clone(), None, &[], StorageMode::new_test(None))?;
let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;

// Insert a mock DAG in the BFT without bootup.
*bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
Expand All @@ -1662,8 +1680,7 @@ mod tests {
let bootup_storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);

// Initialize a new instance of BFT with bootup.
let bootup_bft =
BFT::new(account, bootup_storage.clone(), ledger.clone(), None, &[], StorageMode::new_test(None))?;
let bootup_bft = initialize_bft(account.clone(), bootup_storage.clone(), ledger.clone())?;

// Sync the BFT DAG at bootup.
bootup_bft.sync_bft_dag_at_bootup(pre_shutdown_certificates.clone()).await;
Expand Down Expand Up @@ -1841,8 +1858,8 @@ mod tests {
}
// Initialize the bootup BFT.
let account = Account::new(rng)?;
let bootup_bft =
BFT::new(account.clone(), storage.clone(), ledger.clone(), None, &[], StorageMode::new_test(None))?;
let bootup_bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;

// Insert a mock DAG in the BFT without bootup.
*bootup_bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
// Sync the BFT DAG at bootup.
Expand Down
5 changes: 3 additions & 2 deletions node/bft/src/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ pub trait Transport<N: Network>: Send + Sync {
fn broadcast(&self, event: Event<N>);
}

/// The gateway maintains connections to other validators.
/// For connections with clients and provers, the Router logic is used.
#[derive(Clone)]
pub struct Gateway<N: Network> {
/// The account of the node.
Expand Down Expand Up @@ -514,9 +516,8 @@ impl<N: Network> Gateway<N> {
self.update_metrics();
}

/// Inserts the given peer into the connected peers.
/// Inserts the given peer into the connected peers. This is only used in testing.
#[cfg(test)]
// For unit tests, we need to make this public so we can inject peers.
pub fn insert_connected_peer(&self, peer_ip: SocketAddr, peer_addr: SocketAddr, address: Address<N>) {
// Adds a bidirectional map between the listener address and (ambiguous) peer address.
self.resolver.insert_peer(peer_ip, peer_addr, address);
Expand Down
4 changes: 3 additions & 1 deletion node/bft/src/helpers/channels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ pub fn init_consensus_channels<N: Network>() -> (ConsensusSender<N>, ConsensusRe
(sender, receiver)
}

/// "Interface" that enables, for example, sending data from storage to the the BFT logic.
#[derive(Clone, Debug)]
pub struct BFTSender<N: Network> {
pub tx_primary_round: mpsc::Sender<(u64, oneshot::Sender<bool>)>,
Expand Down Expand Up @@ -100,6 +101,7 @@ impl<N: Network> BFTSender<N> {
}
}

/// Receiving counterpart to `BFTSender`
#[derive(Debug)]
pub struct BFTReceiver<N: Network> {
pub rx_primary_round: mpsc::Receiver<(u64, oneshot::Sender<bool>)>,
Expand All @@ -108,7 +110,7 @@ pub struct BFTReceiver<N: Network> {
pub rx_sync_bft: mpsc::Receiver<(BatchCertificate<N>, oneshot::Sender<Result<()>>)>,
}

/// Initializes the BFT channels.
/// Initializes the BFT channels, and returns the sending and receiving ends.
pub fn init_bft_channels<N: Network>() -> (BFTSender<N>, BFTReceiver<N>) {
let (tx_primary_round, rx_primary_round) = mpsc::channel(MAX_CHANNEL_SIZE);
let (tx_primary_certificate, rx_primary_certificate) = mpsc::channel(MAX_CHANNEL_SIZE);
Expand Down
3 changes: 3 additions & 0 deletions node/bft/src/helpers/dag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@ use snarkvm::{
use indexmap::IndexSet;
use std::collections::{BTreeMap, HashMap};

/// Maintains an directed acyclic graph (DAG) of batches, from which we build a totally-ordered blockchain.
/// The DAG is updated in rounds, where each validator adds at most one new batch.
#[derive(Debug)]
pub struct DAG<N: Network> {
/// The in-memory collection of certificates that comprise the DAG.
/// For each round, there is a mapping from node address to batch.
graph: BTreeMap<u64, HashMap<Address<N>, BatchCertificate<N>>>,
/// The in-memory collection of recently committed certificate IDs (up to GC).
recent_committed_ids: BTreeMap<u64, IndexSet<Field<N>>>,
Expand Down
4 changes: 2 additions & 2 deletions node/bft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ pub const MAX_TIMESTAMP_DELTA_IN_SECS: i64 = 10; // seconds
/// The maximum number of workers that can be spawned.
pub const MAX_WORKERS: u8 = 1; // worker(s)

/// The frequency at which each primary broadcasts a ping to every other node.
/// The interval at which each primary broadcasts a ping to every other node.
/// Note: If this is updated, be sure to update `MAX_BLOCKS_BEHIND` to correspond properly.
pub const PRIMARY_PING_IN_MS: u64 = 2 * MAX_BATCH_DELAY_IN_MS; // ms
/// The frequency at which each worker broadcasts a ping to every other node.
/// The interval at which each worker broadcasts a ping to every other node.
pub const WORKER_PING_IN_MS: u64 = 4 * MAX_BATCH_DELAY_IN_MS; // ms

/// A helper macro to spawn a blocking task.
Expand Down
Loading