Skip to content

Commit

Permalink
store both mmr index and block height into database for output (#2903)
Browse files Browse the repository at this point in the history
* store both mmr index and block height into database for output

* rustfmt

* fix: mmr position is 1-based instead of 0-based

* (Hash, u64, u64) deserves a type
  • Loading branch information
garyyu authored and antiochp committed Aug 29, 2019
1 parent 97f961f commit d36a0b2
Show file tree
Hide file tree
Showing 9 changed files with 214 additions and 74 deletions.
4 changes: 2 additions & 2 deletions api/src/handlers/transactions_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ impl TxHashSetHandler {
// allows traversal of utxo set
fn outputs(&self, start_index: u64, mut max: u64) -> Result<OutputListing, Error> {
//set a limit here
if max > 1000 {
max = 1000;
if max > 10_000 {
max = 10_000;
}
let chain = w(&self.chain)?;
let outputs = chain
Expand Down
27 changes: 18 additions & 9 deletions api/src/handlers/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,24 @@ pub fn get_output(

let chain = w(chain)?;

for x in outputs.iter().filter(|x| chain.is_unspent(x).is_ok()) {
let block_height = chain
.get_header_for_output(&x)
.context(ErrorKind::Internal(
"Can't get header for output".to_owned(),
))?
.height;
let output_pos = chain.get_output_pos(&x.commit).unwrap_or(0);
return Ok((Output::new(&commit, block_height, output_pos), x.clone()));
for x in outputs.iter() {
let res = chain.is_unspent(x);
match res {
Ok(output_pos) => {
return Ok((
Output::new(&commit, output_pos.height, output_pos.position),
x.clone(),
));
}
Err(e) => {
trace!(
"get_output: err: {} for commit: {:?} with feature: {:?}",
e.to_string(),
x.commit,
x.features
);
}
}
}
Err(ErrorKind::NotFound)?
}
9 changes: 5 additions & 4 deletions api/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,10 +276,11 @@ impl OutputPrintable {
};

let out_id = core::OutputIdentifier::from_output(&output);
let spent = chain.is_unspent(&out_id).is_err();
let block_height = match spent {
true => None,
false => Some(chain.get_header_for_output(&out_id)?.height),
let res = chain.is_unspent(&out_id);
let (spent, block_height) = if let Ok(output_pos) = res {
(false, Some(output_pos.height))
} else {
(true, None)
};

let proof = if include_proof {
Expand Down
122 changes: 76 additions & 46 deletions chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ use crate::store;
use crate::txhashset;
use crate::txhashset::TxHashSet;
use crate::types::{
BlockStatus, ChainAdapter, NoStatus, Options, Tip, TxHashSetRoots, TxHashsetWriteStatus,
BlockStatus, ChainAdapter, NoStatus, Options, OutputMMRPosition, Tip, TxHashSetRoots,
TxHashsetWriteStatus,
};
use crate::util::secp::pedersen::{Commitment, RangeProof};
use crate::util::RwLock;
Expand Down Expand Up @@ -467,13 +468,9 @@ impl Chain {
/// spent. This querying is done in a way that is consistent with the
/// current chain state, specifically the current winning (valid, most
/// work) fork.
pub fn is_unspent(&self, output_ref: &OutputIdentifier) -> Result<Hash, Error> {
pub fn is_unspent(&self, output_ref: &OutputIdentifier) -> Result<OutputMMRPosition, Error> {
let txhashset = self.txhashset.read();
let res = txhashset.is_unspent(output_ref);
match res {
Err(e) => Err(e),
Ok((h, _)) => Ok(h),
}
txhashset.is_unspent(output_ref)
}

/// Validate the tx against the current UTXO set.
Expand Down Expand Up @@ -980,6 +977,8 @@ impl Chain {

debug!("txhashset_write: replaced our txhashset with the new one");

self.rebuild_height_for_pos()?;

// Check for any orphan blocks and process them based on the new chain state.
self.check_orphans(header.height + 1);

Expand Down Expand Up @@ -1067,26 +1066,30 @@ impl Chain {
}
}

// Take a write lock on the txhashet and start a new writeable db batch.
let mut txhashset = self.txhashset.write();
let mut batch = self.store.batch()?;
{
// Take a write lock on the txhashet and start a new writeable db batch.
let mut txhashset = self.txhashset.write();
let mut batch = self.store.batch()?;

// Compact the txhashset itself (rewriting the pruned backend files).
txhashset.compact(&mut batch)?;
// Compact the txhashset itself (rewriting the pruned backend files).
txhashset.compact(&mut batch)?;

// Rebuild our output_pos index in the db based on current UTXO set.
txhashset::extending(&mut txhashset, &mut batch, |extension| {
extension.rebuild_index()?;
Ok(())
})?;
// Rebuild our output_pos index in the db based on current UTXO set.
txhashset::extending(&mut txhashset, &mut batch, |extension| {
extension.rebuild_index()?;
Ok(())
})?;

// If we are not in archival mode remove historical blocks from the db.
if !self.archive_mode {
self.remove_historical_blocks(&txhashset, &mut batch)?;
}

// If we are not in archival mode remove historical blocks from the db.
if !self.archive_mode {
self.remove_historical_blocks(&txhashset, &mut batch)?;
// Commit all the above db changes.
batch.commit()?;
}

// Commit all the above db changes.
batch.commit()?;
self.rebuild_height_for_pos()?;

Ok(())
}
Expand Down Expand Up @@ -1216,39 +1219,66 @@ impl Chain {
Ok(hash)
}

/// Gets the block header in which a given output appears in the txhashset.
pub fn get_header_for_output(
&self,
output_ref: &OutputIdentifier,
) -> Result<BlockHeader, Error> {
/// Migrate the index 'commitment -> output_pos' to index 'commitment -> (output_pos, block_height)'
/// Note: should only be called in two cases:
/// - Node start-up. For database migration from the old version.
/// - After the txhashset 'rebuild_index' when state syncing or compact.
pub fn rebuild_height_for_pos(&self) -> Result<(), Error> {
let txhashset = self.txhashset.read();
let mut outputs_pos = txhashset.get_all_output_pos()?;
let total_outputs = outputs_pos.len();
if total_outputs == 0 {
debug!("rebuild_height_for_pos: nothing to be rebuilt");
return Ok(());
} else {
debug!(
"rebuild_height_for_pos: rebuilding {} output_pos's height...",
total_outputs
);
}
outputs_pos.sort_by(|a, b| a.1.cmp(&b.1));

let (_, pos) = txhashset.is_unspent(output_ref)?;

let mut min = 0;
let mut max = {
let max_height = {
let head = self.head()?;
head.height
};

loop {
let search_height = max - (max - min) / 2;
let h = txhashset.get_header_by_height(search_height)?;
if search_height == 0 {
return Ok(h);
}
let h_prev = txhashset.get_header_by_height(search_height - 1)?;
if pos > h.output_mmr_size {
min = search_height;
} else if pos < h_prev.output_mmr_size {
max = search_height;
} else {
if pos == h_prev.output_mmr_size {
return Ok(h_prev);
let batch = self.store.batch()?;
// clear it before rebuilding
batch.clear_output_pos_height()?;

let mut i = 0;
for search_height in 0..max_height {
let h = txhashset.get_header_by_height(search_height + 1)?;
while i < total_outputs {
let (commit, pos) = outputs_pos[i];
if pos > h.output_mmr_size {
// Note: MMR position is 1-based and not 0-based, so here must be '>' instead of '>='
break;
}
return Ok(h);
batch.save_output_pos_height(&commit, pos, h.height)?;
trace!("rebuild_height_for_pos: {:?}", (commit, pos, h.height));
i += 1;
}
}

// clear the output_pos since now it has been replaced by the new index
batch.clear_output_pos()?;

batch.commit()?;
debug!("rebuild_height_for_pos: done");
Ok(())
}

/// Gets the block header in which a given output appears in the txhashset.
pub fn get_header_for_output(
&self,
output_ref: &OutputIdentifier,
) -> Result<BlockHeader, Error> {
let txhashset = self.txhashset.read();

let output_pos = txhashset.is_unspent(output_ref)?;
Ok(txhashset.get_header_by_height(output_pos.height)?)
}

/// Verifies the given block header is actually on the current chain.
Expand Down
85 changes: 80 additions & 5 deletions chain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const TAIL_PREFIX: u8 = 'T' as u8;
const HEADER_HEAD_PREFIX: u8 = 'I' as u8;
const SYNC_HEAD_PREFIX: u8 = 's' as u8;
const COMMIT_POS_PREFIX: u8 = 'c' as u8;
const COMMIT_POS_HGT_PREFIX: u8 = 'p' as u8;
const BLOCK_INPUT_BITMAP_PREFIX: u8 = 'B' as u8;
const BLOCK_SUMS_PREFIX: u8 = 'M' as u8;

Expand Down Expand Up @@ -111,11 +112,42 @@ impl ChainStore {
)
}

/// Get all outputs PMMR pos. (only for migration purpose)
pub fn get_all_output_pos(&self) -> Result<Vec<(Commitment, u64)>, Error> {
let mut outputs_pos = Vec::new();
let key = to_key(COMMIT_POS_PREFIX, &mut "".to_string().into_bytes());
for (k, pos) in self.db.iter::<u64>(&key)? {
outputs_pos.push((Commitment::from_vec(k[2..].to_vec()), pos));
}
Ok(outputs_pos)
}

/// Get PMMR pos for the given output commitment.
/// Note:
/// - Original prefix 'COMMIT_POS_PREFIX' is not used anymore for normal case, refer to #2889 for detail.
/// - To be compatible with the old callers, let's keep this function name but replace with new prefix 'COMMIT_POS_HGT_PREFIX'
pub fn get_output_pos(&self, commit: &Commitment) -> Result<u64, Error> {
let res: Result<Option<(u64, u64)>, Error> = self.db.get_ser(&to_key(
COMMIT_POS_HGT_PREFIX,
&mut commit.as_ref().to_vec(),
));
match res {
Ok(None) => Err(Error::NotFoundErr(format!(
"Output position for: {:?}",
commit
))),
Ok(Some((pos, _height))) => Ok(pos),
Err(e) => Err(e),
}
}

/// Get PMMR pos and block height for the given output commitment.
pub fn get_output_pos_height(&self, commit: &Commitment) -> Result<(u64, u64), Error> {
option_to_not_found(
self.db
.get_ser(&to_key(COMMIT_POS_PREFIX, &mut commit.as_ref().to_vec())),
self.db.get_ser(&to_key(
COMMIT_POS_HGT_PREFIX,
&mut commit.as_ref().to_vec(),
)),
&format!("Output position for: {:?}", commit),
)
}
Expand Down Expand Up @@ -253,16 +285,50 @@ impl<'a> Batch<'a> {
)
}

/// Save output_pos and block height to index.
pub fn save_output_pos_height(
&self,
commit: &Commitment,
pos: u64,
height: u64,
) -> Result<(), Error> {
self.db.put_ser(
&to_key(COMMIT_POS_HGT_PREFIX, &mut commit.as_ref().to_vec())[..],
&(pos, height),
)
}

/// Get output_pos from index.
/// Note:
/// - Original prefix 'COMMIT_POS_PREFIX' is not used for normal case anymore, refer to #2889 for detail.
/// - To be compatible with the old callers, let's keep this function name but replace with new prefix 'COMMIT_POS_HGT_PREFIX'
pub fn get_output_pos(&self, commit: &Commitment) -> Result<u64, Error> {
let res: Result<Option<(u64, u64)>, Error> = self.db.get_ser(&to_key(
COMMIT_POS_HGT_PREFIX,
&mut commit.as_ref().to_vec(),
));
match res {
Ok(None) => Err(Error::NotFoundErr(format!(
"Output position for: {:?}",
commit
))),
Ok(Some((pos, _height))) => Ok(pos),
Err(e) => Err(e),
}
}

/// Get output_pos and block height from index.
pub fn get_output_pos_height(&self, commit: &Commitment) -> Result<(u64, u64), Error> {
option_to_not_found(
self.db
.get_ser(&to_key(COMMIT_POS_PREFIX, &mut commit.as_ref().to_vec())),
self.db.get_ser(&to_key(
COMMIT_POS_HGT_PREFIX,
&mut commit.as_ref().to_vec(),
)),
&format!("Output position for commit: {:?}", commit),
)
}

/// Clear all entries from the output_pos index (must be rebuilt after).
/// Clear all entries from the output_pos index.
pub fn clear_output_pos(&self) -> Result<(), Error> {
let key = to_key(COMMIT_POS_PREFIX, &mut "".to_string().into_bytes());
for (k, _) in self.db.iter::<u64>(&key)? {
Expand All @@ -271,6 +337,15 @@ impl<'a> Batch<'a> {
Ok(())
}

/// Clear all entries from the (output_pos,height) index (must be rebuilt after).
pub fn clear_output_pos_height(&self) -> Result<(), Error> {
let key = to_key(COMMIT_POS_HGT_PREFIX, &mut "".to_string().into_bytes());
for (k, _) in self.db.iter::<(u64, u64)>(&key)? {
self.db.delete(&k)?;
}
Ok(())
}

/// Get the previous header.
pub fn get_previous_header(&self, header: &BlockHeader) -> Result<BlockHeader, Error> {
self.get_block_header(&header.prev_hash)
Expand Down
Loading

0 comments on commit d36a0b2

Please sign in to comment.