-
Notifications
You must be signed in to change notification settings - Fork 2.7k
State-db I/o metrics #4562
State-db I/o metrics #4562
Changes from all commits
796de18
47a5f92
fd3a0d1
3776fde
4482e09
ed7a62b
7802f25
5e90a3b
9a3a183
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -33,6 +33,7 @@ mod children; | |
| mod cache; | ||
| mod storage_cache; | ||
| mod utils; | ||
| mod stats; | ||
|
|
||
| use std::sync::Arc; | ||
| use std::path::PathBuf; | ||
|
|
@@ -63,13 +64,14 @@ use sp_runtime::traits::{ | |
| use sc_executor::RuntimeInfo; | ||
| use sp_state_machine::{ | ||
| DBValue, ChangesTrieTransaction, ChangesTrieCacheAction, ChangesTrieBuildCache, | ||
| backend::Backend as StateBackend, | ||
| backend::Backend as StateBackend, UsageInfo as StateUsageInfo, | ||
| }; | ||
| use crate::utils::{Meta, db_err, meta_keys, read_db, read_meta}; | ||
| use sc_client::leaves::{LeafSet, FinalizationDisplaced}; | ||
| use sc_state_db::StateDb; | ||
| use sp_blockchain::{CachedHeaderMetadata, HeaderMetadata, HeaderMetadataCache}; | ||
| use crate::storage_cache::{CachingState, SharedCache, new_shared_cache}; | ||
| use crate::stats::StateUsageStats; | ||
| use log::{trace, debug, warn}; | ||
| pub use sc_state_db::PruningMode; | ||
|
|
||
|
|
@@ -895,7 +897,8 @@ pub struct Backend<Block: BlockT> { | |
| shared_cache: SharedCache<Block>, | ||
| import_lock: RwLock<()>, | ||
| is_archive: bool, | ||
| io_stats: FrozenForDuration<kvdb::IoStats>, | ||
| io_stats: FrozenForDuration<(kvdb::IoStats, StateUsageInfo)>, | ||
| state_usage: StateUsageStats, | ||
| } | ||
|
|
||
| impl<Block: BlockT> Backend<Block> { | ||
|
|
@@ -963,7 +966,8 @@ impl<Block: BlockT> Backend<Block> { | |
| ), | ||
| import_lock: Default::default(), | ||
| is_archive: is_archive_pruning, | ||
| io_stats: FrozenForDuration::new(std::time::Duration::from_secs(1), kvdb::IoStats::empty()), | ||
| io_stats: FrozenForDuration::new(std::time::Duration::from_secs(1), (kvdb::IoStats::empty(), StateUsageInfo::empty())), | ||
| state_usage: StateUsageStats::new(), | ||
| }) | ||
| } | ||
|
|
||
|
|
@@ -1257,13 +1261,23 @@ impl<Block: BlockT> Backend<Block> { | |
|
|
||
| let finalized = if operation.commit_state { | ||
| let mut changeset: sc_state_db::ChangeSet<Vec<u8>> = sc_state_db::ChangeSet::default(); | ||
| let mut ops: u64 = 0; | ||
| let mut bytes: u64 = 0; | ||
| for (key, (val, rc)) in operation.db_updates.drain() { | ||
| if rc > 0 { | ||
| ops += 1; | ||
| bytes += key.len() as u64 + val.len() as u64; | ||
|
|
||
NikVolf marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| changeset.inserted.push((key, val.to_vec())); | ||
| } else if rc < 0 { | ||
| ops += 1; | ||
| bytes += key.len() as u64; | ||
|
|
||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could store that in a different write counter. |
||
| changeset.deleted.push(key); | ||
| } | ||
| } | ||
| self.state_usage.tally_writes(ops, bytes); | ||
NikVolf marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| let number_u64 = number.saturated_into::<u64>(); | ||
| let commit = self.storage.state_db.insert_block(&hash, number_u64, &pending_block.header.parent_hash(), changeset) | ||
| .map_err(|e: sc_state_db::Error<io::Error>| sp_blockchain::Error::from(format!("State database error: {:?}", e)))?; | ||
|
|
@@ -1495,6 +1509,9 @@ impl<Block: BlockT> sc_client_api::backend::Backend<Block> for Backend<Block> { | |
| fn commit_operation(&self, operation: Self::BlockImportOperation) | ||
| -> ClientResult<()> | ||
| { | ||
| let usage = operation.old_state.usage_info(); | ||
| self.state_usage.merge_sm(usage); | ||
|
|
||
| match self.try_commit_operation(operation) { | ||
| Ok(_) => { | ||
| self.storage.state_db.apply_pending(); | ||
|
|
@@ -1548,8 +1565,14 @@ impl<Block: BlockT> sc_client_api::backend::Backend<Block> for Backend<Block> { | |
| Some(self.offchain_storage.clone()) | ||
| } | ||
|
|
||
|
|
||
| fn usage_info(&self) -> Option<UsageInfo> { | ||
| let io_stats = self.io_stats.take_or_else(|| self.storage.db.io_stats(kvdb::IoStatsKind::SincePrevious)); | ||
| let (io_stats, state_stats) = self.io_stats.take_or_else(|| | ||
| ( | ||
| self.storage.db.io_stats(kvdb::IoStatsKind::SincePrevious), | ||
| self.state_usage.take(), | ||
| ) | ||
| ); | ||
| let database_cache = parity_util_mem::malloc_size(&*self.storage.db); | ||
| let state_cache = (*&self.shared_cache).lock().used_storage_cache_size(); | ||
|
|
||
|
|
@@ -1565,6 +1588,8 @@ impl<Block: BlockT> sc_client_api::backend::Backend<Block> for Backend<Block> { | |
| writes: io_stats.writes, | ||
| reads: io_stats.reads, | ||
| average_transaction_size: io_stats.avg_transaction_size() as u64, | ||
| state_reads: state_stats.reads.ops, | ||
| state_reads_cache: state_stats.cache_reads.ops, | ||
| }, | ||
| }) | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,102 @@ | ||
| // Copyright 2017-2020 Parity Technologies (UK) Ltd. | ||
| // This file is part of Substrate. | ||
|
|
||
| // Substrate is free software: you can redistribute it and/or modify | ||
| // it under the terms of the GNU General Public License as published by | ||
| // the Free Software Foundation, either version 3 of the License, or | ||
| // (at your option) any later version. | ||
|
|
||
| // Substrate is distributed in the hope that it will be useful, | ||
| // but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
| // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
| // GNU General Public License for more details. | ||
|
|
||
| // You should have received a copy of the GNU General Public License | ||
| // along with Substrate. If not, see <http://www.gnu.org/licenses/>. | ||
|
|
||
| //! Database usage statistics | ||
|
|
||
| use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering}; | ||
NikVolf marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| /// Accumulated usage statistics for state queries. | ||
| pub struct StateUsageStats { | ||
| started: std::time::Instant, | ||
| reads: AtomicU64, | ||
| bytes_read: AtomicU64, | ||
| writes: AtomicU64, | ||
| bytes_written: AtomicU64, | ||
| reads_cache: AtomicU64, | ||
| bytes_read_cache: AtomicU64, | ||
| } | ||
NikVolf marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| impl StateUsageStats { | ||
| /// New empty usage stats. | ||
| pub fn new() -> Self { | ||
| Self { | ||
| started: std::time::Instant::now(), | ||
| reads: 0.into(), | ||
| bytes_read: 0.into(), | ||
| writes: 0.into(), | ||
| bytes_written: 0.into(), | ||
| reads_cache: 0.into(), | ||
| bytes_read_cache: 0.into(), | ||
| } | ||
| } | ||
|
|
||
| /// Tally one read operation, of some length. | ||
| pub fn tally_read(&self, data_bytes: u64, cache: bool) { | ||
| self.reads.fetch_add(1, AtomicOrdering::Relaxed); | ||
| self.bytes_read.fetch_add(data_bytes, AtomicOrdering::Relaxed); | ||
| if cache { | ||
| self.reads_cache.fetch_add(1, AtomicOrdering::Relaxed); | ||
| self.bytes_read_cache.fetch_add(data_bytes, AtomicOrdering::Relaxed); | ||
| } | ||
| } | ||
|
|
||
| /// Tally one key read. | ||
| pub fn tally_key_read(&self, key: &[u8], val: Option<&Vec<u8>>, cache: bool) { | ||
| self.tally_read(key.len() as u64 + val.as_ref().map(|x| x.len() as u64).unwrap_or(0), cache); | ||
| } | ||
|
|
||
| /// Tally one child key read. | ||
| pub fn tally_child_key_read(&self, key: &(Vec<u8>, Vec<u8>), val: Option<Vec<u8>>, cache: bool) -> Option<Vec<u8>> { | ||
| self.tally_read(key.0.len() as u64 + key.1.len() as u64 + val.as_ref().map(|x| x.len() as u64).unwrap_or(0), cache); | ||
| val | ||
| } | ||
|
|
||
| /// Tally some write operations, including their byte count. | ||
| pub fn tally_writes(&self, ops: u64, data_bytes: u64) { | ||
| self.writes.fetch_add(ops, AtomicOrdering::Relaxed); | ||
| self.bytes_written.fetch_add(data_bytes, AtomicOrdering::Relaxed); | ||
| } | ||
|
|
||
| /// Merge state machine usage info. | ||
| pub fn merge_sm(&self, info: sp_state_machine::UsageInfo) { | ||
| self.reads.fetch_add(info.reads.ops, AtomicOrdering::Relaxed); | ||
| self.bytes_read.fetch_add(info.reads.bytes, AtomicOrdering::Relaxed); | ||
| self.writes.fetch_add(info.writes.ops, AtomicOrdering::Relaxed); | ||
| self.bytes_written.fetch_add(info.writes.bytes, AtomicOrdering::Relaxed); | ||
| self.reads_cache.fetch_add(info.cache_reads.ops, AtomicOrdering::Relaxed); | ||
| self.bytes_read_cache.fetch_add(info.cache_reads.bytes, AtomicOrdering::Relaxed); | ||
| } | ||
|
|
||
| pub fn take(&self) -> sp_state_machine::UsageInfo { | ||
| use sp_state_machine::UsageUnit; | ||
|
|
||
| fn unit(ops: &AtomicU64, bytes: &AtomicU64) -> UsageUnit { | ||
| UsageUnit { ops: ops.swap(0, AtomicOrdering::Relaxed), bytes: bytes.swap(0, AtomicOrdering::Relaxed) } | ||
| } | ||
|
|
||
| sp_state_machine::UsageInfo { | ||
| reads: unit(&self.reads, &self.bytes_read), | ||
| writes: unit(&self.writes, &self.bytes_written), | ||
| cache_reads: unit(&self.reads_cache, &self.bytes_read_cache), | ||
| // TODO: Proper tracking state of memory footprint here requires | ||
| // imposing `MallocSizeOf` requirement on half of the codebase, | ||
| // so it is an open question how to do it better | ||
| memory: 0, | ||
| started: self.started, | ||
| span: self.started.elapsed(), | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change | ||
|---|---|---|---|---|
|
|
@@ -28,6 +28,7 @@ use sp_state_machine::{backend::Backend as StateBackend, TrieBackend}; | |||
| use log::trace; | ||||
| use sc_client_api::backend::{StorageCollection, ChildStorageCollection}; | ||||
| use std::hash::Hash as StdHash; | ||||
| use crate::stats::StateUsageStats; | ||||
|
|
||||
| const STATE_CACHE_BLOCKS: usize = 12; | ||||
|
|
||||
|
|
@@ -296,6 +297,8 @@ pub struct CacheChanges<B: BlockT> { | |||
| /// in `sync_cache` along with the change overlay. | ||||
| /// For non-canonical clones local cache and changes are dropped. | ||||
| pub struct CachingState<S: StateBackend<HasherFor<B>>, B: BlockT> { | ||||
| /// Usage statistics | ||||
| usage: StateUsageStats, | ||||
| /// Backing state. | ||||
| state: S, | ||||
| /// Cache data. | ||||
|
|
@@ -421,6 +424,7 @@ impl<S: StateBackend<HasherFor<B>>, B: BlockT> CachingState<S, B> { | |||
| /// Create a new instance wrapping generic State and shared cache. | ||||
| pub fn new(state: S, shared_cache: SharedCache<B>, parent_hash: Option<B::Hash>) -> Self { | ||||
| CachingState { | ||||
| usage: StateUsageStats::new(), | ||||
| state, | ||||
| cache: CacheChanges { | ||||
| shared_cache, | ||||
|
|
@@ -495,18 +499,22 @@ impl<S: StateBackend<HasherFor<B>>, B: BlockT> StateBackend<HasherFor<B>> for Ca | |||
| // Note that local cache makes that lru is not refreshed | ||||
| if let Some(entry) = local_cache.storage.get(key).cloned() { | ||||
| trace!("Found in local cache: {:?}", HexDisplay::from(&key)); | ||||
| self.usage.tally_key_read(key, entry.as_ref(), true); | ||||
|
|
||||
| return Ok(entry) | ||||
| } | ||||
| let mut cache = self.cache.shared_cache.lock(); | ||||
| if Self::is_allowed(Some(key), None, &self.cache.parent_hash, &cache.modifications) { | ||||
| if let Some(entry) = cache.lru_storage.get(key).map(|a| a.clone()) { | ||||
| trace!("Found in shared cache: {:?}", HexDisplay::from(&key)); | ||||
| self.usage.tally_key_read(key, entry.as_ref(), true); | ||||
| return Ok(entry) | ||||
| } | ||||
| } | ||||
| trace!("Cache miss: {:?}", HexDisplay::from(&key)); | ||||
| let value = self.state.storage(key)?; | ||||
| RwLockUpgradableReadGuard::upgrade(local_cache).storage.insert(key.to_vec(), value.clone()); | ||||
| self.usage.tally_key_read(key, value.as_ref(), false); | ||||
| Ok(value) | ||||
| } | ||||
|
|
||||
|
|
@@ -539,17 +547,25 @@ impl<S: StateBackend<HasherFor<B>>, B: BlockT> StateBackend<HasherFor<B>> for Ca | |||
| let local_cache = self.cache.local_cache.upgradable_read(); | ||||
| if let Some(entry) = local_cache.child_storage.get(&key).cloned() { | ||||
| trace!("Found in local cache: {:?}", key); | ||||
| return Ok(entry) | ||||
| return Ok( | ||||
| self.usage.tally_child_key_read(&key, entry, true) | ||||
| ) | ||||
| } | ||||
| let mut cache = self.cache.shared_cache.lock(); | ||||
| if Self::is_allowed(None, Some(&key), &self.cache.parent_hash, &cache.modifications) { | ||||
| if let Some(entry) = cache.lru_child_storage.get(&key).map(|a| a.clone()) { | ||||
| trace!("Found in shared cache: {:?}", key); | ||||
| return Ok(entry) | ||||
| return Ok( | ||||
| self.usage.tally_child_key_read(&key, entry, true) | ||||
| ) | ||||
| } | ||||
| } | ||||
| trace!("Cache miss: {:?}", key); | ||||
| let value = self.state.child_storage(storage_key, child_info, &key.1[..])?; | ||||
|
|
||||
| // just pass it through the usage counter | ||||
| let value = self.usage.tally_child_key_read(&key, value, false); | ||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So we have something like state_read_cache - state_cache = state-read-db-backend.
We could register them in substrate/primitives/state-machine/src/ext.rs Line 157 in 65ad8e9
|
||||
|
|
||||
| RwLockUpgradableReadGuard::upgrade(local_cache).child_storage.insert(key, value.clone()); | ||||
| Ok(value) | ||||
| } | ||||
|
|
@@ -646,6 +662,10 @@ impl<S: StateBackend<HasherFor<B>>, B: BlockT> StateBackend<HasherFor<B>> for Ca | |||
| fn as_trie_backend(&mut self) -> Option<&TrieBackend<Self::TrieBackendStorage, HasherFor<B>>> { | ||||
| self.state.as_trie_backend() | ||||
| } | ||||
|
|
||||
| fn usage_info(&self) -> sp_state_machine::UsageInfo { | ||||
| self.usage.take() | ||||
| } | ||||
| } | ||||
|
|
||||
| #[cfg(test)] | ||||
|
|
||||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Field db_updates are calculated trie nodes delta that are going to be written in rocksdb, so I think we should change the name, because it gives the wrong idea that the collected read stats (key value) are comparable with those writes (trie nodes).
We can also get write stats that correspond to our read, that would be field 'storage_updates' and 'child_storage_updates', IIRC those are send to storage cache for update and are complete in term of delta.
But we could also hook some 'write_cache' stat in state-machine overlay-db to compare what are actual post block modif ('storage_updates' fields) against what update the runtime did call. That way, we get an idea of the proportion of updates that are done on a same key. (at
substrate/primitives/state-machine/src/overlayed_changes.rs
Line 236 in 65ad8e9