Skip to content

Commit

Permalink
Block input bitmap rework (#3236)
Browse files Browse the repository at this point in the history
* first pass at rewind_single_block
and reworking rewind to simply iterate over blocks, rewinding each incrementally

* commit

* commit

* cleanup

* add test coverage for output_pos index transactional semantics during rewind

* commit

* do not store commitments in spent_index
just use the order of the inputs in the block

* compare key with commitment when cleaning output_pos index

* remove unused OutputPos struct
  • Loading branch information
antiochp authored Feb 24, 2020
1 parent ef853ae commit cb2b909
Show file tree
Hide file tree
Showing 7 changed files with 384 additions and 189 deletions.
4 changes: 2 additions & 2 deletions api/src/handlers/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pub fn get_output(
match res {
Ok(output_pos) => {
return Ok((
Output::new(&commit, output_pos.height, output_pos.position),
Output::new(&commit, output_pos.height, output_pos.pos),
x.clone(),
));
}
Expand Down Expand Up @@ -100,7 +100,7 @@ pub fn get_output_v2(
for x in outputs.iter() {
let res = chain.is_unspent(x);
match res {
Ok(output_pos) => match chain.get_unspent_output_at(output_pos.position) {
Ok(output_pos) => match chain.get_unspent_output_at(output_pos.pos) {
Ok(output) => {
let header = if include_merkle_proof && output.is_coinbase() {
chain.get_header_by_height(output_pos.height).ok()
Expand Down
30 changes: 21 additions & 9 deletions chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::store;
use crate::txhashset;
use crate::txhashset::{PMMRHandle, TxHashSet};
use crate::types::{
BlockStatus, ChainAdapter, NoStatus, Options, OutputMMRPosition, Tip, TxHashsetWriteStatus,
BlockStatus, ChainAdapter, CommitPos, NoStatus, Options, Tip, TxHashsetWriteStatus,
};
use crate::util::secp::pedersen::{Commitment, RangeProof};
use crate::util::RwLock;
Expand Down Expand Up @@ -199,6 +199,15 @@ impl Chain {
&mut txhashset,
)?;

// Initialize the output_pos index based on UTXO set.
// This is fast as we only look for stale and missing entries
// and do not need to rebuild the entire index.
{
let batch = store.batch()?;
txhashset.init_output_pos_index(&header_pmmr, &batch)?;
batch.commit()?;
}

let chain = Chain {
db_root,
store,
Expand Down Expand Up @@ -495,9 +504,8 @@ impl Chain {
/// spent. This querying is done in a way that is consistent with the
/// current chain state, specifically the current winning (valid, most
/// work) fork.
pub fn is_unspent(&self, output_ref: &OutputIdentifier) -> Result<OutputMMRPosition, Error> {
let txhashset = self.txhashset.read();
txhashset.is_unspent(output_ref)
pub fn is_unspent(&self, output_ref: &OutputIdentifier) -> Result<CommitPos, Error> {
self.txhashset.read().is_unspent(output_ref)
}

/// Retrieves an unspent output using its PMMR position
Expand Down Expand Up @@ -973,7 +981,7 @@ impl Chain {
}

// Rebuild our output_pos index in the db based on fresh UTXO set.
txhashset.init_output_pos_index(&header_pmmr, &mut batch)?;
txhashset.init_output_pos_index(&header_pmmr, &batch)?;

// Commit all the changes to the db.
batch.commit()?;
Expand Down Expand Up @@ -1015,7 +1023,7 @@ impl Chain {
fn remove_historical_blocks(
&self,
header_pmmr: &txhashset::PMMRHandle<BlockHeader>,
batch: &mut store::Batch<'_>,
batch: &store::Batch<'_>,
) -> Result<(), Error> {
if self.archive_mode {
return Ok(());
Expand Down Expand Up @@ -1089,7 +1097,7 @@ impl Chain {
// Take a write lock on the txhashet and start a new writeable db batch.
let header_pmmr = self.header_pmmr.read();
let mut txhashset = self.txhashset.write();
let mut batch = self.store.batch()?;
let batch = self.store.batch()?;

// Compact the txhashset itself (rewriting the pruned backend files).
{
Expand All @@ -1100,14 +1108,17 @@ impl Chain {
let horizon_hash = header_pmmr.get_header_hash_by_height(horizon_height)?;
let horizon_header = batch.get_block_header(&horizon_hash)?;

txhashset.compact(&horizon_header, &mut batch)?;
txhashset.compact(&horizon_header, &batch)?;
}

// If we are not in archival mode remove historical blocks from the db.
if !self.archive_mode {
self.remove_historical_blocks(&header_pmmr, &mut batch)?;
self.remove_historical_blocks(&header_pmmr, &batch)?;
}

// Make sure our output_pos index is consistent with the UTXO set.
txhashset.init_output_pos_index(&header_pmmr, &batch)?;

// Commit all the above db changes.
batch.commit()?;

Expand Down Expand Up @@ -1510,6 +1521,7 @@ fn setup_head(
// We will update this later once we have the correct header_root.
batch.save_block_header(&genesis.header)?;
batch.save_block(&genesis)?;
batch.save_spent_index(&genesis.hash(), &vec![])?;
batch.save_body_head(&Tip::from_header(&genesis.header))?;

if !genesis.kernels().is_empty() {
Expand Down
33 changes: 20 additions & 13 deletions chain/src/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::core::pow;
use crate::error::{Error, ErrorKind};
use crate::store;
use crate::txhashset;
use crate::types::{Options, Tip};
use crate::types::{CommitPos, Options, Tip};
use crate::util::RwLock;
use grin_store;
use std::sync::Arc;
Expand Down Expand Up @@ -121,7 +121,7 @@ pub fn process_block(b: &Block, ctx: &mut BlockContext<'_>) -> Result<Option<Tip
let ref mut header_pmmr = &mut ctx.header_pmmr;
let ref mut txhashset = &mut ctx.txhashset;
let ref mut batch = &mut ctx.batch;
let block_sums = txhashset::extending(header_pmmr, txhashset, batch, |ext, batch| {
let (block_sums, spent) = txhashset::extending(header_pmmr, txhashset, batch, |ext, batch| {
rewind_and_apply_fork(&prev, ext, batch)?;

// Check any coinbase being spent have matured sufficiently.
Expand All @@ -143,22 +143,24 @@ pub fn process_block(b: &Block, ctx: &mut BlockContext<'_>) -> Result<Option<Tip
// Apply the block to the txhashset state.
// Validate the txhashset roots and sizes against the block header.
// Block is invalid if there are any discrepencies.
apply_block_to_txhashset(b, ext, batch)?;
let spent = apply_block_to_txhashset(b, ext, batch)?;

// If applying this block does not increase the work on the chain then
// we know we have not yet updated the chain to produce a new chain head.
// We discard the "child" batch used in this extension (original ctx batch still active).
// We discard any MMR modifications applied in this extension.
let head = batch.head()?;
if !has_more_work(&b.header, &head) {
ext.extension.force_rollback();
}

Ok(block_sums)
Ok((block_sums, spent))
})?;

// Add the validated block to the db along with the corresponding block_sums.
// We do this even if we have not increased the total cumulative work
// so we can maintain multiple (in progress) forks.
add_block(b, &block_sums, &ctx.batch)?;
add_block(b, &block_sums, &spent, &ctx.batch)?;

// If we have no "tail" then set it now.
if ctx.batch.tail().is_err() {
Expand Down Expand Up @@ -429,20 +431,25 @@ fn apply_block_to_txhashset(
block: &Block,
ext: &mut txhashset::ExtensionPair<'_>,
batch: &store::Batch<'_>,
) -> Result<(), Error> {
ext.extension.apply_block(block, batch)?;
) -> Result<Vec<CommitPos>, Error> {
let spent = ext.extension.apply_block(block, batch)?;
ext.extension.validate_roots(&block.header)?;
ext.extension.validate_sizes(&block.header)?;
Ok(())
Ok(spent)
}

/// Officially adds the block to our chain.
/// Officially adds the block to our chain (possibly on a losing fork).
/// Adds the associated block_sums and spent_index as well.
/// Header must be added separately (assume this has been done previously).
fn add_block(b: &Block, block_sums: &BlockSums, batch: &store::Batch<'_>) -> Result<(), Error> {
batch
.save_block(b)
.map_err(|e| ErrorKind::StoreErr(e, "pipe save block".to_owned()))?;
fn add_block(
b: &Block,
block_sums: &BlockSums,
spent: &Vec<CommitPos>,
batch: &store::Batch<'_>,
) -> Result<(), Error> {
batch.save_block(b)?;
batch.save_block_sums(&b.hash(), block_sums)?;
batch.save_spent_index(&b.hash(), spent)?;
Ok(())
}

Expand Down
104 changes: 57 additions & 47 deletions chain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ use crate::core::core::hash::{Hash, Hashed};
use crate::core::core::{Block, BlockHeader, BlockSums};
use crate::core::pow::Difficulty;
use crate::core::ser::ProtocolVersion;
use crate::types::Tip;
use crate::types::{CommitPos, Tip};
use crate::util::secp::pedersen::Commitment;
use croaring::Bitmap;
use grin_store as store;
use grin_store::{option_to_not_found, to_key, Error, SerIterator};
use std::convert::TryInto;
use std::sync::Arc;

const STORE_SUBPATH: &str = "chain";
Expand All @@ -35,6 +36,7 @@ const TAIL_PREFIX: u8 = b'T';
const OUTPUT_POS_PREFIX: u8 = b'p';
const BLOCK_INPUT_BITMAP_PREFIX: u8 = b'B';
const BLOCK_SUMS_PREFIX: u8 = b'M';
const BLOCK_SPENT_PREFIX: u8 = b'S';

/// All chain-related database operations
pub struct ChainStore {
Expand Down Expand Up @@ -178,16 +180,19 @@ impl<'a> Batch<'a> {
self.db.exists(&to_key(BLOCK_PREFIX, &mut h.to_vec()))
}

/// Save the block and the associated input bitmap.
/// Save the block to the db.
/// Note: the block header is not saved to the db here, assumes this has already been done.
pub fn save_block(&self, b: &Block) -> Result<(), Error> {
// Build the "input bitmap" for this new block and store it in the db.
self.build_and_store_block_input_bitmap(&b)?;

// Save the block itself to the db.
self.db
.put_ser(&to_key(BLOCK_PREFIX, &mut b.hash().to_vec())[..], b)?;
Ok(())
}

/// We maintain a "spent" index for each full block to allow the output_pos
/// to be easily reverted during rewind.
pub fn save_spent_index(&self, h: &Hash, spent: &Vec<CommitPos>) -> Result<(), Error> {
self.db
.put_ser(&to_key(BLOCK_SPENT_PREFIX, &mut h.to_vec())[..], spent)?;
Ok(())
}

Expand Down Expand Up @@ -217,7 +222,7 @@ impl<'a> Batch<'a> {
// Not an error if these fail.
{
let _ = self.delete_block_sums(bh);
let _ = self.delete_block_input_bitmap(bh);
let _ = self.delete_spent_index(bh);
}

Ok(())
Expand Down Expand Up @@ -247,6 +252,20 @@ impl<'a> Batch<'a> {
)
}

/// Delete the output_pos index entry for a spent output.
pub fn delete_output_pos_height(&self, commit: &Commitment) -> Result<(), Error> {
self.db
.delete(&to_key(OUTPUT_POS_PREFIX, &mut commit.as_ref().to_vec()))
}

/// When using the output_pos iterator we have access to the index keys but not the
/// original commitment that the key is constructed from. So we need a way of comparing
/// a key with another commitment without reconstructing the commitment from the key bytes.
pub fn is_match_output_pos_key(&self, key: &[u8], commit: &Commitment) -> bool {
let commit_key = to_key(OUTPUT_POS_PREFIX, &mut commit.as_ref().to_vec());
commit_key == key
}

/// Iterator over the output_pos index.
pub fn output_pos_iter(&self) -> Result<SerIterator<(u64, u64)>, Error> {
let key = to_key(OUTPUT_POS_PREFIX, &mut "".to_string().into_bytes());
Expand Down Expand Up @@ -281,18 +300,15 @@ impl<'a> Batch<'a> {
)
}

/// Save the input bitmap for the block.
fn save_block_input_bitmap(&self, bh: &Hash, bm: &Bitmap) -> Result<(), Error> {
self.db.put(
&to_key(BLOCK_INPUT_BITMAP_PREFIX, &mut bh.to_vec())[..],
&bm.serialize(),
)
}
/// Delete the block spent index.
fn delete_spent_index(&self, bh: &Hash) -> Result<(), Error> {
// Clean up the legacy input bitmap as well.
let _ = self
.db
.delete(&to_key(BLOCK_INPUT_BITMAP_PREFIX, &mut bh.to_vec()));

/// Delete the block input bitmap.
fn delete_block_input_bitmap(&self, bh: &Hash) -> Result<(), Error> {
self.db
.delete(&to_key(BLOCK_INPUT_BITMAP_PREFIX, &mut bh.to_vec()))
.delete(&to_key(BLOCK_SPENT_PREFIX, &mut bh.to_vec()))
}

/// Save block_sums for the block.
Expand All @@ -314,47 +330,41 @@ impl<'a> Batch<'a> {
self.db.delete(&to_key(BLOCK_SUMS_PREFIX, &mut bh.to_vec()))
}

/// Build the input bitmap for the given block.
fn build_block_input_bitmap(&self, block: &Block) -> Result<Bitmap, Error> {
let bitmap = block
.inputs()
.iter()
.filter_map(|x| self.get_output_pos(&x.commitment()).ok())
.map(|x| x as u32)
.collect();
Ok(bitmap)
}

/// Build and store the input bitmap for the given block.
fn build_and_store_block_input_bitmap(&self, block: &Block) -> Result<Bitmap, Error> {
// Build the bitmap.
let bitmap = self.build_block_input_bitmap(block)?;

// Save the bitmap to the db (via the batch).
self.save_block_input_bitmap(&block.hash(), &bitmap)?;

Ok(bitmap)
/// Get the block input bitmap based on our spent index.
/// Fallback to legacy block input bitmap from the db.
pub fn get_block_input_bitmap(&self, bh: &Hash) -> Result<Bitmap, Error> {
if let Ok(spent) = self.get_spent_index(bh) {
let bitmap = spent
.into_iter()
.map(|x| x.pos.try_into().unwrap())
.collect();
Ok(bitmap)
} else {
self.get_legacy_input_bitmap(bh)
}
}

/// Get the block input bitmap from the db or build the bitmap from
/// the full block from the db (if the block is found).
pub fn get_block_input_bitmap(&self, bh: &Hash) -> Result<Bitmap, Error> {
fn get_legacy_input_bitmap(&self, bh: &Hash) -> Result<Bitmap, Error> {
if let Ok(Some(bytes)) = self
.db
.get(&to_key(BLOCK_INPUT_BITMAP_PREFIX, &mut bh.to_vec()))
{
Ok(Bitmap::deserialize(&bytes))
} else {
match self.get_block(bh) {
Ok(block) => {
let bitmap = self.build_and_store_block_input_bitmap(&block)?;
Ok(bitmap)
}
Err(e) => Err(e),
}
Err(Error::NotFoundErr("legacy block input bitmap".to_string()).into())
}
}

/// Get the "spent index" from the db for the specified block.
/// If we need to rewind a block then we use this to "unspend" the spent outputs.
pub fn get_spent_index(&self, bh: &Hash) -> Result<Vec<CommitPos>, Error> {
option_to_not_found(
self.db
.get_ser(&to_key(BLOCK_SPENT_PREFIX, &mut bh.to_vec())),
|| format!("spent index: {}", bh),
)
}

/// Commits this batch. If it's a child batch, it will be merged with the
/// parent, otherwise the batch is written to db.
pub fn commit(self) -> Result<(), Error> {
Expand Down
Loading

0 comments on commit cb2b909

Please sign in to comment.