Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions substrate/client/state-db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
//! Canonicalization window tracks a tree of blocks identified by header hash. The in-memory
//! overlay allows to get any trie node that was inserted in any of the blocks within the window.
//! The overlay is journaled to the backing database and rebuilt on startup.
//! There's a limit of 32 blocks that may have the same block number in the canonicalization window.
//!
//! Canonicalization function selects one root from the top of the tree and discards all other roots
//! and their subtrees. Upon canonicalization all trie nodes that were inserted in the block are
Expand Down Expand Up @@ -135,8 +134,6 @@ pub enum StateDbError {
InvalidParent,
/// Invalid pruning mode specified. Contains expected mode.
IncompatiblePruningModes { stored: PruningMode, requested: PruningMode },
/// Too many unfinalized sibling blocks inserted.
TooManySiblingBlocks { number: u64 },
/// Trying to insert existing block.
BlockAlreadyExists,
/// Invalid metadata
Expand Down Expand Up @@ -187,9 +184,6 @@ impl fmt::Debug for StateDbError {
"Incompatible pruning modes [stored: {:?}; requested: {:?}]",
stored, requested
),
Self::TooManySiblingBlocks { number } => {
write!(f, "Too many sibling blocks at #{number} inserted")
},
Self::BlockAlreadyExists => write!(f, "Block already exists"),
Self::Metadata(message) => write!(f, "Invalid metadata: {}", message),
Self::BlockUnavailable => {
Expand Down
170 changes: 147 additions & 23 deletions substrate/client/state-db/src/noncanonical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,15 @@ use crate::{LOG_TARGET, LOG_TARGET_PIN};
use super::{to_meta_key, ChangeSet, CommitSet, DBValue, Error, Hash, MetaDb, StateDbError};
use codec::{Decode, Encode};
use log::trace;
use std::collections::{hash_map::Entry, HashMap, VecDeque};
use std::collections::{hash_map::Entry, BTreeSet, HashMap, VecDeque};

const NON_CANONICAL_JOURNAL: &[u8] = b"noncanonical_journal";
pub(crate) const LAST_CANONICAL: &[u8] = b"last_canonical";
const MAX_BLOCKS_PER_LEVEL: u64 = 32;
const OVERLAY_LEVEL_SPAN: &[u8] = b"noncanonical_overlay_span";
/// Threshold for storing overlay level span in db.
/// To decrease the number of database operations, the span of some overlay
/// level will be committed to the database iff span > `OVERLAY_LEVEL_STORE_SPANS_LONGER_THAN`
const OVERLAY_LEVEL_STORE_SPANS_LONGER_THAN: u64 = 32;

/// See module documentation.
pub struct NonCanonicalOverlay<BlockHash: Hash, Key: Hash> {
Expand All @@ -46,26 +50,33 @@ pub struct NonCanonicalOverlay<BlockHash: Hash, Key: Hash> {
#[cfg_attr(test, derive(PartialEq, Debug))]
struct OverlayLevel<BlockHash: Hash, Key: Hash> {
blocks: Vec<BlockOverlay<BlockHash, Key>>,
used_indicies: u64, // Bitmask of available journal indicies.
span: u64, // largest index ever used + 1 (or 0 when None)
available_indices: BTreeSet<u64>, // available indices in [0; span)
}

impl<BlockHash: Hash, Key: Hash> OverlayLevel<BlockHash, Key> {
fn push(&mut self, overlay: BlockOverlay<BlockHash, Key>) {
self.used_indicies |= 1 << overlay.journal_index;
self.span = self.span.max(overlay.journal_index + 1);
self.available_indices.remove(&overlay.journal_index);
self.blocks.push(overlay)
}

fn available_index(&self) -> u64 {
self.used_indicies.trailing_ones() as u64
*self.available_indices.first().unwrap_or(&self.span)
}

fn span(&self) -> u64 {
self.span
}

fn remove(&mut self, index: usize) -> BlockOverlay<BlockHash, Key> {
self.used_indicies &= !(1 << self.blocks[index].journal_index);
self.available_indices.insert(self.blocks[index].journal_index);
self.blocks.remove(index)
}

fn new() -> OverlayLevel<BlockHash, Key> {
OverlayLevel { blocks: Vec::new(), used_indicies: 0 }
fn new_with_span(span: u64) -> OverlayLevel<BlockHash, Key> {
let available_indices = (0..span).collect::<BTreeSet<_>>();
OverlayLevel { blocks: Vec::new(), span, available_indices }
}
}

Expand Down Expand Up @@ -189,8 +200,15 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
let mut total: u64 = 0;
block += 1;
loop {
let mut level = OverlayLevel::new();
for index in 0..MAX_BLOCKS_PER_LEVEL {
let level_span = db
.get_meta(&to_meta_key(OVERLAY_LEVEL_SPAN, &block))
.map_err(Error::Db)?
.map(|data| Decode::decode(&mut data.as_slice()))
.transpose()?;
// Since we don't update the overlay level span on block removal,
// we have to restore it there
let mut level = OverlayLevel::new_with_span(level_span.unwrap_or(0));
for index in 0..level_span.unwrap_or(OVERLAY_LEVEL_STORE_SPANS_LONGER_THAN) {
let journal_key = to_journal_key(block, index);
if let Some(record) = db.get_meta(&journal_key).map_err(Error::Db)? {
let record: JournalRecord<BlockHash, Key> =
Expand Down Expand Up @@ -241,8 +259,8 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
})
}

/// Insert a new block into the overlay. If inserted on the second level or lover expects parent
/// to be present in the window.
/// Insert a new block into the overlay. If inserted on the second level or lower,
/// expects parent to be present in the window.
pub fn insert(
&mut self,
hash: &BlockHash,
Expand Down Expand Up @@ -288,28 +306,25 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
let level = if self.levels.is_empty() ||
number == front_block_number + self.levels.len() as u64
{
self.levels.push_back(OverlayLevel::new());
self.levels.push_back(OverlayLevel::new_with_span(0));
self.levels.back_mut().expect("can't be empty after insertion; qed")
} else {
self.levels.get_mut((number - front_block_number) as usize)
.expect("number is [front_block_number .. front_block_number + levels.len()) is asserted in precondition; qed")
};

if level.blocks.len() >= MAX_BLOCKS_PER_LEVEL as usize {
trace!(
target: LOG_TARGET,
"Too many sibling blocks at #{number}: {:?}",
level.blocks.iter().map(|b| &b.hash).collect::<Vec<_>>()
);
return Err(StateDbError::TooManySiblingBlocks { number })
}
if level.blocks.iter().any(|b| b.hash == *hash) {
return Err(StateDbError::BlockAlreadyExists)
}

let index = level.available_index();
let journal_key = to_journal_key(number, index);
if index >= OVERLAY_LEVEL_STORE_SPANS_LONGER_THAN && index == level.span() {
let key = to_meta_key(OVERLAY_LEVEL_SPAN, &number);
let new_span = index + 1;
commit.meta.inserted.push((key, new_span.encode()));
}

let journal_key = to_journal_key(number, index);
let inserted = changeset.inserted.iter().map(|(k, _)| k.clone()).collect();
let overlay = BlockOverlay {
hash: hash.clone(),
Expand Down Expand Up @@ -407,6 +422,7 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
self.pinned_canonincalized.push(hash.clone());

let mut discarded_journals = Vec::new();
let span = level.span();
for (i, overlay) in level.blocks.into_iter().enumerate() {
let mut pinned_children = 0;
// That's the one we need to canonicalize
Expand Down Expand Up @@ -447,6 +463,10 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
discarded_journals.push(overlay.journal_key.clone());
}
commit.meta.deleted.append(&mut discarded_journals);
if span > OVERLAY_LEVEL_STORE_SPANS_LONGER_THAN {
let key = to_meta_key(OVERLAY_LEVEL_SPAN, &self.front_block_number());
commit.meta.deleted.push(key);
}

let canonicalized = (hash.clone(), self.front_block_number());
commit
Expand Down Expand Up @@ -477,13 +497,18 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
/// Revert a single level. Returns commit set that deletes the journal or `None` if not
/// possible.
pub fn revert_one(&mut self) -> Option<CommitSet<Key>> {
let last_block_number = self.front_block_number() + self.levels.len() as u64 - 1;
self.levels.pop_back().map(|level| {
let mut commit = CommitSet::default();
for overlay in level.blocks.into_iter() {
commit.meta.deleted.push(overlay.journal_key);
self.parents.remove(&overlay.hash);
discard_values(&mut self.values, overlay.inserted);
}
if level.span > OVERLAY_LEVEL_STORE_SPANS_LONGER_THAN {
let key = to_meta_key(OVERLAY_LEVEL_SPAN, &last_block_number);
commit.meta.deleted.push(key);
}
commit
})
}
Expand All @@ -510,6 +535,11 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
break
}
if self.levels.back().map_or(false, |l| l.blocks.is_empty()) {
let last_block_number = self.front_block_number() + level_count as u64 - 1;
if self.levels.back().map_or(0, |l| l.span) > OVERLAY_LEVEL_STORE_SPANS_LONGER_THAN {
let key = to_meta_key(OVERLAY_LEVEL_SPAN, &last_block_number);
commit.meta.deleted.push(key);
}
self.levels.pop_back();
}
if !commit.meta.deleted.is_empty() {
Expand Down Expand Up @@ -572,10 +602,13 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
mod tests {
use super::{to_journal_key, NonCanonicalOverlay};
use crate::{
noncanonical::{LAST_CANONICAL, OVERLAY_LEVEL_SPAN, OVERLAY_LEVEL_STORE_SPANS_LONGER_THAN},
test::{make_changeset, make_db},
ChangeSet, CommitSet, MetaDb, StateDbError,
to_meta_key, ChangeSet, CommitSet, MetaDb, StateDbError,
};
use codec::Encode;
use sp_core::H256;
use std::collections::HashSet;

fn contains(overlay: &NonCanonicalOverlay<H256, H256>, key: u64) -> bool {
overlay.get(&H256::from_low_u64_be(key)) ==
Expand Down Expand Up @@ -1128,4 +1161,95 @@ mod tests {
db.commit(&overlay.remove(&h2).unwrap());
assert!(!contains(&overlay, 2));
}

#[test]
fn insert_canonicalize_long_span() {
let mut db = make_db(&[]);
let mut overlay = NonCanonicalOverlay::<(u64, u64), H256>::new(&db).unwrap();
let indices = 130;

for i in 0..indices {
let changeset = make_changeset(&[i], &[]);
let insertion = overlay.insert(&(1, i), 1, &(0, 0), changeset).unwrap();
db.commit(&insertion);
let expected_insertion_meta_len = match i {
0 => 2, // last canonical, journal key
OVERLAY_LEVEL_STORE_SPANS_LONGER_THAN.. => 2, // journal key, span
_ => 1, // just a journal key
};
assert_eq!(insertion.meta.inserted.len(), expected_insertion_meta_len);
}

let mut commit = CommitSet::default();
overlay.canonicalize(&(1, indices - 1), &mut commit).unwrap();

let expected_meta_deleted = (0..indices)
.map(|i| to_journal_key(1, i))
.chain([to_meta_key(OVERLAY_LEVEL_SPAN, &1u64)])
.collect::<HashSet<_>>();
assert_eq!(commit.meta.inserted.len(), 1);
assert_eq!(HashSet::from_iter(commit.meta.deleted), expected_meta_deleted);
}

#[test]
fn restore_from_journal_long_span() {
let mut db = make_db(&[]);
let mut overlay = NonCanonicalOverlay::<(u64, u64), H256>::new(&db).unwrap();
let indices = OVERLAY_LEVEL_STORE_SPANS_LONGER_THAN + 2;

for i in 0..indices {
let changeset = make_changeset(&[i], &[]);
let insertion = overlay.insert(&(1, i), 1, &(0, 0), changeset).unwrap();
db.commit(&insertion);
}

for i in 0..indices - 1 {
db.commit(&overlay.remove(&(1, i)).unwrap());
}

// Restore into a new overlay and check that journaled value exists.
let mut overlay = NonCanonicalOverlay::<(u64, u64), H256>::new(&db).unwrap();
let mut commit = CommitSet::default();
overlay.canonicalize(&(1, indices - 1), &mut commit).unwrap();
db.commit(&commit);

assert!(db.data_eq(&make_db(&[indices - 1])));
assert_eq!(commit.meta.inserted.len(), 1);
assert_eq!(commit.meta.deleted.len(), 2); // journal key, span
}

#[test]
fn overlay_level_span_entries_are_cleared_from_db() {
let mut db = make_db(&[]);
let mut overlay = NonCanonicalOverlay::<(u64, u64), H256>::new(&db).unwrap();
let indices_per_level = OVERLAY_LEVEL_STORE_SPANS_LONGER_THAN + 1;
for level in 1..4 {
for index in 0..indices_per_level {
let changeset = make_changeset(&[1], &[]);
let insertion =
overlay.insert(&(level, index), level, &(level - 1, 0), changeset).unwrap();
db.commit(&insertion);
}
}

// leave only the 1st level, with not too many blocks, and restore into a new overlay
db.commit(&overlay.revert_one().unwrap());
for index in 0..indices_per_level {
db.commit(&overlay.remove(&(2, index)).unwrap());
}
for index in OVERLAY_LEVEL_STORE_SPANS_LONGER_THAN..indices_per_level {
db.commit(&overlay.remove(&(1, index)).unwrap());
}
let mut overlay = NonCanonicalOverlay::<(u64, u64), H256>::new(&db).unwrap();

let mut commit = CommitSet::default();
overlay.canonicalize(&(1, 1), &mut commit).unwrap();
db.commit(&commit);

assert_eq!(db.meta_len(), 1);
assert_eq!(
db.get_meta(&to_meta_key(LAST_CANONICAL, &())),
Ok(Some((&(1u64, 1u64), 1u64).encode()))
);
}
}