Skip to content

Commit

Permalink
read header_head and sync_head from the header and sync MMR respectiv…
Browse files Browse the repository at this point in the history
…ely (#3045)

no need to maintain header_head and sync_head in the db explicitly
  • Loading branch information
antiochp authored Oct 29, 2019
1 parent a39ff54 commit a362888
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 187 deletions.
123 changes: 39 additions & 84 deletions chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,6 @@ impl Chain {
&mut sync_pmmr,
&mut txhashset,
)?;
Chain::log_heads(&store)?;

let chain = Chain {
db_root,
Expand All @@ -226,6 +225,8 @@ impl Chain {
chain.rebuild_height_for_pos()?;
}

chain.log_heads()?;

Ok(chain)
}

Expand All @@ -244,46 +245,22 @@ impl Chain {
self.store.clone()
}

fn log_heads(store: &store::ChainStore) -> Result<(), Error> {
let head = store.head()?;
debug!(
"init: head: {} @ {} [{}]",
head.total_difficulty.to_num(),
head.height,
head.last_block_h,
);

let header_head = store.header_head()?;
debug!(
"init: header_head: {} @ {} [{}]",
header_head.total_difficulty.to_num(),
header_head.height,
header_head.last_block_h,
);

let sync_head = store.get_sync_head()?;
debug!(
"init: sync_head: {} @ {} [{}]",
sync_head.total_difficulty.to_num(),
sync_head.height,
sync_head.last_block_h,
);

fn log_heads(&self) -> Result<(), Error> {
let log_head = |name, head: Tip| {
debug!(
"{}: {} @ {} [{}]",
name,
head.total_difficulty.to_num(),
head.height,
head.hash(),
);
};
log_head("head", self.head()?);
log_head("header_head", self.header_head()?);
log_head("sync_head", self.get_sync_head()?);
Ok(())
}

/// Reset sync_head to current header_head.
/// We do this when we first transition to header_sync to ensure we extend
/// the "sync" header MMR from a known consistent state and to ensure we track
/// the header chain correctly at the fork point.
pub fn reset_sync_head(&self) -> Result<Tip, Error> {
let batch = self.store.batch()?;
batch.reset_sync_head()?;
let head = batch.get_sync_head()?;
batch.commit()?;
Ok(head)
}

/// Processes a single block, then checks for orphans, processing
/// those as well if they're found
pub fn process_block(&self, b: Block, opts: Options) -> Result<Option<Tip>, Error> {
Expand Down Expand Up @@ -773,9 +750,8 @@ impl Chain {
pub fn rebuild_sync_mmr(&self, head: &Tip) -> Result<(), Error> {
let mut sync_pmmr = self.sync_pmmr.write();
let mut batch = self.store.batch()?;
let sync_head = batch.get_sync_head()?;
let header = batch.get_block_header(&head.hash())?;
txhashset::header_extending(&mut sync_pmmr, &sync_head, &mut batch, |extension| {
txhashset::header_extending(&mut sync_pmmr, &mut batch, |extension| {
pipe::rewind_and_apply_header_fork(&header, extension)?;
Ok(())
})?;
Expand Down Expand Up @@ -1217,9 +1193,9 @@ impl Chain {

/// Tip (head) of the header chain.
pub fn header_head(&self) -> Result<Tip, Error> {
self.store
.header_head()
.map_err(|e| ErrorKind::StoreErr(e, "chain header head".to_owned()).into())
let hash = self.header_pmmr.read().head_hash()?;
let header = self.store.get_block_header(&hash)?;
Ok(Tip::from_header(&header))
}

/// Block header for the chain head
Expand Down Expand Up @@ -1429,9 +1405,9 @@ impl Chain {
/// Get the tip of the current "sync" header chain.
/// This may be significantly different to current header chain.
pub fn get_sync_head(&self) -> Result<Tip, Error> {
self.store
.get_sync_head()
.map_err(|e| ErrorKind::StoreErr(e, "chain get sync head".to_owned()).into())
let hash = self.sync_pmmr.read().head_hash()?;
let header = self.store.get_block_header(&hash)?;
Ok(Tip::from_header(&header))
}

/// Builds an iterator on blocks starting from the current chain head and
Expand Down Expand Up @@ -1460,6 +1436,22 @@ fn setup_head(
) -> Result<(), Error> {
let mut batch = store.batch()?;

// Apply the genesis header to header and sync MMRs to ensure they are non-empty.
// We read header_head and sync_head directly from the MMR and assume they are non-empty.
{
if header_pmmr.last_pos == 0 {
txhashset::header_extending(header_pmmr, &mut batch, |extension| {
extension.apply_header(&genesis.header)
})?;
}

if sync_pmmr.last_pos == 0 {
txhashset::header_extending(sync_pmmr, &mut batch, |extension| {
extension.apply_header(&genesis.header)
})?;
}
}

// check if we have a head in store, otherwise the genesis block is it
let head_res = batch.head();
let mut head: Tip;
Expand Down Expand Up @@ -1534,13 +1526,7 @@ fn setup_head(
// We will update this later once we have the correct header_root.
batch.save_block_header(&genesis.header)?;
batch.save_block(&genesis)?;

let tip = Tip::from_header(&genesis.header);

// Save these ahead of time as we need head and header_head to be initialized
// with *something* when creating a txhashset extension below.
batch.save_body_head(&tip)?;
batch.save_header_head(&tip)?;
batch.save_body_head(&Tip::from_header(&genesis.header))?;

if genesis.kernels().len() > 0 {
let (utxo_sum, kernel_sum) = (sums, genesis as &dyn Committed).verify_kernel_sums(
Expand All @@ -1552,14 +1538,6 @@ fn setup_head(
kernel_sum,
};
}
txhashset::header_extending(header_pmmr, &tip, &mut batch, |extension| {
extension.apply_header(&genesis.header)?;
Ok(())
})?;
txhashset::header_extending(sync_pmmr, &tip, &mut batch, |extension| {
extension.apply_header(&genesis.header)?;
Ok(())
})?;
txhashset::extending(header_pmmr, txhashset, &mut batch, |ext| {
let ref mut extension = ext.extension;
extension.apply_block(&genesis)?;
Expand All @@ -1575,29 +1553,6 @@ fn setup_head(
}
Err(e) => return Err(ErrorKind::StoreErr(e, "chain init load head".to_owned()))?,
};

// Check we have the header corresponding to the header_head.
// If not then something is corrupted and we should reset our header_head.
// Either way we want to reset sync_head to match header_head.
let head = batch.head()?;
let header_head = batch.header_head()?;
if batch.get_block_header(&header_head.last_block_h).is_ok() {
// Reset sync_head to be consistent with current header_head.
batch.reset_sync_head()?;
} else {
// Reset both header_head and sync_head to be consistent with current head.
warn!(
"setup_head: header missing for {}, {}, resetting header_head and sync_head to head: {}, {}",
header_head.last_block_h,
header_head.height,
head.last_block_h,
head.height,
);
batch.reset_header_head()?;
batch.reset_sync_head()?;
}

batch.commit()?;

Ok(())
}
57 changes: 15 additions & 42 deletions chain/src/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,12 @@ pub fn sync_block_headers(
// Check if we know about all these headers. If so we can accept them quickly.
// If they *do not* increase total work on the sync chain we are done.
// If they *do* increase total work then we should process them to update sync_head.
let sync_head = ctx.batch.get_sync_head()?;
let sync_head = {
let hash = ctx.header_pmmr.head_hash()?;
let header = ctx.batch.get_block_header(&hash)?;
Tip::from_header(&header)
};

if let Ok(existing) = ctx.batch.get_block_header(&last_header.hash()) {
if !has_more_work(&existing, &sync_head) {
return Ok(());
Expand All @@ -197,16 +202,12 @@ pub fn sync_block_headers(
add_block_header(header, &ctx.batch)?;
}

// Now apply this entire chunk of headers to the sync MMR.
txhashset::header_extending(&mut ctx.header_pmmr, &sync_head, &mut ctx.batch, |ext| {
// Now apply this entire chunk of headers to the sync MMR (ctx is sync MMR specific).
txhashset::header_extending(&mut ctx.header_pmmr, &mut ctx.batch, |ext| {
rewind_and_apply_header_fork(&last_header, ext)?;
Ok(())
})?;

if has_more_work(&last_header, &sync_head) {
update_sync_head(&Tip::from_header(&last_header), &mut ctx.batch)?;
}

Ok(())
}

Expand All @@ -227,14 +228,19 @@ pub fn process_block_header(header: &BlockHeader, ctx: &mut BlockContext<'_>) ->
// If it does not increase total_difficulty beyond our current header_head
// then we can (re)accept this header and process the full block (or request it).
// This header is on a fork and we should still accept it as the fork may eventually win.
let header_head = ctx.batch.header_head()?;
let header_head = {
let hash = ctx.header_pmmr.head_hash()?;
let header = ctx.batch.get_block_header(&hash)?;
Tip::from_header(&header)
};

if let Ok(existing) = ctx.batch.get_block_header(&header.hash()) {
if !has_more_work(&existing, &header_head) {
return Ok(());
}
}

txhashset::header_extending(&mut ctx.header_pmmr, &header_head, &mut ctx.batch, |ext| {
txhashset::header_extending(&mut ctx.header_pmmr, &mut ctx.batch, |ext| {
rewind_and_apply_header_fork(&prev_header, ext)?;
ext.validate_root(header)?;
ext.apply_header(header)?;
Expand All @@ -247,15 +253,6 @@ pub fn process_block_header(header: &BlockHeader, ctx: &mut BlockContext<'_>) ->
validate_header(header, ctx)?;
add_block_header(header, &ctx.batch)?;

// Update header_head independently of chain head (full blocks).
// If/when we process the corresponding full block we will update the
// chain head to match. This allows our header chain to extend safely beyond
// the full chain in a fork scenario without needing excessive rewinds to handle
// the temporarily divergent chains.
if has_more_work(&header, &header_head) {
update_header_head(&Tip::from_header(&header), &mut ctx.batch)?;
}

Ok(())
}

Expand Down Expand Up @@ -470,30 +467,6 @@ fn has_more_work(header: &BlockHeader, head: &Tip) -> bool {
header.total_difficulty() > head.total_difficulty
}

/// Update the sync head so we can keep syncing from where we left off.
fn update_sync_head(head: &Tip, batch: &mut store::Batch<'_>) -> Result<(), Error> {
batch
.save_sync_head(&head)
.map_err(|e| ErrorKind::StoreErr(e, "pipe save sync head".to_owned()))?;
debug!(
"sync_head updated to {} at {}",
head.last_block_h, head.height
);
Ok(())
}

/// Update the header_head.
fn update_header_head(head: &Tip, batch: &mut store::Batch<'_>) -> Result<(), Error> {
batch
.save_header_head(&head)
.map_err(|e| ErrorKind::StoreErr(e, "pipe save header head".to_owned()))?;
debug!(
"header_head updated to {} at {}",
head.last_block_h, head.height
);
Ok(())
}

/// Rewind the header chain and reapply headers on a fork.
pub fn rewind_and_apply_header_fork(
header: &BlockHeader,
Expand Down
52 changes: 0 additions & 52 deletions chain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ const BLOCK_HEADER_PREFIX: u8 = 'h' as u8;
const BLOCK_PREFIX: u8 = 'b' as u8;
const HEAD_PREFIX: u8 = 'H' as u8;
const TAIL_PREFIX: u8 = 'T' as u8;
const HEADER_HEAD_PREFIX: u8 = 'I' as u8;
const SYNC_HEAD_PREFIX: u8 = 's' as u8;
const COMMIT_POS_PREFIX: u8 = 'c' as u8;
const COMMIT_POS_HGT_PREFIX: u8 = 'p' as u8;
const BLOCK_INPUT_BITMAP_PREFIX: u8 = 'B' as u8;
Expand Down Expand Up @@ -79,20 +77,6 @@ impl ChainStore {
self.get_block_header(&self.head()?.last_block_h)
}

/// Head of the header chain (not the same thing as head_header).
pub fn header_head(&self) -> Result<Tip, Error> {
option_to_not_found(self.db.get_ser(&vec![HEADER_HEAD_PREFIX]), || {
"HEADER_HEAD".to_owned()
})
}

/// The "sync" head.
pub fn get_sync_head(&self) -> Result<Tip, Error> {
option_to_not_found(self.db.get_ser(&vec![SYNC_HEAD_PREFIX]), || {
"SYNC_HEAD".to_owned()
})
}

/// Get full block.
pub fn get_block(&self, h: &Hash) -> Result<Block, Error> {
option_to_not_found(
Expand Down Expand Up @@ -198,20 +182,6 @@ impl<'a> Batch<'a> {
self.get_block_header(&self.head()?.last_block_h)
}

/// Head of the header chain (not the same thing as head_header).
pub fn header_head(&self) -> Result<Tip, Error> {
option_to_not_found(self.db.get_ser(&vec![HEADER_HEAD_PREFIX]), || {
"HEADER_HEAD".to_owned()
})
}

/// Get "sync" head.
pub fn get_sync_head(&self) -> Result<Tip, Error> {
option_to_not_found(self.db.get_ser(&vec![SYNC_HEAD_PREFIX]), || {
"SYNC_HEAD".to_owned()
})
}

/// Save body head to db.
pub fn save_body_head(&self, t: &Tip) -> Result<(), Error> {
self.db.put_ser(&vec![HEAD_PREFIX], t)
Expand All @@ -222,28 +192,6 @@ impl<'a> Batch<'a> {
self.db.put_ser(&vec![TAIL_PREFIX], t)
}

/// Save header_head to db.
pub fn save_header_head(&self, t: &Tip) -> Result<(), Error> {
self.db.put_ser(&vec![HEADER_HEAD_PREFIX], t)
}

/// Save "sync" head to db.
pub fn save_sync_head(&self, t: &Tip) -> Result<(), Error> {
self.db.put_ser(&vec![SYNC_HEAD_PREFIX], t)
}

/// Reset sync_head to the current head of the header chain.
pub fn reset_sync_head(&self) -> Result<(), Error> {
let head = self.header_head()?;
self.save_sync_head(&head)
}

/// Reset header_head to the current head of the body chain.
pub fn reset_header_head(&self) -> Result<(), Error> {
let tip = self.head()?;
self.save_header_head(&tip)
}

/// get block
pub fn get_block(&self, h: &Hash) -> Result<Block, Error> {
option_to_not_found(
Expand Down
Loading

0 comments on commit a362888

Please sign in to comment.