Skip to content

Commit

Permalink
grin v5.3 (0111) [DNM] PIBD Task / Issue Tracker (mimblewimble#3695)
Browse files Browse the repository at this point in the history
* [PIBD_IMPL] Introduce PIBD state into sync workflow (mimblewimble#3685)
* experimental addition of pibd download state for testnet only
* fixes to bitmap number of segments calculation + conversion of bitmap accumulator to bitmap
* attempt to call a test message
* add p2p methods for receiving bitmap segment and applying to desegmenter associated with chain
* fixes to state sync
* add pibd receive messages to network, and basic calls to desegmenter from each (mimblewimble#3686)
* [PIBD_IMPL] PIBD Desegmenter State (mimblewimble#3688)
* add functions to desegmenter to report next desired segments, begin to add state to determine which segments have been requested
* add segmentidentifier type to id requested segments uniquely
. . . and more . . .
  • Loading branch information
bayk committed Jun 21, 2024
1 parent d02d277 commit 70b4411
Show file tree
Hide file tree
Showing 35 changed files with 2,640 additions and 341 deletions.
2 changes: 1 addition & 1 deletion api/src/handlers/chain_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl ChainResetHandler {
pub fn reset_chain_head(&self, hash: Hash) -> Result<(), Error> {
let chain = w(&self.chain)?;
let header = chain.get_block_header(&hash)?;
chain.reset_chain_head(&header)?;
chain.reset_chain_head(&header, true)?;

// Reset the sync status and clear out any sync error.
w(&self.sync_state)?.reset();
Expand Down
129 changes: 106 additions & 23 deletions chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,14 @@ impl Chain {
/// Reset both head and header_head to the provided header.
/// Handles simple rewind and more complex fork scenarios.
/// Used by the reset_chain_head owner api endpoint.
pub fn reset_chain_head<T: Into<Tip>>(&self, head: T) -> Result<(), Error> {
/// Caller can choose not to rewind headers, which can be used
/// during PIBD scenarios where it's desirable to restart the PIBD process
/// without re-downloading the header chain
pub fn reset_chain_head<T: Into<Tip>>(
&self,
head: T,
rewind_headers: bool,
) -> Result<(), Error> {
let head = head.into();

let mut header_pmmr = self.header_pmmr.write();
Expand All @@ -278,19 +285,44 @@ impl Chain {
},
)?;

// If the rewind of full blocks was successful then we can rewind the header MMR.
// Rewind and reapply headers to reset the header MMR.
txhashset::header_extending(&mut header_pmmr, &mut batch, |ext, batch| {
self.rewind_and_apply_header_fork(&header, ext, batch)?;
batch.save_header_head(&head)?;
Ok(())
})?;
if rewind_headers {
// If the rewind of full blocks was successful then we can rewind the header MMR.
// Rewind and reapply headers to reset the header MMR.
txhashset::header_extending(&mut header_pmmr, &mut batch, |ext, batch| {
self.rewind_and_apply_header_fork(&header, ext, batch)?;
batch.save_header_head(&head)?;
Ok(())
})?;
}

batch.commit()?;

Ok(())
}

/// Reset prune lists (when PIBD resets and rolls back the
/// entire chain, the prune list needs to be manually wiped
/// as it's currently not included as part of rewind)
pub fn reset_prune_lists(&self) -> Result<(), Error> {
let mut header_pmmr = self.header_pmmr.write();
let mut txhashset = self.txhashset.write();
let mut batch = self.store.batch()?;

txhashset::extending(&mut header_pmmr, &mut txhashset, &mut batch, |ext, _| {
let extension = &mut ext.extension;
extension.reset_prune_lists();
Ok(())
})?;
Ok(())
}

/// Reset PIBD head
pub fn reset_pibd_head(&self) -> Result<(), Error> {
let batch = self.store.batch()?;
batch.save_pibd_head(&self.genesis().into())?;
Ok(())
}

/// Are we running with archive_mode enabled?
pub fn archive_mode(&self) -> bool {
self.archive_mode
Expand All @@ -306,6 +338,11 @@ impl Chain {
self.txhashset.clone()
}

/// return genesis header
pub fn genesis(&self) -> BlockHeader {
self.genesis.clone()
}

/// Shared store instance.
pub fn store(&self) -> Arc<store::ChainStore> {
self.store.clone()
Expand Down Expand Up @@ -853,8 +890,15 @@ impl Chain {
// ensure the view is consistent.
txhashset::extending_readonly(&mut header_pmmr, &mut txhashset, |ext, batch| {
self.rewind_and_apply_fork(&header, ext, batch)?;
ext.extension
.validate(&self.genesis, fast_validation, &NoStatus, &header)?;
ext.extension.validate(
&self.genesis,
fast_validation,
&NoStatus,
None,
None,
&header,
None,
)?;
Ok(())
})
}
Expand Down Expand Up @@ -1056,21 +1100,22 @@ impl Chain {

/// instantiate desegmenter (in same lazy fashion as segmenter, though this should not be as
/// expensive an operation)
pub fn desegmenter(&self, archive_header: &BlockHeader) -> Result<Desegmenter, Error> {
pub fn desegmenter(
&self,
archive_header: &BlockHeader,
) -> Result<Arc<RwLock<Option<Desegmenter>>>, Error> {
// Use our cached desegmenter if we have one and the associated header matches.
if let Some(d) = self.pibd_desegmenter.read().as_ref() {
if let Some(d) = self.pibd_desegmenter.write().as_ref() {
if d.header() == archive_header {
return Ok(d.clone());
return Ok(self.pibd_desegmenter.clone());
}
}
// If no desegmenter or headers don't match init
// TODO: (Check whether we can do this.. we *should* be able to modify this as the desegmenter
// is in flight and we cross a horizon boundary, but needs more thinking)

let desegmenter = self.init_desegmenter(archive_header)?;
let mut cache = self.pibd_desegmenter.write();
*cache = Some(desegmenter.clone());

return Ok(desegmenter);
Ok(self.pibd_desegmenter.clone())
}

/// initialize a desegmenter, which is capable of extending the hashset by appending
Expand All @@ -1087,6 +1132,7 @@ impl Chain {
self.txhashset(),
self.header_pmmr.clone(),
header.clone(),
self.genesis.clone(),
self.store.clone(),
))
}
Expand All @@ -1112,6 +1158,17 @@ impl Chain {
self.get_header_by_height(txhashset_height)
}

/// Return the Block Header at the txhashset horizon, considering only the
/// contents of the header PMMR
pub fn txhashset_archive_header_header_only(&self) -> Result<BlockHeader, Error> {
let header_head = self.header_head()?;
let threshold = global::state_sync_threshold() as u64;
let archive_interval = global::txhashset_archive_interval();
let mut txhashset_height = header_head.height.saturating_sub(threshold);
txhashset_height = txhashset_height.saturating_sub(txhashset_height % archive_interval);
self.get_header_by_height(txhashset_height)
}

// Special handling to make sure the whole kernel set matches each of its
// roots in each block header, without truncation. We go back header by
// header, rewind and check each root. This fixes a potential weakness in
Expand Down Expand Up @@ -1232,7 +1289,7 @@ impl Chain {
txhashset_data: File,
status: &dyn TxHashsetWriteStatus,
) -> Result<bool, Error> {
status.on_setup();
status.on_setup(None, None, None, None);

// Initial check whether this txhashset is needed or not
let fork_point = self.fork_point()?;
Expand Down Expand Up @@ -1272,7 +1329,7 @@ impl Chain {

let header_pmmr = self.header_pmmr.read();
let batch = self.store.batch()?;
txhashset.verify_kernel_pos_index(&self.genesis, &header_pmmr, &batch)?;
txhashset.verify_kernel_pos_index(&self.genesis, &header_pmmr, &batch, None, None)?;
}

// all good, prepare a new batch and update all the required records
Expand All @@ -1291,7 +1348,7 @@ impl Chain {
// Validate the extension, generating the utxo_sum and kernel_sum.
// Full validation, including rangeproofs and kernel signature verification.
let (utxo_sum, kernel_sum) =
extension.validate(&self.genesis, false, status, &header)?;
extension.validate(&self.genesis, false, status, None, None, &header, None)?;

// Save the block_sums (utxo_sum, kernel_sum) to the db for use later.
batch.save_block_sums(
Expand Down Expand Up @@ -1365,6 +1422,7 @@ impl Chain {
fn remove_historical_blocks(
&self,
header_pmmr: &txhashset::PMMRHandle<BlockHeader>,
archive_header: BlockHeader,
batch: &store::Batch<'_>,
) -> Result<(), Error> {
if self.archive_mode() {
Expand All @@ -1385,7 +1443,6 @@ impl Chain {
// TODO: Check this, compaction selects a different horizon
// block from txhashset horizon/PIBD segmenter when using
// Automated testing chain
let archive_header = self.txhashset_archive_header()?;
if archive_header.height < cutoff {
cutoff = archive_header.height;
horizon = head.height - archive_header.height;
Expand Down Expand Up @@ -1445,6 +1502,10 @@ impl Chain {
}
}

// Retrieve archive header here, so as not to attempt a read
// lock while removing historical blocks
let archive_header = self.txhashset_archive_header()?;

// Take a write lock on the txhashet and start a new writeable db batch.
let header_pmmr = self.header_pmmr.read();
let mut txhashset = self.txhashset.write();
Expand All @@ -1464,7 +1525,7 @@ impl Chain {

// If we are not in archival mode remove historical blocks from the db.
if !self.archive_mode() {
self.remove_historical_blocks(&header_pmmr, &batch)?;
self.remove_historical_blocks(&header_pmmr, archive_header, &batch)?;
}

// Make sure our output_pos index is consistent with the UTXO set.
Expand Down Expand Up @@ -1899,9 +1960,31 @@ fn setup_head(
// Note: We are rewinding and validating against a writeable extension.
// If validation is successful we will truncate the backend files
// to match the provided block header.
let header = batch.get_block_header(&head.last_block_h)?;
let mut pibd_in_progress = false;
let header = {
let head = batch.get_block_header(&head.last_block_h)?;
let pibd_tip = store.pibd_head()?;
let pibd_head = batch.get_block_header(&pibd_tip.last_block_h)?;
if pibd_head.height > head.height {
pibd_in_progress = true;
pibd_head
} else {
head
}
};

let res = txhashset::extending(header_pmmr, txhashset, &mut batch, |ext, batch| {
// If we're still downloading via PIBD, don't worry about sums and validations just yet
// We still want to rewind to the last completed block to ensure a consistent state
if pibd_in_progress {
debug!(
"init: PIBD appears to be in progress at height {}, hash {}, not validating, will attempt to continue",
header.height,
header.hash()
);
return Ok(());
}

pipe::rewind_and_apply_fork(&header, ext, batch, &|_| Ok(()))?;

let extension = &mut ext.extension;
Expand Down
8 changes: 8 additions & 0 deletions chain/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,14 +161,22 @@ pub enum Error {
/// Conversion
source: segment::SegmentError,
},
/// We've decided to halt the PIBD process due to lack of supporting peers or
/// otherwise failing to progress for a certain amount of time
#[error("Aborting PIBD error")]
AbortingPIBDError,
/// The segmenter is associated to a different block header
#[error("Segmenter header mismatch")]
SegmenterHeaderMismatch,
/// Segment height not within allowed range
#[error("Invalid segment height")]
InvalidSegmentHeight,
/// Error from the core calls
#[error("Core error, {0}")]
CoreErr(core::Error),
/// Other issue with segment
#[error("Invalid segment: {0}")]
InvalidSegment(String),
}

impl Error {
Expand Down
1 change: 1 addition & 0 deletions chain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use grin_util as util;
mod chain;
mod error;
pub mod linked_list;
pub mod pibd_params;
pub mod pipe;
pub mod store;
pub mod txhashset;
Expand Down
45 changes: 45 additions & 0 deletions chain/src/pibd_params.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2022 The Grin Developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Set of static definitions for all parameters related to PIBD and Desegmentation
//! Note these are for experimentation via compilation, not meant to be exposed as
//! configuration parameters anywhere
/// Bitmap segment height assumed for requests and segment calculation
pub const BITMAP_SEGMENT_HEIGHT: u8 = 9;

/// Output segment height assumed for requests and segment calculation
pub const OUTPUT_SEGMENT_HEIGHT: u8 = 11;

/// Rangeproof segment height assumed for requests and segment calculation
pub const RANGEPROOF_SEGMENT_HEIGHT: u8 = 11;

/// Kernel segment height assumed for requests and segment calculation
pub const KERNEL_SEGMENT_HEIGHT: u8 = 11;

/// Maximum number of received segments to cache (across all trees) before we stop requesting others
pub const MAX_CACHED_SEGMENTS: usize = 15;

/// How long the state sync should wait after requesting a segment from a peer before
/// deciding the segment isn't going to arrive. The syncer will then re-request the segment
pub const SEGMENT_REQUEST_TIMEOUT_SECS: i64 = 60;

/// Number of simultaneous requests for segments we should make. Note this is currently
/// divisible by 3 to try and evenly spread requests amount the 3 main MMRs (Bitmap segments
/// will always be requested first)
pub const SEGMENT_REQUEST_COUNT: usize = 15;

/// If the syncer hasn't seen a max work peer that supports PIBD in this number of seconds
/// give up and revert back to the txhashset.zip download method
pub const TXHASHSET_ZIP_FALLBACK_TIME_SECS: i64 = 60;
19 changes: 19 additions & 0 deletions chain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
use crate::core::consensus::HeaderDifficultyInfo;
use crate::core::core::hash::{Hash, Hashed};
use crate::core::core::{Block, BlockHeader, BlockSums, Inputs};
use crate::core::global;
use crate::core::pow::Difficulty;
use crate::core::ser::{DeserializationMode, ProtocolVersion, Readable, Writeable};
use crate::linked_list::MultiIndex;
Expand All @@ -36,6 +37,7 @@ const BLOCK_HEADER_PREFIX: u8 = b'h';
const BLOCK_PREFIX: u8 = b'b';
const HEAD_PREFIX: u8 = b'H';
const TAIL_PREFIX: u8 = b'T';
const PIBD_HEAD_PREFIX: u8 = b'I';
const HEADER_HEAD_PREFIX: u8 = b'G';
const OUTPUT_POS_PREFIX: u8 = b'p';

Expand Down Expand Up @@ -83,6 +85,18 @@ impl ChainStore {
option_to_not_found(self.db.get_ser(&[TAIL_PREFIX], None), || "TAIL".to_owned())
}

/// The current PIBD head (will differ from the other heads. Return genesis block if PIBD head doesn't exist).
pub fn pibd_head(&self) -> Result<Tip, Error> {
let res = option_to_not_found(self.db.get_ser(&[PIBD_HEAD_PREFIX], None), || {
"PIBD_HEAD".to_owned()
});

match res {
Ok(r) => Ok(r),
Err(_) => Ok(Tip::from_header(&global::get_genesis_block().header)),
}
}

/// Header of the block at the head of the block chain (not the same thing as header_head).
pub fn head_header(&self) -> Result<BlockHeader, Error> {
self.get_block_header(&self.head()?.last_block_h)
Expand Down Expand Up @@ -209,6 +223,11 @@ impl<'a> Batch<'a> {
self.db.put_ser(&[HEADER_HEAD_PREFIX], t)
}

/// Save PIBD head to db.
pub fn save_pibd_head(&self, t: &Tip) -> Result<(), Error> {
self.db.put_ser(&[PIBD_HEAD_PREFIX], t)
}

/// get block
pub fn get_block(&self, h: &Hash) -> Result<Block, Error> {
option_to_not_found(self.db.get_ser(&to_key(BLOCK_PREFIX, h), None), || {
Expand Down
Loading

0 comments on commit 70b4411

Please sign in to comment.