diff --git a/client/api/src/backend.rs b/client/api/src/backend.rs index bcc7a9bff3b2d..c7bff673d80ac 100644 --- a/client/api/src/backend.rs +++ b/client/api/src/backend.rs @@ -437,12 +437,24 @@ pub trait StorageProvider> { /// /// Manages the data layer. /// -/// Note on state pruning: while an object from `state_at` is alive, the state +/// # State Pruning +/// +/// While an object from `state_at` is alive, the state /// should not be pruned. The backend should internally reference-count /// its state objects. /// /// The same applies for live `BlockImportOperation`s: while an import operation building on a /// parent `P` is alive, the state for `P` should not be pruned. +/// +/// # Block Pruning +/// +/// Users can prevent a block from being pruned via calling `pin_block`. +/// After an equal number of `unpin_block` calls, the block can be pruned in +/// the next pruning window. +/// +/// While a block is pinned, its state is also preserved. +/// +/// The backend should internally reference count the number of pin / unpin calls. pub trait Backend: AuxStore + Send + Sync { /// Associated block insertion operation type. type BlockImportOperation: BlockImportOperation; @@ -503,6 +515,12 @@ pub trait Backend: AuxStore + Send + Sync { /// Returns a handle to offchain storage. fn offchain_storage(&self) -> Option; + /// Pin the block to prevent pruning. + fn pin_block(&self, hash: &Block::Hash) -> sp_blockchain::Result<()>; + + /// Unpin the block to allow pruning. + fn unpin_block(&self, hash: &Block::Hash) -> sp_blockchain::Result<()>; + /// Returns true if state for given block is available. fn have_state_at(&self, hash: &Block::Hash, _number: NumberFor) -> bool { self.state_at(BlockId::Hash(*hash)).is_ok() diff --git a/client/api/src/client.rs b/client/api/src/client.rs index b809e0ee61032..2b0a7a2d21f3b 100644 --- a/client/api/src/client.rs +++ b/client/api/src/client.rs @@ -123,6 +123,12 @@ pub trait BlockBackend { id: &BlockId, ) -> sp_blockchain::Result>>>; + /// Pin the block to prevent pruning. + fn pin_block(&self, hash: &Block::Hash) -> sp_blockchain::Result<()>; + + /// Unpin the block to allow pruning. + fn unpin_block(&self, hash: &Block::Hash) -> sp_blockchain::Result<()>; + /// Get full block by id. fn block(&self, id: &BlockId) -> sp_blockchain::Result>>; diff --git a/client/api/src/in_mem.rs b/client/api/src/in_mem.rs index 9000f62aa6cc3..773ce06217bdb 100644 --- a/client/api/src/in_mem.rs +++ b/client/api/src/in_mem.rs @@ -768,6 +768,14 @@ where None } + fn pin_block(&self, _hash: &Block::Hash) -> sp_blockchain::Result<()> { + Ok(()) + } + + fn unpin_block(&self, _hash: &Block::Hash) -> sp_blockchain::Result<()> { + Ok(()) + } + fn state_at(&self, block: BlockId) -> sp_blockchain::Result { match block { BlockId::Hash(h) if h == Default::default() => return Ok(Self::State::default()), diff --git a/client/db/src/lib.rs b/client/db/src/lib.rs index 32c4c9ef85ed9..9002b8e043833 100644 --- a/client/db/src/lib.rs +++ b/client/db/src/lib.rs @@ -44,7 +44,7 @@ use linked_hash_map::LinkedHashMap; use log::{debug, trace, warn}; use parking_lot::{Mutex, RwLock}; use std::{ - collections::{HashMap, HashSet}, + collections::{hash_map::Entry, HashMap, HashSet}, io, path::{Path, PathBuf}, sync::Arc, @@ -1019,6 +1019,46 @@ impl FrozenForDuration { } } +/// The state of the pinned block. +struct PinnedBlockState { + /// The number of active users that need this block alive. + ref_count: u64, + /// True if the block was pruned by any previous finalization. + /// + /// # Note + /// + /// The block must be pruned with the pruning window + /// if and only if this field is set and the reference count drops to zero. + /// + /// Added to avoid the following edge-case: + /// + /// Block tree: + /// G ... X -> A1 + /// X -> A2 + /// + /// - user1: pin_block A1 + /// - user2: pin_block A1 + /// - user1: unpin_block A1 + /// - user2: unpin_block A1 + /// + /// - block A2 is finalized (A1 is considered a fork) + /// - finalize block A2 + /// - block A1 should be pruned only once + /// + /// Therefore, not every block that has its reference count + /// equal to zero should also be pruned with the next finalization. + was_pruned: bool, + /// The state of the block as tracked by the `state-db`. + _state: RefTrackingState, +} + +impl PinnedBlockState { + /// Construct a new [`PinnedBlockState`]. + fn new(state: RefTrackingState) -> Self { + Self { ref_count: 1, was_pruned: false, _state: state } + } +} + /// Disk backend. /// /// Disk backend keeps data in a key-value store. In archive mode, trie nodes are kept from all @@ -1035,6 +1075,19 @@ pub struct Backend { state_usage: Arc, genesis_state: RwLock>>>, shared_trie_cache: Option>>, + /// Keep track of the pinned blocks. A block is pinned when calling + /// [`sc_client_api::backend::Backend::pin_block`]. + /// + /// The pruning of these blocks is delayed for as long as their reference count is positive. + /// + /// # Note + /// + /// Block hashes from here are moved to the pruning queue when: + /// - [`PinnedBlockState::ref_count`] equals to zero + /// - the block was previously marked as pruned [`PinnedBlockState::was_pruned`] + pinned_blocks: Arc>>>, + /// Blocks added here will be pruned on the next finalization. + pruning_queue: Arc>>, } impl Backend { @@ -1151,6 +1204,8 @@ impl Backend { shared_trie_cache: config.trie_cache_maximum_size.map(|maximum_size| { SharedTrieCache::new(sp_trie::cache::CacheSize::Maximum(maximum_size)) }), + pinned_blocks: Default::default(), + pruning_queue: Default::default(), }; // Older DB versions have no last state key. Check if the state is available and set it. @@ -1761,6 +1816,14 @@ impl Backend { } } } + + // Also discard all previously pinned blocks + let mut blocks = self.pruning_queue.lock(); + for hash in &*blocks { + let id = BlockId::::hash(*hash); + self.prune_block(transaction, id)?; + } + blocks.clear(); Ok(()) } @@ -1769,6 +1832,13 @@ impl Backend { transaction: &mut Transaction, id: BlockId, ) -> ClientResult<()> { + if self.should_delay_pruning(id).unwrap_or(false) { + // Trace for easily identifying `db-pin` paths only. + trace!(target: "db-pin", "Not pruning pinned block #{}", id); + debug!(target: "db", "Not pruning pinned block #{}", id); + return Ok(()) + } + debug!(target: "db", "Removing block #{}", id); utils::remove_from_db( transaction, @@ -1819,6 +1889,92 @@ impl Backend { let state = RefTrackingState::new(db_state, self.storage.clone(), None); Ok(RecordStatsState::new(state, None, self.state_usage.clone())) } + + /// Return true if the pruning should be delayed for the provided block. + /// + /// If the given block is part of the `pinned_blocks`, then sets the block's `was_pruned` + /// flag to be later included to the pruning queue when its reference count drops + /// to zero. + /// + /// If the given block is not part of the `pinned_blocks`, then return false to complete + /// the pruning immediately. + fn should_delay_pruning(&self, block: BlockId) -> ClientResult { + if self.blocks_pruning == BlocksPruning::KeepAll { + return Ok(false) + } + + let hash = match block { + BlockId::Hash(h) => h, + BlockId::Number(n) => self.blockchain.hash(n)?.ok_or_else(|| { + sp_blockchain::Error::UnknownBlock(format!("Unknown block number {}", n)) + })?, + }; + + let mut cache = self.pinned_blocks.lock(); + let res = if let Some(entry) = cache.get_mut(&hash) { + trace!(target: "db-pin", "Pinned block: {:?} delay pruning", hash); + entry.was_pruned = true; + true + } else { + false + }; + Ok(res) + } + + fn state_at_ref(&self, block: BlockId) -> ClientResult> { + use sc_client_api::blockchain::HeaderBackend as BcHeaderBackend; + + let is_genesis = match &block { + BlockId::Number(n) if n.is_zero() => true, + BlockId::Hash(h) if h == &self.blockchain.meta.read().genesis_hash => true, + _ => false, + }; + if is_genesis { + if let Some(genesis_state) = &*self.genesis_state.read() { + let root = genesis_state.root; + let db_state = DbStateBuilder::::new(genesis_state.clone(), root) + .with_optional_cache(self.shared_trie_cache.as_ref().map(|c| c.local_cache())) + .build(); + + return Ok(RefTrackingState::new(db_state, self.storage.clone(), None)) + } + } + + let hash = match block { + BlockId::Hash(h) => h, + BlockId::Number(n) => self.blockchain.hash(n)?.ok_or_else(|| { + sp_blockchain::Error::UnknownBlock(format!("Unknown block number {}", n)) + })?, + }; + + match self.blockchain.header_metadata(hash) { + Ok(ref hdr) => { + let hint = || { + sc_state_db::NodeDb::get(self.storage.as_ref(), hdr.state_root.as_ref()) + .unwrap_or(None) + .is_some() + }; + if let Ok(()) = + self.storage.state_db.pin(&hash, hdr.number.saturated_into::(), hint) + { + let root = hdr.state_root; + let db_state = DbStateBuilder::::new(self.storage.clone(), root) + .with_optional_cache( + self.shared_trie_cache.as_ref().map(|c| c.local_cache()), + ) + .build(); + + Ok(RefTrackingState::new(db_state, self.storage.clone(), Some(hash))) + } else { + Err(sp_blockchain::Error::UnknownBlock(format!( + "State already discarded for {:?}", + block + ))) + } + }, + Err(e) => Err(e), + } + } } fn apply_state_commit( @@ -2302,60 +2458,72 @@ impl sc_client_api::backend::Backend for Backend { &self.blockchain } - fn state_at(&self, block: BlockId) -> ClientResult { - use sc_client_api::blockchain::HeaderBackend as BcHeaderBackend; + fn pin_block(&self, hash: &Block::Hash) -> sp_blockchain::Result<()> { + if self.blocks_pruning == BlocksPruning::KeepAll { + return Ok(()) + } - let is_genesis = match &block { - BlockId::Number(n) if n.is_zero() => true, - BlockId::Hash(h) if h == &self.blockchain.meta.read().genesis_hash => true, - _ => false, - }; - if is_genesis { - if let Some(genesis_state) = &*self.genesis_state.read() { - let root = genesis_state.root; - let db_state = DbStateBuilder::::new(genesis_state.clone(), root) - .with_optional_cache(self.shared_trie_cache.as_ref().map(|c| c.local_cache())) - .build(); + let mut cache = self.pinned_blocks.lock(); + match cache.entry(*hash) { + Entry::Occupied(mut entry) => { + let state = entry.get_mut(); + state.ref_count = state.ref_count.saturating_add(1); + }, + Entry::Vacant(entry) => { + // The `state_at_ref` verifies if the block exists. + let state = self.state_at_ref(BlockId::hash(*hash))?; + let state = PinnedBlockState::new(state); + trace!(target: "db-pin", "Pinned block: {:?}", hash); + entry.insert(state); + }, + } + + Ok(()) + } + + fn unpin_block(&self, hash: &Block::Hash) -> sp_blockchain::Result<()> { + if self.blocks_pruning == BlocksPruning::KeepAll { + return Ok(()) + } - let state = RefTrackingState::new(db_state, self.storage.clone(), None); - return Ok(RecordStatsState::new(state, None, self.state_usage.clone())) + let mut cache = self.pinned_blocks.lock(); + if let Entry::Occupied(mut entry) = cache.entry(*hash) { + let state = entry.get_mut(); + state.ref_count = state.ref_count.saturating_sub(1); + if state.ref_count == 0 { + // Ensure the block is pruned with the next finalization. + if state.was_pruned { + trace!(target: "db-pin", "Unpinned block: {:?} to be pruned", hash); + let mut queue = self.pruning_queue.lock(); + queue.push(*hash); + } + + trace!(target: "db-pin", "Unpinned block: {:?}", hash); + entry.remove_entry(); } } - let hash = match block { - BlockId::Hash(h) => h, - BlockId::Number(n) => self.blockchain.hash(n)?.ok_or_else(|| { - sp_blockchain::Error::UnknownBlock(format!("Unknown block number {}", n)) - })?, + Ok(()) + } + + fn state_at(&self, block: BlockId) -> ClientResult { + use sc_client_api::blockchain::HeaderBackend as BcHeaderBackend; + + let state = self.state_at_ref(block)?; + let block_hash = if state.parent_hash.is_none() { + // This is genesis. + None + } else { + let hash = match block { + BlockId::Hash(h) => h, + BlockId::Number(n) => self.blockchain.hash(n)?.ok_or_else(|| { + sp_blockchain::Error::UnknownBlock(format!("Unknown block number {}", n)) + })?, + }; + Some(hash) }; - match self.blockchain.header_metadata(hash) { - Ok(ref hdr) => { - let hint = || { - sc_state_db::NodeDb::get(self.storage.as_ref(), hdr.state_root.as_ref()) - .unwrap_or(None) - .is_some() - }; - if let Ok(()) = - self.storage.state_db.pin(&hash, hdr.number.saturated_into::(), hint) - { - let root = hdr.state_root; - let db_state = DbStateBuilder::::new(self.storage.clone(), root) - .with_optional_cache( - self.shared_trie_cache.as_ref().map(|c| c.local_cache()), - ) - .build(); - let state = RefTrackingState::new(db_state, self.storage.clone(), Some(hash)); - Ok(RecordStatsState::new(state, Some(hash), self.state_usage.clone())) - } else { - Err(sp_blockchain::Error::UnknownBlock(format!( - "State already discarded for {:?}", - block - ))) - } - }, - Err(e) => Err(e), - } + Ok(RecordStatsState::new(state, block_hash, self.state_usage.clone())) } fn have_state_at(&self, hash: &Block::Hash, number: NumberFor) -> bool { @@ -3811,4 +3979,196 @@ pub(crate) mod tests { assert_eq!(backend.blockchain().leaves().unwrap(), vec![block2]); assert_eq!(backend.blockchain().info().best_hash, block2); } + + #[test] + fn test_pinned_blocks_on_finalize() { + let backend = Backend::::new_test_with_tx_storage(BlocksPruning::Some(1), 10); + let mut blocks = Vec::new(); + let mut prev_hash = Default::default(); + + // Block tree: + // 0 -> 1 -> 2 -> 3 -> 4 + for i in 0..5 { + let hash = insert_block( + &backend, + i, + prev_hash, + None, + Default::default(), + vec![i.into()], + None, + ) + .unwrap(); + blocks.push(hash); + + // Avoid block pruning. + backend.pin_block(&blocks[i as usize]).unwrap(); + + prev_hash = hash; + } + + let mut op = backend.begin_operation().unwrap(); + backend.begin_state_operation(&mut op, BlockId::hash(blocks[4])).unwrap(); + for i in 1..5 { + op.mark_finalized(BlockId::hash(blocks[i]), None).unwrap(); + } + backend.commit_operation(op).unwrap(); + + let bc = backend.blockchain(); + // Block 0, 1, 2, 3 are pinned and pruning is delayed, + // while block 4 is never delayed for pruning as it is finalized. + assert_eq!(Some(vec![0.into()]), bc.body(BlockId::hash(blocks[0])).unwrap()); + assert_eq!(Some(vec![1.into()]), bc.body(BlockId::hash(blocks[1])).unwrap()); + assert_eq!(Some(vec![2.into()]), bc.body(BlockId::hash(blocks[2])).unwrap()); + assert_eq!(Some(vec![3.into()]), bc.body(BlockId::hash(blocks[3])).unwrap()); + assert_eq!(Some(vec![4.into()]), bc.body(BlockId::hash(blocks[4])).unwrap()); + + // Unpin all blocks. + for block in &blocks { + backend.unpin_block(&block).unwrap(); + } + + // Block tree: + // 0 -> 1 -> 2 -> 3 -> 4 -> 5 + let hash = + insert_block(&backend, 5, prev_hash, None, Default::default(), vec![5.into()], None) + .unwrap(); + blocks.push(hash); + + // Mark block 5 as finalized. + let mut op = backend.begin_operation().unwrap(); + backend.begin_state_operation(&mut op, BlockId::hash(blocks[5])).unwrap(); + op.mark_finalized(BlockId::hash(blocks[5]), None).unwrap(); + backend.commit_operation(op).unwrap(); + + assert!(bc.body(BlockId::hash(blocks[0])).unwrap().is_none()); + assert!(bc.body(BlockId::hash(blocks[1])).unwrap().is_none()); + assert!(bc.body(BlockId::hash(blocks[2])).unwrap().is_none()); + assert!(bc.body(BlockId::hash(blocks[3])).unwrap().is_none()); + // Block 4 was unpinned before pruning, it must also get pruned. + assert!(bc.body(BlockId::hash(blocks[4])).unwrap().is_none()); + assert_eq!(Some(vec![5.into()]), bc.body(BlockId::hash(blocks[5])).unwrap()); + } + + #[test] + fn test_pinned_blocks_on_finalize_with_fork() { + let backend = Backend::::new_test_with_tx_storage(BlocksPruning::Some(1), 10); + let mut blocks = Vec::new(); + let mut prev_hash = Default::default(); + + // Block tree: + // 0 -> 1 -> 2 -> 3 -> 4 + for i in 0..5 { + let hash = insert_block( + &backend, + i, + prev_hash, + None, + Default::default(), + vec![i.into()], + None, + ) + .unwrap(); + blocks.push(hash); + + // Avoid block pruning. + backend.pin_block(&blocks[i as usize]).unwrap(); + + prev_hash = hash; + } + + // Insert a fork at the second block. + // Block tree: + // 0 -> 1 -> 2 -> 3 -> 4 + // 1 -> 2 -> 3 + let fork_hash_root = + insert_block(&backend, 2, blocks[1], None, H256::random(), vec![2.into()], None) + .unwrap(); + let fork_hash_3 = insert_block( + &backend, + 3, + fork_hash_root, + None, + H256::random(), + vec![3.into(), 11.into()], + None, + ) + .unwrap(); + + // Do not prune the fork hash. + backend.pin_block(&fork_hash_3).unwrap(); + + let mut op = backend.begin_operation().unwrap(); + backend.begin_state_operation(&mut op, BlockId::hash(blocks[4])).unwrap(); + op.mark_head(BlockId::hash(blocks[4])).unwrap(); + backend.commit_operation(op).unwrap(); + + for i in 1..5 { + let mut op = backend.begin_operation().unwrap(); + backend.begin_state_operation(&mut op, BlockId::hash(blocks[4])).unwrap(); + op.mark_finalized(BlockId::hash(blocks[i]), None).unwrap(); + backend.commit_operation(op).unwrap(); + } + + let bc = backend.blockchain(); + // Block 0, 1, 2, 3 are pinned and pruning is delayed, + // while block 4 is never delayed for pruning as it is finalized. + assert_eq!(Some(vec![0.into()]), bc.body(BlockId::hash(blocks[0])).unwrap()); + assert_eq!(Some(vec![1.into()]), bc.body(BlockId::hash(blocks[1])).unwrap()); + assert_eq!(Some(vec![2.into()]), bc.body(BlockId::hash(blocks[2])).unwrap()); + assert_eq!(Some(vec![3.into()]), bc.body(BlockId::hash(blocks[3])).unwrap()); + assert_eq!(Some(vec![4.into()]), bc.body(BlockId::hash(blocks[4])).unwrap()); + // Check the fork hashes. + assert_eq!(None, bc.body(BlockId::hash(fork_hash_root)).unwrap()); + assert_eq!(Some(vec![3.into(), 11.into()]), bc.body(BlockId::hash(fork_hash_3)).unwrap()); + + // Unpin all blocks, except the forked one. + for block in &blocks { + backend.unpin_block(&block).unwrap(); + } + + // Block tree: + // 0 -> 1 -> 2 -> 3 -> 4 -> 5 + // 1 -> ..-> 3 + let hash = + insert_block(&backend, 5, prev_hash, None, Default::default(), vec![5.into()], None) + .unwrap(); + blocks.push(hash); + + // Mark block 5 as finalized. + let mut op = backend.begin_operation().unwrap(); + backend.begin_state_operation(&mut op, BlockId::hash(blocks[5])).unwrap(); + op.mark_finalized(BlockId::hash(blocks[5]), None).unwrap(); + backend.commit_operation(op).unwrap(); + + assert!(bc.body(BlockId::hash(blocks[0])).unwrap().is_none()); + assert!(bc.body(BlockId::hash(blocks[1])).unwrap().is_none()); + assert!(bc.body(BlockId::hash(blocks[2])).unwrap().is_none()); + assert!(bc.body(BlockId::hash(blocks[3])).unwrap().is_none()); + // Block 4 was unpinned before pruning, it must also get pruned. + assert!(bc.body(BlockId::hash(blocks[4])).unwrap().is_none()); + assert_eq!(Some(vec![5.into()]), bc.body(BlockId::hash(blocks[5])).unwrap()); + // Fork 3 was kept around. + assert_eq!(Some(vec![3.into(), 11.into()]), bc.body(BlockId::hash(fork_hash_3)).unwrap()); + + backend.unpin_block(&fork_hash_3).unwrap(); + + // Block tree: + // 0 -> 1 -> 2 -> 3 -> 4 -> 5 -> 6 + // 1 -> ..-> 3 + let hash = + insert_block(&backend, 6, blocks[5], None, Default::default(), vec![6.into()], None) + .unwrap(); + blocks.push(hash); + + // Mark block 6 as finalized. + let mut op = backend.begin_operation().unwrap(); + backend.begin_state_operation(&mut op, BlockId::hash(blocks[6])).unwrap(); + op.mark_finalized(BlockId::hash(blocks[6]), None).unwrap(); + backend.commit_operation(op).unwrap(); + + // Block 6 must be the only one around. + assert!(bc.body(BlockId::hash(fork_hash_3)).unwrap().is_none()); + assert_eq!(Some(vec![6.into()]), bc.body(BlockId::hash(blocks[6])).unwrap()); + } } diff --git a/client/service/src/client/client.rs b/client/service/src/client/client.rs index 27561046c3481..5c756dfcabfff 100644 --- a/client/service/src/client/client.rs +++ b/client/service/src/client/client.rs @@ -413,6 +413,16 @@ where &self.finality_notification_sinks } + /// Pin the block to prevent pruning. + fn pin_block(&self, hash: &Block::Hash) -> sp_blockchain::Result<()> { + self.backend.pin_block(hash) + } + + /// Unpin the block to allow pruning. + fn unpin_block(&self, hash: &Block::Hash) -> sp_blockchain::Result<()> { + self.backend.unpin_block(hash) + } + /// Get a reference to the state at a given block. pub fn state_at(&self, block: &BlockId) -> sp_blockchain::Result { self.backend.state_at(*block) @@ -1949,6 +1959,16 @@ where self.body(id) } + /// Pin the block to prevent pruning. + fn pin_block(&self, hash: &Block::Hash) -> sp_blockchain::Result<()> { + self.pin_block(hash) + } + + /// Unpin the block to allow pruning. + fn unpin_block(&self, hash: &Block::Hash) -> sp_blockchain::Result<()> { + self.unpin_block(hash) + } + fn block(&self, id: &BlockId) -> sp_blockchain::Result>> { Ok(match (self.header(id)?, self.body(id)?, self.justifications(id)?) { (Some(header), Some(extrinsics), justifications) =>