From 215a28d690fc35fcb67346624a1cb4bf49c1cd7d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Tue, 4 Dec 2018 11:50:31 +0100 Subject: [PATCH 01/37] Switch from `parity-rocksdb` to upstream `rust-rocksdb` --- kvdb-rocksdb/Cargo.toml | 2 +- kvdb-rocksdb/src/lib.rs | 193 +++++++++++++++++++++++----------------- 2 files changed, 110 insertions(+), 85 deletions(-) diff --git a/kvdb-rocksdb/Cargo.toml b/kvdb-rocksdb/Cargo.toml index cda3cc24b..92b6d20e0 100644 --- a/kvdb-rocksdb/Cargo.toml +++ b/kvdb-rocksdb/Cargo.toml @@ -16,7 +16,7 @@ log = "0.4.8" num_cpus = "1.10.1" parking_lot = "0.9.0" regex = "1.3.1" -parity-rocksdb = "0.5.1" +rocksdb = "0.12.4" [dev-dependencies] tempdir = "0.3.7" diff --git a/kvdb-rocksdb/src/lib.rs b/kvdb-rocksdb/src/lib.rs index 7f137953c..e7f7d4446 100644 --- a/kvdb-rocksdb/src/lib.rs +++ b/kvdb-rocksdb/src/lib.rs @@ -20,9 +20,10 @@ use std::{ }; use parking_lot::{Mutex, MutexGuard, RwLock}; -use parity_rocksdb::{ - DB, Writable, WriteBatch, WriteOptions, IteratorMode, DBIterator, - Options, BlockBasedOptions, Direction, Cache, Column, ReadOptions +use rocksdb::{ + DB, WriteBatch, WriteOptions, IteratorMode, DBIterator, + Options, BlockBasedOptions, Direction, ReadOptions, ColumnFamily, + Error }; use interleaved_ordered::{interleave_ordered, InterleaveOrdered}; @@ -193,7 +194,7 @@ impl Default for DatabaseConfig { // pub struct DatabaseIterator<'a> { iter: InterleaveOrdered<::std::vec::IntoIter<(Box<[u8]>, Box<[u8]>)>, DBIterator>, - _marker: PhantomData<&'a Database>, + _marker: PhantomData<&'a ()>, } impl<'a> Iterator for DatabaseIterator<'a> { @@ -206,38 +207,50 @@ impl<'a> Iterator for DatabaseIterator<'a> { struct DBAndColumns { db: DB, - cfs: Vec, + cfs: Vec>, } // get column family configuration from database config. fn col_config(config: &DatabaseConfig, block_opts: &BlockBasedOptions) -> io::Result { - let mut opts = Options::new(); - - opts.set_parsed_options("level_compaction_dynamic_level_bytes=true").map_err(other_io_err)?; + let mut opts = Options::default(); opts.set_block_based_table_factory(block_opts); - opts.set_parsed_options( - &format!("block_based_table_factory={{{};{}}}", - "cache_index_and_filter_blocks=true", - "pin_l0_filter_and_index_blocks_in_cache=true")).map_err(other_io_err)?; - - opts.optimize_level_style_compaction(config.memory_budget_per_col() as i32); + opts.optimize_level_style_compaction(config.memory_budget_per_col()); opts.set_target_file_size_base(config.compaction.initial_file_size); - opts.set_parsed_options("compression_per_level=").map_err(other_io_err)?; - Ok(opts) } +/// Utility structure that makes the given type implement `Send + Sync`. +/// YOU NEED TO BE SURE WHAT YOU ARE DOING! +struct MakeSendSync(T); + +unsafe impl Send for MakeSendSync {} +unsafe impl Sync for MakeSendSync {} + +impl ::std::ops::Deref for MakeSendSync { + type Target = T; + + fn deref(&self) -> &T { + &self.0 + } +} + +impl From for MakeSendSync { + fn from(data: T) -> MakeSendSync { + MakeSendSync(data) + } +} + /// Key-Value database. pub struct Database { db: RwLock>, config: DatabaseConfig, - write_opts: WriteOptions, - read_opts: ReadOptions, - block_opts: BlockBasedOptions, path: String, + write_opts: MakeSendSync, + read_opts: MakeSendSync, + block_opts: MakeSendSync, // Dirty values added with `write_buffered`. Cleaned on `flush`. overlay: RwLock, KeyState>>>, // Values currently being flushed. Cleared when `flush` completes. @@ -248,9 +261,12 @@ pub struct Database { } #[inline] -fn check_for_corruption>(path: P, res: result::Result) -> io::Result { +fn check_for_corruption>( + path: P, + res: result::Result +) -> io::Result { if let Err(ref s) = res { - if s.starts_with("Corruption:") { + if is_corrupted(s) { warn!("DB corrupted: {}. Repair will be triggered on next restart", s); let _ = fs::File::create(path.as_ref().join(Database::CORRUPTION_FILE_NAME)); } @@ -259,8 +275,26 @@ fn check_for_corruption>(path: P, res: result::Result bool { - s.starts_with("Corruption:") || s.starts_with("Invalid argument: You have to open all column families") +fn is_corrupted(err: &Error) -> bool { + err.as_ref().starts_with("Corruption:") + || err.as_ref().starts_with("Invalid argument: You have to open all column families") +} + +/// Generate the options for RocksDB, based on the given `DatabaseConfig`. +fn generate_options(config: &DatabaseConfig) -> Options { + let mut opts = Options::default(); + + //TODO: rate_limiter_bytes_per_sec={} was removed + + opts.set_use_fsync(false); + opts.create_if_missing(true); + opts.set_max_open_files(config.max_open_files); + opts.set_bytes_per_sync(1048576); + //TODO: keep_log_file_num=1 was removed + opts.set_write_buffer_size(config.memory_budget_per_col() / 2); + opts.increase_parallelism(cmp::max(1, ::num_cpus::get() as i32 / 2)); + + opts } impl Database { @@ -273,35 +307,18 @@ impl Database { /// Open database file. Creates if it does not exist. pub fn open(config: &DatabaseConfig, path: &str) -> io::Result { - let mut opts = Options::new(); - - if let Some(rate_limit) = config.compaction.write_rate_limit { - opts.set_parsed_options(&format!("rate_limiter_bytes_per_sec={}", rate_limit)).map_err(other_io_err)?; - } - opts.set_use_fsync(false); - opts.create_if_missing(true); - opts.set_max_open_files(config.max_open_files); - opts.set_parsed_options("keep_log_file_num=1").map_err(other_io_err)?; - opts.set_parsed_options("bytes_per_sync=1048576").map_err(other_io_err)?; - opts.set_db_write_buffer_size(config.memory_budget_per_col() / 2); - opts.increase_parallelism(cmp::max(1, ::num_cpus::get() as i32 / 2)); - - let mut block_opts = BlockBasedOptions::new(); - - { - block_opts.set_block_size(config.compaction.block_size); - // Set cache size as recommended by - // https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning#block-cache-size - let cache_size = config.memory_budget() / 3; - let cache = Cache::new(cache_size); - block_opts.set_cache(cache); - } + let mut block_opts = BlockBasedOptions::default(); + block_opts.set_block_size(config.compaction.block_size); + // Set cache size as recommended by + // https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning#block-cache-size + let cache_size = config.memory_budget() / 3; + block_opts.set_lru_cache(cache_size); // attempt database repair if it has been previously marked as corrupted let db_corrupted = Path::new(path).join(Database::CORRUPTION_FILE_NAME); if db_corrupted.exists() { warn!("DB has been previously marked as corrupted, attempting repair"); - DB::repair(&opts, path).map_err(other_io_err)?; + DB::repair(generate_options(config), path).map_err(other_io_err)?; fs::remove_file(db_corrupted)?; } @@ -316,13 +333,14 @@ impl Database { } let write_opts = WriteOptions::new(); - let mut read_opts = ReadOptions::new(); - read_opts.set_verify_checksums(false); + let read_opts = ReadOptions::default(); + //TODO: removed read_opts.set_verify_checksums(false); - let mut cfs: Vec = Vec::new(); + let opts = generate_options(config); + let mut cfs: Vec = Vec::new(); let db = match config.columns { Some(_) => { - match DB::open_cf(&opts, path, &cfnames, &cf_options) { + match DB::open_cf(&opts, path, &cfnames) { Ok(db) => { cfs = cfnames.iter().map(|n| db.cf_handle(n) .expect("rocksdb opens a cf_handle for each cfname; qed")).collect(); @@ -330,7 +348,7 @@ impl Database { } Err(_) => { // retry and create CFs - match DB::open_cf(&opts, path, &[], &[]) { + match DB::open_cf(&opts, path, &[]) { Ok(mut db) => { cfs = cfnames.iter() .enumerate() @@ -351,33 +369,32 @@ impl Database { Ok(db) => db, Err(ref s) if is_corrupted(s) => { warn!("DB corrupted: {}, attempting repair", s); - DB::repair(&opts, path).map_err(other_io_err)?; - - match cfnames.is_empty() { - true => DB::open(&opts, path).map_err(other_io_err)?, - false => { - let db = DB::open_cf(&opts, path, &cfnames, &cf_options).map_err(other_io_err)?; - cfs = cfnames.iter().map(|n| db.cf_handle(n) - .expect("rocksdb opens a cf_handle for each cfname; qed")).collect(); - db - }, + DB::repair(generate_options(config), path).map_err(other_io_err)?; + + if cfnames.is_empty() { + DB::open(&opts, path).map_err(other_io_err)? + } else { + let db = DB::open_cf(&opts, path, &cfnames).map_err(other_io_err)?; + cfs = cfnames.iter().map(|n| db.cf_handle(n) + .expect("rocksdb opens a cf_handle for each cfname; qed")).collect(); + db } - }, + } Err(s) => { return Err(other_io_err(s)) } }; let num_cols = cfs.len(); Ok(Database { - db: RwLock::new(Some(DBAndColumns{ db: db, cfs: cfs })), + db: RwLock::new(Some(DBAndColumns{ db, cfs: cfs.into_iter().map(Into::into).collect() })), config: config.clone(), - write_opts: write_opts, overlay: RwLock::new((0..(num_cols + 1)).map(|_| HashMap::new()).collect()), flushing: RwLock::new((0..(num_cols + 1)).map(|_| HashMap::new()).collect()), flushing_lock: Mutex::new(false), path: path.to_owned(), - read_opts: read_opts, - block_opts: block_opts, + read_opts: read_opts.into(), + write_opts: write_opts.into(), + block_opts: block_opts.into(), }) } @@ -412,7 +429,7 @@ impl Database { fn write_flushing_with_lock(&self, _lock: &mut MutexGuard<'_, bool>) -> io::Result<()> { match *self.db.read() { Some(DBAndColumns { ref db, ref cfs }) => { - let batch = WriteBatch::new(); + let mut batch = WriteBatch::default(); mem::swap(&mut *self.overlay.write(), &mut *self.flushing.write()); { for (c, column) in self.flushing.read().iter().enumerate() { @@ -420,14 +437,14 @@ impl Database { match *state { KeyState::Delete => { if c > 0 { - batch.delete_cf(cfs[c - 1], key).map_err(other_io_err)?; + batch.delete_cf(*cfs[c - 1], key).map_err(other_io_err)?; } else { batch.delete(key).map_err(other_io_err)?; } }, KeyState::Insert(ref value) => { if c > 0 { - batch.put_cf(cfs[c - 1], key, value).map_err(other_io_err)?; + batch.put_cf(*cfs[c - 1], key, value).map_err(other_io_err)?; } else { batch.put(key, value).map_err(other_io_err)?; } @@ -437,9 +454,7 @@ impl Database { } } - check_for_corruption( - &self.path, - db.write_opt(batch, &self.write_opts))?; + check_for_corruption(&self.path, db.write_opt(batch, &self.write_opts))?; for column in self.flushing.write().iter_mut() { column.clear(); @@ -470,7 +485,7 @@ impl Database { pub fn write(&self, tr: DBTransaction) -> io::Result<()> { match *self.db.read() { Some(DBAndColumns { ref db, ref cfs }) => { - let batch = WriteBatch::new(); + let mut batch = WriteBatch::default(); let ops = tr.ops; for op in ops { // remove any buffered operation for this key @@ -479,11 +494,11 @@ impl Database { match op { DBOp::Insert { col, key, value } => match col { None => batch.put(&key, &value).map_err(other_io_err)?, - Some(c) => batch.put_cf(cfs[c as usize], &key, &value).map_err(other_io_err)?, + Some(c) => batch.put_cf(*cfs[c as usize], &key, &value).map_err(other_io_err)?, }, DBOp::Delete { col, key } => match col { None => batch.delete(&key).map_err(other_io_err)?, - Some(c) => batch.delete_cf(cfs[c as usize], &key).map_err(other_io_err)?, + Some(c) => batch.delete_cf(*cfs[c as usize], &key).map_err(other_io_err)?, } } } @@ -509,8 +524,13 @@ impl Database { Some(&KeyState::Delete) => Ok(None), None => { col.map_or_else( - || db.get_opt(key, &self.read_opts).map(|r| r.map(|v| DBValue::from_slice(&v))), - |c| db.get_cf_opt(cfs[c as usize], key, &self.read_opts).map(|r| r.map(|v| DBValue::from_slice(&v)))) + || db + .get_opt(key, &self.read_opts) + .map(|r| r.map(|v| DBValue::from_slice(&v))), + |c| db + .get_cf_opt(*cfs[c as usize], key, &self.read_opts) + .map(|r| r.map(|v| DBValue::from_slice(&v))) + ) .map_err(other_io_err) }, } @@ -541,14 +561,19 @@ impl Database { let mut overlay_data = overlay.iter() .filter_map(|(k, v)| match *v { KeyState::Insert(ref value) => - Some((k.clone().into_vec().into_boxed_slice(), value.clone().into_vec().into_boxed_slice())), + Some( + ( + k.clone().into_vec().into_boxed_slice(), + value.clone().into_vec().into_boxed_slice() + ) + ), KeyState::Delete => None, }).collect::>(); overlay_data.sort(); let iter = col.map_or_else( - || db.iterator_opt(IteratorMode::Start, &self.read_opts), - |c| db.iterator_cf_opt(cfs[c as usize], IteratorMode::Start, &self.read_opts) + || db.iterator(IteratorMode::Start), + |c| db.iterator_cf(*cfs[c as usize], IteratorMode::Start) .expect("iterator params are valid; qed") ); @@ -564,8 +589,8 @@ impl Database { fn iter_from_prefix(&self, col: Option, prefix: &[u8]) -> Option> { match *self.db.read() { Some(DBAndColumns { ref db, ref cfs }) => { - let iter = col.map_or_else(|| db.iterator_opt(IteratorMode::From(prefix, Direction::Forward), &self.read_opts), - |c| db.iterator_cf_opt(cfs[c as usize], IteratorMode::From(prefix, Direction::Forward), &self.read_opts) + let iter = col.map_or_else(|| db.iterator(IteratorMode::From(prefix, Direction::Forward)), + |c| db.iterator_cf(*cfs[c as usize], IteratorMode::From(prefix, Direction::Forward)) .expect("iterator params are valid; qed")); Some(DatabaseIterator { @@ -646,7 +671,7 @@ impl Database { Some(DBAndColumns { ref mut db, ref mut cfs }) => { let col = cfs.len() as u32; let name = format!("col{}", col); - cfs.push(db.create_cf(&name, &col_config(&self.config, &self.block_opts)?).map_err(other_io_err)?); + cfs.push(db.create_cf(&name, &col_config(&self.config, &self.block_opts)?).map_err(other_io_err)?.into()); Ok(()) }, None => Ok(()), From 2c0468f5be88644beb488450282e8e39972460ff Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Mon, 28 Oct 2019 18:27:50 +0100 Subject: [PATCH 02/37] wip --- kvdb-rocksdb/Cargo.toml | 3 ++- kvdb-rocksdb/src/lib.rs | 5 +---- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/kvdb-rocksdb/Cargo.toml b/kvdb-rocksdb/Cargo.toml index 92b6d20e0..ce72d5cbe 100644 --- a/kvdb-rocksdb/Cargo.toml +++ b/kvdb-rocksdb/Cargo.toml @@ -16,7 +16,8 @@ log = "0.4.8" num_cpus = "1.10.1" parking_lot = "0.9.0" regex = "1.3.1" -rocksdb = "0.12.4" +# rocksdb = "0.12.4" +rocksdb = { git = "https://github.com/rust-rocksdb/rust-rocksdb" } [dev-dependencies] tempdir = "0.3.7" diff --git a/kvdb-rocksdb/src/lib.rs b/kvdb-rocksdb/src/lib.rs index e7f7d4446..52114781b 100644 --- a/kvdb-rocksdb/src/lib.rs +++ b/kvdb-rocksdb/src/lib.rs @@ -193,8 +193,7 @@ impl Default for DatabaseConfig { // inner DB (to prevent closing via restoration) may be re-evaluated in the future. // pub struct DatabaseIterator<'a> { - iter: InterleaveOrdered<::std::vec::IntoIter<(Box<[u8]>, Box<[u8]>)>, DBIterator>, - _marker: PhantomData<&'a ()>, + iter: InterleaveOrdered<::std::vec::IntoIter<(Box<[u8]>, Box<[u8]>)>, DBIterator<'a>>, } impl<'a> Iterator for DatabaseIterator<'a> { @@ -579,7 +578,6 @@ impl Database { Some(DatabaseIterator { iter: interleave_ordered(overlay_data, iter), - _marker: PhantomData, }) }, None => None, @@ -595,7 +593,6 @@ impl Database { Some(DatabaseIterator { iter: interleave_ordered(Vec::new(), iter), - _marker: PhantomData, }) }, None => None, From d445cdef90306b37fd6c1d31e90807e62f80cec3 Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Fri, 1 Nov 2019 15:46:48 +0100 Subject: [PATCH 03/37] wip --- kvdb-rocksdb/Cargo.toml | 1 + kvdb-rocksdb/src/lib.rs | 147 ++++++++++++++++++---------------------- 2 files changed, 68 insertions(+), 80 deletions(-) diff --git a/kvdb-rocksdb/Cargo.toml b/kvdb-rocksdb/Cargo.toml index ce72d5cbe..1364e4ec0 100644 --- a/kvdb-rocksdb/Cargo.toml +++ b/kvdb-rocksdb/Cargo.toml @@ -18,6 +18,7 @@ parking_lot = "0.9.0" regex = "1.3.1" # rocksdb = "0.12.4" rocksdb = { git = "https://github.com/rust-rocksdb/rust-rocksdb" } +owning_ref = "0.4.0" [dev-dependencies] tempdir = "0.3.7" diff --git a/kvdb-rocksdb/src/lib.rs b/kvdb-rocksdb/src/lib.rs index 52114781b..1d6194d00 100644 --- a/kvdb-rocksdb/src/lib.rs +++ b/kvdb-rocksdb/src/lib.rs @@ -16,14 +16,14 @@ use std::{ cmp, fs, io, mem, result, error, - collections::HashMap, marker::PhantomData, path::Path + collections::HashMap, path::Path, }; use parking_lot::{Mutex, MutexGuard, RwLock}; use rocksdb::{ DB, WriteBatch, WriteOptions, IteratorMode, DBIterator, Options, BlockBasedOptions, Direction, ReadOptions, ColumnFamily, - Error + Error, }; use interleaved_ordered::{interleave_ordered, InterleaveOrdered}; @@ -47,7 +47,7 @@ fn other_io_err(e: E) -> io::Error where E: Into Iterator for DatabaseIterator<'a> { struct DBAndColumns { db: DB, - cfs: Vec>, + column_names: Vec, +} + +impl DBAndColumns { + fn get_cf(&self, i: usize) -> &ColumnFamily { + self.db + .cf_handle(&self.column_names[i]) + .expect("the specified column name is correct; qed") + } } // get column family configuration from database config. @@ -221,35 +229,14 @@ fn col_config(config: &DatabaseConfig, block_opts: &BlockBasedOptions) -> io::Re Ok(opts) } -/// Utility structure that makes the given type implement `Send + Sync`. -/// YOU NEED TO BE SURE WHAT YOU ARE DOING! -struct MakeSendSync(T); - -unsafe impl Send for MakeSendSync {} -unsafe impl Sync for MakeSendSync {} - -impl ::std::ops::Deref for MakeSendSync { - type Target = T; - - fn deref(&self) -> &T { - &self.0 - } -} - -impl From for MakeSendSync { - fn from(data: T) -> MakeSendSync { - MakeSendSync(data) - } -} - /// Key-Value database. pub struct Database { db: RwLock>, config: DatabaseConfig, path: String, - write_opts: MakeSendSync, - read_opts: MakeSendSync, - block_opts: MakeSendSync, + write_opts: WriteOptions, + read_opts: ReadOptions, + block_opts: BlockBasedOptions, // Dirty values added with `write_buffered`. Cleaned on `flush`. overlay: RwLock, KeyState>>>, // Values currently being flushed. Cleared when `flush` completes. @@ -288,10 +275,10 @@ fn generate_options(config: &DatabaseConfig) -> Options { opts.set_use_fsync(false); opts.create_if_missing(true); opts.set_max_open_files(config.max_open_files); - opts.set_bytes_per_sync(1048576); - //TODO: keep_log_file_num=1 was removed + opts.set_bytes_per_sync(1 * MB as u64); + opts.set_keep_log_file_num(1); opts.set_write_buffer_size(config.memory_budget_per_col() / 2); - opts.increase_parallelism(cmp::max(1, ::num_cpus::get() as i32 / 2)); + opts.increase_parallelism(cmp::max(1, num_cpus::get() as i32 / 2)); opts } @@ -324,36 +311,37 @@ impl Database { let columns = config.columns.unwrap_or(0) as usize; let mut cf_options = Vec::with_capacity(columns); - let cfnames: Vec<_> = (0..columns).map(|c| format!("col{}", c)).collect(); - let cfnames: Vec<&str> = cfnames.iter().map(|n| n as &str).collect(); + let column_names: Vec<_> = (0..columns).map(|c| format!("col{}", c)).collect(); + let cfnames: Vec<&str> = column_names.iter().map(|n| n as &str).collect(); for _ in 0 .. config.columns.unwrap_or(0) { cf_options.push(col_config(&config, &block_opts)?); } let write_opts = WriteOptions::new(); - let read_opts = ReadOptions::default(); + let mut read_opts = ReadOptions::default(); + read_opts.set_prefix_same_as_start(true); //TODO: removed read_opts.set_verify_checksums(false); let opts = generate_options(config); - let mut cfs: Vec = Vec::new(); let db = match config.columns { Some(_) => { match DB::open_cf(&opts, path, &cfnames) { Ok(db) => { - cfs = cfnames.iter().map(|n| db.cf_handle(n) - .expect("rocksdb opens a cf_handle for each cfname; qed")).collect(); + for name in &cfnames { + let _ = db.cf_handle(name) + .expect("rocksdb opens a cf_handle for each cfname; qed"); + } Ok(db) } Err(_) => { // retry and create CFs - match DB::open_cf(&opts, path, &[]) { + match DB::open_cf(&opts, path, &[] as &[&str]) { Ok(mut db) => { - cfs = cfnames.iter() - .enumerate() - .map(|(i, n)| db.create_cf(n, &cf_options[i])) - .collect::<::std::result::Result<_, _>>() - .map_err(other_io_err)?; + for (i, name) in cfnames.iter().enumerate() { + let _ = db.create_cf(name, &cf_options[i]) + .map_err(other_io_err)?; + } Ok(db) }, err => err, @@ -374,8 +362,10 @@ impl Database { DB::open(&opts, path).map_err(other_io_err)? } else { let db = DB::open_cf(&opts, path, &cfnames).map_err(other_io_err)?; - cfs = cfnames.iter().map(|n| db.cf_handle(n) - .expect("rocksdb opens a cf_handle for each cfname; qed")).collect(); + for name in cfnames { + let _ = db.cf_handle(name) + .expect("rocksdb opens a cf_handle for each cfname; qed"); + } db } } @@ -383,12 +373,12 @@ impl Database { return Err(other_io_err(s)) } }; - let num_cols = cfs.len(); + let num_cols = column_names.len(); Ok(Database { - db: RwLock::new(Some(DBAndColumns{ db, cfs: cfs.into_iter().map(Into::into).collect() })), + db: RwLock::new(Some(DBAndColumns { db, column_names })), config: config.clone(), - overlay: RwLock::new((0..(num_cols + 1)).map(|_| HashMap::new()).collect()), - flushing: RwLock::new((0..(num_cols + 1)).map(|_| HashMap::new()).collect()), + overlay: RwLock::new((0..=num_cols).map(|_| HashMap::new()).collect()), + flushing: RwLock::new((0..=num_cols).map(|_| HashMap::new()).collect()), flushing_lock: Mutex::new(false), path: path.to_owned(), read_opts: read_opts.into(), @@ -427,7 +417,7 @@ impl Database { /// Commit buffered changes to database. Must be called under `flush_lock` fn write_flushing_with_lock(&self, _lock: &mut MutexGuard<'_, bool>) -> io::Result<()> { match *self.db.read() { - Some(DBAndColumns { ref db, ref cfs }) => { + Some(ref cfs) => { let mut batch = WriteBatch::default(); mem::swap(&mut *self.overlay.write(), &mut *self.flushing.write()); { @@ -436,14 +426,16 @@ impl Database { match *state { KeyState::Delete => { if c > 0 { - batch.delete_cf(*cfs[c - 1], key).map_err(other_io_err)?; + let cf = cfs.get_cf(c - 1); + batch.delete_cf(cf, key).map_err(other_io_err)?; } else { batch.delete(key).map_err(other_io_err)?; } }, KeyState::Insert(ref value) => { if c > 0 { - batch.put_cf(*cfs[c - 1], key, value).map_err(other_io_err)?; + let cf = cfs.get_cf(c - 1); + batch.put_cf(cf, key, value).map_err(other_io_err)?; } else { batch.put(key, value).map_err(other_io_err)?; } @@ -453,7 +445,7 @@ impl Database { } } - check_for_corruption(&self.path, db.write_opt(batch, &self.write_opts))?; + check_for_corruption(&self.path, cfs.db.write_opt(batch, &self.write_opts))?; for column in self.flushing.write().iter_mut() { column.clear(); @@ -483,7 +475,7 @@ impl Database { /// Commit transaction to database. pub fn write(&self, tr: DBTransaction) -> io::Result<()> { match *self.db.read() { - Some(DBAndColumns { ref db, ref cfs }) => { + Some(ref cfs) => { let mut batch = WriteBatch::default(); let ops = tr.ops; for op in ops { @@ -493,16 +485,16 @@ impl Database { match op { DBOp::Insert { col, key, value } => match col { None => batch.put(&key, &value).map_err(other_io_err)?, - Some(c) => batch.put_cf(*cfs[c as usize], &key, &value).map_err(other_io_err)?, + Some(c) => batch.put_cf(cfs.get_cf(c as usize), &key, &value).map_err(other_io_err)?, }, DBOp::Delete { col, key } => match col { None => batch.delete(&key).map_err(other_io_err)?, - Some(c) => batch.delete_cf(*cfs[c as usize], &key).map_err(other_io_err)?, + Some(c) => batch.delete_cf(cfs.get_cf(c as usize), &key).map_err(other_io_err)?, } } } - check_for_corruption(&self.path, db.write_opt(batch, &self.write_opts)) + check_for_corruption(&self.path, cfs.db.write_opt(batch, &self.write_opts)) }, None => Err(other_io_err("Database is closed")), } @@ -511,7 +503,7 @@ impl Database { /// Get value by key. pub fn get(&self, col: Option, key: &[u8]) -> io::Result> { match *self.db.read() { - Some(DBAndColumns { ref db, ref cfs }) => { + Some(ref cfs) => { let overlay = &self.overlay.read()[Self::to_overlay_column(col)]; match overlay.get(key) { Some(&KeyState::Insert(ref value)) => Ok(Some(value.clone())), @@ -523,11 +515,11 @@ impl Database { Some(&KeyState::Delete) => Ok(None), None => { col.map_or_else( - || db + || cfs.db .get_opt(key, &self.read_opts) .map(|r| r.map(|v| DBValue::from_slice(&v))), - |c| db - .get_cf_opt(*cfs[c as usize], key, &self.read_opts) + |c| cfs.db + .get_cf_opt(cfs.get_cf(c as usize), key, &self.read_opts) .map(|r| r.map(|v| DBValue::from_slice(&v))) ) .map_err(other_io_err) @@ -544,18 +536,14 @@ impl Database { // TODO: support prefix seek for unflushed data pub fn get_by_prefix(&self, col: Option, prefix: &[u8]) -> Option> { self.iter_from_prefix(col, prefix).and_then(|mut iter| { - match iter.next() { - // TODO: use prefix_same_as_start read option (not available in C API currently) - Some((k, v)) => if k[0 .. prefix.len()] == prefix[..] { Some(v) } else { None }, - _ => None - } + iter.next().map(|(_, v)| v) }) } /// Get database iterator for flushed data. pub fn iter(&self, col: Option) -> Option> { match *self.db.read() { - Some(DBAndColumns { ref db, ref cfs }) => { + Some(ref cfs) => { let overlay = &self.overlay.read()[Self::to_overlay_column(col)]; let mut overlay_data = overlay.iter() .filter_map(|(k, v)| match *v { @@ -571,8 +559,8 @@ impl Database { overlay_data.sort(); let iter = col.map_or_else( - || db.iterator(IteratorMode::Start), - |c| db.iterator_cf(*cfs[c as usize], IteratorMode::Start) + || cfs.db.iterator(IteratorMode::Start), + |c| cfs.db.iterator_cf(cfs.get_cf(c as usize), IteratorMode::Start) .expect("iterator params are valid; qed") ); @@ -586,9 +574,9 @@ impl Database { fn iter_from_prefix(&self, col: Option, prefix: &[u8]) -> Option> { match *self.db.read() { - Some(DBAndColumns { ref db, ref cfs }) => { - let iter = col.map_or_else(|| db.iterator(IteratorMode::From(prefix, Direction::Forward)), - |c| db.iterator_cf(*cfs[c as usize], IteratorMode::From(prefix, Direction::Forward)) + Some(ref cfs) => { + let iter = col.map_or_else(|| cfs.db.iterator(IteratorMode::From(prefix, Direction::Forward)), + |c| cfs.db.iterator_cf(cfs.get_cf(c as usize), IteratorMode::From(prefix, Direction::Forward)) .expect("iterator params are valid; qed")); Some(DatabaseIterator { @@ -642,7 +630,7 @@ impl Database { /// The number of non-default column families. pub fn num_columns(&self) -> u32 { self.db.read().as_ref() - .and_then(|db| if db.cfs.is_empty() { None } else { Some(db.cfs.len()) } ) + .and_then(|db| if db.column_names.is_empty() { None } else { Some(db.column_names.len()) } ) .map(|n| n as u32) .unwrap_or(0) } @@ -650,10 +638,8 @@ impl Database { /// Drop a column family. pub fn drop_column(&self) -> io::Result<()> { match *self.db.write() { - Some(DBAndColumns { ref mut db, ref mut cfs }) => { - if let Some(col) = cfs.pop() { - let name = format!("col{}", cfs.len()); - drop(col); + Some(DBAndColumns { ref mut db, ref mut column_names }) => { + if let Some(name) = column_names.pop() { db.drop_cf(&name).map_err(other_io_err)?; } Ok(()) @@ -665,10 +651,11 @@ impl Database { /// Add a column family. pub fn add_column(&self) -> io::Result<()> { match *self.db.write() { - Some(DBAndColumns { ref mut db, ref mut cfs }) => { - let col = cfs.len() as u32; + Some(DBAndColumns { ref mut db, ref mut column_names }) => { + let col = column_names.len() as u32; let name = format!("col{}", col); - cfs.push(db.create_cf(&name, &col_config(&self.config, &self.block_opts)?).map_err(other_io_err)?.into()); + let _ = db.create_cf(&name, &col_config(&self.config, &self.block_opts)?).map_err(other_io_err)?; + column_names.push(name); Ok(()) }, None => Ok(()), From 47a6e22b83a136ff57981bcf75419535ee3cdde5 Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Sun, 10 Nov 2019 15:24:01 +0100 Subject: [PATCH 04/37] kvdb-rocksdb: working iterator --- kvdb-rocksdb/src/iter.rs | 121 +++++++++++++++++++++++++ kvdb-rocksdb/src/lib.rs | 191 ++++++++++++++++----------------------- 2 files changed, 201 insertions(+), 111 deletions(-) create mode 100644 kvdb-rocksdb/src/iter.rs diff --git a/kvdb-rocksdb/src/iter.rs b/kvdb-rocksdb/src/iter.rs new file mode 100644 index 000000000..164bb03eb --- /dev/null +++ b/kvdb-rocksdb/src/iter.rs @@ -0,0 +1,121 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +use crate::DBAndColumns; +use owning_ref::{OwningHandle, StableAddress}; +use parking_lot::RwLockReadGuard; +use rocksdb::{DBIterator, Direction, IteratorMode}; +use std::ops::{Deref, DerefMut}; + +pub type KeyValuePair = (Box<[u8]>, Box<[u8]>); + +pub struct ReadGuardedIterator<'a, I, T> { + inner: OwningHandle>>, DerefWrapper>>, +} + +// We can't implement `StableAddress` for a `RwLockReadGuard` +// directly due to orphan rules. +#[repr(transparent)] +struct UnsafeStableAddress(T); + +impl Deref for UnsafeStableAddress { + type Target = T::Target; + fn deref(&self) -> &Self::Target { + self.0.deref() + } +} + +// RwLockReadGuard dereferences to a stable address; qed +unsafe impl StableAddress for UnsafeStableAddress {} + +struct DerefWrapper(T); + +impl Deref for DerefWrapper { + type Target = T; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for DerefWrapper { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl<'a, I: Iterator, T> Iterator for ReadGuardedIterator<'a, I, T> { + type Item = I::Item; + + fn next(&mut self) -> Option { + self.inner.deref_mut().as_mut().and_then(|iter| iter.next()) + } +} + +pub trait IterationHandler { + type Iterator: Iterator; + + fn iter(&self, col: Option) -> Self::Iterator; + fn iter_from_prefix(&self, col: Option, prefix: &[u8]) -> Self::Iterator; +} + +impl<'a, T> ReadGuardedIterator<'a, <&'a T as IterationHandler>::Iterator, T> +where + &'a T: IterationHandler, +{ + pub fn new(read_lock: RwLockReadGuard<'a, Option>, col: Option) -> Self { + Self { + inner: OwningHandle::new_with_fn(UnsafeStableAddress(read_lock), move |rlock| { + let rlock = unsafe { rlock.as_ref().expect("initialized as non-null; qed") }; + DerefWrapper(rlock.as_ref().map(|db| db.iter(col))) + }), + } + } + + pub fn new_from_prefix(read_lock: RwLockReadGuard<'a, Option>, col: Option, prefix: &[u8]) -> Self { + Self { + inner: OwningHandle::new_with_fn(UnsafeStableAddress(read_lock), move |rlock| { + let rlock = unsafe { rlock.as_ref().expect("initialized as non-null; qed") }; + DerefWrapper(rlock.as_ref().map(|db| db.iter_from_prefix(col, prefix))) + }), + } + } +} + +impl<'a> IterationHandler for &'a DBAndColumns { + type Iterator = DBIterator<'a>; + + fn iter(&self, col: Option) -> Self::Iterator { + col.map_or_else( + || self.db.iterator(IteratorMode::Start), + |c| { + self.db + .iterator_cf(self.get_cf(c as usize), IteratorMode::Start) + .expect("iterator params are valid; qed") + }, + ) + } + + fn iter_from_prefix(&self, col: Option, prefix: &[u8]) -> Self::Iterator { + col.map_or_else( + || self.db.iterator(IteratorMode::From(prefix, Direction::Forward)), + |c| { + self.db + .iterator_cf(self.get_cf(c as usize), IteratorMode::From(prefix, Direction::Forward)) + .expect("iterator params are valid; qed") + }, + ) + } +} diff --git a/kvdb-rocksdb/src/lib.rs b/kvdb-rocksdb/src/lib.rs index d525f3c4e..ee36ccd12 100644 --- a/kvdb-rocksdb/src/lib.rs +++ b/kvdb-rocksdb/src/lib.rs @@ -14,21 +14,17 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . -use std::{ - cmp, fs, io, mem, result, error, - collections::HashMap, path::Path, -}; - -use rocksdb::{ - DB, WriteBatch, WriteOptions, IteratorMode, DBIterator, - Options, BlockBasedOptions, Direction, ReadOptions, ColumnFamily, - Error, -}; -use parking_lot::{Mutex, MutexGuard, RwLock, MappedRwLockReadGuard}; - -use interleaved_ordered::{interleave_ordered, InterleaveOrdered}; +mod iter; + +use std::{cmp, collections::HashMap, convert::identity, error, fs, io, mem, path::Path, result}; + +use parking_lot::{Mutex, MutexGuard, RwLock}; +use rocksdb::{BlockBasedOptions, ColumnFamily, Error, Options, ReadOptions, WriteBatch, WriteOptions, DB}; + +use crate::iter::KeyValuePair; use elastic_array::ElasticArray32; use fs_swap::{swap, swap_nonatomic}; +use interleaved_ordered::interleave_ordered; use kvdb::{DBOp, DBTransaction, DBValue, KeyValueDB}; use log::{debug, warn}; @@ -193,35 +189,23 @@ impl Default for DatabaseConfig { } } -/// Database iterator (for flushed data only). -/// Will hold a lock until the iterator is dropped -/// preventing the database from being closed. -pub struct DatabaseIterator<'a> { - iter: InterleaveOrdered< - std::vec::IntoIter<(Box<[u8]>, Box<[u8]>)>, - ReadGuardedIterator<'a, DBIterator<'a>>, - >, -} - -impl<'a> Iterator for DatabaseIterator<'a> { - type Item = (Box<[u8]>, Box<[u8]>); +// /// Database iterator (for flushed data only). +// /// Will hold a lock until the iterator is dropped +// /// preventing the database from being closed. +// pub struct DatabaseIterator<'a> { +// iter: InterleaveOrdered< +// std::vec::IntoIter, +// ReadGuardedIterator<'a, DBIterator<'a>>, +// >, +// } - fn next(&mut self) -> Option { - self.iter.next() - } -} - -pub struct ReadGuardedIterator<'a, I> { - inner: MappedRwLockReadGuard<'a, I>, -} +// impl<'a> Iterator for DatabaseIterator<'a> { +// type Item = (Box<[u8]>, Box<[u8]>); -impl<'a, I: Iterator> Iterator for ReadGuardedIterator<'a, I> { - type Item = I::Item; - - fn next(&mut self) -> Option { - self.inner.deref_mut().next() - } -} +// fn next(&mut self) -> Option { +// self.iter.next() +// } +// } struct DBAndColumns { db: DB, @@ -230,9 +214,7 @@ struct DBAndColumns { impl DBAndColumns { fn get_cf(&self, i: usize) -> &ColumnFamily { - self.db - .cf_handle(&self.column_names[i]) - .expect("the specified column name is correct; qed") + self.db.cf_handle(&self.column_names[i]).expect("the specified column name is correct; qed") } } @@ -272,10 +254,7 @@ pub struct Database { } #[inline] -fn check_for_corruption>( - path: P, - res: result::Result -) -> io::Result { +fn check_for_corruption>(path: P, res: result::Result) -> io::Result { if let Err(ref s) = res { if is_corrupted(s) { warn!("DB corrupted: {}. Repair will be triggered on next restart", s); @@ -354,8 +333,7 @@ impl Database { match DB::open_cf(&opts, path, &cfnames) { Ok(db) => { for name in &cfnames { - let _ = db.cf_handle(name) - .expect("rocksdb opens a cf_handle for each cfname; qed"); + let _ = db.cf_handle(name).expect("rocksdb opens a cf_handle for each cfname; qed"); } Ok(db) } @@ -364,8 +342,7 @@ impl Database { match DB::open_cf(&opts, path, &[] as &[&str]) { Ok(mut db) => { for (i, name) in cfnames.iter().enumerate() { - let _ = db.create_cf(name, &cf_options[i]) - .map_err(other_io_err)?; + let _ = db.create_cf(name, &cf_options[i]).map_err(other_io_err)?; } Ok(db) } @@ -388,8 +365,7 @@ impl Database { } else { let db = DB::open_cf(&opts, path, &cfnames).map_err(other_io_err)?; for name in cfnames { - let _ = db.cf_handle(name) - .expect("rocksdb opens a cf_handle for each cfname; qed"); + let _ = db.cf_handle(name).expect("rocksdb opens a cf_handle for each cfname; qed"); } db } @@ -513,12 +489,12 @@ impl Database { DBOp::Delete { col, key } => match col { None => batch.delete(&key).map_err(other_io_err)?, Some(c) => batch.delete_cf(cfs.get_cf(c as usize), &key).map_err(other_io_err)?, - } + }, } } check_for_corruption(&self.path, cfs.db.write_opt(batch, &self.write_opts)) - }, + } None => Err(other_io_err("Database is closed")), } } @@ -536,17 +512,16 @@ impl Database { match flushing.get(key) { Some(&KeyState::Insert(ref value)) => Ok(Some(value.clone())), Some(&KeyState::Delete) => Ok(None), - None => { - col.map_or_else( - || cfs.db - .get_opt(key, &self.read_opts) - .map(|r| r.map(|v| DBValue::from_slice(&v))), - |c| cfs.db + None => col + .map_or_else( + || cfs.db.get_opt(key, &self.read_opts).map(|r| r.map(|v| DBValue::from_slice(&v))), + |c| { + cfs.db .get_cf_opt(cfs.get_cf(c as usize), key, &self.read_opts) .map(|r| r.map(|v| DBValue::from_slice(&v))) - ) - .map_err(other_io_err) - }, + }, + ) + .map_err(other_io_err), } } } @@ -558,58 +533,50 @@ impl Database { /// Get value by partial key. Prefix size should match configured prefix size. Only searches flushed values. // TODO: support prefix seek for unflushed data pub fn get_by_prefix(&self, col: Option, prefix: &[u8]) -> Option> { - self.iter_from_prefix(col, prefix).and_then(|mut iter| { - iter.next().map(|(_, v)| v) - }) + self.iter_from_prefix(col, prefix).next().map(|(_, v)| v) } /// Get database iterator for flushed data. - pub fn iter(&self, col: Option) -> Option> { - match *self.db.read() { - Some(ref cfs) => { - let overlay = &self.overlay.read()[Self::to_overlay_column(col)]; + pub fn iter<'a>(&'a self, col: Option) -> impl Iterator + 'a { + let read_lock = self.db.read(); + let optional = if read_lock.is_some() { + let c = Self::to_overlay_column(col); + let overlay_data = { + let overlay = &self.overlay.read()[c]; let mut overlay_data = overlay .iter() .filter_map(|(k, v)| match *v { - KeyState::Insert(ref value) => - Some( - ( - k.clone().into_vec().into_boxed_slice(), - value.clone().into_vec().into_boxed_slice() - ) - ), + KeyState::Insert(ref value) => { + Some((k.clone().into_vec().into_boxed_slice(), value.clone().into_vec().into_boxed_slice())) + } KeyState::Delete => None, }) .collect::>(); overlay_data.sort(); + overlay_data + }; - let iter = col.map_or_else( - || cfs.db.iterator(IteratorMode::Start), - |c| cfs.db.iterator_cf(cfs.get_cf(c as usize), IteratorMode::Start) - .expect("iterator params are valid; qed") - ); - - Some(DatabaseIterator { - iter: interleave_ordered(overlay_data, iter), - }) - }, - None => None, - } + let guarded = iter::ReadGuardedIterator::new(read_lock, col); + Some(interleave_ordered(overlay_data, guarded)) + } else { + None + }; + optional.into_iter().flat_map(identity) } - - fn iter_from_prefix(&self, col: Option, prefix: &[u8]) -> Option> { - match *self.db.read() { - Some(ref cfs) => { - let iter = col.map_or_else(|| cfs.db.iterator(IteratorMode::From(prefix, Direction::Forward)), - |c| cfs.db.iterator_cf(cfs.get_cf(c as usize), IteratorMode::From(prefix, Direction::Forward)) - .expect("iterator params are valid; qed")); - - Some(DatabaseIterator { - iter: interleave_ordered(Vec::new(), iter), - }) - }, - None => None, - } + /// Get database iterator from prefix for flushed data. + fn iter_from_prefix<'a>( + &'a self, + col: Option, + prefix: &[u8], + ) -> impl Iterator + 'a { + let read_lock = self.db.read(); + let optional = if read_lock.is_some() { + let guarded = iter::ReadGuardedIterator::new_from_prefix(read_lock, col, prefix); + Some(interleave_ordered(Vec::new(), guarded)) + } else { + None + }; + optional.into_iter().flat_map(identity) } /// Close the database @@ -657,8 +624,10 @@ impl Database { /// The number of non-default column families. pub fn num_columns(&self) -> u32 { - self.db.read().as_ref() - .and_then(|db| if db.column_names.is_empty() { None } else { Some(db.column_names.len()) } ) + self.db + .read() + .as_ref() + .and_then(|db| if db.column_names.is_empty() { None } else { Some(db.column_names.len()) }) .map(|n| n as u32) .unwrap_or(0) } @@ -714,18 +683,18 @@ impl KeyValueDB for Database { Database::flush(self) } - fn iter<'a>(&'a self, col: Option) -> Box, Box<[u8]>)> + 'a> { + fn iter<'a>(&'a self, col: Option) -> Box + 'a> { let unboxed = Database::iter(self, col); - Box::new(unboxed.into_iter().flat_map(|inner| inner)) + Box::new(unboxed.into_iter()) } fn iter_from_prefix<'a>( &'a self, col: Option, prefix: &'a [u8], - ) -> Box, Box<[u8]>)> + 'a> { + ) -> Box + 'a> { let unboxed = Database::iter_from_prefix(self, col, prefix); - Box::new(unboxed.into_iter().flat_map(|inner| inner)) + Box::new(unboxed.into_iter()) } fn restore(&self, new_db: &str) -> io::Result<()> { @@ -761,7 +730,7 @@ mod tests { assert_eq!(&*db.get(None, key1.as_bytes()).unwrap().unwrap(), b"cat"); - let contents: Vec<_> = db.iter(None).into_iter().flat_map(|inner| inner).collect(); + let contents: Vec<_> = db.iter(None).into_iter().collect(); assert_eq!(contents.len(), 2); assert_eq!(&*contents[0].0, key1.as_bytes()); assert_eq!(&*contents[0].1, b"cat"); From 81bce7c9b1e46117ddf5e5d36951dd4e4d13dd07 Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Sun, 10 Nov 2019 15:25:40 +0100 Subject: [PATCH 05/37] kvdb-rocksdb: cleanup --- kvdb-rocksdb/src/lib.rs | 18 ++++-------------- 1 file changed, 4 insertions(+), 14 deletions(-) diff --git a/kvdb-rocksdb/src/lib.rs b/kvdb-rocksdb/src/lib.rs index ee36ccd12..2b501319a 100644 --- a/kvdb-rocksdb/src/lib.rs +++ b/kvdb-rocksdb/src/lib.rs @@ -192,20 +192,6 @@ impl Default for DatabaseConfig { // /// Database iterator (for flushed data only). // /// Will hold a lock until the iterator is dropped // /// preventing the database from being closed. -// pub struct DatabaseIterator<'a> { -// iter: InterleaveOrdered< -// std::vec::IntoIter, -// ReadGuardedIterator<'a, DBIterator<'a>>, -// >, -// } - -// impl<'a> Iterator for DatabaseIterator<'a> { -// type Item = (Box<[u8]>, Box<[u8]>); - -// fn next(&mut self) -> Option { -// self.iter.next() -// } -// } struct DBAndColumns { db: DB, @@ -537,6 +523,8 @@ impl Database { } /// Get database iterator for flushed data. + /// Will hold a lock until the iterator is dropped + /// preventing the database from being closed. pub fn iter<'a>(&'a self, col: Option) -> impl Iterator + 'a { let read_lock = self.db.read(); let optional = if read_lock.is_some() { @@ -564,6 +552,8 @@ impl Database { optional.into_iter().flat_map(identity) } /// Get database iterator from prefix for flushed data. + /// Will hold a lock until the iterator is dropped + /// preventing the database from being closed. fn iter_from_prefix<'a>( &'a self, col: Option, From 71b2af0fdeca49d4208c05956ec96c5735886642 Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Sun, 10 Nov 2019 15:43:54 +0100 Subject: [PATCH 06/37] kvdb-rocksdb: more cleanup --- kvdb-rocksdb/src/lib.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/kvdb-rocksdb/src/lib.rs b/kvdb-rocksdb/src/lib.rs index 2b501319a..81297bdae 100644 --- a/kvdb-rocksdb/src/lib.rs +++ b/kvdb-rocksdb/src/lib.rs @@ -46,7 +46,7 @@ where const KB: usize = 1024; const MB: usize = 1024 * KB; -const DB_DEFAULT_MEMORY_BUDGET_MB: usize = 1024; +const DB_DEFAULT_MEMORY_BUDGET_MB: usize = 128; enum KeyState { Insert(DBValue), @@ -189,10 +189,6 @@ impl Default for DatabaseConfig { } } -// /// Database iterator (for flushed data only). -// /// Will hold a lock until the iterator is dropped -// /// preventing the database from being closed. - struct DBAndColumns { db: DB, column_names: Vec, From e5cd5cb2064a3d8037c0c5c73c20b8cff8c888e6 Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Tue, 12 Nov 2019 10:29:15 +0100 Subject: [PATCH 07/37] kvdb-rocksdb: use options from updated upstream --- kvdb-rocksdb/src/lib.rs | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/kvdb-rocksdb/src/lib.rs b/kvdb-rocksdb/src/lib.rs index 81297bdae..e4f41e2a3 100644 --- a/kvdb-rocksdb/src/lib.rs +++ b/kvdb-rocksdb/src/lib.rs @@ -205,14 +205,7 @@ fn col_config(config: &DatabaseConfig, block_opts: &BlockBasedOptions) -> io::Re let mut opts = Options::default(); opts.set_block_based_table_factory(block_opts); - opts.optimize_level_style_compaction(config.memory_budget_per_col()); - // opts.set_parsed_options(&format!( - // "block_based_table_factory={{{};{}}}", - // "cache_index_and_filter_blocks=true", "pin_l0_filter_and_index_blocks_in_cache=true" - // )) - // .map_err(other_io_err)?; - opts.set_target_file_size_base(config.compaction.initial_file_size); Ok(opts) @@ -285,7 +278,9 @@ impl Database { // https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning#block-cache-size let cache_size = config.memory_budget() / 3; block_opts.set_lru_cache(cache_size); - + block_opts.set_cache_index_and_filter_blocks(true); + block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true); + // attempt database repair if it has been previously marked as corrupted let db_corrupted = Path::new(path).join(Database::CORRUPTION_FILE_NAME); if db_corrupted.exists() { @@ -307,7 +302,7 @@ impl Database { let write_opts = WriteOptions::new(); let mut read_opts = ReadOptions::default(); read_opts.set_prefix_same_as_start(true); - //TODO: removed read_opts.set_verify_checksums(false); + read_opts.set_verify_checksums(false); let opts = generate_options(config); let db = match config.columns { @@ -340,7 +335,7 @@ impl Database { Ok(db) => db, Err(ref s) if is_corrupted(s) => { warn!("DB corrupted: {}, attempting repair", s); - DB::repair(generate_options(config), path).map_err(other_io_err)?; + DB::repair(&opts, path).map_err(other_io_err)?; if cfnames.is_empty() { DB::open(&opts, path).map_err(other_io_err)? From c7640d15a5bda3141ba06ab732e6988ef62ba051 Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Tue, 12 Nov 2019 10:33:12 +0100 Subject: [PATCH 08/37] kvdb-rocksdb: set bloom filter as recommended by tuning guide --- kvdb-rocksdb/src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/kvdb-rocksdb/src/lib.rs b/kvdb-rocksdb/src/lib.rs index e4f41e2a3..c36c72640 100644 --- a/kvdb-rocksdb/src/lib.rs +++ b/kvdb-rocksdb/src/lib.rs @@ -280,6 +280,7 @@ impl Database { block_opts.set_lru_cache(cache_size); block_opts.set_cache_index_and_filter_blocks(true); block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true); + block_opts.set_bloom_filter(10, true); // attempt database repair if it has been previously marked as corrupted let db_corrupted = Path::new(path).join(Database::CORRUPTION_FILE_NAME); From fb208ad854c52201edaa236ec0717bb4fc5ea51b Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Tue, 12 Nov 2019 10:47:26 +0100 Subject: [PATCH 09/37] kvdb-rocksdb: fix build --- kvdb-rocksdb/src/lib.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/kvdb-rocksdb/src/lib.rs b/kvdb-rocksdb/src/lib.rs index c36c72640..02c1eb5a2 100644 --- a/kvdb-rocksdb/src/lib.rs +++ b/kvdb-rocksdb/src/lib.rs @@ -282,11 +282,13 @@ impl Database { block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true); block_opts.set_bloom_filter(10, true); + let opts = generate_options(config); + // attempt database repair if it has been previously marked as corrupted let db_corrupted = Path::new(path).join(Database::CORRUPTION_FILE_NAME); if db_corrupted.exists() { warn!("DB has been previously marked as corrupted, attempting repair"); - DB::repair(generate_options(config), path).map_err(other_io_err)?; + DB::repair(&opts, path).map_err(other_io_err)?; fs::remove_file(db_corrupted)?; } @@ -305,7 +307,6 @@ impl Database { read_opts.set_prefix_same_as_start(true); read_opts.set_verify_checksums(false); - let opts = generate_options(config); let db = match config.columns { Some(_) => { match DB::open_cf(&opts, path, &cfnames) { From 8a2442a010fe2768986c812f6004e642612102cb Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Tue, 12 Nov 2019 10:53:04 +0100 Subject: [PATCH 10/37] kvdb-rocksdb: set_level_compaction_dynamic_level_bytes --- kvdb-rocksdb/src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/kvdb-rocksdb/src/lib.rs b/kvdb-rocksdb/src/lib.rs index 02c1eb5a2..4a66a57e6 100644 --- a/kvdb-rocksdb/src/lib.rs +++ b/kvdb-rocksdb/src/lib.rs @@ -204,6 +204,7 @@ impl DBAndColumns { fn col_config(config: &DatabaseConfig, block_opts: &BlockBasedOptions) -> io::Result { let mut opts = Options::default(); + opts.set_level_compaction_dynamic_level_bytes(true); opts.set_block_based_table_factory(block_opts); opts.optimize_level_style_compaction(config.memory_budget_per_col()); opts.set_target_file_size_base(config.compaction.initial_file_size); From fa16fb5c0717965e9b550e5632a210f22616b1be Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Thu, 14 Nov 2019 16:55:22 +0100 Subject: [PATCH 11/37] kvdb-rocksdb: switch to just published version --- kvdb-rocksdb/Cargo.toml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/kvdb-rocksdb/Cargo.toml b/kvdb-rocksdb/Cargo.toml index 1364e4ec0..315203f3a 100644 --- a/kvdb-rocksdb/Cargo.toml +++ b/kvdb-rocksdb/Cargo.toml @@ -16,8 +16,7 @@ log = "0.4.8" num_cpus = "1.10.1" parking_lot = "0.9.0" regex = "1.3.1" -# rocksdb = "0.12.4" -rocksdb = { git = "https://github.com/rust-rocksdb/rust-rocksdb" } +rocksdb = "0.13" owning_ref = "0.4.0" [dev-dependencies] From 3b31e4fd562141458a0dca1c09abc5c99c1cf9ca Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Thu, 14 Nov 2019 17:21:09 +0100 Subject: [PATCH 12/37] kvdb-rocksdb: preserve the old compression_per_level setting --- kvdb-rocksdb/src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/kvdb-rocksdb/src/lib.rs b/kvdb-rocksdb/src/lib.rs index 4a66a57e6..a7594d97f 100644 --- a/kvdb-rocksdb/src/lib.rs +++ b/kvdb-rocksdb/src/lib.rs @@ -208,6 +208,7 @@ fn col_config(config: &DatabaseConfig, block_opts: &BlockBasedOptions) -> io::Re opts.set_block_based_table_factory(block_opts); opts.optimize_level_style_compaction(config.memory_budget_per_col()); opts.set_target_file_size_base(config.compaction.initial_file_size); + opts.set_compression_per_level(&[]); Ok(opts) } From f595233f9a434c9c99eac412af31f883f1408614 Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Thu, 14 Nov 2019 17:21:24 +0100 Subject: [PATCH 13/37] kvdb-rocksdb: add some iter module docs --- kvdb-rocksdb/src/iter.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/kvdb-rocksdb/src/iter.rs b/kvdb-rocksdb/src/iter.rs index 164bb03eb..aaef6dae0 100644 --- a/kvdb-rocksdb/src/iter.rs +++ b/kvdb-rocksdb/src/iter.rs @@ -14,6 +14,10 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . +//! This module contains an implementation of a RocksDB iterator +//! wrapped inside a `RwLock`. Since `RwLock` "owns" the inner data, +//! we're using `owning_ref` to work around the borrowing rules of Rust. + use crate::DBAndColumns; use owning_ref::{OwningHandle, StableAddress}; use parking_lot::RwLockReadGuard; From df8d61422ec0e92faaba8e91f0f9d685daa7af6f Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Tue, 19 Nov 2019 10:24:51 +0100 Subject: [PATCH 14/37] kvdb-rocksdb: remove path on kvdb dependency temporarily --- kvdb-rocksdb/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kvdb-rocksdb/Cargo.toml b/kvdb-rocksdb/Cargo.toml index 315203f3a..37e1bf403 100644 --- a/kvdb-rocksdb/Cargo.toml +++ b/kvdb-rocksdb/Cargo.toml @@ -11,7 +11,7 @@ edition = "2018" elastic-array = "0.10.2" fs-swap = "0.2.4" interleaved-ordered = "0.1.1" -kvdb = { version = "0.1", path = "../kvdb" } +kvdb = { version = "0.1" } log = "0.4.8" num_cpus = "1.10.1" parking_lot = "0.9.0" From a479485f4a4d4f3f68c814785a857e1a08b29b1d Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Tue, 19 Nov 2019 14:49:27 +0100 Subject: [PATCH 15/37] kvdb-rocksdb: use only lz4 and snappy features --- kvdb-rocksdb/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kvdb-rocksdb/Cargo.toml b/kvdb-rocksdb/Cargo.toml index 37e1bf403..a0fcdbe5e 100644 --- a/kvdb-rocksdb/Cargo.toml +++ b/kvdb-rocksdb/Cargo.toml @@ -16,7 +16,7 @@ log = "0.4.8" num_cpus = "1.10.1" parking_lot = "0.9.0" regex = "1.3.1" -rocksdb = "0.13" +rocksdb = { version = "0.13", features = ["lz4", "snappy"], default-features = false } owning_ref = "0.4.0" [dev-dependencies] From 3153cca6c4f1fa3310ec2fbf352de4a5a64c90dc Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Fri, 22 Nov 2019 10:57:13 +0100 Subject: [PATCH 16/37] kvdb-rocksdb: support zstd compression as well --- kvdb-rocksdb/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kvdb-rocksdb/Cargo.toml b/kvdb-rocksdb/Cargo.toml index a0fcdbe5e..5fcbbd2c3 100644 --- a/kvdb-rocksdb/Cargo.toml +++ b/kvdb-rocksdb/Cargo.toml @@ -16,7 +16,7 @@ log = "0.4.8" num_cpus = "1.10.1" parking_lot = "0.9.0" regex = "1.3.1" -rocksdb = { version = "0.13", features = ["lz4", "snappy"], default-features = false } +rocksdb = { version = "0.13", features = ["lz4", "snappy", "zstd"], default-features = false } owning_ref = "0.4.0" [dev-dependencies] From ed372f480c65c7e808aa7606524c17ce7d42b8d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Fri, 22 Nov 2019 12:00:57 +0100 Subject: [PATCH 17/37] Also add `kvdb` as path dependency --- kvdb-rocksdb/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kvdb-rocksdb/Cargo.toml b/kvdb-rocksdb/Cargo.toml index 5fcbbd2c3..c85e496a3 100644 --- a/kvdb-rocksdb/Cargo.toml +++ b/kvdb-rocksdb/Cargo.toml @@ -11,7 +11,7 @@ edition = "2018" elastic-array = "0.10.2" fs-swap = "0.2.4" interleaved-ordered = "0.1.1" -kvdb = { version = "0.1" } +kvdb = { path = "../kvdb", version = "0.1" } log = "0.4.8" num_cpus = "1.10.1" parking_lot = "0.9.0" From 28de430c2eb7792abcb110d1738fcb25dac66970 Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Fri, 22 Nov 2019 12:38:25 +0100 Subject: [PATCH 18/37] Apply suggestions from code review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Bastian Köcher --- kvdb-rocksdb/src/iter.rs | 4 ++-- kvdb-rocksdb/src/lib.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/kvdb-rocksdb/src/iter.rs b/kvdb-rocksdb/src/iter.rs index aaef6dae0..329018097 100644 --- a/kvdb-rocksdb/src/iter.rs +++ b/kvdb-rocksdb/src/iter.rs @@ -33,7 +33,7 @@ pub struct ReadGuardedIterator<'a, I, T> { // We can't implement `StableAddress` for a `RwLockReadGuard` // directly due to orphan rules. #[repr(transparent)] -struct UnsafeStableAddress(T); +struct UnsafeStableAddress<'a, T>(RwLockReadGuard<'a, T>); impl Deref for UnsafeStableAddress { type Target = T::Target; @@ -43,7 +43,7 @@ impl Deref for UnsafeStableAddress { } // RwLockReadGuard dereferences to a stable address; qed -unsafe impl StableAddress for UnsafeStableAddress {} +unsafe impl StableAddress for UnsafeStableAddress {} struct DerefWrapper(T); diff --git a/kvdb-rocksdb/src/lib.rs b/kvdb-rocksdb/src/lib.rs index a7594d97f..aecbd6e97 100644 --- a/kvdb-rocksdb/src/lib.rs +++ b/kvdb-rocksdb/src/lib.rs @@ -201,7 +201,7 @@ impl DBAndColumns { } // get column family configuration from database config. -fn col_config(config: &DatabaseConfig, block_opts: &BlockBasedOptions) -> io::Result { +fn col_config(config: &DatabaseConfig, block_opts: &BlockBasedOptions) -> Options { let mut opts = Options::default(); opts.set_level_compaction_dynamic_level_bytes(true); From d9787591e36110f67766846e7db4d7f1139d423d Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Fri, 22 Nov 2019 12:43:19 +0100 Subject: [PATCH 19/37] kvdb-rocksdb: fix build --- kvdb-rocksdb/src/iter.rs | 8 ++++---- kvdb-rocksdb/src/lib.rs | 6 +++--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/kvdb-rocksdb/src/iter.rs b/kvdb-rocksdb/src/iter.rs index 329018097..e36a7f95b 100644 --- a/kvdb-rocksdb/src/iter.rs +++ b/kvdb-rocksdb/src/iter.rs @@ -27,7 +27,7 @@ use std::ops::{Deref, DerefMut}; pub type KeyValuePair = (Box<[u8]>, Box<[u8]>); pub struct ReadGuardedIterator<'a, I, T> { - inner: OwningHandle>>, DerefWrapper>>, + inner: OwningHandle>, DerefWrapper>>, } // We can't implement `StableAddress` for a `RwLockReadGuard` @@ -35,15 +35,15 @@ pub struct ReadGuardedIterator<'a, I, T> { #[repr(transparent)] struct UnsafeStableAddress<'a, T>(RwLockReadGuard<'a, T>); -impl Deref for UnsafeStableAddress { - type Target = T::Target; +impl<'a, T> Deref for UnsafeStableAddress<'a, T> { + type Target = T; fn deref(&self) -> &Self::Target { self.0.deref() } } // RwLockReadGuard dereferences to a stable address; qed -unsafe impl StableAddress for UnsafeStableAddress {} +unsafe impl<'a, T> StableAddress for UnsafeStableAddress<'a, T> {} struct DerefWrapper(T); diff --git a/kvdb-rocksdb/src/lib.rs b/kvdb-rocksdb/src/lib.rs index aecbd6e97..bb5e25bdf 100644 --- a/kvdb-rocksdb/src/lib.rs +++ b/kvdb-rocksdb/src/lib.rs @@ -210,7 +210,7 @@ fn col_config(config: &DatabaseConfig, block_opts: &BlockBasedOptions) -> Option opts.set_target_file_size_base(config.compaction.initial_file_size); opts.set_compression_per_level(&[]); - Ok(opts) + opts } /// Key-Value database. @@ -301,7 +301,7 @@ impl Database { let cfnames: Vec<&str> = column_names.iter().map(|n| n as &str).collect(); for _ in 0..config.columns.unwrap_or(0) { - cf_options.push(col_config(&config, &block_opts)?); + cf_options.push(col_config(&config, &block_opts)); } let write_opts = WriteOptions::new(); @@ -636,7 +636,7 @@ impl Database { Some(DBAndColumns { ref mut db, ref mut column_names }) => { let col = column_names.len() as u32; let name = format!("col{}", col); - let _ = db.create_cf(&name, &col_config(&self.config, &self.block_opts)?).map_err(other_io_err)?; + let _ = db.create_cf(&name, &col_config(&self.config, &self.block_opts)).map_err(other_io_err)?; column_names.push(name); Ok(()) } From 13b029c173cc44aa9ef4025cafef97844461e572 Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Fri, 22 Nov 2019 14:20:07 +0100 Subject: [PATCH 20/37] kvdb-rocksdb: use open_cf_descriptors --- kvdb-rocksdb/src/lib.rs | 91 +++++++++++++++++++---------------------- 1 file changed, 43 insertions(+), 48 deletions(-) diff --git a/kvdb-rocksdb/src/lib.rs b/kvdb-rocksdb/src/lib.rs index ce28eb640..1d4065235 100644 --- a/kvdb-rocksdb/src/lib.rs +++ b/kvdb-rocksdb/src/lib.rs @@ -19,7 +19,9 @@ mod iter; use std::{cmp, collections::HashMap, convert::identity, error, fs, io, mem, path::Path, result}; use parking_lot::{Mutex, MutexGuard, RwLock}; -use rocksdb::{BlockBasedOptions, ColumnFamily, Error, Options, ReadOptions, WriteBatch, WriteOptions, DB}; +use rocksdb::{ + BlockBasedOptions, ColumnFamily, ColumnFamilyDescriptor, Error, Options, ReadOptions, WriteBatch, WriteOptions, DB, +}; use crate::iter::KeyValuePair; use elastic_array::ElasticArray32; @@ -112,11 +114,7 @@ impl CompactionProfile { let hdd_check_file = db_path .to_str() .and_then(|path_str| Command::new("df").arg(path_str).output().ok()) - .and_then(|df_res| if df_res.status.success() { - Some(df_res.stdout) - } else { - None - }) + .and_then(|df_res| if df_res.status.success() { Some(df_res.stdout) } else { None }) .and_then(rotational_from_df_output); // Read out the file and match compaction profile. if let Some(hdd_check) = hdd_check_file { @@ -151,10 +149,7 @@ impl CompactionProfile { /// Slow HDD compaction profile pub fn hdd() -> CompactionProfile { - CompactionProfile { - initial_file_size: 256 * MB as u64, - block_size: 64 * KB, - } + CompactionProfile { initial_file_size: 256 * MB as u64, block_size: 64 * KB } } } @@ -208,7 +203,7 @@ impl DatabaseConfig { opts.optimize_level_style_compaction(memory_budget_per_col); opts.set_target_file_size_base(self.compaction.initial_file_size); opts.set_compression_per_level(&[]); - + opts } } @@ -306,7 +301,7 @@ fn generate_block_based_options(config: &DatabaseConfig) -> BlockBasedOptions { block_opts.set_cache_index_and_filter_blocks(true); block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true); block_opts.set_bloom_filter(10, true); - + block_opts } @@ -332,43 +327,37 @@ impl Database { fs::remove_file(db_corrupted)?; } - let mut cf_options = Vec::with_capacity(columns as usize); let column_names: Vec<_> = (0..columns).map(|c| format!("col{}", c)).collect(); - let cfnames: Vec<&str> = column_names.iter().map(|n| n as &str).collect(); - for i in 0..columns { - cf_options.push(config.column_config(&block_opts, Some(i))); - } - - let write_opts = WriteOptions::new(); + let write_opts = WriteOptions::default(); let mut read_opts = ReadOptions::default(); read_opts.set_prefix_same_as_start(true); read_opts.set_verify_checksums(false); - let db = match config.columns { - Some(_) => { - match DB::open_cf(&opts, path, &cfnames) { - Ok(db) => { - for name in &cfnames { - let _ = db.cf_handle(name).expect("rocksdb opens a cf_handle for each cfname; qed"); - } - Ok(db) - } - Err(_) => { - // retry and create CFs - match DB::open_cf(&opts, path, &[] as &[&str]) { - Ok(mut db) => { - for (i, name) in cfnames.iter().enumerate() { - let _ = db.create_cf(name, &cf_options[i]).map_err(other_io_err)?; - } - Ok(db) + let db = if config.columns.is_some() { + let cf_descriptors: Vec<_> = (0..columns) + .map(|i| { + ColumnFamilyDescriptor::new(&column_names[i as usize], config.column_config(&block_opts, Some(i))) + }) + .collect(); + + match DB::open_cf_descriptors(&opts, path, cf_descriptors) { + Err(_) => { + // retry and create CFs + match DB::open_cf(&opts, path, &[] as &[&str]) { + Ok(mut db) => { + for (i, name) in column_names.iter().enumerate() { + let _ = db.create_cf(name, &config.column_config(&block_opts, Some(i as u32))).map_err(other_io_err)?; } - err => err, + Ok(db) } + err => err, } } + ok => ok, } - None => DB::open(&opts, path), + } else { + DB::open(&opts, path) }; let db = match db { @@ -377,22 +366,28 @@ impl Database { warn!("DB corrupted: {}, attempting repair", s); DB::repair(&opts, path).map_err(other_io_err)?; - if cfnames.is_empty() { - DB::open(&opts, path).map_err(other_io_err)? + if config.columns.is_some() { + let cf_descriptors: Vec<_> = (0..columns) + .map(|i| { + ColumnFamilyDescriptor::new( + &column_names[i as usize], + config.column_config(&block_opts, Some(i)), + ) + }) + .collect(); + + DB::open_cf_descriptors(&opts, path, cf_descriptors).map_err(other_io_err)? } else { - // TODO: open_cf_descriptors - let db = DB::open_cf(&opts, path, &cfnames).map_err(other_io_err)?; - db + DB::open(&opts, path).map_err(other_io_err)? } } Err(s) => return Err(other_io_err(s)), }; - let num_cols = column_names.len(); Ok(Database { db: RwLock::new(Some(DBAndColumns { db, column_names })), config: config.clone(), - overlay: RwLock::new((0..=num_cols).map(|_| HashMap::new()).collect()), - flushing: RwLock::new((0..=num_cols).map(|_| HashMap::new()).collect()), + overlay: RwLock::new((0..=columns).map(|_| HashMap::new()).collect()), + flushing: RwLock::new((0..=columns).map(|_| HashMap::new()).collect()), flushing_lock: Mutex::new(false), path: path.to_owned(), read_opts: read_opts.into(), @@ -842,11 +837,11 @@ mod tests { let config = DatabaseConfig::default(); let config_5 = DatabaseConfig::with_columns(Some(5)); - let tempdir = TempDir::new("").unwrap(); + let tempdir = TempDir::new("drop_columns").unwrap(); // open 5, remove all. { - let db = Database::open(&config_5, tempdir.path().to_str().unwrap()).unwrap(); + let db = Database::open(&config_5, tempdir.path().to_str().unwrap()).expect("open with 5 columns"); assert_eq!(db.num_columns(), 5); for i in (0..5).rev() { From c88186f81bb2a32a9a0e5a43b1abda15f69c5827 Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Fri, 22 Nov 2019 14:28:12 +0100 Subject: [PATCH 21/37] kvdb-rocksdb: remove redundant .into() --- kvdb-rocksdb/src/lib.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/kvdb-rocksdb/src/lib.rs b/kvdb-rocksdb/src/lib.rs index 1d4065235..edde7ae8f 100644 --- a/kvdb-rocksdb/src/lib.rs +++ b/kvdb-rocksdb/src/lib.rs @@ -390,9 +390,9 @@ impl Database { flushing: RwLock::new((0..=columns).map(|_| HashMap::new()).collect()), flushing_lock: Mutex::new(false), path: path.to_owned(), - read_opts: read_opts.into(), - write_opts: write_opts.into(), - block_opts: block_opts.into(), + read_opts, + write_opts, + block_opts, }) } From 03a2ba08f47f4af4219280e660a1ea92cb8896bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Fri, 22 Nov 2019 18:26:53 +0100 Subject: [PATCH 22/37] Disable `zstd` again --- kvdb-rocksdb/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kvdb-rocksdb/Cargo.toml b/kvdb-rocksdb/Cargo.toml index c85e496a3..043b50028 100644 --- a/kvdb-rocksdb/Cargo.toml +++ b/kvdb-rocksdb/Cargo.toml @@ -16,7 +16,7 @@ log = "0.4.8" num_cpus = "1.10.1" parking_lot = "0.9.0" regex = "1.3.1" -rocksdb = { version = "0.13", features = ["lz4", "snappy", "zstd"], default-features = false } +rocksdb = { version = "0.13", features = [ "lz4", "snappy" ], default-features = false } owning_ref = "0.4.0" [dev-dependencies] From 407c604d44c36e915b5651f185660c8ad3680232 Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Tue, 26 Nov 2019 15:39:48 +0100 Subject: [PATCH 23/37] kvdb-rocksdb: set block_size to 64 KB again --- kvdb-rocksdb/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kvdb-rocksdb/src/lib.rs b/kvdb-rocksdb/src/lib.rs index edde7ae8f..7c6eda4a9 100644 --- a/kvdb-rocksdb/src/lib.rs +++ b/kvdb-rocksdb/src/lib.rs @@ -144,7 +144,7 @@ impl CompactionProfile { /// Default profile suitable for SSD storage pub fn ssd() -> CompactionProfile { - CompactionProfile { initial_file_size: 64 * MB as u64, block_size: 8 * MB } + CompactionProfile { initial_file_size: 64 * MB as u64, block_size: 64 * KB } } /// Slow HDD compaction profile From 1739d8a43336121df0e62cfd6d9bed9ecd32cb9e Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Tue, 26 Nov 2019 15:40:19 +0100 Subject: [PATCH 24/37] kvdb-rocksdb: cargo fmt --- kvdb-rocksdb/src/lib.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/kvdb-rocksdb/src/lib.rs b/kvdb-rocksdb/src/lib.rs index 7c6eda4a9..eb73920f1 100644 --- a/kvdb-rocksdb/src/lib.rs +++ b/kvdb-rocksdb/src/lib.rs @@ -347,7 +347,9 @@ impl Database { match DB::open_cf(&opts, path, &[] as &[&str]) { Ok(mut db) => { for (i, name) in column_names.iter().enumerate() { - let _ = db.create_cf(name, &config.column_config(&block_opts, Some(i as u32))).map_err(other_io_err)?; + let _ = db + .create_cf(name, &config.column_config(&block_opts, Some(i as u32))) + .map_err(other_io_err)?; } Ok(db) } From 8c658baa5b90e102de89893bb7a50efa1dfeabae Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Tue, 26 Nov 2019 15:40:19 +0100 Subject: [PATCH 25/37] kvdb-rocksdb: set back block_size to 16 KB --- kvdb-rocksdb/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kvdb-rocksdb/src/lib.rs b/kvdb-rocksdb/src/lib.rs index eb73920f1..c37a8adb2 100644 --- a/kvdb-rocksdb/src/lib.rs +++ b/kvdb-rocksdb/src/lib.rs @@ -144,7 +144,7 @@ impl CompactionProfile { /// Default profile suitable for SSD storage pub fn ssd() -> CompactionProfile { - CompactionProfile { initial_file_size: 64 * MB as u64, block_size: 64 * KB } + CompactionProfile { initial_file_size: 64 * MB as u64, block_size: 16 * KB } } /// Slow HDD compaction profile From 87d0aecd3969837987599111fc6a4a19a0a69f61 Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Tue, 26 Nov 2019 16:54:44 +0100 Subject: [PATCH 26/37] moar cargo fmt --- ethbloom/src/lib.rs | 2 +- rlp/src/rlpin.rs | 7 +++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/ethbloom/src/lib.rs b/ethbloom/src/lib.rs index d21f62022..9ef11e3ee 100644 --- a/ethbloom/src/lib.rs +++ b/ethbloom/src/lib.rs @@ -93,7 +93,7 @@ impl<'a> From> for Hash<'a> { keccak256.update(raw); keccak256.finalize(&mut out); Hash::Owned(out) - }, + } Input::Hash(hash) => Hash::Ref(hash), } } diff --git a/rlp/src/rlpin.rs b/rlp/src/rlpin.rs index 5dd3730b6..f1c488626 100644 --- a/rlp/src/rlpin.rs +++ b/rlp/src/rlpin.rs @@ -201,10 +201,9 @@ impl<'a> Rlp<'a> { /// raw data slice. /// /// Returns an error if this Rlp is not a list or if the index is out of range. - pub fn at_with_offset<'view>(&'view self, index: usize) - -> Result<(Rlp<'a>, usize), DecoderError> - where - 'a: 'view, + pub fn at_with_offset<'view>(&'view self, index: usize) -> Result<(Rlp<'a>, usize), DecoderError> + where + 'a: 'view, { if !self.is_list() { return Err(DecoderError::RlpExpectedToBeList); From 3a071aff381d1ae61a706965e45f15dfbba180ed Mon Sep 17 00:00:00 2001 From: David Palm Date: Tue, 26 Nov 2019 22:06:36 +0100 Subject: [PATCH 27/37] Add tests for budget calculation --- kvdb-rocksdb/src/lib.rs | 36 ++++++++++++++++++++++++++++++++++-- 1 file changed, 34 insertions(+), 2 deletions(-) diff --git a/kvdb-rocksdb/src/lib.rs b/kvdb-rocksdb/src/lib.rs index c37a8adb2..c2259bed4 100644 --- a/kvdb-rocksdb/src/lib.rs +++ b/kvdb-rocksdb/src/lib.rs @@ -189,13 +189,13 @@ impl DatabaseConfig { } /// Returns the memory budget of the specified column in bytes. - fn memory_budget_per_col(&self, col: Option) -> MiB { + fn memory_budget_for_col(&self, col: Option) -> MiB { self.memory_budget.get(&col).unwrap_or(&DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB) * MB } // Get column family configuration with the given block based options. fn column_config(&self, block_opts: &BlockBasedOptions, col: Option) -> Options { - let memory_budget_per_col = self.memory_budget_per_col(col); + let memory_budget_per_col = self.memory_budget_for_col(col); let mut opts = Options::default(); opts.set_level_compaction_dynamic_level_bytes(true); @@ -875,4 +875,36 @@ mod tests { assert_eq!(db.get(None, b"foo").unwrap().unwrap().as_ref(), b"baz"); } + + #[test] + fn default_memory_budget() { + let c = DatabaseConfig::default(); + assert_eq!(c.columns, None); + assert_eq!( + c.memory_budget(), + DB_DEFAULT_MEMORY_BUDGET_MB * MB, + "total memory budget is default" + ); + assert_eq!( + c.memory_budget_for_col(Some(0)), + DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB * MB, + "total memory budget for column 0 is the default" + ); + assert_eq!( + c.memory_budget_for_col(Some(999)), + DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB * MB, + "total memory budget for any column is the default" + ); + } + + #[test] + fn memory_budget() { + let mut c = DatabaseConfig::with_columns(Some(3)); + c.memory_budget = [ + (0, 10), + (1, 15), + (2, 20), + ].iter().cloned().map(|(c, b)| (Some(c), b)).collect(); + assert_eq!(c.memory_budget(), 45 * MB, "total budget is the sum of the column budget"); + } } From cd343cc527cb2ad3f2230e021b5e1a5f4059667b Mon Sep 17 00:00:00 2001 From: David Palm Date: Wed, 27 Nov 2019 12:17:38 +0100 Subject: [PATCH 28/37] Add test to check the rocksdb settings --- kvdb-rocksdb/src/lib.rs | 94 +++++++++++++++++++++++++++++++++++------ 1 file changed, 80 insertions(+), 14 deletions(-) diff --git a/kvdb-rocksdb/src/lib.rs b/kvdb-rocksdb/src/lib.rs index c2259bed4..3788f4576 100644 --- a/kvdb-rocksdb/src/lib.rs +++ b/kvdb-rocksdb/src/lib.rs @@ -70,7 +70,7 @@ enum KeyState { #[derive(Clone, Copy, PartialEq, Debug)] pub struct CompactionProfile { /// L0-L1 target file size - /// The mimimum size should be calculated in accordance with the + /// The minimum size should be calculated in accordance with the /// number of levels and the expected size of the database. pub initial_file_size: u64, /// block size @@ -195,12 +195,12 @@ impl DatabaseConfig { // Get column family configuration with the given block based options. fn column_config(&self, block_opts: &BlockBasedOptions, col: Option) -> Options { - let memory_budget_per_col = self.memory_budget_for_col(col); + let column_mem_budget = self.memory_budget_for_col(col); let mut opts = Options::default(); opts.set_level_compaction_dynamic_level_bytes(true); opts.set_block_based_table_factory(block_opts); - opts.optimize_level_style_compaction(memory_budget_per_col); + opts.optimize_level_style_compaction(column_mem_budget); opts.set_target_file_size_base(self.compaction.initial_file_size); opts.set_compression_per_level(&[]); @@ -298,7 +298,10 @@ fn generate_block_based_options(config: &DatabaseConfig) -> BlockBasedOptions { // https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning#block-cache-size let cache_size = config.memory_budget() / 3; block_opts.set_lru_cache(cache_size); + // "index and filter blocks will be stored in block cache, together with all other data blocks." + // See: https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB#indexes-and-filter-blocks block_opts.set_cache_index_and_filter_blocks(true); + // Don't evict L0 filter/index blocks from the cache block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true); block_opts.set_bloom_filter(10, true); @@ -730,6 +733,7 @@ impl Drop for Database { mod tests { use super::*; use ethereum_types::H256; + use std::io::Read; use std::str::FromStr; use tempdir::TempDir; @@ -880,11 +884,7 @@ mod tests { fn default_memory_budget() { let c = DatabaseConfig::default(); assert_eq!(c.columns, None); - assert_eq!( - c.memory_budget(), - DB_DEFAULT_MEMORY_BUDGET_MB * MB, - "total memory budget is default" - ); + assert_eq!(c.memory_budget(), DB_DEFAULT_MEMORY_BUDGET_MB * MB, "total memory budget is default"); assert_eq!( c.memory_budget_for_col(Some(0)), DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB * MB, @@ -900,11 +900,77 @@ mod tests { #[test] fn memory_budget() { let mut c = DatabaseConfig::with_columns(Some(3)); - c.memory_budget = [ - (0, 10), - (1, 15), - (2, 20), - ].iter().cloned().map(|(c, b)| (Some(c), b)).collect(); - assert_eq!(c.memory_budget(), 45 * MB, "total budget is the sum of the column budget"); + c.memory_budget = [(0, 10), (1, 15), (2, 20)].iter().cloned().map(|(c, b)| (Some(c), b)).collect(); + assert_eq!( + c.memory_budget(), + 45 * MB + DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB * MB, + "total budget is the sum of the column budget plus the default mem budget for the default column" + ); + } + + #[test] + fn rocksdb_settings() { + const NUM_COLS: usize = 2; + let mut cfg = DatabaseConfig::with_columns(Some(NUM_COLS as u32)); + cfg.max_open_files = 12345; + cfg.compaction.block_size = 323232; + cfg.compaction.initial_file_size = 102030; + cfg.memory_budget = [(0, 30), (1, 300)].iter().cloned().map(|(c, b)| (Some(c), b)).collect(); + + let db_path = TempDir::new("config_test").expect("the OS can create tmp dirs"); + let _db = Database::open(&cfg, db_path.path().to_str().unwrap()).expect("can open a db"); + let mut rocksdb_log = std::fs::File::open(format!("{}/LOG", db_path.path().to_str().unwrap())) + .expect("rocksdb creates a LOG file"); + let mut settings = String::new(); + rocksdb_log.read_to_string(&mut settings).unwrap(); + // Check column count + assert!(settings.contains("Options for column family [default]"), "no default col"); + assert!(settings.contains("Options for column family [col0]"), "no col0"); + assert!(settings.contains("Options for column family [col1]"), "no col1"); + + // Check max_open_files + assert!(settings.contains("max_open_files: 12345")); + + // Check block size + assert!(settings.contains(" block_size: 323232")); + + // LRU cache (default column) + assert!(settings.contains("block_cache_options:\n capacity : 8388608")); + // LRU cache for non-default columns is ⅓ of memory budget (including default column) + let lru_size = (330 * MB + DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB * MB) / 3; + let needle = format!("block_cache_options:\n capacity : {}", lru_size); + let lru = settings.match_indices(&needle).collect::>().len(); + assert_eq!(lru, NUM_COLS); + + // Index/filters share cache + let include_indexes = settings.matches("cache_index_and_filter_blocks: 1").collect::>().len(); + assert_eq!(include_indexes, NUM_COLS); + // Pin index/filters on L0 + let pins = settings.matches("pin_l0_filter_and_index_blocks_in_cache: 1").collect::>().len(); + assert_eq!(pins, NUM_COLS); + + // Check target file size, aka initial file size + let l0_sizes = settings.matches("target_file_size_base: 102030").collect::>().len(); + assert_eq!(l0_sizes, NUM_COLS); + // The default column uses the default of 64Mb regardless of the setting. + assert!(settings.contains("target_file_size_base: 67108864")); + + // Check compression settings + let snappy_compression = settings.matches("Options.compression: Snappy").collect::>().len(); + // All columns use Snappy + assert_eq!(snappy_compression, NUM_COLS + 1); + // …even for L7 + let snappy_bottommost = settings.matches("Options.bottommost_compression: Disabled").collect::>().len(); + assert_eq!(snappy_bottommost, NUM_COLS + 1); + + // 7 levels + let levels = settings.matches("Options.num_levels: 7").collect::>().len(); + assert_eq!(levels, NUM_COLS + 1); + + // Don't fsync every store + assert!(settings.contains("Options.use_fsync: 0")); + + // We're using the old format + assert!(settings.contains("format_version: 2")); } } From 7eeaf7b499be306403402087df2758558f29841a Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Wed, 27 Nov 2019 14:07:18 +0100 Subject: [PATCH 29/37] kvdb-rocksdb: do not account for default column memory budget --- kvdb-rocksdb/src/lib.rs | 52 +++++++++++++++++++++++------------------ 1 file changed, 29 insertions(+), 23 deletions(-) diff --git a/kvdb-rocksdb/src/lib.rs b/kvdb-rocksdb/src/lib.rs index 3788f4576..50f6aab85 100644 --- a/kvdb-rocksdb/src/lib.rs +++ b/kvdb-rocksdb/src/lib.rs @@ -180,21 +180,23 @@ impl DatabaseConfig { /// Returns the total memory budget in bytes. pub fn memory_budget(&self) -> MiB { - if self.memory_budget.is_empty() && self.columns.is_none() { - return DB_DEFAULT_MEMORY_BUDGET_MB * MB; + match self.columns { + None => self.memory_budget.get(&None).unwrap_or(&DB_DEFAULT_MEMORY_BUDGET_MB) * MB, + Some(columns) => { + (0..columns) + .map(|i| self.memory_budget.get(&Some(i)).unwrap_or(&DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB) * MB) + .sum() + } } - (0..=self.columns.unwrap_or(0)) - .map(|i| self.memory_budget.get(&i.checked_sub(1)).unwrap_or(&DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB) * MB) - .sum() } /// Returns the memory budget of the specified column in bytes. - fn memory_budget_for_col(&self, col: Option) -> MiB { - self.memory_budget.get(&col).unwrap_or(&DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB) * MB + fn memory_budget_for_col(&self, col: u32) -> MiB { + self.memory_budget.get(&Some(col)).unwrap_or(&DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB) * MB } // Get column family configuration with the given block based options. - fn column_config(&self, block_opts: &BlockBasedOptions, col: Option) -> Options { + fn column_config(&self, block_opts: &BlockBasedOptions, col: u32) -> Options { let column_mem_budget = self.memory_budget_for_col(col); let mut opts = Options::default(); @@ -322,6 +324,10 @@ impl Database { let block_opts = generate_block_based_options(config); let columns = config.columns.unwrap_or(0); + if config.columns.is_some() && config.memory_budget.contains_key(&None) { + warn!("Memory budget for the default column (None) is ignored if columns.is_some()"); + } + // attempt database repair if it has been previously marked as corrupted let db_corrupted = Path::new(path).join(Database::CORRUPTION_FILE_NAME); if db_corrupted.exists() { @@ -340,7 +346,7 @@ impl Database { let db = if config.columns.is_some() { let cf_descriptors: Vec<_> = (0..columns) .map(|i| { - ColumnFamilyDescriptor::new(&column_names[i as usize], config.column_config(&block_opts, Some(i))) + ColumnFamilyDescriptor::new(&column_names[i as usize], config.column_config(&block_opts, i)) }) .collect(); @@ -351,7 +357,7 @@ impl Database { Ok(mut db) => { for (i, name) in column_names.iter().enumerate() { let _ = db - .create_cf(name, &config.column_config(&block_opts, Some(i as u32))) + .create_cf(name, &config.column_config(&block_opts, i as u32)) .map_err(other_io_err)?; } Ok(db) @@ -376,7 +382,7 @@ impl Database { .map(|i| { ColumnFamilyDescriptor::new( &column_names[i as usize], - config.column_config(&block_opts, Some(i)), + config.column_config(&block_opts, i), ) }) .collect(); @@ -670,7 +676,7 @@ impl Database { Some(DBAndColumns { ref mut db, ref mut column_names }) => { let col = column_names.len() as u32; let name = format!("col{}", col); - let col_config = self.config.column_config(&self.block_opts, Some(col as u32)); + let col_config = self.config.column_config(&self.block_opts, col as u32); let _ = db.create_cf(&name, &col_config).map_err(other_io_err)?; column_names.push(name); Ok(()) @@ -886,12 +892,12 @@ mod tests { assert_eq!(c.columns, None); assert_eq!(c.memory_budget(), DB_DEFAULT_MEMORY_BUDGET_MB * MB, "total memory budget is default"); assert_eq!( - c.memory_budget_for_col(Some(0)), + c.memory_budget_for_col(0), DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB * MB, "total memory budget for column 0 is the default" ); assert_eq!( - c.memory_budget_for_col(Some(999)), + c.memory_budget_for_col(999), DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB * MB, "total memory budget for any column is the default" ); @@ -900,19 +906,19 @@ mod tests { #[test] fn memory_budget() { let mut c = DatabaseConfig::with_columns(Some(3)); - c.memory_budget = [(0, 10), (1, 15), (2, 20)].iter().cloned().map(|(c, b)| (Some(c), b)).collect(); - assert_eq!( - c.memory_budget(), - 45 * MB + DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB * MB, - "total budget is the sum of the column budget plus the default mem budget for the default column" - ); + c.memory_budget = [ + (0, 10), + (1, 15), + (2, 20), + ].iter().cloned().map(|(c, b)| (Some(c), b)).collect(); + assert_eq!(c.memory_budget(), 45 * MB, "total budget is the sum of the column budget"); } #[test] fn rocksdb_settings() { const NUM_COLS: usize = 2; let mut cfg = DatabaseConfig::with_columns(Some(NUM_COLS as u32)); - cfg.max_open_files = 12345; + cfg.max_open_files = 999; // is capped OS fd limit (typically 1024) cfg.compaction.block_size = 323232; cfg.compaction.initial_file_size = 102030; cfg.memory_budget = [(0, 30), (1, 300)].iter().cloned().map(|(c, b)| (Some(c), b)).collect(); @@ -929,7 +935,7 @@ mod tests { assert!(settings.contains("Options for column family [col1]"), "no col1"); // Check max_open_files - assert!(settings.contains("max_open_files: 12345")); + assert!(settings.contains("max_open_files: 999")); // Check block size assert!(settings.contains(" block_size: 323232")); @@ -937,7 +943,7 @@ mod tests { // LRU cache (default column) assert!(settings.contains("block_cache_options:\n capacity : 8388608")); // LRU cache for non-default columns is ⅓ of memory budget (including default column) - let lru_size = (330 * MB + DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB * MB) / 3; + let lru_size = (330 * MB) / 3; let needle = format!("block_cache_options:\n capacity : {}", lru_size); let lru = settings.match_indices(&needle).collect::>().len(); assert_eq!(lru, NUM_COLS); From bd5abf18e6415c4975f1a2f56c1aea7bfaa68879 Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Wed, 27 Nov 2019 14:56:05 +0100 Subject: [PATCH 30/37] kvdb-rocksdb: please the CI --- kvdb-rocksdb/src/lib.rs | 27 ++++++++------------------- 1 file changed, 8 insertions(+), 19 deletions(-) diff --git a/kvdb-rocksdb/src/lib.rs b/kvdb-rocksdb/src/lib.rs index 50f6aab85..bb4290623 100644 --- a/kvdb-rocksdb/src/lib.rs +++ b/kvdb-rocksdb/src/lib.rs @@ -182,11 +182,9 @@ impl DatabaseConfig { pub fn memory_budget(&self) -> MiB { match self.columns { None => self.memory_budget.get(&None).unwrap_or(&DB_DEFAULT_MEMORY_BUDGET_MB) * MB, - Some(columns) => { - (0..columns) - .map(|i| self.memory_budget.get(&Some(i)).unwrap_or(&DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB) * MB) - .sum() - } + Some(columns) => (0..columns) + .map(|i| self.memory_budget.get(&Some(i)).unwrap_or(&DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB) * MB) + .sum(), } } @@ -345,9 +343,7 @@ impl Database { let db = if config.columns.is_some() { let cf_descriptors: Vec<_> = (0..columns) - .map(|i| { - ColumnFamilyDescriptor::new(&column_names[i as usize], config.column_config(&block_opts, i)) - }) + .map(|i| ColumnFamilyDescriptor::new(&column_names[i as usize], config.column_config(&block_opts, i))) .collect(); match DB::open_cf_descriptors(&opts, path, cf_descriptors) { @@ -380,10 +376,7 @@ impl Database { if config.columns.is_some() { let cf_descriptors: Vec<_> = (0..columns) .map(|i| { - ColumnFamilyDescriptor::new( - &column_names[i as usize], - config.column_config(&block_opts, i), - ) + ColumnFamilyDescriptor::new(&column_names[i as usize], config.column_config(&block_opts, i)) }) .collect(); @@ -906,11 +899,7 @@ mod tests { #[test] fn memory_budget() { let mut c = DatabaseConfig::with_columns(Some(3)); - c.memory_budget = [ - (0, 10), - (1, 15), - (2, 20), - ].iter().cloned().map(|(c, b)| (Some(c), b)).collect(); + c.memory_budget = [(0, 10), (1, 15), (2, 20)].iter().cloned().map(|(c, b)| (Some(c), b)).collect(); assert_eq!(c.memory_budget(), 45 * MB, "total budget is the sum of the column budget"); } @@ -918,7 +907,7 @@ mod tests { fn rocksdb_settings() { const NUM_COLS: usize = 2; let mut cfg = DatabaseConfig::with_columns(Some(NUM_COLS as u32)); - cfg.max_open_files = 999; // is capped OS fd limit (typically 1024) + cfg.max_open_files = 123; // is capped by the OS fd limit (typically 1024) cfg.compaction.block_size = 323232; cfg.compaction.initial_file_size = 102030; cfg.memory_budget = [(0, 30), (1, 300)].iter().cloned().map(|(c, b)| (Some(c), b)).collect(); @@ -935,7 +924,7 @@ mod tests { assert!(settings.contains("Options for column family [col1]"), "no col1"); // Check max_open_files - assert!(settings.contains("max_open_files: 999")); + assert!(settings.contains("max_open_files: 123")); // Check block size assert!(settings.contains(" block_size: 323232")); From f23574fa75140ab26171cb7343be49c9c1d0904c Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Wed, 27 Nov 2019 16:17:36 +0100 Subject: [PATCH 31/37] kvdb-rocksdb: remove lz4 feature as it has no effect for now --- kvdb-rocksdb/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kvdb-rocksdb/Cargo.toml b/kvdb-rocksdb/Cargo.toml index 043b50028..bbf2e42cb 100644 --- a/kvdb-rocksdb/Cargo.toml +++ b/kvdb-rocksdb/Cargo.toml @@ -16,7 +16,7 @@ log = "0.4.8" num_cpus = "1.10.1" parking_lot = "0.9.0" regex = "1.3.1" -rocksdb = { version = "0.13", features = [ "lz4", "snappy" ], default-features = false } +rocksdb = { version = "0.13", features = ["snappy"], default-features = false } owning_ref = "0.4.0" [dev-dependencies] From 6a64e3688fa1f9524078c3a9038303bce6542f71 Mon Sep 17 00:00:00 2001 From: David Palm Date: Thu, 28 Nov 2019 08:36:00 +0100 Subject: [PATCH 32/37] Address review grumbles --- kvdb-rocksdb/src/iter.rs | 31 +++++++++++++++++++------------ 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/kvdb-rocksdb/src/iter.rs b/kvdb-rocksdb/src/iter.rs index e36a7f95b..e61b726bb 100644 --- a/kvdb-rocksdb/src/iter.rs +++ b/kvdb-rocksdb/src/iter.rs @@ -24,8 +24,10 @@ use parking_lot::RwLockReadGuard; use rocksdb::{DBIterator, Direction, IteratorMode}; use std::ops::{Deref, DerefMut}; +/// A tuple holding key and value data, used as the iterator item type. pub type KeyValuePair = (Box<[u8]>, Box<[u8]>); +/// Iterator with built-in synchronization. pub struct ReadGuardedIterator<'a, I, T> { inner: OwningHandle>, DerefWrapper>>, } @@ -68,10 +70,15 @@ impl<'a, I: Iterator, T> Iterator for ReadGuardedIterator<'a, I, T> { } } +/// Instantiate iterators yielding `KeyValuePair`s. pub trait IterationHandler { type Iterator: Iterator; + /// Create an `Iterator` over the default DB column or over a `ColumnFamily` if a column number + /// is passed. fn iter(&self, col: Option) -> Self::Iterator; + /// Create an `Iterator` over the default DB column or over a `ColumnFamily` if a column number + /// is passed. The iterator starts from the first key having the provided `prefix`. fn iter_from_prefix(&self, col: Option, prefix: &[u8]) -> Self::Iterator; } @@ -80,21 +87,21 @@ where &'a T: IterationHandler, { pub fn new(read_lock: RwLockReadGuard<'a, Option>, col: Option) -> Self { - Self { - inner: OwningHandle::new_with_fn(UnsafeStableAddress(read_lock), move |rlock| { - let rlock = unsafe { rlock.as_ref().expect("initialized as non-null; qed") }; - DerefWrapper(rlock.as_ref().map(|db| db.iter(col))) - }), - } + Self { inner: Self::new_inner(read_lock, |db| db.iter(col)) } } pub fn new_from_prefix(read_lock: RwLockReadGuard<'a, Option>, col: Option, prefix: &[u8]) -> Self { - Self { - inner: OwningHandle::new_with_fn(UnsafeStableAddress(read_lock), move |rlock| { - let rlock = unsafe { rlock.as_ref().expect("initialized as non-null; qed") }; - DerefWrapper(rlock.as_ref().map(|db| db.iter_from_prefix(col, prefix))) - }), - } + Self { inner: Self::new_inner(read_lock, |db| db.iter_from_prefix(col, prefix)) } + } + + fn new_inner( + rlock: RwLockReadGuard<'a, Option>, + f: impl FnOnce(&'a T) -> <&'a T as IterationHandler>::Iterator, + ) -> OwningHandle>, DerefWrapper::Iterator>>> { + OwningHandle::new_with_fn(UnsafeStableAddress(rlock), move |rlock| { + let rlock = unsafe { rlock.as_ref().expect("initialized as non-null; qed") }; + DerefWrapper(rlock.as_ref().map(f)) + }) } } From 02174388f381bfe51b9e175e0ab1c3712661d88a Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Thu, 28 Nov 2019 11:53:21 +0100 Subject: [PATCH 33/37] kvdb-rocksdb: add failing iter_from_prefix test --- kvdb-rocksdb/src/lib.rs | 51 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/kvdb-rocksdb/src/lib.rs b/kvdb-rocksdb/src/lib.rs index bb4290623..cc81ce264 100644 --- a/kvdb-rocksdb/src/lib.rs +++ b/kvdb-rocksdb/src/lib.rs @@ -862,6 +862,57 @@ mod tests { } } + #[test] + fn test_iter_by_prefix() { + let tempdir = TempDir::new("").unwrap(); + let config = DatabaseConfig::default(); + let db = Database::open(&config, tempdir.path().to_str().unwrap()).unwrap(); + + let key1 = b"0"; + let key2 = b"ab"; + let key3 = b"abc"; + let key4 = b"abcd"; + + let mut batch = db.transaction(); + batch.put(None, key1, key1); + batch.put(None, key2, key2); + batch.put(None, key3, key3); + batch.put(None, key4, key4); + db.write(batch).unwrap(); + + // empty prefix + let contents: Vec<_> = db.iter_from_prefix(None, b"").into_iter().collect(); + assert_eq!(contents.len(), 4); + assert_eq!(&*contents[0].0, key1); + assert_eq!(&*contents[1].0, key2); + assert_eq!(&*contents[2].0, key3); + assert_eq!(&*contents[3].0, key4); + + // prefix a + let contents: Vec<_> = db.iter_from_prefix(None, b"a").into_iter().collect(); + assert_eq!(contents.len(), 3); + assert_eq!(&*contents[0].0, key2); + assert_eq!(&*contents[1].0, key3); + assert_eq!(&*contents[2].0, key4); + + // prefix abc + let contents: Vec<_> = db.iter_from_prefix(None, b"abc").into_iter().collect(); + assert_eq!(contents.len(), 2); + assert_eq!(&*contents[0].0, key3); + assert_eq!(&*contents[1].0, key4); + + // prefix abcde + let contents: Vec<_> = db.iter_from_prefix(None, b"abcde").into_iter().collect(); + assert_eq!(contents.len(), 0); + + // prefix 0 + // let contents: Vec<_> = db.iter_from_prefix(None, b"0").into_iter().collect(); + // assert_eq!(contents.len(), 1); + // TODO: this fails: ^^ + + // assert_eq!(&*contents[0].0, key1); + } + #[test] fn write_clears_buffered_ops() { let tempdir = TempDir::new("").unwrap(); From e0d59e516af0bb203e87215ba804a37f0faa8cee Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Thu, 28 Nov 2019 12:52:42 +0100 Subject: [PATCH 34/37] kvdb-rocksdb: add a workaround for the rocksdb prefix bug --- kvdb-rocksdb/src/iter.rs | 10 +++------- kvdb-rocksdb/src/lib.rs | 14 ++++++-------- 2 files changed, 9 insertions(+), 15 deletions(-) diff --git a/kvdb-rocksdb/src/iter.rs b/kvdb-rocksdb/src/iter.rs index e61b726bb..52934e1a8 100644 --- a/kvdb-rocksdb/src/iter.rs +++ b/kvdb-rocksdb/src/iter.rs @@ -21,7 +21,7 @@ use crate::DBAndColumns; use owning_ref::{OwningHandle, StableAddress}; use parking_lot::RwLockReadGuard; -use rocksdb::{DBIterator, Direction, IteratorMode}; +use rocksdb::{DBIterator, IteratorMode}; use std::ops::{Deref, DerefMut}; /// A tuple holding key and value data, used as the iterator item type. @@ -121,12 +121,8 @@ impl<'a> IterationHandler for &'a DBAndColumns { fn iter_from_prefix(&self, col: Option, prefix: &[u8]) -> Self::Iterator { col.map_or_else( - || self.db.iterator(IteratorMode::From(prefix, Direction::Forward)), - |c| { - self.db - .iterator_cf(self.get_cf(c as usize), IteratorMode::From(prefix, Direction::Forward)) - .expect("iterator params are valid; qed") - }, + || self.db.prefix_iterator(prefix), + |c| self.db.prefix_iterator_cf(self.get_cf(c as usize), prefix).expect("iterator params are valid; qed"), ) } } diff --git a/kvdb-rocksdb/src/lib.rs b/kvdb-rocksdb/src/lib.rs index cc81ce264..dd16679f4 100644 --- a/kvdb-rocksdb/src/lib.rs +++ b/kvdb-rocksdb/src/lib.rs @@ -338,7 +338,6 @@ impl Database { let write_opts = WriteOptions::default(); let mut read_opts = ReadOptions::default(); - read_opts.set_prefix_same_as_start(true); read_opts.set_verify_checksums(false); let db = if config.columns.is_some() { @@ -585,7 +584,7 @@ impl Database { fn iter_from_prefix<'a>( &'a self, col: Option, - prefix: &[u8], + prefix: &'a [u8], ) -> impl Iterator + 'a { let read_lock = self.db.read(); let optional = if read_lock.is_some() { @@ -594,7 +593,8 @@ impl Database { } else { None }; - optional.into_iter().flat_map(identity) + // workaround for https://github.com/facebook/rocksdb/issues/2343 + optional.into_iter().flat_map(identity).filter(move |(k, _)| k.starts_with(prefix)) } /// Close the database @@ -906,11 +906,9 @@ mod tests { assert_eq!(contents.len(), 0); // prefix 0 - // let contents: Vec<_> = db.iter_from_prefix(None, b"0").into_iter().collect(); - // assert_eq!(contents.len(), 1); - // TODO: this fails: ^^ - - // assert_eq!(&*contents[0].0, key1); + let contents: Vec<_> = db.iter_from_prefix(None, b"0").into_iter().collect(); + assert_eq!(contents.len(), 1); + assert_eq!(&*contents[0].0, key1); } #[test] From c4b5413b3c1b22923d7a1b9f01b554b2e86839f4 Mon Sep 17 00:00:00 2001 From: David Palm Date: Thu, 28 Nov 2019 13:01:33 +0100 Subject: [PATCH 35/37] More prefix iter test --- kvdb-rocksdb/src/lib.rs | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/kvdb-rocksdb/src/lib.rs b/kvdb-rocksdb/src/lib.rs index cc81ce264..5222c1c07 100644 --- a/kvdb-rocksdb/src/lib.rs +++ b/kvdb-rocksdb/src/lib.rs @@ -741,22 +741,32 @@ mod tests { let db = Database::open(config, tempdir.path().to_str().unwrap()).unwrap(); let key1 = H256::from_str("02c69be41d0b7e40352fc85be1cd65eb03d40ef8427a0ca4596b1ead9a00e9fc").unwrap(); let key2 = H256::from_str("03c69be41d0b7e40352fc85be1cd65eb03d40ef8427a0ca4596b1ead9a00e9fc").unwrap(); - let key3 = H256::from_str("01c69be41d0b7e40352fc85be1cd65eb03d40ef8427a0ca4596b1ead9a00e9fc").unwrap(); + let key3 = H256::from_str("04c60000000b7e40352fc85be1cd65eb03d40ef8427a0ca4596b1ead9a00e9fc").unwrap(); + let key4 = H256::from_str("04c71111110b7e40352fc85be1cd65eb03d40ef8427a0ca4596b1ead9a00e9fc").unwrap(); + let key5 = H256::from_str("04c82222220b7e40352fc85be1cd65eb03d40ef8427a0ca4596b1ead9a00e9fc").unwrap(); let mut batch = db.transaction(); batch.put(None, key1.as_bytes(), b"cat"); batch.put(None, key2.as_bytes(), b"dog"); + batch.put(None, key3.as_bytes(), b"caterpillar"); + batch.put(None, key4.as_bytes(), b"beef"); + batch.put(None, key5.as_bytes(), b"fish"); db.write(batch).unwrap(); assert_eq!(&*db.get(None, key1.as_bytes()).unwrap().unwrap(), b"cat"); let contents: Vec<_> = db.iter(None).into_iter().collect(); - assert_eq!(contents.len(), 2); + assert_eq!(contents.len(), 5); assert_eq!(&*contents[0].0, key1.as_bytes()); assert_eq!(&*contents[0].1, b"cat"); assert_eq!(&*contents[1].0, key2.as_bytes()); assert_eq!(&*contents[1].1, b"dog"); + let mut prefix_iter = db.iter_from_prefix(None, &key3.as_bytes()[..2]); + assert_eq!(*prefix_iter.next().unwrap().1, b"caterpillar"[..]); + assert_eq!(*prefix_iter.next().unwrap().1, b"beef"[..]); + assert_eq!(*prefix_iter.next().unwrap().1, b"fish"[..]); + let mut batch = db.transaction(); batch.delete(None, key1.as_bytes()); db.write(batch).unwrap(); From b40b416ea5b0bfd0a44b4fd7d5a2980452f71326 Mon Sep 17 00:00:00 2001 From: David Palm Date: Thu, 28 Nov 2019 13:39:15 +0100 Subject: [PATCH 36/37] Fix tests (hex, it's so hard) --- kvdb-rocksdb/src/lib.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/kvdb-rocksdb/src/lib.rs b/kvdb-rocksdb/src/lib.rs index f67c64310..f173c6f5b 100644 --- a/kvdb-rocksdb/src/lib.rs +++ b/kvdb-rocksdb/src/lib.rs @@ -741,9 +741,9 @@ mod tests { let db = Database::open(config, tempdir.path().to_str().unwrap()).unwrap(); let key1 = H256::from_str("02c69be41d0b7e40352fc85be1cd65eb03d40ef8427a0ca4596b1ead9a00e9fc").unwrap(); let key2 = H256::from_str("03c69be41d0b7e40352fc85be1cd65eb03d40ef8427a0ca4596b1ead9a00e9fc").unwrap(); - let key3 = H256::from_str("04c60000000b7e40352fc85be1cd65eb03d40ef8427a0ca4596b1ead9a00e9fc").unwrap(); - let key4 = H256::from_str("04c71111110b7e40352fc85be1cd65eb03d40ef8427a0ca4596b1ead9a00e9fc").unwrap(); - let key5 = H256::from_str("04c82222220b7e40352fc85be1cd65eb03d40ef8427a0ca4596b1ead9a00e9fc").unwrap(); + let key3 = H256::from_str("04c00000000b7e40352fc85be1cd65eb03d40ef8427a0ca4596b1ead9a00e9fc").unwrap(); + let key4 = H256::from_str("04c01111110b7e40352fc85be1cd65eb03d40ef8427a0ca4596b1ead9a00e9fc").unwrap(); + let key5 = H256::from_str("04c02222220b7e40352fc85be1cd65eb03d40ef8427a0ca4596b1ead9a00e9fc").unwrap(); let mut batch = db.transaction(); batch.put(None, key1.as_bytes(), b"cat"); @@ -762,7 +762,7 @@ mod tests { assert_eq!(&*contents[1].0, key2.as_bytes()); assert_eq!(&*contents[1].1, b"dog"); - let mut prefix_iter = db.iter_from_prefix(None, &key3.as_bytes()[..2]); + let mut prefix_iter = db.iter_from_prefix(None, &[0x04,0xc0]); assert_eq!(*prefix_iter.next().unwrap().1, b"caterpillar"[..]); assert_eq!(*prefix_iter.next().unwrap().1, b"beef"[..]); assert_eq!(*prefix_iter.next().unwrap().1, b"fish"[..]); From 38876cf6a0bb33e2c7e2a897294fc67f2720037b Mon Sep 17 00:00:00 2001 From: David Palm Date: Thu, 28 Nov 2019 13:40:43 +0100 Subject: [PATCH 37/37] whitespace --- kvdb-rocksdb/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kvdb-rocksdb/src/lib.rs b/kvdb-rocksdb/src/lib.rs index f173c6f5b..3a0905273 100644 --- a/kvdb-rocksdb/src/lib.rs +++ b/kvdb-rocksdb/src/lib.rs @@ -762,7 +762,7 @@ mod tests { assert_eq!(&*contents[1].0, key2.as_bytes()); assert_eq!(&*contents[1].1, b"dog"); - let mut prefix_iter = db.iter_from_prefix(None, &[0x04,0xc0]); + let mut prefix_iter = db.iter_from_prefix(None, &[0x04, 0xc0]); assert_eq!(*prefix_iter.next().unwrap().1, b"caterpillar"[..]); assert_eq!(*prefix_iter.next().unwrap().1, b"beef"[..]); assert_eq!(*prefix_iter.next().unwrap().1, b"fish"[..]);