Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 44 additions & 21 deletions node/bft/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,14 @@ impl<N: Network> Sync<N> {
self.sync_storage_with_ledger_at_bootup().await
}

/// Performs one iteration of the block synchronization.
/// Issues new requests for blocks, if needed.
///
/// Additionally, this function removes obsolete and timed out block requests,
/// and disconnects/bans unresponsive peers.
///
/// Responses to block requests will eventually be processed by `Self::try_advancing_block_synchronization`.
#[inline]
pub async fn try_block_sync(&self) {
pub async fn issue_block_requests(&self) {
// First see if any peers need removal
let peers_to_ban = self.block_sync.remove_timed_out_block_requests();
for peer_ip in peers_to_ban {
Expand Down Expand Up @@ -174,16 +179,6 @@ impl<N: Network> Sync<N> {
tokio::time::sleep(Duration::from_millis(PRIMARY_PING_IN_MS)).await;

self_.try_block_sync().await;

// Sync the storage with the blocks.
if let Err(e) = self_.sync_storage_with_blocks().await {
error!("Unable to sync storage with blocks - {e}");
}

// If the node is synced, clear the `latest_block_responses`.
if self_.is_synced() {
self_.latest_block_responses.lock().await.clear();
}
}
});

Expand Down Expand Up @@ -277,20 +272,39 @@ impl<N: Network> Sync<N> {

Ok(())
}

/// Execute one iteration of block synchronization.
///
/// This is called periodically by a tokio background task spawned in `Self::run`.
/// Some unit tests also call this function directly to manually trigger block synchronization.
pub(crate) async fn try_block_sync(&self) {
self.issue_block_requests().await;

// Sync the storage with the blocks.
if let Err(e) = self.try_advancing_block_synchronization().await {
error!("Unable to sync storage with blocks - {e}");
}

// If the node is synced, clear the `latest_block_responses`.
if self.is_synced() {
self.latest_block_responses.lock().await.clear();
}
}
}

// Callbacks used when receiving messages from the Gateway
impl<N: Network> Sync<N> {
/// We received a block response and can (possibly) advance synchronization.
async fn advance_with_sync_blocks(&self, peer_ip: SocketAddr, blocks: Vec<Block<N>>) -> Result<()> {
// Verify that the response is valid.
// Verify that the response is valid and add it to block sync.
self.block_sync.insert_block_responses(peer_ip, blocks)?;

// Advance block synchronization
self.block_sync.try_advancing_block_synchronization();
// Try to process responses stored in BlockSync.
// Note: Do not call `self.block_sync.try_advancing_block_synchronziation` here as it will process
// and remove any completed requests, which means the call to `sync_storage_with_blocks` will not process
// them as expected.
self.try_advancing_block_synchronization().await?;

// Sync the storage with the blocks.
self.sync_storage_with_blocks().await?;
Ok(())
}

Expand Down Expand Up @@ -395,10 +409,19 @@ impl<N: Network> Sync<N> {
Ok(())
}

/// Syncs the storage with blocks already received from peers.
/// Aims to advance synchronization using any recent block responses
/// received from peers.
///
/// This is the validator's version of `BlockSync::try_advancing_block_synchronization` and is called periodically at runtime.
///
/// A key difference to `BlockSync`'s versions is that it will only add blocks to the ledger once they have been confirmed by the network.
/// If blocks are not confirmed yet, they will be kept in [`Self::latest_block_responses`].
/// It will also pass certificates from synced blocks to the BFT module so that consensus can progress as expected
/// (see [`Self::sync_storage_with_block`] for more details).
///
/// This is called periodically at runtime.
async fn sync_storage_with_blocks(&self) -> Result<()> {
/// If the node falls behind more than GC rounds, this function calls [`Self::sync_storage_without_bft`] instead,
/// which syncs without updating the BFT state.
async fn try_advancing_block_synchronization(&self) -> Result<()> {
// Acquire the response lock.
let _lock = self.response_lock.lock().await;

Expand Down Expand Up @@ -491,7 +514,7 @@ impl<N: Network> Sync<N> {
.await?
}

/// Syncs the storage with the given block.
/// Advances the ledger by the given block and updates the storage accordingly.
///
/// This also updates the DAG, and uses the DAG to ensure that the block's leader certificate
/// meets the voter availability threshold (i.e. > f voting stake)
Expand Down
5 changes: 4 additions & 1 deletion node/sync/src/block_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,10 @@ impl<N: Network> BlockSync<N> {
if is_request_complete { self.responses.read().get(&next_height).cloned() } else { None }
}

/// Attempts to advance with blocks from the sync pool.
/// Attempts to advance synchronization by processing completed block responses.
///
/// Validators will not call this function, but instead execute `snarkos_node_bft::Sync::try_advancing_block_synchronization`
/// which also updates the BFT state.
#[inline]
pub fn try_advancing_block_synchronization(&self) {
// Acquire the lock to ensure this function is called only once at a time.
Expand Down