From be40dfdf4b814b5ff27a019f83c2e7148718165d Mon Sep 17 00:00:00 2001 From: NikVolf Date: Thu, 26 Dec 2019 13:23:38 +0300 Subject: [PATCH 01/17] kvdb general io stats --- kvdb/src/io_stats.rs | 87 ++++++++++++++++++++++++++++++++++++++++++++ kvdb/src/lib.rs | 16 +++++++- 2 files changed, 102 insertions(+), 1 deletion(-) create mode 100644 kvdb/src/io_stats.rs diff --git a/kvdb/src/io_stats.rs b/kvdb/src/io_stats.rs new file mode 100644 index 000000000..472e906d8 --- /dev/null +++ b/kvdb/src/io_stats.rs @@ -0,0 +1,87 @@ +// Copyright 2015-2019 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Generic statistics for key-value databases + +/// Statistic for the `span` period +pub struct IoStats { + /// Number of transaction. + pub transactions: u64, + /// Number of read operations. + pub reads: u64, + /// Number of reads resulted in a read from cache. + pub cache_reads: u64, + /// Number of write operations. + pub writes: u64, + /// Start of the statistic period. + pub start: std::time::Instant, + /// Total duration of the statistic period. + pub span: std::time::Duration, +} + +impl IoStats { + /// Empty statistic report. + pub fn empty() -> Self { + Self { + transactions: 0, + reads: 0, + cache_reads: 0, + writes: 0, + start: std::time::Instant::now(), + span: std::time::Duration::default(), + } + } + + /// Average batch (transaction) size (writes per transaction) + pub fn avg_batch_size(&self) -> f64 { + if self.writes == 0 { return 0.0 } + self.transactions as f64 / self.writes as f64 + } + + /// Read operations per second. + pub fn reads_per_sec(&self) -> f64 { + if self.span.as_secs_f64() == 0.0 { return 0.0 } + + self.reads as f64 / self.span.as_secs_f64() + } + + /// Write operations per second. + pub fn writes_per_sec(&self) -> f64 { + if self.span.as_secs_f64() == 0.0 { return 0.0 } + + self.writes as f64 / self.span.as_secs_f64() + } + + /// Total number of operations per second. + pub fn ops_per_sec(&self) -> f64 { + if self.span.as_secs_f64() == 0.0 { return 0.0 } + + (self.writes as f64 + self.reads as f64) / self.span.as_secs_f64() + } + + /// Transactions per second. + pub fn transactions_per_sec(&self) -> f64 { + if self.span.as_secs_f64() == 0.0 { return 0.0 } + + (self.transactions as f64) / self.span.as_secs_f64() + } + + pub fn cache_hit_ration(&self) -> f64 { + if self.reads == 0 { return 0.0 } + + self.cache_reads as f64 / self.reads as f64 + } + } \ No newline at end of file diff --git a/kvdb/src/lib.rs b/kvdb/src/lib.rs index 708871982..4da6191ed 100644 --- a/kvdb/src/lib.rs +++ b/kvdb/src/lib.rs @@ -14,7 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . -//! Key-Value store abstraction with `RocksDB` backend. +//! Key-Value store abstraction. use bytes::Bytes; use smallvec::SmallVec; @@ -22,6 +22,8 @@ use std::io; use std::path::Path; use std::sync::Arc; +mod io_stats; + /// Required length of prefixes. pub const PREFIX_LEN: usize = 12; @@ -30,6 +32,8 @@ pub type DBValue = Vec; /// Database keys. pub type DBKey = SmallVec<[u8; 32]>; +pub use io_stats::IoStats; + /// Write transaction. Batches a sequence of put/delete operations for efficiency. #[derive(Default, Clone, PartialEq)] pub struct DBTransaction { @@ -143,6 +147,16 @@ pub trait KeyValueDB: Sync + Send + parity_util_mem::MallocSizeOf { /// Attempt to replace this database with a new one located at the given path. fn restore(&self, new_db: &str) -> io::Result<()>; + + /// Query statistics. + /// + /// Not all kvdb implementations are able or expected to implement this, so by + /// default, empty statistics is returned. Also, not all kvdb implementation + /// can return every statistic or configured to do so (some statistics gathering + /// may impede the performance and might be off by default). + fn io_stats(&self) -> IoStats { + IoStats::empty() + } } /// Generic key-value database handler. This trait contains one function `open`. From 7556a87e8b4cb6af2ab004c59941e26c47c96416 Mon Sep 17 00:00:00 2001 From: NikVolf Date: Thu, 26 Dec 2019 13:24:06 +0300 Subject: [PATCH 02/17] rocksdb io stats gathering --- kvdb-rocksdb/src/lib.rs | 93 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 93 insertions(+) diff --git a/kvdb-rocksdb/src/lib.rs b/kvdb-rocksdb/src/lib.rs index 2d666d139..76e3f94d0 100644 --- a/kvdb-rocksdb/src/lib.rs +++ b/kvdb-rocksdb/src/lib.rs @@ -17,6 +17,7 @@ mod iter; use std::{cmp, collections::HashMap, convert::identity, error, fs, io, mem, path::Path, result}; +use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering}; use parity_util_mem::MallocSizeOf; use parking_lot::{Mutex, MutexGuard, RwLock}; @@ -256,6 +257,52 @@ impl DBAndColumns { } } +struct DbStats { + reads: AtomicU64, + writes: AtomicU64, + transactions: AtomicU64, + started: Mutex, +} + +struct TakenDbStats { + reads: u64, + writes: u64, + transactions: u64, + start: std::time::Instant, +} + +impl DbStats { + fn new() -> Self { + Self { reads: 0.into(), writes: 0.into(), transactions: 0.into(), started: std::time::Instant::now().into(), } + } + + fn add_reads(&self, val: u64) { + self.reads.fetch_add(val, AtomicOrdering::Relaxed); + } + + fn add_writes(&self, val: u64) { + self.writes.fetch_add(val, AtomicOrdering::Relaxed); + } + + fn add_transactions(&self, val: u64) { + self.transactions.fetch_add(val, AtomicOrdering::Relaxed); + } + + fn take(&self) -> TakenDbStats { + let mut started_lock = self.started.lock(); + let stats = TakenDbStats { + reads: self.reads.swap(0, AtomicOrdering::Relaxed), + writes: self.writes.swap(0, AtomicOrdering::Relaxed), + transactions: self.transactions.swap(0, AtomicOrdering::Relaxed), + start: *started_lock, + }; + + *started_lock = std::time::Instant::now(); + + stats + } +} + /// Key-Value database. #[derive(MallocSizeOf)] pub struct Database { @@ -271,6 +318,7 @@ pub struct Database { block_opts: BlockBasedOptions, // Dirty values added with `write_buffered`. Cleaned on `flush`. overlay: RwLock>>, + stats: DbStats, // Values currently being flushed. Cleared when `flush` completes. flushing: RwLock>>, // Prevents concurrent flushes. @@ -403,6 +451,7 @@ impl Database { read_opts, write_opts, block_opts, + stats: DbStats::new(), }) } @@ -474,6 +523,10 @@ impl Database { Some(ref cfs) => { let mut batch = WriteBatch::default(); let ops = tr.ops; + + self.stats.add_writes(ops.len() as u64); + self.stats.add_transactions(1); + for op in ops { // remove any buffered operation for this key self.overlay.write()[op.col() as usize].remove(op.key()); @@ -494,6 +547,7 @@ impl Database { /// Get value by key. pub fn get(&self, col: u32, key: &[u8]) -> io::Result> { + self.stats.add_reads(1); match *self.db.read() { Some(ref cfs) => { let overlay = &self.overlay.read()[col as usize]; @@ -704,6 +758,20 @@ impl KeyValueDB for Database { fn restore(&self, new_db: &str) -> io::Result<()> { Database::restore(self, new_db) } + + fn io_stats(&self) -> kvdb::IoStats { + let taken_stats = self.stats.take(); + + let mut stats = kvdb::IoStats::empty(); + + stats.reads = taken_stats.reads; + stats.writes = taken_stats.writes; + stats.transactions = taken_stats.transactions; + stats.start = taken_stats.start; + stats.span = taken_stats.start.elapsed(); + + stats + } } impl Drop for Database { @@ -916,6 +984,31 @@ mod tests { assert_eq!(db.num_keys(0).unwrap(), 1, "adding a key increases the count"); } + #[test] + fn stats() { + let tempdir = TempDir::new("").unwrap(); + let config = DatabaseConfig::with_columns(3); + let db = Database::open(&config, tempdir.path().to_str().unwrap()).unwrap(); + + let key1 = b"kkk"; + let mut batch = db.transaction(); + batch.put(0, key1, key1); + batch.put(1, key1, key1); + batch.put(2, key1, key1); + + for _ in 0..10 { db.get(0, key1).unwrap(); } + + db.write(batch).unwrap(); + + let io_stats = db.io_stats(); + assert_eq!(io_stats.transactions, 1); + assert_eq!(io_stats.writes, 3); + assert_eq!(io_stats.reads, 10); + + let new_io_stats = db.io_stats(); + assert_eq!(new_io_stats.transactions, 0); + } + #[test] fn test_iter_by_prefix() { let tempdir = TempDir::new("").unwrap(); From 147cdb020b9d69c4580fa3eda4f7996f6568b246 Mon Sep 17 00:00:00 2001 From: NikVolf Date: Thu, 26 Dec 2019 14:16:19 +0300 Subject: [PATCH 03/17] add bytes stats and allow to keep gathered data --- kvdb-rocksdb/src/lib.rs | 77 +++++++++++++++++++++++++++++++++++------ kvdb/src/io_stats.rs | 9 +++++ kvdb/src/lib.rs | 5 ++- 3 files changed, 80 insertions(+), 11 deletions(-) diff --git a/kvdb-rocksdb/src/lib.rs b/kvdb-rocksdb/src/lib.rs index 76e3f94d0..1e50be1ee 100644 --- a/kvdb-rocksdb/src/lib.rs +++ b/kvdb-rocksdb/src/lib.rs @@ -260,6 +260,8 @@ impl DBAndColumns { struct DbStats { reads: AtomicU64, writes: AtomicU64, + write_bytes: AtomicU64, + read_bytes: AtomicU64, transactions: AtomicU64, started: Mutex, } @@ -267,23 +269,40 @@ struct DbStats { struct TakenDbStats { reads: u64, writes: u64, + write_bytes: u64, + read_bytes: u64, transactions: u64, start: std::time::Instant, } impl DbStats { fn new() -> Self { - Self { reads: 0.into(), writes: 0.into(), transactions: 0.into(), started: std::time::Instant::now().into(), } + Self { + reads: 0.into(), + read_bytes: 0.into(), + writes: 0.into(), + write_bytes: 0.into(), + transactions: 0.into(), + started: std::time::Instant::now().into(), + } } fn add_reads(&self, val: u64) { self.reads.fetch_add(val, AtomicOrdering::Relaxed); } + fn add_read_bytes(&self, val: u64) { + self.read_bytes.fetch_add(val, AtomicOrdering::Relaxed); + } + fn add_writes(&self, val: u64) { self.writes.fetch_add(val, AtomicOrdering::Relaxed); } + fn add_write_bytes(&self, val: u64) { + self.write_bytes.fetch_add(val, AtomicOrdering::Relaxed); + } + fn add_transactions(&self, val: u64) { self.transactions.fetch_add(val, AtomicOrdering::Relaxed); } @@ -293,6 +312,8 @@ impl DbStats { let stats = TakenDbStats { reads: self.reads.swap(0, AtomicOrdering::Relaxed), writes: self.writes.swap(0, AtomicOrdering::Relaxed), + write_bytes: self.write_bytes.swap(0, AtomicOrdering::Relaxed), + read_bytes: self.read_bytes.swap(0, AtomicOrdering::Relaxed), transactions: self.transactions.swap(0, AtomicOrdering::Relaxed), start: *started_lock, }; @@ -301,6 +322,19 @@ impl DbStats { stats } + + fn peek(&self) -> TakenDbStats { + let started_lock = self.started.lock(); + + TakenDbStats { + reads: self.reads.load(AtomicOrdering::Relaxed), + writes: self.writes.load(AtomicOrdering::Relaxed), + write_bytes: self.write_bytes.load(AtomicOrdering::Relaxed), + read_bytes: self.read_bytes.load(AtomicOrdering::Relaxed), + transactions: self.transactions.load(AtomicOrdering::Relaxed), + start: *started_lock, + } + } } /// Key-Value database. @@ -533,6 +567,13 @@ impl Database { let cf = cfs.cf(op.col() as usize); + self.stats.add_write_bytes( + match op { + DBOp::Insert { col: _, ref key, ref value } => key.len() + value.len(), + DBOp::Delete { col: _, ref key } => key.len(), + } as u64 + ); + match op { DBOp::Insert { col: _, key, value } => batch.put_cf(cf, &key, &value).map_err(other_io_err)?, DBOp::Delete { col: _, key } => batch.delete_cf(cf, &key).map_err(other_io_err)?, @@ -559,11 +600,21 @@ impl Database { match flushing.get(key) { Some(&KeyState::Insert(ref value)) => Ok(Some(value.clone())), Some(&KeyState::Delete) => Ok(None), - None => cfs - .db - .get_pinned_cf_opt(cfs.cf(col as usize), key, &self.read_opts) - .map(|r| r.map(|v| v.to_vec())) - .map_err(other_io_err), + None => { + let aquired_val = cfs + .db + .get_pinned_cf_opt(cfs.cf(col as usize), key, &self.read_opts) + .map(|r| r.map(|v| v.to_vec())) + .map_err(other_io_err); + + match aquired_val { + Ok(Some(ref v)) => self.stats.add_read_bytes((key.len() + v.len()) as u64), + Ok(None) => self.stats.add_read_bytes(key.len() as u64), + _ => {} + }; + + aquired_val + } } } } @@ -759,8 +810,8 @@ impl KeyValueDB for Database { Database::restore(self, new_db) } - fn io_stats(&self) -> kvdb::IoStats { - let taken_stats = self.stats.take(); + fn io_stats(&self, keep: bool) -> kvdb::IoStats { + let taken_stats = if keep { self.stats.peek() } else { self.stats.take() }; let mut stats = kvdb::IoStats::empty(); @@ -769,6 +820,8 @@ impl KeyValueDB for Database { stats.transactions = taken_stats.transactions; stats.start = taken_stats.start; stats.span = taken_stats.start.elapsed(); + stats.write_bytes = taken_stats.write_bytes; + stats.read_bytes = taken_stats.read_bytes; stats } @@ -1000,12 +1053,16 @@ mod tests { db.write(batch).unwrap(); - let io_stats = db.io_stats(); + let io_stats = db.io_stats(false); assert_eq!(io_stats.transactions, 1); assert_eq!(io_stats.writes, 3); + assert_eq!(io_stats.write_bytes, 18); assert_eq!(io_stats.reads, 10); + assert_eq!(io_stats.read_bytes, 30); - let new_io_stats = db.io_stats(); + let new_io_stats = db.io_stats(true); + // Since we choosed not to keep previous statistic period, + // this is expected to be totally empty. assert_eq!(new_io_stats.transactions, 0); } diff --git a/kvdb/src/io_stats.rs b/kvdb/src/io_stats.rs index 472e906d8..c7181fcbd 100644 --- a/kvdb/src/io_stats.rs +++ b/kvdb/src/io_stats.rs @@ -26,6 +26,12 @@ pub struct IoStats { pub cache_reads: u64, /// Number of write operations. pub writes: u64, + /// Number of bytes read + pub read_bytes: u64, + /// Number of bytes read from cache + pub cache_read_bytes: u64, + /// Number of bytes write + pub write_bytes: u64, /// Start of the statistic period. pub start: std::time::Instant, /// Total duration of the statistic period. @@ -40,6 +46,9 @@ impl IoStats { reads: 0, cache_reads: 0, writes: 0, + read_bytes: 0, + cache_read_bytes: 0, + write_bytes: 0, start: std::time::Instant::now(), span: std::time::Duration::default(), } diff --git a/kvdb/src/lib.rs b/kvdb/src/lib.rs index 4da6191ed..73685f132 100644 --- a/kvdb/src/lib.rs +++ b/kvdb/src/lib.rs @@ -154,7 +154,10 @@ pub trait KeyValueDB: Sync + Send + parity_util_mem::MallocSizeOf { /// default, empty statistics is returned. Also, not all kvdb implementation /// can return every statistic or configured to do so (some statistics gathering /// may impede the performance and might be off by default). - fn io_stats(&self) -> IoStats { + /// + /// `keep` argument indicates if database should erase current accumulated statistics + /// and start new gathering period at current time. + fn io_stats(&self, _keep: bool) -> IoStats { IoStats::empty() } } From 05e470e2dc10fac94d8e2cabd926763bef44fcf5 Mon Sep 17 00:00:00 2001 From: NikVolf Date: Thu, 26 Dec 2019 15:31:29 +0300 Subject: [PATCH 04/17] cargo fmt --- kvdb-rocksdb/src/lib.rs | 14 +++++++------- kvdb/src/io_stats.rs | 26 +++++++++++++++++++------- 2 files changed, 26 insertions(+), 14 deletions(-) diff --git a/kvdb-rocksdb/src/lib.rs b/kvdb-rocksdb/src/lib.rs index 1e50be1ee..c2890b467 100644 --- a/kvdb-rocksdb/src/lib.rs +++ b/kvdb-rocksdb/src/lib.rs @@ -567,12 +567,10 @@ impl Database { let cf = cfs.cf(op.col() as usize); - self.stats.add_write_bytes( - match op { - DBOp::Insert { col: _, ref key, ref value } => key.len() + value.len(), - DBOp::Delete { col: _, ref key } => key.len(), - } as u64 - ); + self.stats.add_write_bytes(match op { + DBOp::Insert { col: _, ref key, ref value } => key.len() + value.len(), + DBOp::Delete { col: _, ref key } => key.len(), + } as u64); match op { DBOp::Insert { col: _, key, value } => batch.put_cf(cf, &key, &value).map_err(other_io_err)?, @@ -1049,7 +1047,9 @@ mod tests { batch.put(1, key1, key1); batch.put(2, key1, key1); - for _ in 0..10 { db.get(0, key1).unwrap(); } + for _ in 0..10 { + db.get(0, key1).unwrap(); + } db.write(batch).unwrap(); diff --git a/kvdb/src/io_stats.rs b/kvdb/src/io_stats.rs index c7181fcbd..9babf05e2 100644 --- a/kvdb/src/io_stats.rs +++ b/kvdb/src/io_stats.rs @@ -56,41 +56,53 @@ impl IoStats { /// Average batch (transaction) size (writes per transaction) pub fn avg_batch_size(&self) -> f64 { - if self.writes == 0 { return 0.0 } + if self.writes == 0 { + return 0.0; + } self.transactions as f64 / self.writes as f64 } /// Read operations per second. pub fn reads_per_sec(&self) -> f64 { - if self.span.as_secs_f64() == 0.0 { return 0.0 } + if self.span.as_secs_f64() == 0.0 { + return 0.0; + } self.reads as f64 / self.span.as_secs_f64() } /// Write operations per second. pub fn writes_per_sec(&self) -> f64 { - if self.span.as_secs_f64() == 0.0 { return 0.0 } + if self.span.as_secs_f64() == 0.0 { + return 0.0; + } self.writes as f64 / self.span.as_secs_f64() } /// Total number of operations per second. pub fn ops_per_sec(&self) -> f64 { - if self.span.as_secs_f64() == 0.0 { return 0.0 } + if self.span.as_secs_f64() == 0.0 { + return 0.0; + } (self.writes as f64 + self.reads as f64) / self.span.as_secs_f64() } /// Transactions per second. pub fn transactions_per_sec(&self) -> f64 { - if self.span.as_secs_f64() == 0.0 { return 0.0 } + if self.span.as_secs_f64() == 0.0 { + return 0.0; + } (self.transactions as f64) / self.span.as_secs_f64() } pub fn cache_hit_ration(&self) -> f64 { - if self.reads == 0 { return 0.0 } + if self.reads == 0 { + return 0.0; + } self.cache_reads as f64 / self.reads as f64 } - } \ No newline at end of file +} From 2c0a6da030a98dae577561ddcee7369984a63fce Mon Sep 17 00:00:00 2001 From: NikVolf Date: Mon, 30 Dec 2019 19:35:04 +0300 Subject: [PATCH 05/17] address review --- kvdb-rocksdb/src/lib.rs | 87 ++++++++++++++++++++++------------------- kvdb/src/io_stats.rs | 23 +++++++---- 2 files changed, 62 insertions(+), 48 deletions(-) diff --git a/kvdb-rocksdb/src/lib.rs b/kvdb-rocksdb/src/lib.rs index c2890b467..4cbe5d479 100644 --- a/kvdb-rocksdb/src/lib.rs +++ b/kvdb-rocksdb/src/lib.rs @@ -260,62 +260,62 @@ impl DBAndColumns { struct DbStats { reads: AtomicU64, writes: AtomicU64, - write_bytes: AtomicU64, - read_bytes: AtomicU64, + bytes_written: AtomicU64, + bytes_read: AtomicU64, transactions: AtomicU64, - started: Mutex, + started: RwLock, } struct TakenDbStats { reads: u64, writes: u64, - write_bytes: u64, - read_bytes: u64, + bytes_written: u64, + bytes_read: u64, transactions: u64, - start: std::time::Instant, + started: std::time::Instant, } impl DbStats { fn new() -> Self { Self { reads: 0.into(), - read_bytes: 0.into(), + bytes_read: 0.into(), writes: 0.into(), - write_bytes: 0.into(), + bytes_written: 0.into(), transactions: 0.into(), started: std::time::Instant::now().into(), } } - fn add_reads(&self, val: u64) { + fn tally_reads(&self, val: u64) { self.reads.fetch_add(val, AtomicOrdering::Relaxed); } - fn add_read_bytes(&self, val: u64) { - self.read_bytes.fetch_add(val, AtomicOrdering::Relaxed); + fn tally_bytes_read(&self, val: u64) { + self.bytes_read.fetch_add(val, AtomicOrdering::Relaxed); } - fn add_writes(&self, val: u64) { + fn tally_writes(&self, val: u64) { self.writes.fetch_add(val, AtomicOrdering::Relaxed); } - fn add_write_bytes(&self, val: u64) { - self.write_bytes.fetch_add(val, AtomicOrdering::Relaxed); + fn tally_bytes_written(&self, val: u64) { + self.bytes_written.fetch_add(val, AtomicOrdering::Relaxed); } - fn add_transactions(&self, val: u64) { + fn tally_transactions(&self, val: u64) { self.transactions.fetch_add(val, AtomicOrdering::Relaxed); } fn take(&self) -> TakenDbStats { - let mut started_lock = self.started.lock(); + let mut started_lock = self.started.write(); let stats = TakenDbStats { reads: self.reads.swap(0, AtomicOrdering::Relaxed), writes: self.writes.swap(0, AtomicOrdering::Relaxed), - write_bytes: self.write_bytes.swap(0, AtomicOrdering::Relaxed), - read_bytes: self.read_bytes.swap(0, AtomicOrdering::Relaxed), + bytes_written: self.bytes_written.swap(0, AtomicOrdering::Relaxed), + bytes_read: self.bytes_read.swap(0, AtomicOrdering::Relaxed), transactions: self.transactions.swap(0, AtomicOrdering::Relaxed), - start: *started_lock, + started: *started_lock, }; *started_lock = std::time::Instant::now(); @@ -324,15 +324,15 @@ impl DbStats { } fn peek(&self) -> TakenDbStats { - let started_lock = self.started.lock(); + let started_lock = self.started.read(); TakenDbStats { reads: self.reads.load(AtomicOrdering::Relaxed), writes: self.writes.load(AtomicOrdering::Relaxed), - write_bytes: self.write_bytes.load(AtomicOrdering::Relaxed), - read_bytes: self.read_bytes.load(AtomicOrdering::Relaxed), + bytes_written: self.bytes_written.load(AtomicOrdering::Relaxed), + bytes_read: self.bytes_read.load(AtomicOrdering::Relaxed), transactions: self.transactions.load(AtomicOrdering::Relaxed), - start: *started_lock, + started: *started_lock, } } } @@ -558,8 +558,10 @@ impl Database { let mut batch = WriteBatch::default(); let ops = tr.ops; - self.stats.add_writes(ops.len() as u64); - self.stats.add_transactions(1); + self.stats.tally_writes(ops.len() as u64); + self.stats.tally_transactions(1); + + let mut stats_total_bytes = 0; for op in ops { // remove any buffered operation for this key @@ -567,16 +569,19 @@ impl Database { let cf = cfs.cf(op.col() as usize); - self.stats.add_write_bytes(match op { - DBOp::Insert { col: _, ref key, ref value } => key.len() + value.len(), - DBOp::Delete { col: _, ref key } => key.len(), - } as u64); - match op { - DBOp::Insert { col: _, key, value } => batch.put_cf(cf, &key, &value).map_err(other_io_err)?, - DBOp::Delete { col: _, key } => batch.delete_cf(cf, &key).map_err(other_io_err)?, + DBOp::Insert { col: _, key, value } => { + stats_total_bytes += key.len() + value.len(); + batch.put_cf(cf, &key, &value).map_err(other_io_err)? + }, + DBOp::Delete { col: _, key } => { + // We count deletes as writes. + stats_total_bytes += key.len(); + batch.delete_cf(cf, &key).map_err(other_io_err)? + } }; } + self.stats.tally_bytes_written(stats_total_bytes as u64); check_for_corruption(&self.path, cfs.db.write_opt(batch, &self.write_opts)) } @@ -586,7 +591,7 @@ impl Database { /// Get value by key. pub fn get(&self, col: u32, key: &[u8]) -> io::Result> { - self.stats.add_reads(1); + self.stats.tally_reads(1); match *self.db.read() { Some(ref cfs) => { let overlay = &self.overlay.read()[col as usize]; @@ -606,8 +611,8 @@ impl Database { .map_err(other_io_err); match aquired_val { - Ok(Some(ref v)) => self.stats.add_read_bytes((key.len() + v.len()) as u64), - Ok(None) => self.stats.add_read_bytes(key.len() as u64), + Ok(Some(ref v)) => self.stats.tally_bytes_read((key.len() + v.len()) as u64), + Ok(None) => self.stats.tally_bytes_read(key.len() as u64), _ => {} }; @@ -816,10 +821,10 @@ impl KeyValueDB for Database { stats.reads = taken_stats.reads; stats.writes = taken_stats.writes; stats.transactions = taken_stats.transactions; - stats.start = taken_stats.start; - stats.span = taken_stats.start.elapsed(); - stats.write_bytes = taken_stats.write_bytes; - stats.read_bytes = taken_stats.read_bytes; + stats.started = taken_stats.started; + stats.span = taken_stats.started.elapsed(); + stats.bytes_written = taken_stats.bytes_written; + stats.bytes_read = taken_stats.bytes_read; stats } @@ -1056,9 +1061,9 @@ mod tests { let io_stats = db.io_stats(false); assert_eq!(io_stats.transactions, 1); assert_eq!(io_stats.writes, 3); - assert_eq!(io_stats.write_bytes, 18); + assert_eq!(io_stats.bytes_written, 18); assert_eq!(io_stats.reads, 10); - assert_eq!(io_stats.read_bytes, 30); + assert_eq!(io_stats.bytes_read, 30); let new_io_stats = db.io_stats(true); // Since we choosed not to keep previous statistic period, diff --git a/kvdb/src/io_stats.rs b/kvdb/src/io_stats.rs index 9babf05e2..4bea619e2 100644 --- a/kvdb/src/io_stats.rs +++ b/kvdb/src/io_stats.rs @@ -27,13 +27,13 @@ pub struct IoStats { /// Number of write operations. pub writes: u64, /// Number of bytes read - pub read_bytes: u64, + pub bytes_read: u64, /// Number of bytes read from cache pub cache_read_bytes: u64, /// Number of bytes write - pub write_bytes: u64, + pub bytes_written: u64, /// Start of the statistic period. - pub start: std::time::Instant, + pub started: std::time::Instant, /// Total duration of the statistic period. pub span: std::time::Duration, } @@ -46,10 +46,10 @@ impl IoStats { reads: 0, cache_reads: 0, writes: 0, - read_bytes: 0, + bytes_read: 0, cache_read_bytes: 0, - write_bytes: 0, - start: std::time::Instant::now(), + bytes_written: 0, + started: std::time::Instant::now(), span: std::time::Duration::default(), } } @@ -98,7 +98,16 @@ impl IoStats { (self.transactions as f64) / self.span.as_secs_f64() } - pub fn cache_hit_ration(&self) -> f64 { + pub fn avg_transaction_size(&self) -> f64 { + if self.transactions == 0 { + return 0.0; + } + + self.bytes_written as f64 / self.transactions as f64 + + } + + pub fn cache_hit_ratio(&self) -> f64 { if self.reads == 0 { return 0.0; } From 30b934db5801567a08218dc52fb08af64a3ce119 Mon Sep 17 00:00:00 2001 From: NikVolf Date: Mon, 30 Dec 2019 21:07:06 +0300 Subject: [PATCH 06/17] cargo fmt --- kvdb-rocksdb/src/lib.rs | 2 +- kvdb/src/io_stats.rs | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/kvdb-rocksdb/src/lib.rs b/kvdb-rocksdb/src/lib.rs index 4cbe5d479..dd056eb0c 100644 --- a/kvdb-rocksdb/src/lib.rs +++ b/kvdb-rocksdb/src/lib.rs @@ -573,7 +573,7 @@ impl Database { DBOp::Insert { col: _, key, value } => { stats_total_bytes += key.len() + value.len(); batch.put_cf(cf, &key, &value).map_err(other_io_err)? - }, + } DBOp::Delete { col: _, key } => { // We count deletes as writes. stats_total_bytes += key.len(); diff --git a/kvdb/src/io_stats.rs b/kvdb/src/io_stats.rs index 4bea619e2..7c78d09dd 100644 --- a/kvdb/src/io_stats.rs +++ b/kvdb/src/io_stats.rs @@ -104,7 +104,6 @@ impl IoStats { } self.bytes_written as f64 / self.transactions as f64 - } pub fn cache_hit_ratio(&self) -> f64 { From a564d9356ce9d5a369a9f199ff4656606d53bf9b Mon Sep 17 00:00:00 2001 From: NikVolf Date: Tue, 31 Dec 2019 18:58:49 +0300 Subject: [PATCH 07/17] tally_read inside --- kvdb-rocksdb/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kvdb-rocksdb/src/lib.rs b/kvdb-rocksdb/src/lib.rs index dd056eb0c..27478af30 100644 --- a/kvdb-rocksdb/src/lib.rs +++ b/kvdb-rocksdb/src/lib.rs @@ -591,9 +591,9 @@ impl Database { /// Get value by key. pub fn get(&self, col: u32, key: &[u8]) -> io::Result> { - self.stats.tally_reads(1); match *self.db.read() { Some(ref cfs) => { + self.stats.tally_reads(1); let overlay = &self.overlay.read()[col as usize]; match overlay.get(key) { Some(&KeyState::Insert(ref value)) => Ok(Some(value.clone())), From 963bce350a78e572e8612e5ab660220bc16bce43 Mon Sep 17 00:00:00 2001 From: NikVolf Date: Tue, 31 Dec 2019 19:02:18 +0300 Subject: [PATCH 08/17] add test for delete --- kvdb-rocksdb/src/lib.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/kvdb-rocksdb/src/lib.rs b/kvdb-rocksdb/src/lib.rs index 27478af30..b6bd99aec 100644 --- a/kvdb-rocksdb/src/lib.rs +++ b/kvdb-rocksdb/src/lib.rs @@ -1069,6 +1069,19 @@ mod tests { // Since we choosed not to keep previous statistic period, // this is expected to be totally empty. assert_eq!(new_io_stats.transactions, 0); + + let mut batch = db.transaction(); + batch.delete(0, key1); + batch.delete(1, key1); + batch.delete(2, key1); + + // transaction is not commited yet + assert_eq!(db.io_stats(false).writes, 0); + + db.write(batch).unwrap(); + // now it is, and delete is counted as write + assert_eq!(db.io_stats(false).writes, 3); + } #[test] From 66451e1cd6d18862ad9d9b1cab00012a2bcef7bd Mon Sep 17 00:00:00 2001 From: NikVolf Date: Tue, 31 Dec 2019 22:02:48 +0300 Subject: [PATCH 09/17] cargo fmt --- kvdb-rocksdb/src/lib.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/kvdb-rocksdb/src/lib.rs b/kvdb-rocksdb/src/lib.rs index b6bd99aec..9d0ccaa89 100644 --- a/kvdb-rocksdb/src/lib.rs +++ b/kvdb-rocksdb/src/lib.rs @@ -1081,7 +1081,6 @@ mod tests { db.write(batch).unwrap(); // now it is, and delete is counted as write assert_eq!(db.io_stats(false).writes, 3); - } #[test] From 3d67aeb5e786d2a466a58fae5a42856a3af40320 Mon Sep 17 00:00:00 2001 From: NikVolf Date: Wed, 1 Jan 2020 12:37:53 +0300 Subject: [PATCH 10/17] refactor completely --- kvdb-rocksdb/src/lib.rs | 105 ++++----------------------- kvdb-rocksdb/src/stats.rs | 144 ++++++++++++++++++++++++++++++++++++++ kvdb/src/io_stats.rs | 8 +++ kvdb/src/lib.rs | 4 +- 4 files changed, 169 insertions(+), 92 deletions(-) create mode 100644 kvdb-rocksdb/src/stats.rs diff --git a/kvdb-rocksdb/src/lib.rs b/kvdb-rocksdb/src/lib.rs index 9d0ccaa89..c8c6dcad4 100644 --- a/kvdb-rocksdb/src/lib.rs +++ b/kvdb-rocksdb/src/lib.rs @@ -15,9 +15,9 @@ // along with Parity. If not, see . mod iter; +mod stats; use std::{cmp, collections::HashMap, convert::identity, error, fs, io, mem, path::Path, result}; -use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering}; use parity_util_mem::MallocSizeOf; use parking_lot::{Mutex, MutexGuard, RwLock}; @@ -257,86 +257,6 @@ impl DBAndColumns { } } -struct DbStats { - reads: AtomicU64, - writes: AtomicU64, - bytes_written: AtomicU64, - bytes_read: AtomicU64, - transactions: AtomicU64, - started: RwLock, -} - -struct TakenDbStats { - reads: u64, - writes: u64, - bytes_written: u64, - bytes_read: u64, - transactions: u64, - started: std::time::Instant, -} - -impl DbStats { - fn new() -> Self { - Self { - reads: 0.into(), - bytes_read: 0.into(), - writes: 0.into(), - bytes_written: 0.into(), - transactions: 0.into(), - started: std::time::Instant::now().into(), - } - } - - fn tally_reads(&self, val: u64) { - self.reads.fetch_add(val, AtomicOrdering::Relaxed); - } - - fn tally_bytes_read(&self, val: u64) { - self.bytes_read.fetch_add(val, AtomicOrdering::Relaxed); - } - - fn tally_writes(&self, val: u64) { - self.writes.fetch_add(val, AtomicOrdering::Relaxed); - } - - fn tally_bytes_written(&self, val: u64) { - self.bytes_written.fetch_add(val, AtomicOrdering::Relaxed); - } - - fn tally_transactions(&self, val: u64) { - self.transactions.fetch_add(val, AtomicOrdering::Relaxed); - } - - fn take(&self) -> TakenDbStats { - let mut started_lock = self.started.write(); - let stats = TakenDbStats { - reads: self.reads.swap(0, AtomicOrdering::Relaxed), - writes: self.writes.swap(0, AtomicOrdering::Relaxed), - bytes_written: self.bytes_written.swap(0, AtomicOrdering::Relaxed), - bytes_read: self.bytes_read.swap(0, AtomicOrdering::Relaxed), - transactions: self.transactions.swap(0, AtomicOrdering::Relaxed), - started: *started_lock, - }; - - *started_lock = std::time::Instant::now(); - - stats - } - - fn peek(&self) -> TakenDbStats { - let started_lock = self.started.read(); - - TakenDbStats { - reads: self.reads.load(AtomicOrdering::Relaxed), - writes: self.writes.load(AtomicOrdering::Relaxed), - bytes_written: self.bytes_written.load(AtomicOrdering::Relaxed), - bytes_read: self.bytes_read.load(AtomicOrdering::Relaxed), - transactions: self.transactions.load(AtomicOrdering::Relaxed), - started: *started_lock, - } - } -} - /// Key-Value database. #[derive(MallocSizeOf)] pub struct Database { @@ -352,7 +272,8 @@ pub struct Database { block_opts: BlockBasedOptions, // Dirty values added with `write_buffered`. Cleaned on `flush`. overlay: RwLock>>, - stats: DbStats, + #[ignore_malloc_size_of = "insignificant"] + stats: stats::RunningDbStats, // Values currently being flushed. Cleared when `flush` completes. flushing: RwLock>>, // Prevents concurrent flushes. @@ -485,7 +406,7 @@ impl Database { read_opts, write_opts, block_opts, - stats: DbStats::new(), + stats: stats::RunningDbStats::new(), }) } @@ -813,18 +734,22 @@ impl KeyValueDB for Database { Database::restore(self, new_db) } - fn io_stats(&self, keep: bool) -> kvdb::IoStats { - let taken_stats = if keep { self.stats.peek() } else { self.stats.take() }; + fn io_stats(&self, kind: kvdb::IoStatsKind) -> kvdb::IoStats { + let taken_stats = match kind { + kvdb::IoStatsKind::Overall => self.stats.overall(), + kvdb::IoStatsKind::SincePrevious => self.stats.since_previous(), + }; let mut stats = kvdb::IoStats::empty(); - stats.reads = taken_stats.reads; - stats.writes = taken_stats.writes; - stats.transactions = taken_stats.transactions; + stats.reads = taken_stats.raw.reads; + stats.writes = taken_stats.raw.writes; + stats.transactions = taken_stats.raw.transactions; + stats.bytes_written = taken_stats.raw.bytes_written; + stats.bytes_read = taken_stats.raw.bytes_read; + stats.started = taken_stats.started; stats.span = taken_stats.started.elapsed(); - stats.bytes_written = taken_stats.bytes_written; - stats.bytes_read = taken_stats.bytes_read; stats } diff --git a/kvdb-rocksdb/src/stats.rs b/kvdb-rocksdb/src/stats.rs new file mode 100644 index 000000000..882892b54 --- /dev/null +++ b/kvdb-rocksdb/src/stats.rs @@ -0,0 +1,144 @@ +// Copyright 2015-2020 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +use parking_lot::RwLock; +use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering}; +use std::time::Instant; + +pub struct RawDbStats { + pub reads: u64, + pub writes: u64, + pub bytes_written: u64, + pub bytes_read: u64, + pub transactions: u64, +} + +impl RawDbStats { + fn combine(&self, other: &RawDbStats) -> Self { + RawDbStats { + reads: self.reads + other.reads, + writes: self.writes + other.writes, + bytes_written: self.bytes_written + other.bytes_written, + bytes_read: self.bytes_read + other.bytes_written, + transactions: self.transactions + other.transactions, + } + } +} + +struct OverallDbStats { + stats: RawDbStats, + last_taken: Instant, + started: Instant, +} + +impl OverallDbStats { + fn new() -> Self { + OverallDbStats { + stats: RawDbStats { reads: 0, writes: 0, bytes_written: 0, bytes_read: 0, transactions: 0 }, + last_taken: Instant::now(), + started: Instant::now(), + } + } +} + +pub struct RunningDbStats { + reads: AtomicU64, + writes: AtomicU64, + bytes_written: AtomicU64, + bytes_read: AtomicU64, + transactions: AtomicU64, + overall: RwLock, +} + +pub struct TakenDbStats { + pub raw: RawDbStats, + pub started: std::time::Instant, +} + +impl RunningDbStats { + pub fn new() -> Self { + Self { + reads: 0.into(), + bytes_read: 0.into(), + writes: 0.into(), + bytes_written: 0.into(), + transactions: 0.into(), + overall: OverallDbStats::new().into(), + } + } + + pub fn tally_reads(&self, val: u64) { + self.reads.fetch_add(val, AtomicOrdering::Relaxed); + } + + pub fn tally_bytes_read(&self, val: u64) { + self.bytes_read.fetch_add(val, AtomicOrdering::Relaxed); + } + + pub fn tally_writes(&self, val: u64) { + self.writes.fetch_add(val, AtomicOrdering::Relaxed); + } + + pub fn tally_bytes_written(&self, val: u64) { + self.bytes_written.fetch_add(val, AtomicOrdering::Relaxed); + } + + pub fn tally_transactions(&self, val: u64) { + self.transactions.fetch_add(val, AtomicOrdering::Relaxed); + } + + fn take_current(&self) -> RawDbStats { + RawDbStats { + reads: self.reads.swap(0, AtomicOrdering::Relaxed), + writes: self.writes.swap(0, AtomicOrdering::Relaxed), + bytes_written: self.bytes_written.swap(0, AtomicOrdering::Relaxed), + bytes_read: self.bytes_read.swap(0, AtomicOrdering::Relaxed), + transactions: self.transactions.swap(0, AtomicOrdering::Relaxed), + } + } + + fn peek_current(&self) -> RawDbStats { + RawDbStats { + reads: self.reads.load(AtomicOrdering::Relaxed), + writes: self.writes.load(AtomicOrdering::Relaxed), + bytes_written: self.bytes_written.load(AtomicOrdering::Relaxed), + bytes_read: self.bytes_read.load(AtomicOrdering::Relaxed), + transactions: self.transactions.load(AtomicOrdering::Relaxed), + } + } + + pub fn since_previous(&self) -> TakenDbStats { + let mut overall_lock = self.overall.write(); + + let current = self.take_current(); + + overall_lock.stats = overall_lock.stats.combine(¤t); + + let stats = TakenDbStats { raw: current, started: overall_lock.last_taken }; + + overall_lock.last_taken = std::time::Instant::now(); + + stats + } + + pub fn overall(&self) -> TakenDbStats { + let overall_lock = self.overall.read(); + + let current = self.peek_current(); + + TakenDbStats { raw: overall_lock.stats.combine(¤t), started: overall_lock.started } + } +} diff --git a/kvdb/src/io_stats.rs b/kvdb/src/io_stats.rs index 7c78d09dd..f9f1d3a6a 100644 --- a/kvdb/src/io_stats.rs +++ b/kvdb/src/io_stats.rs @@ -16,6 +16,14 @@ //! Generic statistics for key-value databases +/// Statistic kind to query. +pub enum Kind { + /// Overall statistics since start. + Overall, + /// Statistics since previous query. + SincePrevious, +} + /// Statistic for the `span` period pub struct IoStats { /// Number of transaction. diff --git a/kvdb/src/lib.rs b/kvdb/src/lib.rs index 73685f132..e35ff98e1 100644 --- a/kvdb/src/lib.rs +++ b/kvdb/src/lib.rs @@ -32,7 +32,7 @@ pub type DBValue = Vec; /// Database keys. pub type DBKey = SmallVec<[u8; 32]>; -pub use io_stats::IoStats; +pub use io_stats::{IoStats, Kind as IoStatsKind}; /// Write transaction. Batches a sequence of put/delete operations for efficiency. #[derive(Default, Clone, PartialEq)] @@ -157,7 +157,7 @@ pub trait KeyValueDB: Sync + Send + parity_util_mem::MallocSizeOf { /// /// `keep` argument indicates if database should erase current accumulated statistics /// and start new gathering period at current time. - fn io_stats(&self, _keep: bool) -> IoStats { + fn io_stats(&self, _kind: IoStatsKind) -> IoStats { IoStats::empty() } } From 891b16a0aa965e1c324b5940d82f9ba0152608c5 Mon Sep 17 00:00:00 2001 From: NikVolf Date: Wed, 1 Jan 2020 12:41:04 +0300 Subject: [PATCH 11/17] alter tests --- kvdb-rocksdb/src/lib.rs | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/kvdb-rocksdb/src/lib.rs b/kvdb-rocksdb/src/lib.rs index c8c6dcad4..a41798ecc 100644 --- a/kvdb-rocksdb/src/lib.rs +++ b/kvdb-rocksdb/src/lib.rs @@ -967,6 +967,8 @@ mod tests { #[test] fn stats() { + use kvdb::IoStatsKind; + let tempdir = TempDir::new("").unwrap(); let config = DatabaseConfig::with_columns(3); let db = Database::open(&config, tempdir.path().to_str().unwrap()).unwrap(); @@ -983,29 +985,33 @@ mod tests { db.write(batch).unwrap(); - let io_stats = db.io_stats(false); + let io_stats = db.io_stats(IoStatsKind::SincePrevious); assert_eq!(io_stats.transactions, 1); assert_eq!(io_stats.writes, 3); assert_eq!(io_stats.bytes_written, 18); assert_eq!(io_stats.reads, 10); assert_eq!(io_stats.bytes_read, 30); - let new_io_stats = db.io_stats(true); - // Since we choosed not to keep previous statistic period, + let new_io_stats = db.io_stats(IoStatsKind::SincePrevious); + // Since we taken previous statistic period, // this is expected to be totally empty. assert_eq!(new_io_stats.transactions, 0); + // but the overall should be there + let new_io_stats = db.io_stats(IoStatsKind::Overall); + assert_eq!(new_io_stats.bytes_written, 18); + let mut batch = db.transaction(); batch.delete(0, key1); batch.delete(1, key1); batch.delete(2, key1); // transaction is not commited yet - assert_eq!(db.io_stats(false).writes, 0); + assert_eq!(db.io_stats(IoStatsKind::SincePrevious).writes, 0); db.write(batch).unwrap(); // now it is, and delete is counted as write - assert_eq!(db.io_stats(false).writes, 3); + assert_eq!(db.io_stats(IoStatsKind::SincePrevious).writes, 3); } #[test] From 3a0d3c50b5891454043c9fc768e541ad3e17f26c Mon Sep 17 00:00:00 2001 From: Nikolay Volf Date: Wed, 1 Jan 2020 14:58:27 +0300 Subject: [PATCH 12/17] Update kvdb-rocksdb/src/stats.rs Co-Authored-By: Andronik Ordian --- kvdb-rocksdb/src/stats.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kvdb-rocksdb/src/stats.rs b/kvdb-rocksdb/src/stats.rs index 882892b54..7167bb119 100644 --- a/kvdb-rocksdb/src/stats.rs +++ b/kvdb-rocksdb/src/stats.rs @@ -129,7 +129,7 @@ impl RunningDbStats { let stats = TakenDbStats { raw: current, started: overall_lock.last_taken }; - overall_lock.last_taken = std::time::Instant::now(); + overall_lock.last_taken = Instant::now(); stats } From a81d487cd96adefeb6fdb3eba348a98f676cf232 Mon Sep 17 00:00:00 2001 From: Nikolay Volf Date: Wed, 1 Jan 2020 14:58:34 +0300 Subject: [PATCH 13/17] Update kvdb-rocksdb/src/stats.rs Co-Authored-By: Andronik Ordian --- kvdb-rocksdb/src/stats.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kvdb-rocksdb/src/stats.rs b/kvdb-rocksdb/src/stats.rs index 7167bb119..039dd3a88 100644 --- a/kvdb-rocksdb/src/stats.rs +++ b/kvdb-rocksdb/src/stats.rs @@ -65,7 +65,7 @@ pub struct RunningDbStats { pub struct TakenDbStats { pub raw: RawDbStats, - pub started: std::time::Instant, + pub started: Instant, } impl RunningDbStats { From 76c28af24a49d434c4c6c3c3f904c4deca830afc Mon Sep 17 00:00:00 2001 From: NikVolf Date: Wed, 1 Jan 2020 15:07:46 +0300 Subject: [PATCH 14/17] remove argument description since it is now documented enum --- kvdb/src/lib.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/kvdb/src/lib.rs b/kvdb/src/lib.rs index e35ff98e1..2bcb1b39c 100644 --- a/kvdb/src/lib.rs +++ b/kvdb/src/lib.rs @@ -154,9 +154,6 @@ pub trait KeyValueDB: Sync + Send + parity_util_mem::MallocSizeOf { /// default, empty statistics is returned. Also, not all kvdb implementation /// can return every statistic or configured to do so (some statistics gathering /// may impede the performance and might be off by default). - /// - /// `keep` argument indicates if database should erase current accumulated statistics - /// and start new gathering period at current time. fn io_stats(&self, _kind: IoStatsKind) -> IoStats { IoStats::empty() } From dff6ba5c5eb5fcb8ea5aaca01e37a1b231947007 Mon Sep 17 00:00:00 2001 From: NikVolf Date: Wed, 1 Jan 2020 19:42:09 +0300 Subject: [PATCH 15/17] add stats to write_buffered --- kvdb-rocksdb/src/lib.rs | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/kvdb-rocksdb/src/lib.rs b/kvdb-rocksdb/src/lib.rs index a41798ecc..bfa4ccf2c 100644 --- a/kvdb-rocksdb/src/lib.rs +++ b/kvdb-rocksdb/src/lib.rs @@ -432,20 +432,32 @@ impl Database { match *self.db.read() { Some(ref cfs) => { let mut batch = WriteBatch::default(); + let mut ops: usize = 0; + let mut bytes: usize = 0; mem::swap(&mut *self.overlay.write(), &mut *self.flushing.write()); { for (c, column) in self.flushing.read().iter().enumerate() { + ops += column.len(); for (key, state) in column.iter() { let cf = cfs.cf(c); match *state { - KeyState::Delete => batch.delete_cf(cf, key).map_err(other_io_err)?, - KeyState::Insert(ref value) => batch.put_cf(cf, key, value).map_err(other_io_err)?, + KeyState::Delete => { + bytes += key.len(); + batch.delete_cf(cf, key).map_err(other_io_err)? + }, + KeyState::Insert(ref value) => { + bytes += key.len() + value.len(); + batch.put_cf(cf, key, value).map_err(other_io_err)? + } }; } } } check_for_corruption(&self.path, cfs.db.write_opt(batch, &self.write_opts))?; + self.stats.tally_transactions(1); + self.stats.tally_writes(ops as u64); + self.stats.tally_bytes_written(bytes as u64); for column in self.flushing.write().iter_mut() { column.clear(); From 289b2c1aca87a9db632c7900f43858738b5991fc Mon Sep 17 00:00:00 2001 From: NikVolf Date: Wed, 1 Jan 2020 20:20:39 +0300 Subject: [PATCH 16/17] enhance io api --- kvdb/src/io_stats.rs | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/kvdb/src/io_stats.rs b/kvdb/src/io_stats.rs index f9f1d3a6a..3645b2f36 100644 --- a/kvdb/src/io_stats.rs +++ b/kvdb/src/io_stats.rs @@ -25,6 +25,7 @@ pub enum Kind { } /// Statistic for the `span` period +#[derive(Debug, Clone)] pub struct IoStats { /// Number of transaction. pub transactions: u64, @@ -79,6 +80,14 @@ impl IoStats { self.reads as f64 / self.span.as_secs_f64() } + pub fn byte_reads_per_sec(&self) -> f64 { + if self.span.as_secs_f64() == 0.0 { + return 0.0 + } + + self.bytes_read as f64 / self.span.as_secs_f64() + } + /// Write operations per second. pub fn writes_per_sec(&self) -> f64 { if self.span.as_secs_f64() == 0.0 { @@ -88,6 +97,15 @@ impl IoStats { self.writes as f64 / self.span.as_secs_f64() } + pub fn byte_writes_per_sec(&self) -> f64 { + if self.span.as_secs_f64() == 0.0 { + return 0.0 + } + + self.bytes_written as f64 / self.span.as_secs_f64() + } + + /// Total number of operations per second. pub fn ops_per_sec(&self) -> f64 { if self.span.as_secs_f64() == 0.0 { From 226158c551c1b5bf65dbdece83343647a681afa8 Mon Sep 17 00:00:00 2001 From: NikVolf Date: Wed, 1 Jan 2020 20:21:13 +0300 Subject: [PATCH 17/17] cargo fmt --- kvdb-rocksdb/src/lib.rs | 2 +- kvdb/src/io_stats.rs | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/kvdb-rocksdb/src/lib.rs b/kvdb-rocksdb/src/lib.rs index bfa4ccf2c..cb09432dc 100644 --- a/kvdb-rocksdb/src/lib.rs +++ b/kvdb-rocksdb/src/lib.rs @@ -444,7 +444,7 @@ impl Database { KeyState::Delete => { bytes += key.len(); batch.delete_cf(cf, key).map_err(other_io_err)? - }, + } KeyState::Insert(ref value) => { bytes += key.len() + value.len(); batch.put_cf(cf, key, value).map_err(other_io_err)? diff --git a/kvdb/src/io_stats.rs b/kvdb/src/io_stats.rs index 3645b2f36..d0de5ce36 100644 --- a/kvdb/src/io_stats.rs +++ b/kvdb/src/io_stats.rs @@ -82,7 +82,7 @@ impl IoStats { pub fn byte_reads_per_sec(&self) -> f64 { if self.span.as_secs_f64() == 0.0 { - return 0.0 + return 0.0; } self.bytes_read as f64 / self.span.as_secs_f64() @@ -99,13 +99,12 @@ impl IoStats { pub fn byte_writes_per_sec(&self) -> f64 { if self.span.as_secs_f64() == 0.0 { - return 0.0 + return 0.0; } self.bytes_written as f64 / self.span.as_secs_f64() } - /// Total number of operations per second. pub fn ops_per_sec(&self) -> f64 { if self.span.as_secs_f64() == 0.0 {