diff --git a/kvdb-rocksdb/Cargo.toml b/kvdb-rocksdb/Cargo.toml index cda3cc24b..bbf2e42cb 100644 --- a/kvdb-rocksdb/Cargo.toml +++ b/kvdb-rocksdb/Cargo.toml @@ -11,12 +11,13 @@ edition = "2018" elastic-array = "0.10.2" fs-swap = "0.2.4" interleaved-ordered = "0.1.1" -kvdb = { version = "0.1", path = "../kvdb" } +kvdb = { path = "../kvdb", version = "0.1" } log = "0.4.8" num_cpus = "1.10.1" parking_lot = "0.9.0" regex = "1.3.1" -parity-rocksdb = "0.5.1" +rocksdb = { version = "0.13", features = ["snappy"], default-features = false } +owning_ref = "0.4.0" [dev-dependencies] tempdir = "0.3.7" diff --git a/kvdb-rocksdb/src/iter.rs b/kvdb-rocksdb/src/iter.rs new file mode 100644 index 000000000..52934e1a8 --- /dev/null +++ b/kvdb-rocksdb/src/iter.rs @@ -0,0 +1,128 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! This module contains an implementation of a RocksDB iterator +//! wrapped inside a `RwLock`. Since `RwLock` "owns" the inner data, +//! we're using `owning_ref` to work around the borrowing rules of Rust. + +use crate::DBAndColumns; +use owning_ref::{OwningHandle, StableAddress}; +use parking_lot::RwLockReadGuard; +use rocksdb::{DBIterator, IteratorMode}; +use std::ops::{Deref, DerefMut}; + +/// A tuple holding key and value data, used as the iterator item type. +pub type KeyValuePair = (Box<[u8]>, Box<[u8]>); + +/// Iterator with built-in synchronization. +pub struct ReadGuardedIterator<'a, I, T> { + inner: OwningHandle>, DerefWrapper>>, +} + +// We can't implement `StableAddress` for a `RwLockReadGuard` +// directly due to orphan rules. +#[repr(transparent)] +struct UnsafeStableAddress<'a, T>(RwLockReadGuard<'a, T>); + +impl<'a, T> Deref for UnsafeStableAddress<'a, T> { + type Target = T; + fn deref(&self) -> &Self::Target { + self.0.deref() + } +} + +// RwLockReadGuard dereferences to a stable address; qed +unsafe impl<'a, T> StableAddress for UnsafeStableAddress<'a, T> {} + +struct DerefWrapper(T); + +impl Deref for DerefWrapper { + type Target = T; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for DerefWrapper { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl<'a, I: Iterator, T> Iterator for ReadGuardedIterator<'a, I, T> { + type Item = I::Item; + + fn next(&mut self) -> Option { + self.inner.deref_mut().as_mut().and_then(|iter| iter.next()) + } +} + +/// Instantiate iterators yielding `KeyValuePair`s. +pub trait IterationHandler { + type Iterator: Iterator; + + /// Create an `Iterator` over the default DB column or over a `ColumnFamily` if a column number + /// is passed. + fn iter(&self, col: Option) -> Self::Iterator; + /// Create an `Iterator` over the default DB column or over a `ColumnFamily` if a column number + /// is passed. The iterator starts from the first key having the provided `prefix`. + fn iter_from_prefix(&self, col: Option, prefix: &[u8]) -> Self::Iterator; +} + +impl<'a, T> ReadGuardedIterator<'a, <&'a T as IterationHandler>::Iterator, T> +where + &'a T: IterationHandler, +{ + pub fn new(read_lock: RwLockReadGuard<'a, Option>, col: Option) -> Self { + Self { inner: Self::new_inner(read_lock, |db| db.iter(col)) } + } + + pub fn new_from_prefix(read_lock: RwLockReadGuard<'a, Option>, col: Option, prefix: &[u8]) -> Self { + Self { inner: Self::new_inner(read_lock, |db| db.iter_from_prefix(col, prefix)) } + } + + fn new_inner( + rlock: RwLockReadGuard<'a, Option>, + f: impl FnOnce(&'a T) -> <&'a T as IterationHandler>::Iterator, + ) -> OwningHandle>, DerefWrapper::Iterator>>> { + OwningHandle::new_with_fn(UnsafeStableAddress(rlock), move |rlock| { + let rlock = unsafe { rlock.as_ref().expect("initialized as non-null; qed") }; + DerefWrapper(rlock.as_ref().map(f)) + }) + } +} + +impl<'a> IterationHandler for &'a DBAndColumns { + type Iterator = DBIterator<'a>; + + fn iter(&self, col: Option) -> Self::Iterator { + col.map_or_else( + || self.db.iterator(IteratorMode::Start), + |c| { + self.db + .iterator_cf(self.get_cf(c as usize), IteratorMode::Start) + .expect("iterator params are valid; qed") + }, + ) + } + + fn iter_from_prefix(&self, col: Option, prefix: &[u8]) -> Self::Iterator { + col.map_or_else( + || self.db.prefix_iterator(prefix), + |c| self.db.prefix_iterator_cf(self.get_cf(c as usize), prefix).expect("iterator params are valid; qed"), + ) + } +} diff --git a/kvdb-rocksdb/src/lib.rs b/kvdb-rocksdb/src/lib.rs index 75d2eb615..3a0905273 100644 --- a/kvdb-rocksdb/src/lib.rs +++ b/kvdb-rocksdb/src/lib.rs @@ -14,17 +14,19 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . -use std::{cmp, collections::HashMap, error, fs, io, marker::PhantomData, mem, path::Path, result}; +mod iter; + +use std::{cmp, collections::HashMap, convert::identity, error, fs, io, mem, path::Path, result}; -use interleaved_ordered::{interleave_ordered, InterleaveOrdered}; -use parity_rocksdb::{ - BlockBasedOptions, Cache, Column, DBIterator, Direction, IteratorMode, Options, ReadOptions, Writable, WriteBatch, - WriteOptions, DB, -}; use parking_lot::{Mutex, MutexGuard, RwLock}; +use rocksdb::{ + BlockBasedOptions, ColumnFamily, ColumnFamilyDescriptor, Error, Options, ReadOptions, WriteBatch, WriteOptions, DB, +}; +use crate::iter::KeyValuePair; use elastic_array::ElasticArray32; use fs_swap::{swap, swap_nonatomic}; +use interleaved_ordered::interleave_ordered; use kvdb::{DBOp, DBTransaction, DBValue, KeyValueDB}; use log::{debug, warn}; @@ -68,7 +70,7 @@ enum KeyState { #[derive(Clone, Copy, PartialEq, Debug)] pub struct CompactionProfile { /// L0-L1 target file size - /// The mimimum size should be calculated in accordance with the + /// The minimum size should be calculated in accordance with the /// number of levels and the expected size of the database. pub initial_file_size: u64, /// block size @@ -142,7 +144,7 @@ impl CompactionProfile { /// Default profile suitable for SSD storage pub fn ssd() -> CompactionProfile { - CompactionProfile { initial_file_size: 64 * MB as u64, block_size: 8 * MB } + CompactionProfile { initial_file_size: 64 * MB as u64, block_size: 16 * KB } } /// Slow HDD compaction profile @@ -178,40 +180,31 @@ impl DatabaseConfig { /// Returns the total memory budget in bytes. pub fn memory_budget(&self) -> MiB { - if self.memory_budget.is_empty() && self.columns.is_none() { - return DB_DEFAULT_MEMORY_BUDGET_MB * MB; + match self.columns { + None => self.memory_budget.get(&None).unwrap_or(&DB_DEFAULT_MEMORY_BUDGET_MB) * MB, + Some(columns) => (0..columns) + .map(|i| self.memory_budget.get(&Some(i)).unwrap_or(&DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB) * MB) + .sum(), } - (0..=self.columns.unwrap_or(0)) - .map(|i| self.memory_budget.get(&i.checked_sub(1)).unwrap_or(&DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB) * MB) - .sum() } /// Returns the memory budget of the specified column in bytes. - fn memory_budget_per_col(&self, col: Option) -> MiB { - self.memory_budget.get(&col).unwrap_or(&DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB) * MB + fn memory_budget_for_col(&self, col: u32) -> MiB { + self.memory_budget.get(&Some(col)).unwrap_or(&DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB) * MB } // Get column family configuration with the given block based options. - fn column_config(&self, block_opts: &BlockBasedOptions, col: Option) -> io::Result { - let memory_budget_per_col = self.memory_budget_per_col(col); - let mut opts = Options::new(); - - opts.set_parsed_options("level_compaction_dynamic_level_bytes=true").map_err(other_io_err)?; + fn column_config(&self, block_opts: &BlockBasedOptions, col: u32) -> Options { + let column_mem_budget = self.memory_budget_for_col(col); + let mut opts = Options::default(); + opts.set_level_compaction_dynamic_level_bytes(true); opts.set_block_based_table_factory(block_opts); - - opts.set_parsed_options(&format!( - "block_based_table_factory={{{};{}}}", - "cache_index_and_filter_blocks=true", "pin_l0_filter_and_index_blocks_in_cache=true" - )) - .map_err(other_io_err)?; - - opts.optimize_level_style_compaction(memory_budget_per_col as i32); + opts.optimize_level_style_compaction(column_mem_budget); opts.set_target_file_size_base(self.compaction.initial_file_size); + opts.set_compression_per_level(&[]); - opts.set_parsed_options("compression_per_level=").map_err(other_io_err)?; - - Ok(opts) + opts } } @@ -227,35 +220,25 @@ impl Default for DatabaseConfig { } } -/// Database iterator (for flushed data only) -// The compromise of holding only a virtual borrow vs. holding a lock on the -// inner DB (to prevent closing via restoration) may be re-evaluated in the future. -pub struct DatabaseIterator<'a> { - iter: InterleaveOrdered<::std::vec::IntoIter<(Box<[u8]>, Box<[u8]>)>, DBIterator>, - _marker: PhantomData<&'a Database>, +struct DBAndColumns { + db: DB, + column_names: Vec, } -impl<'a> Iterator for DatabaseIterator<'a> { - type Item = (Box<[u8]>, Box<[u8]>); - - fn next(&mut self) -> Option { - self.iter.next() +impl DBAndColumns { + fn get_cf(&self, i: usize) -> &ColumnFamily { + self.db.cf_handle(&self.column_names[i]).expect("the specified column name is correct; qed") } } -struct DBAndColumns { - db: DB, - cfs: Vec, -} - /// Key-Value database. pub struct Database { db: RwLock>, config: DatabaseConfig, + path: String, write_opts: WriteOptions, read_opts: ReadOptions, block_opts: BlockBasedOptions, - path: String, // Dirty values added with `write_buffered`. Cleaned on `flush`. overlay: RwLock, KeyState>>>, // Values currently being flushed. Cleared when `flush` completes. @@ -266,9 +249,9 @@ pub struct Database { } #[inline] -fn check_for_corruption>(path: P, res: result::Result) -> io::Result { +fn check_for_corruption>(path: P, res: result::Result) -> io::Result { if let Err(ref s) = res { - if s.starts_with("Corruption:") { + if is_corrupted(s) { warn!("DB corrupted: {}. Repair will be triggered on next restart", s); let _ = fs::File::create(path.as_ref().join(Database::CORRUPTION_FILE_NAME)); } @@ -277,8 +260,52 @@ fn check_for_corruption>(path: P, res: result::Result bool { - s.starts_with("Corruption:") || s.starts_with("Invalid argument: You have to open all column families") +fn is_corrupted(err: &Error) -> bool { + err.as_ref().starts_with("Corruption:") + || err.as_ref().starts_with("Invalid argument: You have to open all column families") +} + +/// Generate the options for RocksDB, based on the given `DatabaseConfig`. +fn generate_options(config: &DatabaseConfig) -> Options { + let mut opts = Options::default(); + let columns = config.columns.unwrap_or(0); + + if columns == 0 { + let budget = config.memory_budget() / 2; + opts.set_db_write_buffer_size(budget); + // from https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB#memtable + // Memtable size is controlled by the option `write_buffer_size`. + // If you increase your memtable size, be sure to also increase your L1 size! + // L1 size is controlled by the option `max_bytes_for_level_base`. + opts.set_max_bytes_for_level_base(budget as u64); + } + + opts.set_use_fsync(false); + opts.create_if_missing(true); + opts.set_max_open_files(config.max_open_files); + opts.set_bytes_per_sync(1 * MB as u64); + opts.set_keep_log_file_num(1); + opts.increase_parallelism(cmp::max(1, num_cpus::get() as i32 / 2)); + + opts +} + +/// Generate the block based options for RocksDB, based on the given `DatabaseConfig`. +fn generate_block_based_options(config: &DatabaseConfig) -> BlockBasedOptions { + let mut block_opts = BlockBasedOptions::default(); + block_opts.set_block_size(config.compaction.block_size); + // Set cache size as recommended by + // https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning#block-cache-size + let cache_size = config.memory_budget() / 3; + block_opts.set_lru_cache(cache_size); + // "index and filter blocks will be stored in block cache, together with all other data blocks." + // See: https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB#indexes-and-filter-blocks + block_opts.set_cache_index_and_filter_blocks(true); + // Don't evict L0 filter/index blocks from the cache + block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true); + block_opts.set_bloom_filter(10, true); + + block_opts } impl Database { @@ -291,36 +318,12 @@ impl Database { /// Open database file. Creates if it does not exist. pub fn open(config: &DatabaseConfig, path: &str) -> io::Result { - let mut opts = Options::new(); - - opts.set_use_fsync(false); - opts.create_if_missing(true); - opts.set_max_open_files(config.max_open_files); - opts.set_parsed_options(&format!("keep_log_file_num={}", config.keep_log_file_num)).map_err(other_io_err)?; - opts.set_parsed_options("bytes_per_sync=1048576").map_err(other_io_err)?; - + let opts = generate_options(config); + let block_opts = generate_block_based_options(config); let columns = config.columns.unwrap_or(0); - if columns == 0 { - let budget = config.memory_budget() / 2; - opts.set_db_write_buffer_size(budget); - // from https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB#memtable - // Memtable size is controlled by the option `write_buffer_size`. - // If you increase your memtable size, be sure to also increase your L1 size! - // L1 size is controlled by the option `max_bytes_for_level_base`. - opts.set_parsed_options(&format!("max_bytes_for_level_base={}", budget)).map_err(other_io_err)?; - } - opts.increase_parallelism(cmp::max(1, num_cpus::get() as i32 / 2)); - - let mut block_opts = BlockBasedOptions::new(); - - { - block_opts.set_block_size(config.compaction.block_size); - // Set cache size as recommended by - // https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning#block-cache-size - let cache_size = config.memory_budget() / 3; - let cache = Cache::new(cache_size); - block_opts.set_cache(cache); + if config.columns.is_some() && config.memory_budget.contains_key(&None) { + warn!("Memory budget for the default column (None) is ignored if columns.is_some()"); } // attempt database repair if it has been previously marked as corrupted @@ -331,47 +334,36 @@ impl Database { fs::remove_file(db_corrupted)?; } - let mut cf_options = Vec::with_capacity(columns as usize); - let cfnames: Vec<_> = (0..columns).map(|c| format!("col{}", c)).collect(); - let cfnames: Vec<&str> = cfnames.iter().map(|n| n as &str).collect(); - - for i in 0..columns { - cf_options.push(config.column_config(&block_opts, Some(i))?); - } + let column_names: Vec<_> = (0..columns).map(|c| format!("col{}", c)).collect(); - let write_opts = WriteOptions::new(); - let mut read_opts = ReadOptions::new(); + let write_opts = WriteOptions::default(); + let mut read_opts = ReadOptions::default(); read_opts.set_verify_checksums(false); - let mut cfs: Vec = Vec::new(); - let db = match config.columns { - Some(_) => { - match DB::open_cf(&opts, path, &cfnames, &cf_options) { - Ok(db) => { - cfs = cfnames - .iter() - .map(|n| db.cf_handle(n).expect("rocksdb opens a cf_handle for each cfname; qed")) - .collect(); - Ok(db) - } - Err(_) => { - // retry and create CFs - match DB::open_cf(&opts, path, &[], &[]) { - Ok(mut db) => { - cfs = cfnames - .iter() - .enumerate() - .map(|(i, n)| db.create_cf(n, &cf_options[i])) - .collect::<::std::result::Result<_, _>>() + let db = if config.columns.is_some() { + let cf_descriptors: Vec<_> = (0..columns) + .map(|i| ColumnFamilyDescriptor::new(&column_names[i as usize], config.column_config(&block_opts, i))) + .collect(); + + match DB::open_cf_descriptors(&opts, path, cf_descriptors) { + Err(_) => { + // retry and create CFs + match DB::open_cf(&opts, path, &[] as &[&str]) { + Ok(mut db) => { + for (i, name) in column_names.iter().enumerate() { + let _ = db + .create_cf(name, &config.column_config(&block_opts, i as u32)) .map_err(other_io_err)?; - Ok(db) } - err => err, + Ok(db) } + err => err, } } + ok => ok, } - None => DB::open(&opts, path), + } else { + DB::open(&opts, path) }; let db = match db { @@ -380,29 +372,29 @@ impl Database { warn!("DB corrupted: {}, attempting repair", s); DB::repair(&opts, path).map_err(other_io_err)?; - if cfnames.is_empty() { - DB::open(&opts, path).map_err(other_io_err)? - } else { - let db = DB::open_cf(&opts, path, &cfnames, &cf_options).map_err(other_io_err)?; - cfs = cfnames - .iter() - .map(|n| db.cf_handle(n).expect("rocksdb opens a cf_handle for each cfname; qed")) + if config.columns.is_some() { + let cf_descriptors: Vec<_> = (0..columns) + .map(|i| { + ColumnFamilyDescriptor::new(&column_names[i as usize], config.column_config(&block_opts, i)) + }) .collect(); - db + + DB::open_cf_descriptors(&opts, path, cf_descriptors).map_err(other_io_err)? + } else { + DB::open(&opts, path).map_err(other_io_err)? } } Err(s) => return Err(other_io_err(s)), }; - let num_cols = cfs.len(); Ok(Database { - db: RwLock::new(Some(DBAndColumns { db, cfs })), + db: RwLock::new(Some(DBAndColumns { db, column_names })), config: config.clone(), - write_opts, - overlay: RwLock::new((0..(num_cols + 1)).map(|_| HashMap::new()).collect()), - flushing: RwLock::new((0..(num_cols + 1)).map(|_| HashMap::new()).collect()), + overlay: RwLock::new((0..=columns).map(|_| HashMap::new()).collect()), + flushing: RwLock::new((0..=columns).map(|_| HashMap::new()).collect()), flushing_lock: Mutex::new(false), path: path.to_owned(), read_opts, + write_opts, block_opts, }) } @@ -437,8 +429,8 @@ impl Database { /// Commit buffered changes to database. Must be called under `flush_lock` fn write_flushing_with_lock(&self, _lock: &mut MutexGuard<'_, bool>) -> io::Result<()> { match *self.db.read() { - Some(DBAndColumns { ref db, ref cfs }) => { - let batch = WriteBatch::new(); + Some(ref cfs) => { + let mut batch = WriteBatch::default(); mem::swap(&mut *self.overlay.write(), &mut *self.flushing.write()); { for (c, column) in self.flushing.read().iter().enumerate() { @@ -446,14 +438,16 @@ impl Database { match *state { KeyState::Delete => { if c > 0 { - batch.delete_cf(cfs[c - 1], key).map_err(other_io_err)?; + let cf = cfs.get_cf(c - 1); + batch.delete_cf(cf, key).map_err(other_io_err)?; } else { batch.delete(key).map_err(other_io_err)?; } } KeyState::Insert(ref value) => { if c > 0 { - batch.put_cf(cfs[c - 1], key, value).map_err(other_io_err)?; + let cf = cfs.get_cf(c - 1); + batch.put_cf(cf, key, value).map_err(other_io_err)?; } else { batch.put(key, value).map_err(other_io_err)?; } @@ -463,7 +457,7 @@ impl Database { } } - check_for_corruption(&self.path, db.write_opt(batch, &self.write_opts))?; + check_for_corruption(&self.path, cfs.db.write_opt(batch, &self.write_opts))?; for column in self.flushing.write().iter_mut() { column.clear(); @@ -493,8 +487,8 @@ impl Database { /// Commit transaction to database. pub fn write(&self, tr: DBTransaction) -> io::Result<()> { match *self.db.read() { - Some(DBAndColumns { ref db, ref cfs }) => { - let batch = WriteBatch::new(); + Some(ref cfs) => { + let mut batch = WriteBatch::default(); let ops = tr.ops; for op in ops { // remove any buffered operation for this key @@ -503,16 +497,16 @@ impl Database { match op { DBOp::Insert { col, key, value } => match col { None => batch.put(&key, &value).map_err(other_io_err)?, - Some(c) => batch.put_cf(cfs[c as usize], &key, &value).map_err(other_io_err)?, + Some(c) => batch.put_cf(cfs.get_cf(c as usize), &key, &value).map_err(other_io_err)?, }, DBOp::Delete { col, key } => match col { None => batch.delete(&key).map_err(other_io_err)?, - Some(c) => batch.delete_cf(cfs[c as usize], &key).map_err(other_io_err)?, + Some(c) => batch.delete_cf(cfs.get_cf(c as usize), &key).map_err(other_io_err)?, }, } } - check_for_corruption(&self.path, db.write_opt(batch, &self.write_opts)) + check_for_corruption(&self.path, cfs.db.write_opt(batch, &self.write_opts)) } None => Err(other_io_err("Database is closed")), } @@ -521,7 +515,7 @@ impl Database { /// Get value by key. pub fn get(&self, col: Option, key: &[u8]) -> io::Result> { match *self.db.read() { - Some(DBAndColumns { ref db, ref cfs }) => { + Some(ref cfs) => { let overlay = &self.overlay.read()[Self::to_overlay_column(col)]; match overlay.get(key) { Some(&KeyState::Insert(ref value)) => Ok(Some(value.clone())), @@ -533,9 +527,10 @@ impl Database { Some(&KeyState::Delete) => Ok(None), None => col .map_or_else( - || db.get_opt(key, &self.read_opts).map(|r| r.map(|v| DBValue::from_slice(&v))), + || cfs.db.get_opt(key, &self.read_opts).map(|r| r.map(|v| DBValue::from_slice(&v))), |c| { - db.get_cf_opt(cfs[c as usize], key, &self.read_opts) + cfs.db + .get_cf_opt(cfs.get_cf(c as usize), key, &self.read_opts) .map(|r| r.map(|v| DBValue::from_slice(&v))) }, ) @@ -551,26 +546,18 @@ impl Database { /// Get value by partial key. Prefix size should match configured prefix size. Only searches flushed values. // TODO: support prefix seek for unflushed data pub fn get_by_prefix(&self, col: Option, prefix: &[u8]) -> Option> { - self.iter_from_prefix(col, prefix).and_then(|mut iter| { - match iter.next() { - // TODO: use prefix_same_as_start read option (not available in C API currently) - Some((k, v)) => { - if k[0..prefix.len()] == prefix[..] { - Some(v) - } else { - None - } - } - _ => None, - } - }) + self.iter_from_prefix(col, prefix).next().map(|(_, v)| v) } /// Get database iterator for flushed data. - pub fn iter(&self, col: Option) -> Option> { - match *self.db.read() { - Some(DBAndColumns { ref db, ref cfs }) => { - let overlay = &self.overlay.read()[Self::to_overlay_column(col)]; + /// Will hold a lock until the iterator is dropped + /// preventing the database from being closed. + pub fn iter<'a>(&'a self, col: Option) -> impl Iterator + 'a { + let read_lock = self.db.read(); + let optional = if read_lock.is_some() { + let c = Self::to_overlay_column(col); + let overlay_data = { + let overlay = &self.overlay.read()[c]; let mut overlay_data = overlay .iter() .filter_map(|(k, v)| match *v { @@ -581,40 +568,33 @@ impl Database { }) .collect::>(); overlay_data.sort(); + overlay_data + }; - let iter = col.map_or_else( - || db.iterator_opt(IteratorMode::Start, &self.read_opts), - |c| { - db.iterator_cf_opt(cfs[c as usize], IteratorMode::Start, &self.read_opts) - .expect("iterator params are valid; qed") - }, - ); - - Some(DatabaseIterator { iter: interleave_ordered(overlay_data, iter), _marker: PhantomData }) - } - None => None, - } + let guarded = iter::ReadGuardedIterator::new(read_lock, col); + Some(interleave_ordered(overlay_data, guarded)) + } else { + None + }; + optional.into_iter().flat_map(identity) } - - fn iter_from_prefix(&self, col: Option, prefix: &[u8]) -> Option> { - match *self.db.read() { - Some(DBAndColumns { ref db, ref cfs }) => { - let iter = col.map_or_else( - || db.iterator_opt(IteratorMode::From(prefix, Direction::Forward), &self.read_opts), - |c| { - db.iterator_cf_opt( - cfs[c as usize], - IteratorMode::From(prefix, Direction::Forward), - &self.read_opts, - ) - .expect("iterator params are valid; qed") - }, - ); - - Some(DatabaseIterator { iter: interleave_ordered(Vec::new(), iter), _marker: PhantomData }) - } - None => None, - } + /// Get database iterator from prefix for flushed data. + /// Will hold a lock until the iterator is dropped + /// preventing the database from being closed. + fn iter_from_prefix<'a>( + &'a self, + col: Option, + prefix: &'a [u8], + ) -> impl Iterator + 'a { + let read_lock = self.db.read(); + let optional = if read_lock.is_some() { + let guarded = iter::ReadGuardedIterator::new_from_prefix(read_lock, col, prefix); + Some(interleave_ordered(Vec::new(), guarded)) + } else { + None + }; + // workaround for https://github.com/facebook/rocksdb/issues/2343 + optional.into_iter().flat_map(identity).filter(move |(k, _)| k.starts_with(prefix)) } /// Close the database @@ -665,7 +645,7 @@ impl Database { self.db .read() .as_ref() - .and_then(|db| if db.cfs.is_empty() { None } else { Some(db.cfs.len()) }) + .and_then(|db| if db.column_names.is_empty() { None } else { Some(db.column_names.len()) }) .map(|n| n as u32) .unwrap_or(0) } @@ -673,9 +653,8 @@ impl Database { /// Drop a column family. pub fn drop_column(&self) -> io::Result<()> { match *self.db.write() { - Some(DBAndColumns { ref mut db, ref mut cfs }) => { - if let Some(_col) = cfs.pop() { - let name = format!("col{}", cfs.len()); + Some(DBAndColumns { ref mut db, ref mut column_names }) => { + if let Some(name) = column_names.pop() { db.drop_cf(&name).map_err(other_io_err)?; } Ok(()) @@ -687,11 +666,12 @@ impl Database { /// Add a column family. pub fn add_column(&self) -> io::Result<()> { match *self.db.write() { - Some(DBAndColumns { ref mut db, ref mut cfs }) => { - let col = cfs.len(); + Some(DBAndColumns { ref mut db, ref mut column_names }) => { + let col = column_names.len() as u32; let name = format!("col{}", col); - let col_config = self.config.column_config(&self.block_opts, Some(col as u32))?; - cfs.push(db.create_cf(&name, &col_config).map_err(other_io_err)?); + let col_config = self.config.column_config(&self.block_opts, col as u32); + let _ = db.create_cf(&name, &col_config).map_err(other_io_err)?; + column_names.push(name); Ok(()) } None => Ok(()), @@ -722,18 +702,18 @@ impl KeyValueDB for Database { Database::flush(self) } - fn iter<'a>(&'a self, col: Option) -> Box, Box<[u8]>)> + 'a> { + fn iter<'a>(&'a self, col: Option) -> Box + 'a> { let unboxed = Database::iter(self, col); - Box::new(unboxed.into_iter().flat_map(|inner| inner)) + Box::new(unboxed.into_iter()) } fn iter_from_prefix<'a>( &'a self, col: Option, prefix: &'a [u8], - ) -> Box, Box<[u8]>)> + 'a> { + ) -> Box + 'a> { let unboxed = Database::iter_from_prefix(self, col, prefix); - Box::new(unboxed.into_iter().flat_map(|inner| inner)) + Box::new(unboxed.into_iter()) } fn restore(&self, new_db: &str) -> io::Result<()> { @@ -752,6 +732,7 @@ impl Drop for Database { mod tests { use super::*; use ethereum_types::H256; + use std::io::Read; use std::str::FromStr; use tempdir::TempDir; @@ -760,22 +741,32 @@ mod tests { let db = Database::open(config, tempdir.path().to_str().unwrap()).unwrap(); let key1 = H256::from_str("02c69be41d0b7e40352fc85be1cd65eb03d40ef8427a0ca4596b1ead9a00e9fc").unwrap(); let key2 = H256::from_str("03c69be41d0b7e40352fc85be1cd65eb03d40ef8427a0ca4596b1ead9a00e9fc").unwrap(); - let key3 = H256::from_str("01c69be41d0b7e40352fc85be1cd65eb03d40ef8427a0ca4596b1ead9a00e9fc").unwrap(); + let key3 = H256::from_str("04c00000000b7e40352fc85be1cd65eb03d40ef8427a0ca4596b1ead9a00e9fc").unwrap(); + let key4 = H256::from_str("04c01111110b7e40352fc85be1cd65eb03d40ef8427a0ca4596b1ead9a00e9fc").unwrap(); + let key5 = H256::from_str("04c02222220b7e40352fc85be1cd65eb03d40ef8427a0ca4596b1ead9a00e9fc").unwrap(); let mut batch = db.transaction(); batch.put(None, key1.as_bytes(), b"cat"); batch.put(None, key2.as_bytes(), b"dog"); + batch.put(None, key3.as_bytes(), b"caterpillar"); + batch.put(None, key4.as_bytes(), b"beef"); + batch.put(None, key5.as_bytes(), b"fish"); db.write(batch).unwrap(); assert_eq!(&*db.get(None, key1.as_bytes()).unwrap().unwrap(), b"cat"); - let contents: Vec<_> = db.iter(None).into_iter().flat_map(|inner| inner).collect(); - assert_eq!(contents.len(), 2); + let contents: Vec<_> = db.iter(None).into_iter().collect(); + assert_eq!(contents.len(), 5); assert_eq!(&*contents[0].0, key1.as_bytes()); assert_eq!(&*contents[0].1, b"cat"); assert_eq!(&*contents[1].0, key2.as_bytes()); assert_eq!(&*contents[1].1, b"dog"); + let mut prefix_iter = db.iter_from_prefix(None, &[0x04, 0xc0]); + assert_eq!(*prefix_iter.next().unwrap().1, b"caterpillar"[..]); + assert_eq!(*prefix_iter.next().unwrap().1, b"beef"[..]); + assert_eq!(*prefix_iter.next().unwrap().1, b"fish"[..]); + let mut batch = db.transaction(); batch.delete(None, key1.as_bytes()); db.write(batch).unwrap(); @@ -861,11 +852,11 @@ mod tests { let config = DatabaseConfig::default(); let config_5 = DatabaseConfig::with_columns(Some(5)); - let tempdir = TempDir::new("").unwrap(); + let tempdir = TempDir::new("drop_columns").unwrap(); // open 5, remove all. { - let db = Database::open(&config_5, tempdir.path().to_str().unwrap()).unwrap(); + let db = Database::open(&config_5, tempdir.path().to_str().unwrap()).expect("open with 5 columns"); assert_eq!(db.num_columns(), 5); for i in (0..5).rev() { @@ -881,6 +872,55 @@ mod tests { } } + #[test] + fn test_iter_by_prefix() { + let tempdir = TempDir::new("").unwrap(); + let config = DatabaseConfig::default(); + let db = Database::open(&config, tempdir.path().to_str().unwrap()).unwrap(); + + let key1 = b"0"; + let key2 = b"ab"; + let key3 = b"abc"; + let key4 = b"abcd"; + + let mut batch = db.transaction(); + batch.put(None, key1, key1); + batch.put(None, key2, key2); + batch.put(None, key3, key3); + batch.put(None, key4, key4); + db.write(batch).unwrap(); + + // empty prefix + let contents: Vec<_> = db.iter_from_prefix(None, b"").into_iter().collect(); + assert_eq!(contents.len(), 4); + assert_eq!(&*contents[0].0, key1); + assert_eq!(&*contents[1].0, key2); + assert_eq!(&*contents[2].0, key3); + assert_eq!(&*contents[3].0, key4); + + // prefix a + let contents: Vec<_> = db.iter_from_prefix(None, b"a").into_iter().collect(); + assert_eq!(contents.len(), 3); + assert_eq!(&*contents[0].0, key2); + assert_eq!(&*contents[1].0, key3); + assert_eq!(&*contents[2].0, key4); + + // prefix abc + let contents: Vec<_> = db.iter_from_prefix(None, b"abc").into_iter().collect(); + assert_eq!(contents.len(), 2); + assert_eq!(&*contents[0].0, key3); + assert_eq!(&*contents[1].0, key4); + + // prefix abcde + let contents: Vec<_> = db.iter_from_prefix(None, b"abcde").into_iter().collect(); + assert_eq!(contents.len(), 0); + + // prefix 0 + let contents: Vec<_> = db.iter_from_prefix(None, b"0").into_iter().collect(); + assert_eq!(contents.len(), 1); + assert_eq!(&*contents[0].0, key1); + } + #[test] fn write_clears_buffered_ops() { let tempdir = TempDir::new("").unwrap(); @@ -897,4 +937,94 @@ mod tests { assert_eq!(db.get(None, b"foo").unwrap().unwrap().as_ref(), b"baz"); } + + #[test] + fn default_memory_budget() { + let c = DatabaseConfig::default(); + assert_eq!(c.columns, None); + assert_eq!(c.memory_budget(), DB_DEFAULT_MEMORY_BUDGET_MB * MB, "total memory budget is default"); + assert_eq!( + c.memory_budget_for_col(0), + DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB * MB, + "total memory budget for column 0 is the default" + ); + assert_eq!( + c.memory_budget_for_col(999), + DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB * MB, + "total memory budget for any column is the default" + ); + } + + #[test] + fn memory_budget() { + let mut c = DatabaseConfig::with_columns(Some(3)); + c.memory_budget = [(0, 10), (1, 15), (2, 20)].iter().cloned().map(|(c, b)| (Some(c), b)).collect(); + assert_eq!(c.memory_budget(), 45 * MB, "total budget is the sum of the column budget"); + } + + #[test] + fn rocksdb_settings() { + const NUM_COLS: usize = 2; + let mut cfg = DatabaseConfig::with_columns(Some(NUM_COLS as u32)); + cfg.max_open_files = 123; // is capped by the OS fd limit (typically 1024) + cfg.compaction.block_size = 323232; + cfg.compaction.initial_file_size = 102030; + cfg.memory_budget = [(0, 30), (1, 300)].iter().cloned().map(|(c, b)| (Some(c), b)).collect(); + + let db_path = TempDir::new("config_test").expect("the OS can create tmp dirs"); + let _db = Database::open(&cfg, db_path.path().to_str().unwrap()).expect("can open a db"); + let mut rocksdb_log = std::fs::File::open(format!("{}/LOG", db_path.path().to_str().unwrap())) + .expect("rocksdb creates a LOG file"); + let mut settings = String::new(); + rocksdb_log.read_to_string(&mut settings).unwrap(); + // Check column count + assert!(settings.contains("Options for column family [default]"), "no default col"); + assert!(settings.contains("Options for column family [col0]"), "no col0"); + assert!(settings.contains("Options for column family [col1]"), "no col1"); + + // Check max_open_files + assert!(settings.contains("max_open_files: 123")); + + // Check block size + assert!(settings.contains(" block_size: 323232")); + + // LRU cache (default column) + assert!(settings.contains("block_cache_options:\n capacity : 8388608")); + // LRU cache for non-default columns is ⅓ of memory budget (including default column) + let lru_size = (330 * MB) / 3; + let needle = format!("block_cache_options:\n capacity : {}", lru_size); + let lru = settings.match_indices(&needle).collect::>().len(); + assert_eq!(lru, NUM_COLS); + + // Index/filters share cache + let include_indexes = settings.matches("cache_index_and_filter_blocks: 1").collect::>().len(); + assert_eq!(include_indexes, NUM_COLS); + // Pin index/filters on L0 + let pins = settings.matches("pin_l0_filter_and_index_blocks_in_cache: 1").collect::>().len(); + assert_eq!(pins, NUM_COLS); + + // Check target file size, aka initial file size + let l0_sizes = settings.matches("target_file_size_base: 102030").collect::>().len(); + assert_eq!(l0_sizes, NUM_COLS); + // The default column uses the default of 64Mb regardless of the setting. + assert!(settings.contains("target_file_size_base: 67108864")); + + // Check compression settings + let snappy_compression = settings.matches("Options.compression: Snappy").collect::>().len(); + // All columns use Snappy + assert_eq!(snappy_compression, NUM_COLS + 1); + // …even for L7 + let snappy_bottommost = settings.matches("Options.bottommost_compression: Disabled").collect::>().len(); + assert_eq!(snappy_bottommost, NUM_COLS + 1); + + // 7 levels + let levels = settings.matches("Options.num_levels: 7").collect::>().len(); + assert_eq!(levels, NUM_COLS + 1); + + // Don't fsync every store + assert!(settings.contains("Options.use_fsync: 0")); + + // We're using the old format + assert!(settings.contains("format_version: 2")); + } }