Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
8ee1dad
kvdb-rocksdb: update to new set_upper_bound API
ordian Apr 22, 2020
2d92d86
kvdb-rocksdb: update rocksdb to crates.io version
ordian Apr 22, 2020
0e0bca8
kvdb-rocksdb: update the changelog
ordian Apr 22, 2020
1731b14
Fix build? Set VM template.
dvdplm Apr 23, 2020
27d6768
Fix build? correct image name
dvdplm Apr 23, 2020
b5aa3ce
Fix build? Maybe it's 2019?
dvdplm Apr 23, 2020
ace87ee
appveyor: try release build
ordian Apr 23, 2020
080de4a
Revert "appveyor: try release build"
ordian Apr 23, 2020
f168c09
checkout https://github.com/rust-rocksdb/rust-rocksdb/pull/412
ordian Apr 23, 2020
c64a021
revert patch
ordian Apr 23, 2020
f1fead0
revert unrelated changes
ordian Apr 23, 2020
36f799f
Merge branch 'master' into ao-upgrade-rocksdb
ordian Apr 23, 2020
f1d94ea
add open as secondary rocksdb api
insipx Apr 28, 2020
b1e1a38
Update kvdb-rocksdb/src/lib.rs
insipx Apr 28, 2020
06df3fd
add more information to secondary mode comment
insipx Apr 28, 2020
faa9f6f
add function to catch up a secondary instance with a primary instance
insipx Apr 29, 2020
54f5cb8
one more doc comment for more clarity
insipx Apr 29, 2020
8c032f3
style fixes
insipx Apr 29, 2020
ccbd7d5
Update kvdb-rocksdb/src/lib.rs
insipx Apr 29, 2020
234baaa
Update kvdb-rocksdb/src/lib.rs
insipx Apr 29, 2020
66f2258
change name of `secondary_mode` option to `secondary`
insipx Apr 29, 2020
d31bdc9
Merge branch 'master' of github.com:paritytech/parity-common into ins…
insipx Apr 29, 2020
4c0aaee
Update kvdb-rocksdb/src/lib.rs
insipx Apr 29, 2020
1019484
fix some punctuation
insipx Apr 29, 2020
6a402f8
specify a different directory for secondary instance to store its logs
insipx Apr 30, 2020
d43daf6
Update kvdb-rocksdb/src/lib.rs
insipx Apr 30, 2020
78c1a05
remove catching up on primary db in test
insipx Apr 30, 2020
be980bf
doc comment fixes
insipx Apr 30, 2020
3636b97
remove wrong info about blocking primary instance
insipx Apr 30, 2020
d301420
more docs for catch-up-with-primary
insipx Apr 30, 2020
b5719da
grammar
insipx May 1, 2020
99bddd7
make `max_open_files` comment clearer
insipx May 1, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions kvdb-rocksdb/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ The format is based on [Keep a Changelog].
[Keep a Changelog]: http://keepachangelog.com/en/1.0.0/

