Skip to content

Commit

Permalink
Enable block archival sync (#3579)
Browse files Browse the repository at this point in the history
* wip - body sync for full archive

* allow chain compaction during sync

* placeholder for logic to ensure archive nodes sync from archive nodes

* body sync from archival peers

* allow chain compaction during sync

* placeholder for logic to ensure archive nodes sync from archive nodes
  • Loading branch information
antiochp authored Mar 16, 2021
1 parent 846b8f8 commit 6690b25
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 25 deletions.
15 changes: 13 additions & 2 deletions chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,11 @@ impl Chain {
Ok(chain)
}

/// Are we running with archive_mode enabled?
pub fn archive_mode(&self) -> bool {
self.archive_mode
}

/// Return our shared header MMR handle.
pub fn header_pmmr(&self) -> Arc<RwLock<PMMRHandle<BlockHeader>>> {
self.header_pmmr.clone()
Expand Down Expand Up @@ -889,6 +894,11 @@ impl Chain {
/// If beyond the horizon then we cannot sync via recent full blocks
/// and we need a state (txhashset) sync.
pub fn check_txhashset_needed(&self, fork_point: &BlockHeader) -> Result<bool, Error> {
if self.archive_mode() {
debug!("check_txhashset_needed: we are running with archive_mode=true, not needed");
return Ok(false);
}

let header_head = self.header_head()?;
let horizon = global::cut_through_horizon() as u64;
Ok(fork_point.height < header_head.height.saturating_sub(horizon))
Expand Down Expand Up @@ -1075,7 +1085,7 @@ impl Chain {
header_pmmr: &txhashset::PMMRHandle<BlockHeader>,
batch: &store::Batch<'_>,
) -> Result<(), Error> {
if self.archive_mode {
if self.archive_mode() {
return Ok(());
}

Expand Down Expand Up @@ -1161,13 +1171,14 @@ impl Chain {
}

// If we are not in archival mode remove historical blocks from the db.
if !self.archive_mode {
if !self.archive_mode() {
self.remove_historical_blocks(&header_pmmr, &batch)?;
}

// Make sure our output_pos index is consistent with the UTXO set.
txhashset.init_output_pos_index(&header_pmmr, &batch)?;

// TODO - Why is this part of chain compaction?
// Rebuild our NRD kernel_pos index based on recent kernel history.
txhashset.init_recent_kernel_pos_index(&header_pmmr, &batch)?;

Expand Down
5 changes: 0 additions & 5 deletions servers/src/common/adapters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -704,11 +704,6 @@ where
}

fn check_compact(&self) {
// Skip compaction if we are syncing.
if self.sync_state.is_syncing() {
return;
}

// Roll the dice to trigger compaction at 1/COMPACTION_CHECK chance per block,
// uses a different thread to avoid blocking the caller thread (likely a peer)
let mut rng = thread_rng();
Expand Down
55 changes: 37 additions & 18 deletions servers/src/grin/sync/body_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use chrono::prelude::{DateTime, Utc};
use chrono::Duration;
use p2p::Capabilities;
use rand::prelude::*;
use std::cmp;
use std::sync::Arc;
Expand Down Expand Up @@ -71,6 +72,11 @@ impl BodySync {
Ok(false)
}

/// Is our local node running in archive_mode?
fn archive_mode(&self) -> bool {
self.chain.archive_mode()
}

/// Return true if txhashset download is needed (when requested block is under the horizon).
/// Otherwise go request some missing blocks and return false.
fn body_sync(&mut self) -> Result<bool, chain::Error> {
Expand All @@ -85,26 +91,39 @@ impl BodySync {
return Ok(true);
}

// Find connected peers with strictly greater difficulty than us.
let peers_iter = || {
self.peers
.iter()
.with_difficulty(|x| x > head.total_difficulty)
.connected()
};
let peers = {
// Find connected peers with strictly greater difficulty than us.
let peers_iter = || {
// If we are running with archive mode enabled we only want to sync
// from other archive nodes.
let cap = if self.archive_mode() {
Capabilities::BLOCK_HIST
} else {
Capabilities::UNKNOWN
};

self.peers
.iter()
.with_capabilities(cap)
.with_difficulty(|x| x > head.total_difficulty)
.connected()
};

// We prefer outbound peers with greater difficulty.
let mut peers: Vec<_> = peers_iter().outbound().into_iter().collect();
if peers.is_empty() {
debug!("no outbound peers with more work, considering inbound");
peers = peers_iter().inbound().into_iter().collect();
}

// We prefer outbound peers with greater difficulty.
let mut peers: Vec<_> = peers_iter().outbound().into_iter().collect();
if peers.is_empty() {
debug!("no outbound peers with more work, considering inbound");
peers = peers_iter().inbound().into_iter().collect();
}
// If we have no peers (outbound or inbound) then we are done for now.
if peers.is_empty() {
debug!("no peers (inbound or outbound) with more work");
return Ok(false);
}

// If we have no peers (outbound or inbound) then we are done for now.
if peers.is_empty() {
debug!("no peers (inbound or outbound) with more work");
return Ok(false);
}
peers
};

// if we have 5 peers to sync from then ask for 50 blocks total (peer_count *
// 10) max will be 80 if all 8 peers are advertising more work
Expand Down

0 comments on commit 6690b25

Please sign in to comment.