diff --git a/kvdb-rocksdb/src/lib.rs b/kvdb-rocksdb/src/lib.rs index 2d666d139..cb09432dc 100644 --- a/kvdb-rocksdb/src/lib.rs +++ b/kvdb-rocksdb/src/lib.rs @@ -15,6 +15,7 @@ // along with Parity. If not, see . mod iter; +mod stats; use std::{cmp, collections::HashMap, convert::identity, error, fs, io, mem, path::Path, result}; @@ -271,6 +272,8 @@ pub struct Database { block_opts: BlockBasedOptions, // Dirty values added with `write_buffered`. Cleaned on `flush`. overlay: RwLock>>, + #[ignore_malloc_size_of = "insignificant"] + stats: stats::RunningDbStats, // Values currently being flushed. Cleared when `flush` completes. flushing: RwLock>>, // Prevents concurrent flushes. @@ -403,6 +406,7 @@ impl Database { read_opts, write_opts, block_opts, + stats: stats::RunningDbStats::new(), }) } @@ -428,20 +432,32 @@ impl Database { match *self.db.read() { Some(ref cfs) => { let mut batch = WriteBatch::default(); + let mut ops: usize = 0; + let mut bytes: usize = 0; mem::swap(&mut *self.overlay.write(), &mut *self.flushing.write()); { for (c, column) in self.flushing.read().iter().enumerate() { + ops += column.len(); for (key, state) in column.iter() { let cf = cfs.cf(c); match *state { - KeyState::Delete => batch.delete_cf(cf, key).map_err(other_io_err)?, - KeyState::Insert(ref value) => batch.put_cf(cf, key, value).map_err(other_io_err)?, + KeyState::Delete => { + bytes += key.len(); + batch.delete_cf(cf, key).map_err(other_io_err)? + } + KeyState::Insert(ref value) => { + bytes += key.len() + value.len(); + batch.put_cf(cf, key, value).map_err(other_io_err)? + } }; } } } check_for_corruption(&self.path, cfs.db.write_opt(batch, &self.write_opts))?; + self.stats.tally_transactions(1); + self.stats.tally_writes(ops as u64); + self.stats.tally_bytes_written(bytes as u64); for column in self.flushing.write().iter_mut() { column.clear(); @@ -474,6 +490,12 @@ impl Database { Some(ref cfs) => { let mut batch = WriteBatch::default(); let ops = tr.ops; + + self.stats.tally_writes(ops.len() as u64); + self.stats.tally_transactions(1); + + let mut stats_total_bytes = 0; + for op in ops { // remove any buffered operation for this key self.overlay.write()[op.col() as usize].remove(op.key()); @@ -481,10 +503,18 @@ impl Database { let cf = cfs.cf(op.col() as usize); match op { - DBOp::Insert { col: _, key, value } => batch.put_cf(cf, &key, &value).map_err(other_io_err)?, - DBOp::Delete { col: _, key } => batch.delete_cf(cf, &key).map_err(other_io_err)?, + DBOp::Insert { col: _, key, value } => { + stats_total_bytes += key.len() + value.len(); + batch.put_cf(cf, &key, &value).map_err(other_io_err)? + } + DBOp::Delete { col: _, key } => { + // We count deletes as writes. + stats_total_bytes += key.len(); + batch.delete_cf(cf, &key).map_err(other_io_err)? + } }; } + self.stats.tally_bytes_written(stats_total_bytes as u64); check_for_corruption(&self.path, cfs.db.write_opt(batch, &self.write_opts)) } @@ -496,6 +526,7 @@ impl Database { pub fn get(&self, col: u32, key: &[u8]) -> io::Result> { match *self.db.read() { Some(ref cfs) => { + self.stats.tally_reads(1); let overlay = &self.overlay.read()[col as usize]; match overlay.get(key) { Some(&KeyState::Insert(ref value)) => Ok(Some(value.clone())), @@ -505,11 +536,21 @@ impl Database { match flushing.get(key) { Some(&KeyState::Insert(ref value)) => Ok(Some(value.clone())), Some(&KeyState::Delete) => Ok(None), - None => cfs - .db - .get_pinned_cf_opt(cfs.cf(col as usize), key, &self.read_opts) - .map(|r| r.map(|v| v.to_vec())) - .map_err(other_io_err), + None => { + let aquired_val = cfs + .db + .get_pinned_cf_opt(cfs.cf(col as usize), key, &self.read_opts) + .map(|r| r.map(|v| v.to_vec())) + .map_err(other_io_err); + + match aquired_val { + Ok(Some(ref v)) => self.stats.tally_bytes_read((key.len() + v.len()) as u64), + Ok(None) => self.stats.tally_bytes_read(key.len() as u64), + _ => {} + }; + + aquired_val + } } } } @@ -704,6 +745,26 @@ impl KeyValueDB for Database { fn restore(&self, new_db: &str) -> io::Result<()> { Database::restore(self, new_db) } + + fn io_stats(&self, kind: kvdb::IoStatsKind) -> kvdb::IoStats { + let taken_stats = match kind { + kvdb::IoStatsKind::Overall => self.stats.overall(), + kvdb::IoStatsKind::SincePrevious => self.stats.since_previous(), + }; + + let mut stats = kvdb::IoStats::empty(); + + stats.reads = taken_stats.raw.reads; + stats.writes = taken_stats.raw.writes; + stats.transactions = taken_stats.raw.transactions; + stats.bytes_written = taken_stats.raw.bytes_written; + stats.bytes_read = taken_stats.raw.bytes_read; + + stats.started = taken_stats.started; + stats.span = taken_stats.started.elapsed(); + + stats + } } impl Drop for Database { @@ -916,6 +977,55 @@ mod tests { assert_eq!(db.num_keys(0).unwrap(), 1, "adding a key increases the count"); } + #[test] + fn stats() { + use kvdb::IoStatsKind; + + let tempdir = TempDir::new("").unwrap(); + let config = DatabaseConfig::with_columns(3); + let db = Database::open(&config, tempdir.path().to_str().unwrap()).unwrap(); + + let key1 = b"kkk"; + let mut batch = db.transaction(); + batch.put(0, key1, key1); + batch.put(1, key1, key1); + batch.put(2, key1, key1); + + for _ in 0..10 { + db.get(0, key1).unwrap(); + } + + db.write(batch).unwrap(); + + let io_stats = db.io_stats(IoStatsKind::SincePrevious); + assert_eq!(io_stats.transactions, 1); + assert_eq!(io_stats.writes, 3); + assert_eq!(io_stats.bytes_written, 18); + assert_eq!(io_stats.reads, 10); + assert_eq!(io_stats.bytes_read, 30); + + let new_io_stats = db.io_stats(IoStatsKind::SincePrevious); + // Since we taken previous statistic period, + // this is expected to be totally empty. + assert_eq!(new_io_stats.transactions, 0); + + // but the overall should be there + let new_io_stats = db.io_stats(IoStatsKind::Overall); + assert_eq!(new_io_stats.bytes_written, 18); + + let mut batch = db.transaction(); + batch.delete(0, key1); + batch.delete(1, key1); + batch.delete(2, key1); + + // transaction is not commited yet + assert_eq!(db.io_stats(IoStatsKind::SincePrevious).writes, 0); + + db.write(batch).unwrap(); + // now it is, and delete is counted as write + assert_eq!(db.io_stats(IoStatsKind::SincePrevious).writes, 3); + } + #[test] fn test_iter_by_prefix() { let tempdir = TempDir::new("").unwrap(); diff --git a/kvdb-rocksdb/src/stats.rs b/kvdb-rocksdb/src/stats.rs new file mode 100644 index 000000000..039dd3a88 --- /dev/null +++ b/kvdb-rocksdb/src/stats.rs @@ -0,0 +1,144 @@ +// Copyright 2015-2020 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +use parking_lot::RwLock; +use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering}; +use std::time::Instant; + +pub struct RawDbStats { + pub reads: u64, + pub writes: u64, + pub bytes_written: u64, + pub bytes_read: u64, + pub transactions: u64, +} + +impl RawDbStats { + fn combine(&self, other: &RawDbStats) -> Self { + RawDbStats { + reads: self.reads + other.reads, + writes: self.writes + other.writes, + bytes_written: self.bytes_written + other.bytes_written, + bytes_read: self.bytes_read + other.bytes_written, + transactions: self.transactions + other.transactions, + } + } +} + +struct OverallDbStats { + stats: RawDbStats, + last_taken: Instant, + started: Instant, +} + +impl OverallDbStats { + fn new() -> Self { + OverallDbStats { + stats: RawDbStats { reads: 0, writes: 0, bytes_written: 0, bytes_read: 0, transactions: 0 }, + last_taken: Instant::now(), + started: Instant::now(), + } + } +} + +pub struct RunningDbStats { + reads: AtomicU64, + writes: AtomicU64, + bytes_written: AtomicU64, + bytes_read: AtomicU64, + transactions: AtomicU64, + overall: RwLock, +} + +pub struct TakenDbStats { + pub raw: RawDbStats, + pub started: Instant, +} + +impl RunningDbStats { + pub fn new() -> Self { + Self { + reads: 0.into(), + bytes_read: 0.into(), + writes: 0.into(), + bytes_written: 0.into(), + transactions: 0.into(), + overall: OverallDbStats::new().into(), + } + } + + pub fn tally_reads(&self, val: u64) { + self.reads.fetch_add(val, AtomicOrdering::Relaxed); + } + + pub fn tally_bytes_read(&self, val: u64) { + self.bytes_read.fetch_add(val, AtomicOrdering::Relaxed); + } + + pub fn tally_writes(&self, val: u64) { + self.writes.fetch_add(val, AtomicOrdering::Relaxed); + } + + pub fn tally_bytes_written(&self, val: u64) { + self.bytes_written.fetch_add(val, AtomicOrdering::Relaxed); + } + + pub fn tally_transactions(&self, val: u64) { + self.transactions.fetch_add(val, AtomicOrdering::Relaxed); + } + + fn take_current(&self) -> RawDbStats { + RawDbStats { + reads: self.reads.swap(0, AtomicOrdering::Relaxed), + writes: self.writes.swap(0, AtomicOrdering::Relaxed), + bytes_written: self.bytes_written.swap(0, AtomicOrdering::Relaxed), + bytes_read: self.bytes_read.swap(0, AtomicOrdering::Relaxed), + transactions: self.transactions.swap(0, AtomicOrdering::Relaxed), + } + } + + fn peek_current(&self) -> RawDbStats { + RawDbStats { + reads: self.reads.load(AtomicOrdering::Relaxed), + writes: self.writes.load(AtomicOrdering::Relaxed), + bytes_written: self.bytes_written.load(AtomicOrdering::Relaxed), + bytes_read: self.bytes_read.load(AtomicOrdering::Relaxed), + transactions: self.transactions.load(AtomicOrdering::Relaxed), + } + } + + pub fn since_previous(&self) -> TakenDbStats { + let mut overall_lock = self.overall.write(); + + let current = self.take_current(); + + overall_lock.stats = overall_lock.stats.combine(¤t); + + let stats = TakenDbStats { raw: current, started: overall_lock.last_taken }; + + overall_lock.last_taken = Instant::now(); + + stats + } + + pub fn overall(&self) -> TakenDbStats { + let overall_lock = self.overall.read(); + + let current = self.peek_current(); + + TakenDbStats { raw: overall_lock.stats.combine(¤t), started: overall_lock.started } + } +} diff --git a/kvdb/src/io_stats.rs b/kvdb/src/io_stats.rs new file mode 100644 index 000000000..d0de5ce36 --- /dev/null +++ b/kvdb/src/io_stats.rs @@ -0,0 +1,141 @@ +// Copyright 2015-2019 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Generic statistics for key-value databases + +/// Statistic kind to query. +pub enum Kind { + /// Overall statistics since start. + Overall, + /// Statistics since previous query. + SincePrevious, +} + +/// Statistic for the `span` period +#[derive(Debug, Clone)] +pub struct IoStats { + /// Number of transaction. + pub transactions: u64, + /// Number of read operations. + pub reads: u64, + /// Number of reads resulted in a read from cache. + pub cache_reads: u64, + /// Number of write operations. + pub writes: u64, + /// Number of bytes read + pub bytes_read: u64, + /// Number of bytes read from cache + pub cache_read_bytes: u64, + /// Number of bytes write + pub bytes_written: u64, + /// Start of the statistic period. + pub started: std::time::Instant, + /// Total duration of the statistic period. + pub span: std::time::Duration, +} + +impl IoStats { + /// Empty statistic report. + pub fn empty() -> Self { + Self { + transactions: 0, + reads: 0, + cache_reads: 0, + writes: 0, + bytes_read: 0, + cache_read_bytes: 0, + bytes_written: 0, + started: std::time::Instant::now(), + span: std::time::Duration::default(), + } + } + + /// Average batch (transaction) size (writes per transaction) + pub fn avg_batch_size(&self) -> f64 { + if self.writes == 0 { + return 0.0; + } + self.transactions as f64 / self.writes as f64 + } + + /// Read operations per second. + pub fn reads_per_sec(&self) -> f64 { + if self.span.as_secs_f64() == 0.0 { + return 0.0; + } + + self.reads as f64 / self.span.as_secs_f64() + } + + pub fn byte_reads_per_sec(&self) -> f64 { + if self.span.as_secs_f64() == 0.0 { + return 0.0; + } + + self.bytes_read as f64 / self.span.as_secs_f64() + } + + /// Write operations per second. + pub fn writes_per_sec(&self) -> f64 { + if self.span.as_secs_f64() == 0.0 { + return 0.0; + } + + self.writes as f64 / self.span.as_secs_f64() + } + + pub fn byte_writes_per_sec(&self) -> f64 { + if self.span.as_secs_f64() == 0.0 { + return 0.0; + } + + self.bytes_written as f64 / self.span.as_secs_f64() + } + + /// Total number of operations per second. + pub fn ops_per_sec(&self) -> f64 { + if self.span.as_secs_f64() == 0.0 { + return 0.0; + } + + (self.writes as f64 + self.reads as f64) / self.span.as_secs_f64() + } + + /// Transactions per second. + pub fn transactions_per_sec(&self) -> f64 { + if self.span.as_secs_f64() == 0.0 { + return 0.0; + } + + (self.transactions as f64) / self.span.as_secs_f64() + } + + pub fn avg_transaction_size(&self) -> f64 { + if self.transactions == 0 { + return 0.0; + } + + self.bytes_written as f64 / self.transactions as f64 + } + + pub fn cache_hit_ratio(&self) -> f64 { + if self.reads == 0 { + return 0.0; + } + + self.cache_reads as f64 / self.reads as f64 + } +} diff --git a/kvdb/src/lib.rs b/kvdb/src/lib.rs index 708871982..2bcb1b39c 100644 --- a/kvdb/src/lib.rs +++ b/kvdb/src/lib.rs @@ -14,7 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . -//! Key-Value store abstraction with `RocksDB` backend. +//! Key-Value store abstraction. use bytes::Bytes; use smallvec::SmallVec; @@ -22,6 +22,8 @@ use std::io; use std::path::Path; use std::sync::Arc; +mod io_stats; + /// Required length of prefixes. pub const PREFIX_LEN: usize = 12; @@ -30,6 +32,8 @@ pub type DBValue = Vec; /// Database keys. pub type DBKey = SmallVec<[u8; 32]>; +pub use io_stats::{IoStats, Kind as IoStatsKind}; + /// Write transaction. Batches a sequence of put/delete operations for efficiency. #[derive(Default, Clone, PartialEq)] pub struct DBTransaction { @@ -143,6 +147,16 @@ pub trait KeyValueDB: Sync + Send + parity_util_mem::MallocSizeOf { /// Attempt to replace this database with a new one located at the given path. fn restore(&self, new_db: &str) -> io::Result<()>; + + /// Query statistics. + /// + /// Not all kvdb implementations are able or expected to implement this, so by + /// default, empty statistics is returned. Also, not all kvdb implementation + /// can return every statistic or configured to do so (some statistics gathering + /// may impede the performance and might be off by default). + fn io_stats(&self, _kind: IoStatsKind) -> IoStats { + IoStats::empty() + } } /// Generic key-value database handler. This trait contains one function `open`.