diff --git a/kvdb-memorydb/Cargo.toml b/kvdb-memorydb/Cargo.toml index 301e562bf..da599572b 100644 --- a/kvdb-memorydb/Cargo.toml +++ b/kvdb-memorydb/Cargo.toml @@ -1,11 +1,11 @@ [package] name = "kvdb-memorydb" -version = "0.1.0" +version = "0.2.0" authors = ["Parity Technologies "] repository = "https://github.com/paritytech/parity-common" description = "A key-value in-memory database that implements the `KeyValueDB` trait" license = "GPL-3.0" [dependencies] -parking_lot = "0.6" -kvdb = { version = "0.1", path = "../kvdb" } +parking_lot = "0.7" +kvdb = { version = "0.2", path = "../kvdb" } diff --git a/kvdb-rocksdb/Cargo.toml b/kvdb-rocksdb/Cargo.toml index 0465be84a..b21fae067 100644 --- a/kvdb-rocksdb/Cargo.toml +++ b/kvdb-rocksdb/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "kvdb-rocksdb" -version = "0.1.4" +version = "0.2.0" authors = ["Parity Technologies "] repository = "https://github.com/paritytech/parity-common" description = "kvdb implementation backed by rocksDB" @@ -10,10 +10,10 @@ license = "GPL-3.0" elastic-array = "0.10" fs-swap = "0.2.4" interleaved-ordered = "0.1.0" -kvdb = { version = "0.1", path = "../kvdb" } +kvdb = { version = "0.2", path = "../kvdb" } log = "0.4" num_cpus = "1.0" -parking_lot = "0.6" +parking_lot = "0.7" regex = "1.0" parity-rocksdb = "0.5" diff --git a/kvdb-rocksdb/src/lib.rs b/kvdb-rocksdb/src/lib.rs index eceebbc43..769478bb6 100644 --- a/kvdb-rocksdb/src/lib.rs +++ b/kvdb-rocksdb/src/lib.rs @@ -17,11 +17,8 @@ #[macro_use] extern crate log; -extern crate elastic_array; extern crate fs_swap; -extern crate interleaved_ordered; extern crate num_cpus; -extern crate parking_lot; extern crate regex; extern crate parity_rocksdb; @@ -30,21 +27,20 @@ extern crate ethereum_types; extern crate kvdb; -use std::collections::HashMap; -use std::marker::PhantomData; -use std::{cmp, fs, io, mem, result, error}; +use std::{cmp, fs, io, result, error}; use std::path::Path; -use parking_lot::{Mutex, MutexGuard, RwLock}; use parity_rocksdb::{ DB, Writable, WriteBatch, WriteOptions, IteratorMode, DBIterator, Options, BlockBasedOptions, Direction, Cache, Column, ReadOptions }; -use interleaved_ordered::{interleave_ordered, InterleaveOrdered}; -use elastic_array::ElasticArray32; -use fs_swap::{swap, swap_nonatomic}; -use kvdb::{KeyValueDB, DBTransaction, DBValue, DBOp}; +use kvdb::{ + DBValue, NumColumns, OpenHandler, TransactionHandler, IterationHandler, + MigrationHandler, WriteTransaction, ReadTransaction, +}; + +pub use kvdb::DatabaseWithCache; #[cfg(target_os = "linux")] use regex::Regex; @@ -60,11 +56,8 @@ fn other_io_err(e: E) -> io::Error where E: Into { - iter: InterleaveOrdered<::std::vec::IntoIter<(Box<[u8]>, Box<[u8]>)>, DBIterator>, - _marker: PhantomData<&'a Database>, -} - -impl<'a> Iterator for DatabaseIterator<'a> { - type Item = (Box<[u8]>, Box<[u8]>); - - fn next(&mut self) -> Option { - self.iter.next() - } -} - -struct DBAndColumns { +pub struct DBAndColumns { db: DB, cfs: Vec, + path: String, + write_opts: WriteOptions, + read_opts: ReadOptions, + block_opts: BlockBasedOptions, } // get column family configuration from database config. @@ -244,48 +224,12 @@ fn col_config(config: &DatabaseConfig, block_opts: &BlockBasedOptions) -> io::Re } /// Key-Value database. -pub struct Database { - db: RwLock>, - config: DatabaseConfig, - write_opts: WriteOptions, - read_opts: ReadOptions, - block_opts: BlockBasedOptions, - path: String, - // Dirty values added with `write_buffered`. Cleaned on `flush`. - overlay: RwLock, KeyState>>>, - // Values currently being flushed. Cleared when `flush` completes. - flushing: RwLock, KeyState>>>, - // Prevents concurrent flushes. - // Value indicates if a flush is in progress. - flushing_lock: Mutex, -} - -#[inline] -fn check_for_corruption>(path: P, res: result::Result) -> io::Result { - if let Err(ref s) = res { - if s.starts_with("Corruption:") { - warn!("DB corrupted: {}. Repair will be triggered on next restart", s); - let _ = fs::File::create(path.as_ref().join(Database::CORRUPTION_FILE_NAME)); - } - } +pub type Database = DatabaseWithCache; - res.map_err(other_io_err) -} +impl OpenHandler for DBAndColumns { + type Config = DatabaseConfig; -fn is_corrupted(s: &str) -> bool { - s.starts_with("Corruption:") || s.starts_with("Invalid argument: You have to open all column families") -} - -impl Database { - const CORRUPTION_FILE_NAME: &'static str = "CORRUPTED"; - - /// Open database with default settings. - pub fn open_default(path: &str) -> io::Result { - Database::open(&DatabaseConfig::default(), path) - } - - /// Open database file. Creates if it does not exist. - pub fn open(config: &DatabaseConfig, path: &str) -> io::Result { + fn open(config: &Self::Config, path: &str) -> io::Result { let mut opts = Options::new(); if let Some(rate_limit) = config.compaction.write_rate_limit { @@ -309,7 +253,7 @@ impl Database { } // attempt database repair if it has been previously marked as corrupted - let db_corrupted = Path::new(path).join(Database::CORRUPTION_FILE_NAME); + let db_corrupted = Path::new(path).join(CORRUPTION_FILE_NAME); if db_corrupted.exists() { warn!("DB has been previously marked as corrupted, attempting repair"); DB::repair(&opts, path).map_err(other_io_err)?; @@ -378,339 +322,167 @@ impl Database { return Err(other_io_err(s)) } }; - let num_cols = cfs.len(); - Ok(Database { - db: RwLock::new(Some(DBAndColumns{ db: db, cfs: cfs })), - config: config.clone(), - write_opts: write_opts, - overlay: RwLock::new((0..(num_cols + 1)).map(|_| HashMap::new()).collect()), - flushing: RwLock::new((0..(num_cols + 1)).map(|_| HashMap::new()).collect()), - flushing_lock: Mutex::new(false), + + Ok(DBAndColumns{ + db, + cfs, path: path.to_owned(), - read_opts: read_opts, - block_opts: block_opts, + write_opts, + read_opts, + block_opts, }) } +} - /// Helper to create new transaction for this database. - pub fn transaction(&self) -> DBTransaction { - DBTransaction::new() - } - - fn to_overlay_column(col: Option) -> usize { - col.map_or(0, |c| (c + 1) as usize) - } - - /// Commit transaction to database. - pub fn write_buffered(&self, tr: DBTransaction) { - let mut overlay = self.overlay.write(); - let ops = tr.ops; - for op in ops { - match op { - DBOp::Insert { col, key, value } => { - let c = Self::to_overlay_column(col); - overlay[c].insert(key, KeyState::Insert(value)); - }, - DBOp::Delete { col, key } => { - let c = Self::to_overlay_column(col); - overlay[c].insert(key, KeyState::Delete); - }, - } - }; +impl NumColumns for DatabaseConfig { + fn num_columns(&self) -> usize { + (self.columns.unwrap_or_default() + 1) as usize } +} - /// Commit buffered changes to database. Must be called under `flush_lock` - fn write_flushing_with_lock(&self, _lock: &mut MutexGuard) -> io::Result<()> { - match *self.db.read() { - Some(DBAndColumns { ref db, ref cfs }) => { - let batch = WriteBatch::new(); - mem::swap(&mut *self.overlay.write(), &mut *self.flushing.write()); - { - for (c, column) in self.flushing.read().iter().enumerate() { - for (key, state) in column.iter() { - match *state { - KeyState::Delete => { - if c > 0 { - batch.delete_cf(cfs[c - 1], key).map_err(other_io_err)?; - } else { - batch.delete(key).map_err(other_io_err)?; - } - }, - KeyState::Insert(ref value) => { - if c > 0 { - batch.put_cf(cfs[c - 1], key, value).map_err(other_io_err)?; - } else { - batch.put(key, value).map_err(other_io_err)?; - } - }, - } - } - } - } - - check_for_corruption( - &self.path, - db.write_opt(batch, &self.write_opts))?; +struct RocksDBWriteTransaction<'a> { + batch: WriteBatch, + path: &'a str, + db: &'a DB, + write_opts: &'a WriteOptions, + cfs: &'a [Column], +} - for column in self.flushing.write().iter_mut() { - column.clear(); - column.shrink_to_fit(); - } - Ok(()) - }, - None => Err(other_io_err("Database is closed")) +impl<'a> WriteTransaction for RocksDBWriteTransaction<'a> { + fn put(&mut self, c: usize, key: &[u8], value: &[u8]) -> io::Result<()> { + if c > 0 { + self.batch.put_cf(self.cfs[c - 1], key, value).map_err(other_io_err) + } else { + self.batch.put(key, value).map_err(other_io_err) } } - /// Commit buffered changes to database. - pub fn flush(&self) -> io::Result<()> { - let mut lock = self.flushing_lock.lock(); - // If RocksDB batch allocation fails the thread gets terminated and the lock is released. - // The value inside the lock is used to detect that. - if *lock { - // This can only happen if another flushing thread is terminated unexpectedly. - return Err(other_io_err("Database write failure. Running low on memory perhaps?")) + fn delete(&mut self, c: usize, key: &[u8]) -> io::Result<()> { + if c > 0 { + self.batch.delete_cf(self.cfs[c - 1], key).map_err(other_io_err) + } else { + self.batch.delete(key).map_err(other_io_err) } - *lock = true; - let result = self.write_flushing_with_lock(&mut lock); - *lock = false; - result } - /// Commit transaction to database. - pub fn write(&self, tr: DBTransaction) -> io::Result<()> { - match *self.db.read() { - Some(DBAndColumns { ref db, ref cfs }) => { - let batch = WriteBatch::new(); - let ops = tr.ops; - for op in ops { - // remove any buffered operation for this key - self.overlay.write()[Self::to_overlay_column(op.col())].remove(op.key()); - - match op { - DBOp::Insert { col, key, value } => match col { - None => batch.put(&key, &value).map_err(other_io_err)?, - Some(c) => batch.put_cf(cfs[c as usize], &key, &value).map_err(other_io_err)?, - }, - DBOp::Delete { col, key } => match col { - None => batch.delete(&key).map_err(other_io_err)?, - Some(c) => batch.delete_cf(cfs[c as usize], &key).map_err(other_io_err)?, - } - } - } - - check_for_corruption(&self.path, db.write_opt(batch, &self.write_opts)) - }, - None => Err(other_io_err("Database is closed")), - } + fn commit(self: Box) -> io::Result<()> { + let Self { db, path, write_opts, batch, ..} = *self; + check_for_corruption( + path, + db.write_opt(batch, write_opts), + ) } +} - /// Get value by key. - pub fn get(&self, col: Option, key: &[u8]) -> io::Result> { - match *self.db.read() { - Some(DBAndColumns { ref db, ref cfs }) => { - let overlay = &self.overlay.read()[Self::to_overlay_column(col)]; - match overlay.get(key) { - Some(&KeyState::Insert(ref value)) => Ok(Some(value.clone())), - Some(&KeyState::Delete) => Ok(None), - None => { - let flushing = &self.flushing.read()[Self::to_overlay_column(col)]; - match flushing.get(key) { - Some(&KeyState::Insert(ref value)) => Ok(Some(value.clone())), - Some(&KeyState::Delete) => Ok(None), - None => { - col.map_or_else( - || db.get_opt(key, &self.read_opts).map(|r| r.map(|v| DBValue::from_slice(&v))), - |c| db.get_cf_opt(cfs[c as usize], key, &self.read_opts).map(|r| r.map(|v| DBValue::from_slice(&v)))) - .map_err(other_io_err) - }, - } - }, - } - }, - None => Ok(None), +struct RocksDBReadTransaction<'a> { + db: &'a DB, + read_opts: &'a ReadOptions, + cfs: &'a [Column], +} + +impl<'a> ReadTransaction for RocksDBReadTransaction<'a> { + fn get(self: Box, c: usize, key: &[u8]) -> io::Result> { + if c > 0 { + self.db.get_cf_opt(self.cfs[c - 1], key, &self.read_opts) + } else { + self.db.get_opt(key, &self.read_opts) } + .map_err(other_io_err) + .map(|r| r.map(|v| DBValue::from_slice(&v))) } +} - /// Get value by partial key. Prefix size should match configured prefix size. Only searches flushed values. - // TODO: support prefix seek for unflushed data - pub fn get_by_prefix(&self, col: Option, prefix: &[u8]) -> Option> { - self.iter_from_prefix(col, prefix).and_then(|mut iter| { - match iter.next() { - // TODO: use prefix_same_as_start read option (not availabele in C API currently) - Some((k, v)) => if k[0 .. prefix.len()] == prefix[..] { Some(v) } else { None }, - _ => None - } +impl TransactionHandler for DBAndColumns { + fn write_transaction<'a>(&'a self) -> Box { + Box::new(RocksDBWriteTransaction { + batch: WriteBatch::new(), + db: &self.db, + path: &self.path, + write_opts: &self.write_opts, + cfs: &self.cfs[..], }) } - /// Get database iterator for flushed data. - pub fn iter(&self, col: Option) -> Option { - match *self.db.read() { - Some(DBAndColumns { ref db, ref cfs }) => { - let overlay = &self.overlay.read()[Self::to_overlay_column(col)]; - let mut overlay_data = overlay.iter() - .filter_map(|(k, v)| match *v { - KeyState::Insert(ref value) => - Some((k.clone().into_vec().into_boxed_slice(), value.clone().into_vec().into_boxed_slice())), - KeyState::Delete => None, - }).collect::>(); - overlay_data.sort(); - - let iter = col.map_or_else( - || db.iterator_opt(IteratorMode::Start, &self.read_opts), - |c| db.iterator_cf_opt(cfs[c as usize], IteratorMode::Start, &self.read_opts) - .expect("iterator params are valid; qed") - ); - - Some(DatabaseIterator { - iter: interleave_ordered(overlay_data, iter), - _marker: PhantomData, - }) - }, - None => None, - } + fn read_transaction<'a>(&'a self) -> Box { + Box::new(RocksDBReadTransaction { + db: &self.db, + read_opts: &self.read_opts, + cfs: &self.cfs[..], + }) } +} - fn iter_from_prefix(&self, col: Option, prefix: &[u8]) -> Option { - match *self.db.read() { - Some(DBAndColumns { ref db, ref cfs }) => { - let iter = col.map_or_else(|| db.iterator_opt(IteratorMode::From(prefix, Direction::Forward), &self.read_opts), - |c| db.iterator_cf_opt(cfs[c as usize], IteratorMode::From(prefix, Direction::Forward), &self.read_opts) - .expect("iterator params are valid; qed")); - - Some(DatabaseIterator { - iter: interleave_ordered(Vec::new(), iter), - _marker: PhantomData, - }) - }, - None => None, +impl<'a> IterationHandler for &'a DBAndColumns { + type Iterator = DBIterator; + + fn iter(&self, c: usize) -> Self::Iterator { + if c > 0 { + self.db.iterator_cf_opt( + self.cfs[c - 1], + IteratorMode::Start, + &self.read_opts, + ).expect("iterator params are valid; qed") + } else { + self.db.iterator_opt(IteratorMode::Start, &self.read_opts) } } - /// Close the database - fn close(&self) { - *self.db.write() = None; - self.overlay.write().clear(); - self.flushing.write().clear(); - } - - /// Restore the database from a copy at given path. - pub fn restore(&self, new_db: &str) -> io::Result<()> { - self.close(); - - // swap is guaranteed to be atomic - match swap(new_db, &self.path) { - Ok(_) => { - // ignore errors - let _ = fs::remove_dir_all(new_db); - }, - Err(err) => { - debug!("DB atomic swap failed: {}", err); - match swap_nonatomic(new_db, &self.path) { - Ok(_) => { - // ignore errors - let _ = fs::remove_dir_all(new_db); - }, - Err(err) => { - warn!("Failed to swap DB directories: {:?}", err); - return Err(io::Error::new(io::ErrorKind::Other, "DB restoration failed: could not swap DB directories")); - } - } - } + fn iter_from_prefix(&self, c: usize, prefix: & [u8]) -> Self::Iterator { + if c > 0 { + self.db.iterator_cf_opt( + self.cfs[c - 1], + IteratorMode::From(prefix, Direction::Forward), + &self.read_opts, + ).expect("iterator params are valid; qed") + } else { + self.db.iterator_opt( + IteratorMode::From(prefix, Direction::Forward), + &self.read_opts, + ) } - - // reopen the database and steal handles into self - let db = Self::open(&self.config, &self.path)?; - *self.db.write() = mem::replace(&mut *db.db.write(), None); - *self.overlay.write() = mem::replace(&mut *db.overlay.write(), Vec::new()); - *self.flushing.write() = mem::replace(&mut *db.flushing.write(), Vec::new()); - Ok(()) - } - - /// The number of non-default column families. - pub fn num_columns(&self) -> u32 { - self.db.read().as_ref() - .and_then(|db| if db.cfs.is_empty() { None } else { Some(db.cfs.len()) } ) - .map(|n| n as u32) - .unwrap_or(0) } +} - /// Drop a column family. - pub fn drop_column(&self) -> io::Result<()> { - match *self.db.write() { - Some(DBAndColumns { ref mut db, ref mut cfs }) => { - if let Some(col) = cfs.pop() { - let name = format!("col{}", cfs.len()); - drop(col); - db.drop_cf(&name).map_err(other_io_err)?; - } - Ok(()) - }, - None => Ok(()), +#[inline] +fn check_for_corruption>(path: P, res: result::Result) -> io::Result { + if let Err(ref s) = res { + if s.starts_with("Corruption:") { + warn!("DB corrupted: {}. Repair will be triggered on next restart", s); + let _ = fs::File::create(path.as_ref().join(CORRUPTION_FILE_NAME)); } } - /// Add a column family. - pub fn add_column(&self) -> io::Result<()> { - match *self.db.write() { - Some(DBAndColumns { ref mut db, ref mut cfs }) => { - let col = cfs.len() as u32; - let name = format!("col{}", col); - cfs.push(db.create_cf(&name, &col_config(&self.config, &self.block_opts)?).map_err(other_io_err)?); - Ok(()) - }, - None => Ok(()), - } - } + res.map_err(other_io_err) } -// duplicate declaration of methods here to avoid trait import in certain existing cases -// at time of addition. -impl KeyValueDB for Database { - fn get(&self, col: Option, key: &[u8]) -> io::Result> { - Database::get(self, col, key) - } - - fn get_by_prefix(&self, col: Option, prefix: &[u8]) -> Option> { - Database::get_by_prefix(self, col, prefix) - } - - fn write_buffered(&self, transaction: DBTransaction) { - Database::write_buffered(self, transaction) - } - - fn write(&self, transaction: DBTransaction) -> io::Result<()> { - Database::write(self, transaction) - } - - fn flush(&self) -> io::Result<()> { - Database::flush(self) - } +fn is_corrupted(s: &str) -> bool { + s.starts_with("Corruption:") || s.starts_with("Invalid argument: You have to open all column families") +} - fn iter<'a>(&'a self, col: Option) -> Box, Box<[u8]>)> + 'a> { - let unboxed = Database::iter(self, col); - Box::new(unboxed.into_iter().flat_map(|inner| inner)) +impl NumColumns for DBAndColumns { + fn num_columns(&self) -> usize { + if self.cfs.is_empty() { 0 } else { self.cfs.len() } } +} - fn iter_from_prefix<'a>(&'a self, col: Option, prefix: &'a [u8]) - -> Box, Box<[u8]>)> + 'a> - { - let unboxed = Database::iter_from_prefix(self, col, prefix); - Box::new(unboxed.into_iter().flat_map(|inner| inner)) +impl MigrationHandler for DBAndColumns { + fn drop_column(&mut self) -> io::Result<()> { + if let Some(col) = self.cfs.pop() { + let name = format!("col{}", self.cfs.len()); + drop(col); + self.db.drop_cf(&name).map_err(other_io_err)?; + } + Ok(()) } - fn restore(&self, new_db: &str) -> io::Result<()> { - Database::restore(self, new_db) + fn add_column(&mut self, config: &>::Config) -> io::Result<()> { + let col = self.cfs.len() as u32; + let name = format!("col{}", col); + self.cfs.push(self.db.create_cf(&name, &col_config(config, &self.block_opts)?).map_err(other_io_err)?); + Ok(()) } } -impl Drop for Database { - fn drop(&mut self) { - // write all buffered changes if we can. - let _ = self.flush(); - } -} #[cfg(test)] mod tests { @@ -720,6 +492,7 @@ mod tests { use self::tempdir::TempDir; use ethereum_types::H256; use super::*; + use super::kvdb::ChangeColumns; fn test_db(config: &DatabaseConfig) { let tempdir = TempDir::new("").unwrap(); @@ -735,7 +508,7 @@ mod tests { assert_eq!(&*db.get(None, &key1).unwrap().unwrap(), b"cat"); - let contents: Vec<_> = db.iter(None).into_iter().flat_map(|inner| inner).collect(); + let contents: Vec<_> = db.iter(None).collect(); assert_eq!(contents.len(), 2); assert_eq!(&*contents[0].0, &*key1); assert_eq!(&*contents[0].1, b"cat"); diff --git a/kvdb/Cargo.toml b/kvdb/Cargo.toml index 7c839ba19..4d6dc1ffb 100644 --- a/kvdb/Cargo.toml +++ b/kvdb/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "kvdb" -version = "0.1.0" +version = "0.2.0" authors = ["Parity Technologies "] repository = "https://github.com/paritytech/parity-common" description = "Generic key-value trait" @@ -9,3 +9,9 @@ license = "GPL-3.0" [dependencies] elastic-array = "0.10" parity-bytes = { version = "0.1", path = "../parity-bytes" } +hashbrown = "0.1.8" +parking_lot = "0.7.1" +fs-swap = "0.2.4" +interleaved-ordered = "0.1.1" +log = "0.4.6" +owning_ref = "0.4.0" diff --git a/kvdb/src/iter.rs b/kvdb/src/iter.rs new file mode 100644 index 000000000..f5da687cd --- /dev/null +++ b/kvdb/src/iter.rs @@ -0,0 +1,84 @@ +use owning_ref::{OwningHandle, StableAddress}; +use parking_lot::RwLockReadGuard; +use std::ops::{Deref, DerefMut}; + +pub type KeyValuePair = (Box<[u8]>, Box<[u8]>); + + +pub struct ReadGuardedIterator<'a, I, T> { + inner: OwningHandle< + UnsafeStableAddress>>, + DerefWrapper>, + >, +} + + +// We can't implement `StableAddress` for a `RwLockReadGuard` +// directly due to orphan rules. +#[repr(transparent)] +struct UnsafeStableAddress(T); + +impl Deref for UnsafeStableAddress { + type Target = T::Target; + fn deref(&self) -> &Self::Target { + self.0.deref() + } +} + +// RwLockReadGuard dereferences to a stable address; qed +unsafe impl StableAddress for UnsafeStableAddress {} + + +struct DerefWrapper(T); + +impl Deref for DerefWrapper { + type Target = T; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for DerefWrapper { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + + +impl<'a, I: Iterator, T> Iterator for ReadGuardedIterator<'a, I, T> { + type Item = I::Item; + + fn next(&mut self) -> Option { + self.inner.deref_mut().as_mut().and_then(|iter| iter.next()) + } +} + +pub trait IterationHandler { + type Iterator: Iterator; + + fn iter(&self, col: usize) -> Self::Iterator; + fn iter_from_prefix(&self, col: usize, prefix: &[u8]) -> Self::Iterator; +} + +impl<'a, T> ReadGuardedIterator<'a, <&'a T as IterationHandler>::Iterator, T> +where + &'a T: IterationHandler, +{ + pub fn new(read_lock: RwLockReadGuard<'a, Option>, col: usize) -> Self { + Self { + inner: OwningHandle::new_with_fn(UnsafeStableAddress(read_lock), move |rlock| { + let rlock = unsafe { rlock.as_ref().expect("initialized as non-null; qed") }; + DerefWrapper(rlock.as_ref().map(|db| db.iter(col))) + }) + } + } + + pub fn new_from_prefix(read_lock: RwLockReadGuard<'a, Option>, col: usize, prefix: &[u8]) -> Self { + Self { + inner: OwningHandle::new_with_fn(UnsafeStableAddress(read_lock), move |rlock| { + let rlock = unsafe { rlock.as_ref().expect("initialized as non-null; qed") }; + DerefWrapper(rlock.as_ref().map(|db| db.iter_from_prefix(col, prefix))) + }) + } + } +} diff --git a/kvdb/src/lib.rs b/kvdb/src/lib.rs index 3295075d4..8e5de04f1 100644 --- a/kvdb/src/lib.rs +++ b/kvdb/src/lib.rs @@ -16,14 +16,28 @@ //! Key-Value store abstraction with `RocksDB` backend. +#[macro_use] +extern crate log; + extern crate elastic_array; extern crate parity_bytes as bytes; +extern crate fs_swap; +extern crate hashbrown; +extern crate parking_lot; +extern crate interleaved_ordered; +extern crate owning_ref; -use std::io; -use std::path::Path; -use std::sync::Arc; +use std::{io, mem, fs, convert::identity}; use elastic_array::{ElasticArray128, ElasticArray32}; use bytes::Bytes; +use hashbrown::HashMap; +use parking_lot::{Mutex, MutexGuard, RwLock}; +use fs_swap::{swap, swap_nonatomic}; +use interleaved_ordered::interleave_ordered; + +mod iter; + +pub use iter::IterationHandler; /// Required length of prefixes. pub const PREFIX_LEN: usize = 12; @@ -167,9 +181,432 @@ pub trait KeyValueDB: Sync + Send { fn restore(&self, new_db: &str) -> io::Result<()>; } -/// Generic key-value database handler. This trait contains one function `open`. When called, it opens database with a -/// predefined config. -pub trait KeyValueDBHandler: Send + Sync { - /// Open the predefined key-value database. - fn open(&self, path: &Path) -> io::Result>; + +/// Converts an error to an `io::Error`. +pub fn other_io_err(e: E) -> io::Error where E: Into> { + io::Error::new(io::ErrorKind::Other, e) +} + +/// An abstraction over a concrete database write transaction implementation. +pub trait WriteTransaction { + /// Insert a key-value pair in the transaction. Any existing value will be overwritten upon write. + fn put(&mut self, col: usize, key: &[u8], value: &[u8]) -> io::Result<()>; + /// Delete value by key. + fn delete(&mut self, col: usize, key: &[u8]) -> io::Result<()>; + /// Commit the transaction. + fn commit(self: Box) -> io::Result<()>; +} + +/// An abstraction over a concrete database read-only transaction implementation. +pub trait ReadTransaction { + /// Get value by key. + fn get(self: Box, col: usize, key: &[u8]) -> io::Result>; +} + +pub trait TransactionHandler { + // TODO: how to avoid boxing? + fn write_transaction<'a> (&'a self) -> Box; + fn read_transaction<'a>(&'a self) -> Box; +} + + +enum KeyState { + Insert(DBValue), + Delete, +} + +pub trait OpenHandler: Send + Sync { + /// Database configuration type. + type Config: NumColumns + Default + Clone + Send + Sync; + + /// Opens the database path. Creates if it does not exist. + fn open(config: &Self::Config, path: &str) -> io::Result; +} + +pub trait NumColumns { + /// Number of non-default columns. + fn num_columns(&self) -> usize; +} + +/// Allows dropping and appending columns to the DB. +pub trait MigrationHandler>: NumColumns { + /// Appends a new column to the database. + fn add_column(&mut self, config: &>::Config) -> io::Result<()>; + /// Drops the last column from the database. + fn drop_column(&mut self) -> io::Result<()>; +} + +pub struct DatabaseWithCache +where + DB: OpenHandler + TransactionHandler, +{ + db: RwLock>, + config: >::Config, + path: String, + // Dirty values added with `write_buffered`. Cleaned on `flush`. + overlay: RwLock, KeyState>>>, + // Values currently being flushed. Cleared when `flush` completes. + flushing: RwLock, KeyState>>>, + // Prevents concurrent flushes. + // Value indicates if a flush is in progress. + flushing_lock: Mutex, +} + +impl KeyValueDB for DatabaseWithCache +where + DB: OpenHandler + TransactionHandler, + for<'a> &'a DB: IterationHandler, +{ + fn get(&self, col: Option, key: &[u8]) -> io::Result> { + DatabaseWithCache::get(self, col, key) + } + + fn get_by_prefix(&self, col: Option, prefix: &[u8]) -> Option> { + DatabaseWithCache::get_by_prefix(self, col, prefix) + } + + fn write_buffered(&self, transaction: DBTransaction) { + DatabaseWithCache::write_buffered(self, transaction) + } + + fn write(&self, transaction: DBTransaction) -> io::Result<()> { + DatabaseWithCache::write(self, transaction) + } + + fn flush(&self) -> io::Result<()> { + DatabaseWithCache::flush(self) + } + + fn restore(&self, new_db: &str) -> io::Result<()> { + DatabaseWithCache::restore(self, new_db) + } + + fn iter<'a>(&'a self, col: Option) -> Box + 'a> { + let unboxed = DatabaseWithCache::iter(self, col); + Box::new(unboxed) + } + + fn iter_from_prefix<'a>(&'a self, col: Option, prefix: &'a [u8]) + -> Box + 'a> + { + let unboxed = DatabaseWithCache::iter_from_prefix(self, col, prefix); + Box::new(unboxed) + } +} + +impl DatabaseWithCache +where + DB: OpenHandler + TransactionHandler, + for<'a> &'a DB: IterationHandler, +{ + /// Helper to create a new transaction. + pub fn transaction(&self) -> DBTransaction { DBTransaction::new() } + /// Commit transaction to database. + pub fn write_buffered(&self, tr: DBTransaction) { + let mut overlay = self.overlay.write(); + let ops = tr.ops; + for op in ops { + match op { + DBOp::Insert { col, key, value } => { + let c = Self::to_overlay_column(col); + overlay[c].insert(key, KeyState::Insert(value)); + }, + DBOp::Delete { col, key } => { + let c = Self::to_overlay_column(col); + overlay[c].insert(key, KeyState::Delete); + }, + } + }; + } + + /// Commit transaction to database. + pub fn write(&self, tr: DBTransaction) -> io::Result<()> { + match *self.db.read() { + Some(ref db) => { + let mut txn = db.write_transaction(); + let ops = tr.ops; + for op in ops { + let c = Self::to_overlay_column(op.col()); + // remove any buffered operation for this key + self.overlay.write()[c].remove(op.key()); + + match op { + DBOp::Insert { key, value, .. } => txn.put(c, &key, &value)?, + DBOp::Delete { key, .. } => txn.delete(c, &key)?, + }; + } + + txn.commit() + }, + None => Err(other_io_err("Database is closed")), + } + } + + /// Get value by key. + pub fn get(&self, col: Option, key: &[u8]) -> io::Result> { + match *self.db.read() { + Some(ref db) => { + let c = Self::to_overlay_column(col); + let overlay = &self.overlay.read()[c]; + match overlay.get(key) { + Some(&KeyState::Insert(ref value)) => Ok(Some(value.clone())), + Some(&KeyState::Delete) => Ok(None), + None => { + let flushing = &self.flushing.read()[c]; + match flushing.get(key) { + Some(&KeyState::Insert(ref value)) => Ok(Some(value.clone())), + Some(&KeyState::Delete) => Ok(None), + None => { + let txn = db.read_transaction(); + txn.get(c, &key) + }, + } + }, + } + }, + None => Ok(None), + } + } + + /// Get value by partial key. Prefix size should match configured prefix size. Only searches flushed values. Returns the first value with a matching key or `None`. + pub fn get_by_prefix(&self, col: Option, prefix: &[u8]) -> Option> { + match self.iter_from_prefix(col, prefix).next() { + Some((k, v)) => if k.starts_with(prefix) { Some(v) } else { None }, + _ => None + } + } + + /// Restore the database from a copy at given path. + pub fn restore(&self, new_db: &str) -> io::Result<()> { + self.close(); + + // swap is guaranteed to be atomic + match swap(new_db, &self.path) { + Ok(_) => { + // ignore errors + let _ = fs::remove_dir_all(new_db); + }, + Err(err) => { + debug!("DB atomic swap failed: {}", err); + match swap_nonatomic(new_db, &self.path) { + Ok(_) => { + // ignore errors + let _ = fs::remove_dir_all(new_db); + }, + Err(err) => { + warn!("Failed to swap DB directories: {:?}", err); + return Err(io::Error::new(io::ErrorKind::Other, "DB restoration failed: could not swap DB directories")); + } + } + } + } + + // reopen the database and steal handles into self + let db = Self::open(&self.config, &self.path)?; + + *self.db.write() = mem::replace(&mut *db.db.write(), None); + *self.overlay.write() = mem::replace(&mut *db.overlay.write(), Vec::new()); + *self.flushing.write() = mem::replace(&mut *db.flushing.write(), Vec::new()); + + Ok(()) + } +} + + + +impl DatabaseWithCache +where + DB: OpenHandler + TransactionHandler, +{ + /// Opens or creates the database from the specified config and path. + pub fn open(config: &>::Config, path: &str) -> io::Result { + let db = DB::open(config, path)?; + let num_cols = config.num_columns(); + Ok(Self { + db: RwLock::new(Some(db)), + config: config.clone(), + path: path.to_owned(), + overlay: RwLock::new((0..(num_cols + 1)).map(|_| HashMap::new()).collect()), + flushing: RwLock::new((0..(num_cols + 1)).map(|_| HashMap::new()).collect()), + flushing_lock: Mutex::new(false), + }) + } + + pub fn open_default(path: &str) -> io::Result { + Self::open(&>::Config::default(), path) + } + + fn to_overlay_column(col: Option) -> usize { + col.map_or(0, |c| (c + 1) as usize) + } + + /// Close the database + fn close(&self) { + *self.db.write() = None; + self.overlay.write().clear(); + self.flushing.write().clear(); + } +} + +impl NumColumns for DatabaseWithCache +where + DB: OpenHandler + TransactionHandler + NumColumns, +{ + fn num_columns(&self) -> usize { + self.db + .read() + .as_ref() + .map(|db| db.num_columns()) + .unwrap_or(0) + } +} + +/// Allows dropping and appending columns to the DB. +/// Used for database migration. +pub trait ChangeColumns { + /// Appends a new column to the database. + fn add_column(&self) -> io::Result<()>; + /// Drops the last column from the database. + fn drop_column(&self) -> io::Result<()>; +} + +impl ChangeColumns for DatabaseWithCache +where + DB: OpenHandler + TransactionHandler + MigrationHandler, +{ + fn add_column(&self) -> io::Result<()> { + match *self.db.write() { + Some(ref mut db) => { + db.add_column(&self.config)?; + // TODO: this was not present in the previous implementation + self.overlay.write().push(Default::default()); + self.flushing.write().push(Default::default()); + Ok(()) + }, + None => Ok(()), + } + } + + fn drop_column(&self) -> io::Result<()> { + match *self.db.write() { + Some(ref mut db) => { + db.drop_column()?; + // TODO: this was not present in the previous implementation + self.overlay.write().pop(); + self.flushing.write().pop(); + Ok(()) + }, + None => Ok(()), + } + } +} + +impl DatabaseWithCache +where + DB: OpenHandler + TransactionHandler, + for<'a> &'a DB: IterationHandler, +{ + /// Get database iterator for flushed data. + pub fn iter<'a>(&'a self, col: Option) -> impl Iterator + 'a { + let read_lock = self.db.read(); + let optional = if read_lock.is_some() { + let c = Self::to_overlay_column(col); + let overlay = &self.overlay.read()[c]; + let mut overlay_data = overlay.iter() + .filter_map(|(k, v)| match *v { + KeyState::Insert(ref value) => + Some((k.clone().into_vec().into_boxed_slice(), value.clone().into_vec().into_boxed_slice())), + KeyState::Delete => None, + }).collect::>(); + overlay_data.sort(); + + let guarded = iter::ReadGuardedIterator::new(read_lock, c); + Some(interleave_ordered(overlay_data, guarded)) + } else { + None + }; + optional.into_iter().flat_map(identity) + } + + /// Get database iterator from prefix for flushed data. + pub fn iter_from_prefix<'a>( + &'a self, + col: Option, + prefix: &[u8], + ) -> impl Iterator + 'a { + let read_lock = self.db.read(); + let c = Self::to_overlay_column(col); + let optional = if read_lock.is_some() { + let guarded = iter::ReadGuardedIterator::new_from_prefix(read_lock, c, prefix); + Some(interleave_ordered(Vec::new(), guarded)) + } else { + None + }; + optional.into_iter().flat_map(identity) + } +} + +impl DatabaseWithCache +where + DB: OpenHandler + TransactionHandler, +{ + /// Commit buffered changes to database. Must be called under `flush_lock` + fn write_flushing_with_lock(&self, _lock: &mut MutexGuard) -> io::Result<()> { + match *self.db.read() { + Some(ref db) => { + let mut txn = db.write_transaction(); + mem::swap(&mut *self.overlay.write(), &mut *self.flushing.write()); + { + for (c, column) in self.flushing.read().iter().enumerate() { + for (key, state) in column.iter() { + match *state { + KeyState::Delete => { + txn.delete(c, &key)?; + }, + KeyState::Insert(ref value) => { + txn.put(c, &key, &value)?; + }, + } + } + } + } + + txn.commit()?; + + for column in self.flushing.write().iter_mut() { + column.clear(); + column.shrink_to_fit(); + } + + Ok(()) + }, + None => Err(other_io_err("Database is closed")) + } + } + + /// Commit buffered changes to database. + pub fn flush(&self) -> io::Result<()> { + let mut lock = self.flushing_lock.lock(); + // If batch allocation fails the thread gets terminated and the lock is released. + // The value inside the lock is used to detect that. + if *lock { + // This can only happen if another flushing thread is terminated unexpectedly. + return Err(other_io_err("Database write failure. Running low on memory perhaps?")) + } + *lock = true; + let result = self.write_flushing_with_lock(&mut lock); + *lock = false; + result + } +} + +impl Drop for DatabaseWithCache +where + DB: OpenHandler + TransactionHandler, +{ + fn drop(&mut self) { + if let Err(error) = self.flush() { + warn!("database flush failed while closing: {}", error); + } + } }