## [Unreleased]
- Updated RocksDB to 6.7.3. [#379](https://github.com/paritytech/parity-common/pull/379)
### Breaking
- Updated to the new `kvdb` interface. [#313](https://github.com/paritytech/parity-common/pull/313)
- Rename and optimize prefix iteration. [#365](https://github.com/paritytech/parity-common/pull/365)
Expand Down
2 changes: 1 addition & 1 deletion kvdb-rocksdb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ log = "0.4.8"
num_cpus = "1.10.1"
parking_lot = "0.10.0"
regex = "1.3.1"
rocksdb = { version = "0.13", features = ["snappy"], default-features = false }
rocksdb = { version = "0.14", features = ["snappy"], default-features = false }
owning_ref = "0.4.0"
parity-util-mem = { path = "../parity-util-mem", version = "0.6", default-features = false, features = ["std", "smallvec"] }

Expand Down
43 changes: 14 additions & 29 deletions kvdb-rocksdb/src/iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,6 @@ pub type KeyValuePair = (Box<[u8]>, Box<[u8]>);
/// Iterator with built-in synchronization.
pub struct ReadGuardedIterator<'a, I, T> {
inner: OwningHandle<UnsafeStableAddress<'a, Option<T>>, DerefWrapper<Option<I>>>,
// We store the upper bound here
// to make sure it lives at least as long as the iterator.
// See https://github.com/rust-rocksdb/rust-rocksdb/pull/309.
// TODO: remove this once https://github.com/rust-rocksdb/rust-rocksdb/pull/377
// is merged and released.
#[allow(dead_code)]
upper_bound_prefix: Option<Box<[u8]>>,
}

// We can't implement `StableAddress` for a `RwLockReadGuard`
Expand Down Expand Up @@ -80,15 +73,15 @@ impl<'a, I: Iterator, T> Iterator for ReadGuardedIterator<'a, I, T> {
pub trait IterationHandler {
type Iterator: Iterator<Item = KeyValuePair>;

/// Create an `Iterator` over a `ColumnFamily` corresponding to the passed index. Takes a
/// reference to a `ReadOptions` to allow configuration of the new iterator (see
/// Create an `Iterator` over a `ColumnFamily` corresponding to the passed index. Takes
/// `ReadOptions` to allow configuration of the new iterator (see
/// https://github.com/facebook/rocksdb/blob/master/include/rocksdb/options.h#L1169).
fn iter(&self, col: u32, read_opts: &ReadOptions) -> Self::Iterator;
/// Create an `Iterator` over a `ColumnFamily` corresponding to the passed index. Takes a
/// reference to a `ReadOptions` to allow configuration of the new iterator (see
fn iter(&self, col: u32, read_opts: ReadOptions) -> Self::Iterator;
/// Create an `Iterator` over a `ColumnFamily` corresponding to the passed index. Takes
/// `ReadOptions` to allow configuration of the new iterator (see
/// https://github.com/facebook/rocksdb/blob/master/include/rocksdb/options.h#L1169).
/// The `Iterator` iterates over keys which start with the provided `prefix`.
fn iter_with_prefix(&self, col: u32, prefix: &[u8], read_opts: &ReadOptions) -> Self::Iterator;
fn iter_with_prefix(&self, col: u32, prefix: &[u8], read_opts: ReadOptions) -> Self::Iterator;
}

impl<'a, T> ReadGuardedIterator<'a, <&'a T as IterationHandler>::Iterator, T>
Expand All @@ -97,8 +90,8 @@ where
{
/// Creates a new `ReadGuardedIterator` that maps `RwLock<RocksDB>` to `RwLock<DBIterator>`,
/// where `DBIterator` iterates over all keys.
pub fn new(read_lock: RwLockReadGuard<'a, Option<T>>, col: u32, read_opts: &ReadOptions) -> Self {
Self { inner: Self::new_inner(read_lock, |db| db.iter(col, read_opts)), upper_bound_prefix: None }
pub fn new(read_lock: RwLockReadGuard<'a, Option<T>>, col: u32, read_opts: ReadOptions) -> Self {
Self { inner: Self::new_inner(read_lock, |db| db.iter(col, read_opts)) }
}

/// Creates a new `ReadGuardedIterator` that maps `RwLock<RocksDB>` to `RwLock<DBIterator>`,
Expand All @@ -107,13 +100,9 @@ where
read_lock: RwLockReadGuard<'a, Option<T>>,
col: u32,
prefix: &[u8],
upper_bound: Option<Box<[u8]>>,
read_opts: &ReadOptions,
read_opts: ReadOptions,
) -> Self {
Self {
inner: Self::new_inner(read_lock, |db| db.iter_with_prefix(col, prefix, read_opts)),
upper_bound_prefix: upper_bound,
}
Self { inner: Self::new_inner(read_lock, |db| db.iter_with_prefix(col, prefix, read_opts)) }
}

fn new_inner(
Expand All @@ -130,15 +119,11 @@ where
impl<'a> IterationHandler for &'a DBAndColumns {
type Iterator = DBIterator<'a>;

fn iter(&self, col: u32, read_opts: &ReadOptions) -> Self::Iterator {
self.db
.iterator_cf_opt(self.cf(col as usize), read_opts, IteratorMode::Start)
.expect("iterator params are valid; qed")
fn iter(&self, col: u32, read_opts: ReadOptions) -> Self::Iterator {
self.db.iterator_cf_opt(self.cf(col as usize), read_opts, IteratorMode::Start)
}

fn iter_with_prefix(&self, col: u32, prefix: &[u8], read_opts: &ReadOptions) -> Self::Iterator {
self.db
.iterator_cf_opt(self.cf(col as usize), read_opts, IteratorMode::From(prefix, Direction::Forward))
.expect("iterator params are valid; qed")
fn iter_with_prefix(&self, col: u32, prefix: &[u8], read_opts: ReadOptions) -> Self::Iterator {
self.db.iterator_cf_opt(self.cf(col as usize), read_opts, IteratorMode::From(prefix, Direction::Forward))
}
}
132 changes: 97 additions & 35 deletions kvdb-rocksdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,13 @@ pub struct DatabaseConfig {
/// It can have a negative performance impact up to 10% according to
/// https://github.com/facebook/rocksdb/wiki/Statistics.
pub enable_statistics: bool,
/// Open the database in secondary mode
Comment thread
insipx marked this conversation as resolved.
Outdated
/// disabled by default
///
/// must be open with `max_open_files = -1`
Comment thread
insipx marked this conversation as resolved.
Outdated
/// may have negative performance on the secondary instance if the secondary instance applies log files
/// right before the primary instance performs a compaction
pub secondary_mode: bool,
}

impl DatabaseConfig {
Expand Down Expand Up @@ -215,6 +222,7 @@ impl Default for DatabaseConfig {
columns: 1,
keep_log_file_num: 1,
enable_statistics: false,
secondary_mode: false,
Comment thread
insipx marked this conversation as resolved.
Outdated
}
}
}
Expand Down Expand Up @@ -305,14 +313,24 @@ fn generate_options(config: &DatabaseConfig) -> Options {
}
opts.set_use_fsync(false);
opts.create_if_missing(true);
opts.set_max_open_files(config.max_open_files);
if config.secondary_mode {
opts.set_max_open_files(-1)
} else {
opts.set_max_open_files(config.max_open_files);
}
opts.set_bytes_per_sync(1 * MB as u64);
opts.set_keep_log_file_num(1);
opts.increase_parallelism(cmp::max(1, num_cpus::get() as i32 / 2));

opts
}

fn generate_read_options() -> ReadOptions {
let mut read_opts = ReadOptions::default();
read_opts.set_verify_checksums(false);
read_opts
}

/// Generate the block based options for RocksDB, based on the given `DatabaseConfig`.
fn generate_block_based_options(config: &DatabaseConfig) -> BlockBasedOptions {
let mut block_opts = BlockBasedOptions::default();
Expand Down Expand Up @@ -358,13 +376,38 @@ impl Database {
}

let column_names: Vec<_> = (0..config.columns).map(|c| format!("col{}", c)).collect();

let write_opts = WriteOptions::default();
let mut read_opts = ReadOptions::default();
read_opts.set_verify_checksums(false);
let read_opts = generate_read_options();

let db = if config.secondary_mode {
Self::open_secondary(&opts, path, column_names.as_slice())?
} else {
let column_names: Vec<&str> = column_names.iter().map(|s| s.as_str()).collect();
Self::open_primary(&opts, path, config, column_names.as_slice(), &block_opts)?
};

Ok(Database {
db: RwLock::new(Some(DBAndColumns { db, column_names })),
config: config.clone(),
path: path.to_owned(),
opts,
read_opts,
write_opts,
block_opts,
stats: stats::RunningDbStats::new(),
})
}

/// Internal api to open a database in primary mode
fn open_primary(opts: &Options,
path: &str,
config: &DatabaseConfig,
column_names: &[&str],
block_opts: &BlockBasedOptions
) -> io::Result<rocksdb::DB> {

let cf_descriptors: Vec<_> = (0..config.columns)
.map(|i| ColumnFamilyDescriptor::new(&column_names[i as usize], config.column_config(&block_opts, i)))
.map(|i| ColumnFamilyDescriptor::new(column_names[i as usize], config.column_config(&block_opts, i)))
.collect();

let db = match DB::open_cf_descriptors(&opts, path, cf_descriptors) {
Expand All @@ -385,31 +428,36 @@ impl Database {
ok => ok,
};

let db = match db {
Ok(match db {
Ok(db) => db,
Err(ref s) if is_corrupted(s) => {
warn!("DB corrupted: {}, attempting repair", s);
DB::repair(&opts, path).map_err(other_io_err)?;

let cf_descriptors: Vec<_> = (0..config.columns)
.map(|i| {
ColumnFamilyDescriptor::new(&column_names[i as usize], config.column_config(&block_opts, i))
ColumnFamilyDescriptor::new(column_names[i as usize], config.column_config(&block_opts, i))
})
.collect();

DB::open_cf_descriptors(&opts, path, cf_descriptors).map_err(other_io_err)?
}
Err(s) => return Err(other_io_err(s)),
};
Ok(Database {
db: RwLock::new(Some(DBAndColumns { db, column_names })),
config: config.clone(),
path: path.to_owned(),
opts,
read_opts,
write_opts,
block_opts,
stats: stats::RunningDbStats::new(),
})
}

/// Internal api to open a database in secondary mode
fn open_secondary(opts: &Options, path: &str, column_names: &[String]) -> io::Result<rocksdb::DB> {
let db = DB::open_cf_as_secondary(&opts, path, path, column_names);

Ok(match db {
Ok(db) => db,
Err(ref s) if is_corrupted(s) => {
warn!("DB corrupted: {}, attempting repair", s);
DB::repair(&opts, path).map_err(other_io_err)?;
DB::open_cf_as_secondary(&opts, path, path, column_names).map_err(other_io_err)?
}
Err(s) => return Err(other_io_err(s)),
})
}

Expand All @@ -436,22 +484,22 @@ impl Database {
match op {
DBOp::Insert { col: _, key, value } => {
stats_total_bytes += key.len() + value.len();
batch.put_cf(cf, &key, &value).map_err(other_io_err)?
batch.put_cf(cf, &key, &value);
}
DBOp::Delete { col: _, key } => {
// We count deletes as writes.
stats_total_bytes += key.len();
batch.delete_cf(cf, &key).map_err(other_io_err)?
batch.delete_cf(cf, &key);
}
DBOp::DeletePrefix { col, prefix } => {
let end_prefix = kvdb::end_prefix(&prefix[..]);
let no_end = end_prefix.is_none();
let end_range = end_prefix.unwrap_or_else(|| vec![u8::max_value(); 16]);
batch.delete_range_cf(cf, &prefix[..], &end_range[..]).map_err(other_io_err)?;
batch.delete_range_cf(cf, &prefix[..], &end_range[..]);
if no_end {
let prefix = if prefix.len() > end_range.len() { &prefix[..] } else { &end_range[..] };
for (key, _) in self.iter_with_prefix(col, prefix) {
batch.delete_cf(cf, &key[..]).map_err(other_io_err)?;
batch.delete_cf(cf, &key[..]);
}
}
}
Expand Down Expand Up @@ -502,7 +550,8 @@ impl Database {
pub fn iter<'a>(&'a self, col: u32) -> impl Iterator<Item = KeyValuePair> + 'a {
let read_lock = self.db.read();
let optional = if read_lock.is_some() {
let guarded = iter::ReadGuardedIterator::new(read_lock, col, &self.read_opts);
let read_opts = generate_read_options();
let guarded = iter::ReadGuardedIterator::new(read_lock, col, read_opts);
Some(guarded)
} else {
None
Expand All @@ -516,19 +565,12 @@ impl Database {
fn iter_with_prefix<'a>(&'a self, col: u32, prefix: &'a [u8]) -> impl Iterator<Item = iter::KeyValuePair> + 'a {
let read_lock = self.db.read();
let optional = if read_lock.is_some() {
let mut read_opts = ReadOptions::default();
read_opts.set_verify_checksums(false);
let mut read_opts = generate_read_options();
// rocksdb doesn't work with an empty upper bound
let end_prefix = kvdb::end_prefix(prefix).map(|end_prefix| {
let end_prefix = end_prefix.into_boxed_slice();
// SAFETY: the end_prefix lives as long as the iterator
// See `ReadGuardedIterator` definition for more details.
unsafe {
read_opts.set_iterate_upper_bound(&end_prefix);
}
end_prefix
});
let guarded = iter::ReadGuardedIterator::new_with_prefix(read_lock, col, prefix, end_prefix, &read_opts);
if let Some(end_prefix) = kvdb::end_prefix(prefix) {
read_opts.set_iterate_upper_bound(end_prefix);
}
let guarded = iter::ReadGuardedIterator::new_with_prefix(read_lock, col, prefix, read_opts);
Some(guarded)
} else {
None
Expand Down Expand Up @@ -718,8 +760,9 @@ mod tests {
fn put_and_get() -> io::Result<()> {
let db = create(1)?;
st::test_put_and_get(&db)
}

}

#[test]
fn delete_and_get() -> io::Result<()> {
let db = create(1)?;
Expand Down Expand Up @@ -756,6 +799,24 @@ mod tests {
st::test_io_stats(&db)
}

#[test]
fn secondary_db_get() -> io::Result<()> {
let tempdir = TempDir::new("")?;
let config = DatabaseConfig::with_columns(1);
let db = Database::open(&config, tempdir.path().to_str().expect("tempdir path is valid unicode"))?;

let key1 = b"key1";
let mut transaction = db.transaction();
transaction.put(0, key1, b"horse");
db.write(transaction)?;

let mut config = DatabaseConfig::with_columns(1);
config.secondary_mode = true;
Comment thread
insipx marked this conversation as resolved.
Outdated
let second_db = Database::open(&config, tempdir.path().to_str().expect("tempdir path is valid unicode"))?;
assert_eq!(&*second_db.get(0, key1)?.unwrap(), b"horse");
Ok(())
}

#[test]
fn mem_tables_size() {
let tempdir = TempDir::new("").unwrap();
Expand All @@ -767,6 +828,7 @@ mod tests {
columns: 11,
keep_log_file_num: 1,
enable_statistics: false,
secondary_mode: false,
};

let db = Database::open(&config, tempdir.path().to_str().unwrap()).unwrap();
Expand Down