Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 129 additions & 14 deletions core/client/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ use hash_db::Hasher;
use kvdb::{KeyValueDB, DBTransaction};
use trie::MemoryDB;
use parking_lot::RwLock;
use primitives::{H256, AuthorityId, Blake2Hasher};
use primitives::{H256, AuthorityId, Blake2Hasher, ChangesTrieConfiguration};
use primitives::storage::well_known_keys;
use runtime_primitives::{generic::BlockId, Justification};
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, As, NumberFor, Zero, Digest, DigestItem};
use runtime_primitives::BuildStorage;
Expand All @@ -80,6 +81,7 @@ use state_db::StateDb;
pub use state_db::PruningMode;

const CANONICALIZATION_DELAY: u64 = 256;
const MIN_BLOCKS_TO_KEEP_CHANGES_TRIES_FOR: u64 = 32768;

/// DB-backed patricia trie state, transaction type is an overlay of changes to commit.
pub type DbState = state_machine::TrieBackend<Arc<state_machine::Storage<Blake2Hasher>>, Blake2Hasher>;
Expand Down Expand Up @@ -360,9 +362,42 @@ impl state_machine::Storage<Blake2Hasher> for DbGenesisStorage {

pub struct DbChangesTrieStorage<Block: BlockT> {
db: Arc<KeyValueDB>,
min_blocks_to_keep: Option<u64>,
_phantom: ::std::marker::PhantomData<Block>,
}

impl<Block: BlockT> DbChangesTrieStorage<Block> {
/// Commit new changes trie.
pub fn commit(&self, tx: &mut DBTransaction, mut changes_trie: MemoryDB<Blake2Hasher>) {
for (key, (val, _)) in changes_trie.drain() {
tx.put(columns::CHANGES_TRIE, &key[..], &val);
}
}

/// Prune obsolete changes tries.
pub fn prune(&self, config: Option<ChangesTrieConfiguration>, tx: &mut DBTransaction, block: NumberFor<Block>) {
// never prune on archive nodes
let min_blocks_to_keep = match self.min_blocks_to_keep {
Some(min_blocks_to_keep) => min_blocks_to_keep,
None => return,
};

// read configuration from the database. it is OK to do it here (without checking tx for
// modifications), since config can't change
let config = match config {
Some(config) => config,
None => return,
};

state_machine::prune_changes_tries(
&config,
&*self,
min_blocks_to_keep,
block.as_(),
|node| tx.delete(columns::CHANGES_TRIE, node.as_ref()));
}
}

impl<Block: BlockT> state_machine::ChangesTrieRootsStorage<Blake2Hasher> for DbChangesTrieStorage<Block> {
fn root(&self, block: u64) -> Result<Option<H256>, String> {
Ok(read_db::<Block>(&*self.db, columns::HASH_LOOKUP, columns::HEADER, BlockId::Number(As::sa(block)))
Expand All @@ -389,7 +424,7 @@ impl<Block: BlockT> state_machine::ChangesTrieStorage<Blake2Hasher> for DbChange
/// Otherwise, trie nodes are kept only from some recent blocks.
pub struct Backend<Block: BlockT> {
storage: Arc<StorageDb<Block>>,
tries_change_storage: DbChangesTrieStorage<Block>,
changes_tries_storage: DbChangesTrieStorage<Block>,
blockchain: BlockchainDb<Block>,
canonicalization_delay: u64,
}
Expand Down Expand Up @@ -418,21 +453,23 @@ impl<Block: BlockT> Backend<Block> {
}

fn from_kvdb(db: Arc<KeyValueDB>, pruning: PruningMode, canonicalization_delay: u64) -> Result<Self, client::error::Error> {
let is_archive_pruning = pruning.is_archive();
let blockchain = BlockchainDb::new(db.clone())?;
let map_e = |e: state_db::Error<io::Error>| ::client::error::Error::from(format!("State database error: {:?}", e));
let state_db: StateDb<Block::Hash, H256> = StateDb::new(pruning, &StateMetaDb(&*db)).map_err(map_e)?;
let storage_db = StorageDb {
db: db.clone(),
state_db,
};
let tries_change_storage = DbChangesTrieStorage {
let changes_tries_storage = DbChangesTrieStorage {
db,
min_blocks_to_keep: if is_archive_pruning { None } else { Some(MIN_BLOCKS_TO_KEEP_CHANGES_TRIES_FOR) },
_phantom: Default::default(),
};

Ok(Backend {
storage: Arc::new(storage_db),
tries_change_storage,
changes_tries_storage,
blockchain,
canonicalization_delay,
})
Expand Down Expand Up @@ -487,7 +524,8 @@ impl<Block: BlockT> Backend<Block> {
let f_num = f_header.number().clone();

if f_num.as_() > self.storage.state_db.best_canonical() {
if &meta.finalized_hash != f_header.parent_hash() {
let parent_hash = f_header.parent_hash().clone();
if meta.finalized_hash != parent_hash {
return Err(::client::error::ErrorKind::NonSequentialFinalization(
format!("Last finalized {:?} not parent of {:?}",
meta.finalized_hash, f_hash),
Expand All @@ -497,6 +535,13 @@ impl<Block: BlockT> Backend<Block> {

let commit = self.storage.state_db.canonicalize_block(&f_hash);
apply_state_commit(transaction, commit);

// read config from genesis, since it is readonly atm
use client::backend::Backend;
let changes_trie_config: Option<ChangesTrieConfiguration> = self.state_at(BlockId::Hash(parent_hash))?
.storage(well_known_keys::CHANGES_TRIE_CONFIG)?
.and_then(|v| Decode::decode(&mut &*v));
self.changes_tries_storage.prune(changes_trie_config, transaction, f_num);
}

Ok(())
Expand All @@ -518,12 +563,6 @@ fn apply_state_commit(transaction: &mut DBTransaction, commit: state_db::CommitS
}
}

fn apply_changes_trie_commit(transaction: &mut DBTransaction, mut commit: MemoryDB<Blake2Hasher>) {
for (key, (val, _)) in commit.drain() {
transaction.put(columns::CHANGES_TRIE, &key[..], &val);
}
}

impl<Block> client::backend::Backend<Block, Blake2Hasher> for Backend<Block> where Block: BlockT {
type BlockImportOperation = BlockImportOperation<Block, Blake2Hasher>;
type Blockchain = BlockchainDb<Block>;
Expand Down Expand Up @@ -618,7 +657,7 @@ impl<Block> client::backend::Backend<Block, Blake2Hasher> for Backend<Block> whe
let commit = self.storage.state_db.insert_block(&hash, number_u64, &pending_block.header.parent_hash(), changeset)
.map_err(|e: state_db::Error<io::Error>| client::error::Error::from(format!("State database error: {:?}", e)))?;
apply_state_commit(&mut transaction, commit);
apply_changes_trie_commit(&mut transaction, operation.changes_trie_updates);
self.changes_tries_storage.commit(&mut transaction, operation.changes_trie_updates);

let finalized = match pending_block.leaf_state {
NewBlockState::Final => true,
Expand Down Expand Up @@ -679,7 +718,7 @@ impl<Block> client::backend::Backend<Block, Blake2Hasher> for Backend<Block> whe
}

fn changes_trie_storage(&self) -> Option<&Self::ChangesTrieStorage> {
Some(&self.tries_change_storage)
Some(&self.changes_tries_storage)
}

fn revert(&self, n: NumberFor<Block>) -> Result<NumberFor<Block>, client::error::Error> {
Expand Down Expand Up @@ -1062,7 +1101,7 @@ mod tests {

let check_changes = |backend: &Backend<Block>, block: u64, changes: Vec<(Vec<u8>, Vec<u8>)>| {
let (changes_root, mut changes_trie_update) = prepare_changes(changes);
assert_eq!(backend.tries_change_storage.root(block), Ok(Some(changes_root)));
assert_eq!(backend.changes_tries_storage.root(block), Ok(Some(changes_root)));

for (key, (val, _)) in changes_trie_update.drain() {
assert_eq!(backend.changes_trie_storage().unwrap().get(&key), Ok(Some(val)));
Expand All @@ -1086,6 +1125,82 @@ mod tests {
check_changes(&backend, 2, changes2);
}

#[test]
fn changes_tries_are_pruned_on_finalization() {
let mut backend = Backend::<Block>::new_test(1000, 100);
backend.changes_tries_storage.min_blocks_to_keep = Some(8);
let config = ChangesTrieConfiguration {
digest_interval: 2,
digest_levels: 2,
};

// insert some blocks
let block0 = insert_header(&backend, 0, Default::default(), vec![(b"key_at_0".to_vec(), b"val_at_0".to_vec())], Default::default());
let block1 = insert_header(&backend, 1, block0, vec![(b"key_at_1".to_vec(), b"val_at_1".to_vec())], Default::default());
let block2 = insert_header(&backend, 2, block1, vec![(b"key_at_2".to_vec(), b"val_at_2".to_vec())], Default::default());
let block3 = insert_header(&backend, 3, block2, vec![(b"key_at_3".to_vec(), b"val_at_3".to_vec())], Default::default());
let block4 = insert_header(&backend, 4, block3, vec![(b"key_at_4".to_vec(), b"val_at_4".to_vec())], Default::default());
let block5 = insert_header(&backend, 5, block4, vec![(b"key_at_5".to_vec(), b"val_at_5".to_vec())], Default::default());
let block6 = insert_header(&backend, 6, block5, vec![(b"key_at_6".to_vec(), b"val_at_6".to_vec())], Default::default());
let block7 = insert_header(&backend, 7, block6, vec![(b"key_at_7".to_vec(), b"val_at_7".to_vec())], Default::default());
let block8 = insert_header(&backend, 8, block7, vec![(b"key_at_8".to_vec(), b"val_at_8".to_vec())], Default::default());
let block9 = insert_header(&backend, 9, block8, vec![(b"key_at_9".to_vec(), b"val_at_9".to_vec())], Default::default());
let block10 = insert_header(&backend, 10, block9, vec![(b"key_at_10".to_vec(), b"val_at_10".to_vec())], Default::default());
let block11 = insert_header(&backend, 11, block10, vec![(b"key_at_11".to_vec(), b"val_at_11".to_vec())], Default::default());
let _ = insert_header(&backend, 12, block11, vec![(b"key_at_12".to_vec(), b"val_at_12".to_vec())], Default::default());

// check that roots of all tries are in the columns::CHANGES_TRIE
fn read_changes_trie_root(backend: &Backend<Block>, num: u64) -> H256 {
backend.blockchain().header(BlockId::Number(num)).unwrap().unwrap().digest().logs().iter()
.find(|i| i.as_changes_trie_root().is_some()).unwrap().as_changes_trie_root().unwrap().clone()
}
let root1 = read_changes_trie_root(&backend, 1); assert_eq!(backend.changes_tries_storage.root(1).unwrap(), Some(root1));
let root2 = read_changes_trie_root(&backend, 2); assert_eq!(backend.changes_tries_storage.root(2).unwrap(), Some(root2));
let root3 = read_changes_trie_root(&backend, 3); assert_eq!(backend.changes_tries_storage.root(3).unwrap(), Some(root3));
let root4 = read_changes_trie_root(&backend, 4); assert_eq!(backend.changes_tries_storage.root(4).unwrap(), Some(root4));
let root5 = read_changes_trie_root(&backend, 5); assert_eq!(backend.changes_tries_storage.root(5).unwrap(), Some(root5));
let root6 = read_changes_trie_root(&backend, 6); assert_eq!(backend.changes_tries_storage.root(6).unwrap(), Some(root6));
let root7 = read_changes_trie_root(&backend, 7); assert_eq!(backend.changes_tries_storage.root(7).unwrap(), Some(root7));
let root8 = read_changes_trie_root(&backend, 8); assert_eq!(backend.changes_tries_storage.root(8).unwrap(), Some(root8));
let root9 = read_changes_trie_root(&backend, 9); assert_eq!(backend.changes_tries_storage.root(9).unwrap(), Some(root9));
let root10 = read_changes_trie_root(&backend, 10); assert_eq!(backend.changes_tries_storage.root(10).unwrap(), Some(root10));
let root11 = read_changes_trie_root(&backend, 11); assert_eq!(backend.changes_tries_storage.root(11).unwrap(), Some(root11));
let root12 = read_changes_trie_root(&backend, 12); assert_eq!(backend.changes_tries_storage.root(12).unwrap(), Some(root12));

// now simulate finalization of block#12, causing prune of tries at #1..#4
let mut tx = DBTransaction::new();
backend.changes_tries_storage.prune(Some(config.clone()), &mut tx, 12);
backend.storage.db.write(tx).unwrap();
assert!(backend.changes_tries_storage.get(&root1).unwrap().is_none());
assert!(backend.changes_tries_storage.get(&root2).unwrap().is_none());
assert!(backend.changes_tries_storage.get(&root3).unwrap().is_none());
assert!(backend.changes_tries_storage.get(&root4).unwrap().is_none());
assert!(backend.changes_tries_storage.get(&root5).unwrap().is_some());
assert!(backend.changes_tries_storage.get(&root6).unwrap().is_some());
assert!(backend.changes_tries_storage.get(&root7).unwrap().is_some());
assert!(backend.changes_tries_storage.get(&root8).unwrap().is_some());

// now simulate finalization of block#16, causing prune of tries at #5..#8
let mut tx = DBTransaction::new();
backend.changes_tries_storage.prune(Some(config.clone()), &mut tx, 16);
backend.storage.db.write(tx).unwrap();
assert!(backend.changes_tries_storage.get(&root5).unwrap().is_none());
assert!(backend.changes_tries_storage.get(&root6).unwrap().is_none());
assert!(backend.changes_tries_storage.get(&root7).unwrap().is_none());
assert!(backend.changes_tries_storage.get(&root8).unwrap().is_none());

// now "change" pruning mode to archive && simulate finalization of block#20
// => no changes tries are pruned, because we never prune in archive mode
backend.changes_tries_storage.min_blocks_to_keep = None;
let mut tx = DBTransaction::new();
backend.changes_tries_storage.prune(Some(config), &mut tx, 20);
backend.storage.db.write(tx).unwrap();
assert!(backend.changes_tries_storage.get(&root9).unwrap().is_some());
assert!(backend.changes_tries_storage.get(&root10).unwrap().is_some());
assert!(backend.changes_tries_storage.get(&root11).unwrap().is_some());
assert!(backend.changes_tries_storage.get(&root12).unwrap().is_some());
}

#[test]
fn tree_route_works() {
let backend = Backend::<Block>::new_test(1000, 100);
Expand Down
97 changes: 97 additions & 0 deletions core/primitives/src/changes_trie.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,100 @@ pub struct ChangesTrieConfiguration {
/// 2 means that every digest_interval^2 there will be a level2-digest, and so on.
pub digest_levels: u32,
}

impl ChangesTrieConfiguration {
/// Is digest build enabled?
pub fn is_digest_build_enabled(&self) -> bool {
self.digest_interval > 1 && self.digest_levels > 0
}

/// Do we need to build digest at given block?
pub fn is_digest_build_required_at_block(&self, block: u64) -> bool {
block != 0
&& self.is_digest_build_enabled()
&& block % self.digest_interval == 0
}

/// Returns Some if digest must be built at given block number.
/// The tuple is:
/// (
/// digest level
/// digest interval (in blocks)
/// step between blocks we're interested in when digest is built
/// )
pub fn digest_level_at_block(&self, block: u64) -> Option<(u32, u64, u64)> {
if !self.is_digest_build_required_at_block(block) {
return None;
}

let mut digest_interval = self.digest_interval;
let mut current_level = 1u32;
let mut digest_step = 1u64;
while current_level < self.digest_levels {
let new_digest_interval = match digest_interval.checked_mul(self.digest_interval) {
Some(new_digest_interval) if block % new_digest_interval == 0 => new_digest_interval,
_ => break,
};

digest_step = digest_interval;
digest_interval = new_digest_interval;
current_level = current_level + 1;
}

Some((
current_level,
digest_interval,
digest_step,
))
}
}

#[cfg(test)]
mod tests {
use super::ChangesTrieConfiguration;

fn config(interval: u64, levels: u32) -> ChangesTrieConfiguration {
ChangesTrieConfiguration {
digest_interval: interval,
digest_levels: levels,
}
}

#[test]
fn is_digest_build_enabled_works() {
assert!(!config(0, 100).is_digest_build_enabled());
assert!(!config(1, 100).is_digest_build_enabled());
assert!(config(2, 100).is_digest_build_enabled());
assert!(!config(100, 0).is_digest_build_enabled());
assert!(config(100, 1).is_digest_build_enabled());
}

#[test]
fn is_digest_build_required_at_block_works() {
assert!(!config(8, 4).is_digest_build_required_at_block(0));
assert!(!config(8, 4).is_digest_build_required_at_block(1));
assert!(!config(8, 4).is_digest_build_required_at_block(2));
assert!(!config(8, 4).is_digest_build_required_at_block(4));
assert!(config(8, 4).is_digest_build_required_at_block(8));
assert!(!config(8, 4).is_digest_build_required_at_block(9));
assert!(config(8, 4).is_digest_build_required_at_block(64));
assert!(config(8, 4).is_digest_build_required_at_block(64));
assert!(config(8, 4).is_digest_build_required_at_block(512));
assert!(config(8, 4).is_digest_build_required_at_block(4096));
assert!(!config(8, 4).is_digest_build_required_at_block(4103));
assert!(config(8, 4).is_digest_build_required_at_block(4104));
assert!(!config(8, 4).is_digest_build_required_at_block(4108));
}

#[test]
fn digest_level_at_block_works() {
assert_eq!(config(8, 4).digest_level_at_block(0), None);
assert_eq!(config(8, 4).digest_level_at_block(7), None);
assert_eq!(config(8, 4).digest_level_at_block(63), None);
assert_eq!(config(8, 4).digest_level_at_block(8), Some((1, 8, 1)));
assert_eq!(config(8, 4).digest_level_at_block(64), Some((2, 64, 8)));
assert_eq!(config(8, 4).digest_level_at_block(512), Some((3, 512, 64)));
assert_eq!(config(8, 4).digest_level_at_block(4096), Some((4, 4096, 512)));
assert_eq!(config(8, 4).digest_level_at_block(4112), Some((1, 8, 1)));
}
}
9 changes: 9 additions & 0 deletions core/state-db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,15 @@ impl PruningMode {
max_mem: None,
})
}

/// Is this an archive (either ArchiveAll or ArchiveCanonical) pruning mode?
pub fn is_archive(&self) -> bool {
match *self {
PruningMode::ArchiveAll | PruningMode::ArchiveCanonical => true,
PruningMode::Constrained(_) => false
}
}

}

impl Default for PruningMode {
Expand Down
3 changes: 1 addition & 2 deletions core/state-machine/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,7 @@ pub(crate) fn insert_into_memory_db<H, I>(mdb: &mut MemoryDB<H>, input: I) -> Op
where
H: Hasher,
H::Out: HeapSizeOf,

I: Iterator<Item=(Vec<u8>, Vec<u8>)>,
I: IntoIterator<Item=(Vec<u8>, Vec<u8>)>,
{
let mut root = <H as Hasher>::Out::default();
{
Expand Down
Loading