Skip to content

Commit

Permalink
log reorg (and fork) depth correctly (#3376)
Browse files Browse the repository at this point in the history
* log reorg (and fork) depth correctly
depth should be based on "fork point" not prev head

* correct depth for fork/reorg (0 for alternate head etc.)
  • Loading branch information
antiochp authored Jul 8, 2020
1 parent 30db9c4 commit 3225319
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 62 deletions.
34 changes: 16 additions & 18 deletions chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,24 +271,23 @@ impl Chain {
res
}

fn determine_status(&self, head: Option<Tip>, prev_head: Tip) -> BlockStatus {
// We have more work if the chain head is updated.
let is_more_work = head.is_some();

let mut is_next_block = false;
let mut reorg_depth = None;
fn determine_status(&self, head: Option<Tip>, prev_head: Tip, fork_point: Tip) -> BlockStatus {
// If head is updated then we are either "next" block or we just experienced a "reorg" to new head.
// Otherwise this is a "fork" off the main chain.
if let Some(head) = head {
if head.prev_block_h == prev_head.last_block_h {
is_next_block = true;
BlockStatus::Next { prev_head }
} else {
reorg_depth = Some(prev_head.height.saturating_sub(head.height) + 1);
BlockStatus::Reorg {
prev_head,
fork_point,
}
}
} else {
BlockStatus::Fork {
prev_head,
fork_point,
}
}

match (is_more_work, is_next_block) {
(true, true) => BlockStatus::Next,
(true, false) => BlockStatus::Reorg(reorg_depth.unwrap_or(0)),
(false, _) => BlockStatus::Fork,
}
}

Expand All @@ -305,10 +304,9 @@ impl Chain {
let mut header_pmmr = self.header_pmmr.write();
let mut txhashset = self.txhashset.write();
let batch = self.store.batch()?;
let prev_head = batch.head()?;
let mut ctx = self.new_ctx(opts, batch, &mut header_pmmr, &mut txhashset)?;

let prev_head = ctx.batch.head()?;

let maybe_new_head = pipe::process_block(&b, &mut ctx);

// We have flushed txhashset extension changes to disk
Expand All @@ -324,8 +322,8 @@ impl Chain {
};

match maybe_new_head {
Ok(head) => {
let status = self.determine_status(head.clone(), prev_head);
Ok((head, fork_point)) => {
let status = self.determine_status(head, prev_head, Tip::from_header(&fork_point));

// notifying other parts of the system of the update
self.adapter.block_accepted(&b, status, opts);
Expand Down
22 changes: 13 additions & 9 deletions chain/src/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,11 @@ fn validate_pow_only(header: &BlockHeader, ctx: &mut BlockContext<'_>) -> Result

/// Runs the block processing pipeline, including validation and finding a
/// place for the new block in the chain.
/// Returns new head if chain head updated.
pub fn process_block(b: &Block, ctx: &mut BlockContext<'_>) -> Result<Option<Tip>, Error> {
/// Returns new head if chain head updated and the "fork point" rewound to when processing the new block.
pub fn process_block(
b: &Block,
ctx: &mut BlockContext<'_>,
) -> Result<(Option<Tip>, BlockHeader), Error> {
debug!(
"pipe: process_block {} at {} [in/out/kern: {}/{}/{}]",
b.hash(),
Expand Down Expand Up @@ -127,8 +130,8 @@ pub fn process_block(b: &Block, ctx: &mut BlockContext<'_>) -> Result<Option<Tip
let header_pmmr = &mut ctx.header_pmmr;
let txhashset = &mut ctx.txhashset;
let batch = &mut ctx.batch;
txhashset::extending(header_pmmr, txhashset, batch, |ext, batch| {
rewind_and_apply_fork(&prev, ext, batch)?;
let fork_point = txhashset::extending(header_pmmr, txhashset, batch, |ext, batch| {
let fork_point = rewind_and_apply_fork(&prev, ext, batch)?;

// Check any coinbase being spent have matured sufficiently.
// This needs to be done within the context of a potentially
Expand Down Expand Up @@ -159,7 +162,7 @@ pub fn process_block(b: &Block, ctx: &mut BlockContext<'_>) -> Result<Option<Tip
ext.extension.force_rollback();
}

Ok(())
Ok(fork_point)
})?;

// Add the validated block to the db.
Expand All @@ -176,9 +179,9 @@ pub fn process_block(b: &Block, ctx: &mut BlockContext<'_>) -> Result<Option<Tip
if has_more_work(&b.header, &head) {
let head = Tip::from_header(&b.header);
update_head(&head, &mut ctx.batch)?;
Ok(Some(head))
Ok((Some(head), fork_point))
} else {
Ok(None)
Ok((None, fork_point))
}
}

Expand Down Expand Up @@ -545,11 +548,12 @@ pub fn rewind_and_apply_header_fork(
/// to find to fork point. Rewind the txhashset to the fork point and apply all
/// necessary blocks prior to the one being processed to set the txhashset in
/// the expected state.
/// Returns the "fork point" that we rewound to.
pub fn rewind_and_apply_fork(
header: &BlockHeader,
ext: &mut txhashset::ExtensionPair<'_>,
batch: &store::Batch<'_>,
) -> Result<(), Error> {
) -> Result<BlockHeader, Error> {
let extension = &mut ext.extension;
let header_extension = &mut ext.header_extension;

Expand Down Expand Up @@ -593,7 +597,7 @@ pub fn rewind_and_apply_fork(
apply_block_to_txhashset(&fb, ext, batch)?;
}

Ok(())
Ok(fork_point)
}

fn validate_utxo(
Expand Down
41 changes: 36 additions & 5 deletions chain/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ impl Writeable for CommitPos {
/// blockchain tree. References the max height and the latest and previous
/// blocks
/// for convenience and the total difficulty.
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq)]
pub struct Tip {
/// Height of the tip (max height of the fork)
pub height: u64,
Expand Down Expand Up @@ -444,13 +444,44 @@ impl ChainAdapter for NoopAdapter {
}

/// Status of an accepted block.
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum BlockStatus {
/// Block is the "next" block, updating the chain head.
Next,
Next {
/// Previous chain head.
prev_head: Tip,
},
/// Block does not update the chain head and is a fork.
Fork,
Fork {
/// Previous chain head.
prev_head: Tip,
/// Fork point for rewind.
fork_point: Tip,
},
/// Block updates the chain head via a (potentially disruptive) "reorg".
/// Previous block was not our previous chain head.
Reorg(u64),
Reorg {
/// Previous chain head.
prev_head: Tip,
/// Fork point for rewind.
fork_point: Tip,
},
}

impl BlockStatus {
/// Is this the "next" block?
pub fn is_next(&self) -> bool {
match *self {
BlockStatus::Next { .. } => true,
_ => false,
}
}

/// Is this block a "reorg"?
pub fn is_reorg(&self) -> bool {
match *self {
BlockStatus::Reorg { .. } => true,
_ => false,
}
}
}
12 changes: 8 additions & 4 deletions chain/tests/mine_simple_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,12 +365,12 @@ fn mine_reorg() {
chain.process_block(b, chain::Options::SKIP_POW).unwrap();
}

let head = chain.head_header().unwrap();
let head = chain.head().unwrap();
assert_eq!(head.height, NUM_BLOCKS_MAIN);
assert_eq!(head.hash(), prev.hash());

// Reorg chain should exceed main chain's total difficulty to be considered
let reorg_difficulty = head.total_difficulty().to_num();
let reorg_difficulty = head.total_difficulty.to_num();

// Create one block for reorg chain forking off NUM_BLOCKS_MAIN - REORG_DEPTH height
let fork_head = chain
Expand All @@ -381,13 +381,17 @@ fn mine_reorg() {
chain.process_block(b, chain::Options::SKIP_POW).unwrap();

// Check that reorg is correctly reported in block status
let fork_point = chain.get_header_by_height(1).unwrap();
assert_eq!(
*adapter.last_status.read(),
Some(BlockStatus::Reorg(REORG_DEPTH))
Some(BlockStatus::Reorg {
prev_head: head,
fork_point: Tip::from_header(&fork_point)
})
);

// Chain should be switched to the reorganized chain
let head = chain.head_header().unwrap();
let head = chain.head().unwrap();
assert_eq!(head.height, NUM_BLOCKS_MAIN - REORG_DEPTH + 1);
assert_eq!(head.hash(), reorg_head.hash());
}
Expand Down
12 changes: 4 additions & 8 deletions servers/src/common/adapters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -725,7 +725,7 @@ where
// not broadcasting blocks received through sync
if !opts.contains(chain::Options::SYNC) {
for hook in &self.hooks {
hook.on_block_accepted(b, &status);
hook.on_block_accepted(b, status);
}
// If we mined the block then we want to broadcast the compact block.
// If we received the block from another node then broadcast "header first"
Expand All @@ -743,12 +743,8 @@ where
// Reconcile the txpool against the new block *after* we have broadcast it too our peers.
// This may be slow and we do not want to delay block propagation.
// We only want to reconcile the txpool against the new block *if* total work has increased.
let is_reorg = if let BlockStatus::Reorg(_) = status {
true
} else {
false
};
if status == BlockStatus::Next || is_reorg {

if status.is_next() || status.is_reorg() {
let mut tx_pool = self.tx_pool.write();

let _ = tx_pool.reconcile_block(b);
Expand All @@ -758,7 +754,7 @@ where
tx_pool.truncate_reorg_cache(cutoff);
}

if is_reorg {
if status.is_reorg() {
let _ = self.tx_pool.write().reconcile_reorg_cache(&b.header);
}
}
Expand Down
50 changes: 32 additions & 18 deletions servers/src/common/hooks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ pub trait NetEvents {
/// Trait to be implemented by Chain Event Hooks
pub trait ChainEvents {
/// Triggers when a new block is accepted by the chain (might be a Reorg or a Fork)
fn on_block_accepted(&self, block: &core::Block, status: &BlockStatus) {}
fn on_block_accepted(&self, block: &core::Block, status: BlockStatus) {}
}

/// Basic Logger
Expand Down Expand Up @@ -116,31 +116,45 @@ impl NetEvents for EventLogger {
}

impl ChainEvents for EventLogger {
fn on_block_accepted(&self, block: &core::Block, status: &BlockStatus) {
fn on_block_accepted(&self, block: &core::Block, status: BlockStatus) {
match status {
BlockStatus::Reorg(depth) => {
BlockStatus::Reorg {
prev_head,
fork_point,
} => {
warn!(
"block_accepted (REORG!): {:?} at {} (depth: {}, diff: {})",
"block_accepted (REORG!): {} at {}, (prev_head: {} at {}, fork_point: {} at {}, depth: {})",
block.hash(),
block.header.height,
depth,
block.header.total_difficulty(),
prev_head.hash(),
prev_head.height,
fork_point.hash(),
fork_point.height,
block.header.height.saturating_sub(fork_point.height + 1),
);
}
BlockStatus::Fork => {
BlockStatus::Fork {
prev_head,
fork_point,
} => {
debug!(
"block_accepted (fork?): {:?} at {} (diff: {})",
"block_accepted (fork?): {} at {}, (prev_head: {} at {}, fork_point: {} at {}, depth: {})",
block.hash(),
block.header.height,
block.header.total_difficulty(),
prev_head.hash(),
prev_head.height,
fork_point.hash(),
fork_point.height,
block.header.height.saturating_sub(fork_point.height + 1),
);
}
BlockStatus::Next => {
BlockStatus::Next { prev_head } => {
debug!(
"block_accepted (head+): {:?} at {} (diff: {})",
"block_accepted (head+): {} at {} (prev_head: {} at {})",
block.hash(),
block.header.height,
block.header.total_difficulty(),
prev_head.hash(),
prev_head.height,
);
}
}
Expand Down Expand Up @@ -262,20 +276,20 @@ impl WebHook {
}

impl ChainEvents for WebHook {
fn on_block_accepted(&self, block: &core::Block, status: &BlockStatus) {
fn on_block_accepted(&self, block: &core::Block, status: BlockStatus) {
let status_str = match status {
BlockStatus::Reorg(_) => "reorg",
BlockStatus::Fork => "fork",
BlockStatus::Next => "head",
BlockStatus::Reorg { .. } => "reorg",
BlockStatus::Fork { .. } => "fork",
BlockStatus::Next { .. } => "head",
};

// Add additional `depth` field to the JSON in case of reorg
let payload = if let BlockStatus::Reorg(depth) = status {
let payload = if let BlockStatus::Reorg { fork_point, .. } = status {
let depth = block.header.height.saturating_sub(fork_point.height + 1);
json!({
"hash": block.header.hash().to_hex(),
"status": status_str,
"data": block,

"depth": depth
})
} else {
Expand Down

0 comments on commit 3225319

Please sign in to comment.