Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pull initial chain compaction out of init() and into the syncer #2738

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 47 additions & 37 deletions chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,44 +170,34 @@ impl Chain {
archive_mode: bool,
stop_state: Arc<Mutex<StopState>>,
) -> Result<Chain, Error> {
let chain = {
// Note: We take a lock on the stop_state here and do not release it until
// we have finished chain initialization.
let stop_state_local = stop_state.clone();
let stop_lock = stop_state_local.lock();
if stop_lock.is_stopped() {
return Err(ErrorKind::Stopped.into());
}

let store = Arc::new(store::ChainStore::new(db_env)?);

// open the txhashset, creating a new one if necessary
let mut txhashset = txhashset::TxHashSet::open(db_root.clone(), store.clone(), None)?;

setup_head(&genesis, &store, &mut txhashset)?;
Chain::log_heads(&store)?;

Chain {
db_root,
store,
adapter,
orphans: Arc::new(OrphanBlockPool::new()),
txhashset: Arc::new(RwLock::new(txhashset)),
pow_verifier,
verifier_cache,
archive_mode,
stop_state,
genesis: genesis.header.clone(),
}
};

// Run chain compaction. Laptops and other intermittent nodes
// may not run long enough to trigger daily compaction.
// So run it explicitly here on startup (its fast enough to do so).
// Note: we release the stop_lock from above as compact also requires a lock.
chain.compact()?;
// Note: We take a lock on the stop_state here and do not release it until
// we have finished chain initialization.
let stop_state_local = stop_state.clone();
let stop_lock = stop_state_local.lock();
if stop_lock.is_stopped() {
return Err(ErrorKind::Stopped.into());
}

Ok(chain)
let store = Arc::new(store::ChainStore::new(db_env)?);

// open the txhashset, creating a new one if necessary
let mut txhashset = txhashset::TxHashSet::open(db_root.clone(), store.clone(), None)?;

setup_head(&genesis, &store, &mut txhashset)?;
Chain::log_heads(&store)?;

Ok(Chain {
db_root,
store,
adapter,
orphans: Arc::new(OrphanBlockPool::new()),
txhashset: Arc::new(RwLock::new(txhashset)),
pow_verifier,
verifier_cache,
archive_mode,
stop_state,
genesis: genesis.header.clone(),
})
}

/// Return our shared txhashset instance.
Expand Down Expand Up @@ -1057,6 +1047,26 @@ impl Chain {
/// * removes historical blocks and associated data from the db (unless archive mode)
///
pub fn compact(&self) -> Result<(), Error> {
// A node may be restarted multiple times in a short period of time.
// We compact at most once per 60 blocks in this situation by comparing
// current "head" and "tail" height to our cut-through horizon and
// allowing an additional 60 blocks in height before allowing a further compaction.
if let (Ok(tail), Ok(head)) = (self.tail(), self.head()) {
let horizon = global::cut_through_horizon() as u64;
let threshold = horizon.saturating_add(60);
debug!(
"compact: head: {}, tail: {}, diff: {}, horizon: {}",
head.height,
tail.height,
head.height.saturating_sub(tail.height),
horizon
);
if tail.height.saturating_add(threshold) > head.height {
debug!("compact: skipping compaction - threshold is 60 blocks beyond horizon.");
return Ok(());
}
}

// Note: We take a lock on the stop_state here and do not release it until
// we have finished processing this chain compaction operation.
// We want to avoid shutting the node down in the middle of compacting the data.
Expand Down
17 changes: 14 additions & 3 deletions servers/src/grin/sync/syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,16 +146,27 @@ impl SyncRunner {

thread::sleep(time::Duration::from_millis(10));

let currently_syncing = self.sync_state.is_syncing();

// check whether syncing is generally needed, when we compare our state with others
let (syncing, most_work_height) = unwrap_or_restart_loop!(self.needs_syncing());
let (needs_syncing, most_work_height) = unwrap_or_restart_loop!(self.needs_syncing());
if most_work_height > 0 {
// we can occasionally get a most work height of 0 if read locks fail
highest_height = most_work_height;
}

// quick short-circuit (and a decent sleep) if no syncing is needed
if !syncing {
self.sync_state.update(SyncStatus::NoSync);
if !needs_syncing {
if currently_syncing {
self.sync_state.update(SyncStatus::NoSync);

// Initial transition out of a "syncing" state and into NoSync.
// This triggers a chain compaction to keep out local node tidy.
// Note: Chain compaction runs with an internal threshold
// so can be safely run even if the node is restarted frequently.
unwrap_or_restart_loop!(self.chain.compact());
}

thread::sleep(time::Duration::from_secs(10));
continue;
}
Expand Down