Skip to content
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 57 additions & 39 deletions kvdb-rocksdb/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2015-2018 Parity Technologies (UK) Ltd.
// Copyright 2015-2019 Parity Technologies (UK) Ltd.
// This file is part of Parity.

// Parity is free software: you can redistribute it and/or modify
Expand Down Expand Up @@ -152,47 +152,56 @@ impl CompactionProfile {
pub struct DatabaseConfig {
/// Max number of open files.
pub max_open_files: i32,
/// Memory budget (in MiB) used for setting block cache size, write buffer size.
pub memory_budget: Option<usize>,
/// Compaction profile
/// Memory budget (in MiB) used for setting block cache size,
/// write buffer size for each column.
pub memory_budget: Vec<Option<usize>>,
/// Compaction profile.
pub compaction: CompactionProfile,
/// Set number of columns
pub columns: Option<u32>,
/// Specify the maximal number of info log files to be kept.
Comment thread
ordian marked this conversation as resolved.
Outdated
Comment thread
dvdplm marked this conversation as resolved.
Outdated
pub keep_log_file_num: i32,
}

impl DatabaseConfig {
/// Create new `DatabaseConfig` with default parameters and specified set of columns.
/// Note that cache sizes must be explicitly set.
pub fn with_columns(columns: Option<u32>) -> Self {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't new_with_columns be a more idiomatic name for a constructor?

let mut config = Self::default();
config.columns = columns;
config.memory_budget.resize(columns.unwrap_or(0) as usize, None);
config
}

pub fn memory_budget(&self) -> usize {
self.memory_budget.unwrap_or(DB_DEFAULT_MEMORY_BUDGET_MB) * MB
if self.memory_budget.is_empty() {
return DB_DEFAULT_MEMORY_BUDGET_MB * MB;
Comment thread
ordian marked this conversation as resolved.
}
self.memory_budget
.iter()
.map(|b| b.unwrap_or(DB_DEFAULT_MEMORY_BUDGET_MB) * MB)
.sum()
}

pub fn memory_budget_per_col(&self) -> usize {
self.memory_budget() / self.columns.unwrap_or(1) as usize
/// # Panics
///
/// If `col` is out of bounds.
Comment thread
ordian marked this conversation as resolved.
Outdated
pub fn memory_budget_per_col(&self, col: usize) -> usize {
self.memory_budget[col].unwrap_or(DB_DEFAULT_MEMORY_BUDGET_MB) * MB
Comment thread
ordian marked this conversation as resolved.
Outdated
}
}

impl Default for DatabaseConfig {
fn default() -> DatabaseConfig {
DatabaseConfig {
max_open_files: 512,
memory_budget: None,
memory_budget: vec![],
compaction: CompactionProfile::default(),
columns: None,
keep_log_file_num: 1,
}
}
}

/// Database iterator (for flushed data only)
// The compromise of holding only a virtual borrow vs. holding a lock on the
Comment thread
dvdplm marked this conversation as resolved.
// inner DB (to prevent closing via restoration) may be re-evaluated in the future.
//
pub struct DatabaseIterator<'a> {
iter: InterleaveOrdered<::std::vec::IntoIter<(Box<[u8]>, Box<[u8]>)>, DBIterator>,
_marker: PhantomData<&'a Database>,
Expand All @@ -212,7 +221,7 @@ struct DBAndColumns {
}

// get column family configuration from database config.
fn col_config(config: &DatabaseConfig, block_opts: &BlockBasedOptions) -> io::Result<Options> {
fn col_config(config: &DatabaseConfig, block_opts: &BlockBasedOptions, memory_budget_per_col: usize) -> io::Result<Options> {
let mut opts = Options::new();

opts.set_parsed_options("level_compaction_dynamic_level_bytes=true").map_err(other_io_err)?;
Expand All @@ -225,7 +234,7 @@ fn col_config(config: &DatabaseConfig, block_opts: &BlockBasedOptions) -> io::Re
))
.map_err(other_io_err)?;

opts.optimize_level_style_compaction(config.memory_budget_per_col() as i32);
opts.optimize_level_style_compaction(memory_budget_per_col as i32);
opts.set_target_file_size_base(config.compaction.initial_file_size);

opts.set_parsed_options("compression_per_level=").map_err(other_io_err)?;
Expand Down Expand Up @@ -284,10 +293,18 @@ impl Database {
opts.set_use_fsync(false);
opts.create_if_missing(true);
opts.set_max_open_files(config.max_open_files);
opts.set_parsed_options("keep_log_file_num=1").map_err(other_io_err)?;
opts.set_parsed_options(&format!("keep_log_file_num={}", config.keep_log_file_num)).map_err(other_io_err)?;
opts.set_parsed_options("bytes_per_sync=1048576").map_err(other_io_err)?;
opts.set_db_write_buffer_size(config.memory_budget_per_col() / 2);
opts.increase_parallelism(cmp::max(1, ::num_cpus::get() as i32 / 2));
if config.memory_budget.is_empty() {
Comment thread
dvdplm marked this conversation as resolved.
Outdated
let budget = config.memory_budget();
opts.set_db_write_buffer_size(budget / 2);
// from https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB#memtable
// Memtable size is controlled by the option `write_buffer_size`.
// If you increase your memtable size, be sure to also increase your L1 size!
// L1 size is controlled by the option `max_bytes_for_level_base`.
opts.set_parsed_options(&format!("max_bytes_for_level_base={}", budget)).map_err(other_io_err)?;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the advice is to set max_bytes_for_level to 2x the memtable size?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've set it to the same value as write_buffer_size, couldn't find any advice on that.

}
opts.increase_parallelism(cmp::max(1, num_cpus::get() as i32 / 2));
Comment thread
dvdplm marked this conversation as resolved.

let mut block_opts = BlockBasedOptions::new();

Expand All @@ -308,23 +325,23 @@ impl Database {
fs::remove_file(db_corrupted)?;
}

let columns = config.columns.unwrap_or(0) as usize;
let columns = config.memory_budget.len();

let mut cf_options = Vec::with_capacity(columns);
let cfnames: Vec<_> = (0..columns).map(|c| format!("col{}", c)).collect();
let cfnames: Vec<&str> = cfnames.iter().map(|n| n as &str).collect();

for _ in 0..config.columns.unwrap_or(0) {
cf_options.push(col_config(&config, &block_opts)?);
for i in 0..config.memory_budget.len() {
Comment thread
dvdplm marked this conversation as resolved.
Outdated
cf_options.push(col_config(&config, &block_opts, config.memory_budget_per_col(i))?);
}

let write_opts = WriteOptions::new();
let mut read_opts = ReadOptions::new();
read_opts.set_verify_checksums(false);

let mut cfs: Vec<Column> = Vec::new();
let db = match config.columns {
Some(_) => {
let db = match config.memory_budget.is_empty() {
Comment thread
dvdplm marked this conversation as resolved.
Outdated
false => {
match DB::open_cf(&opts, path, &cfnames, &cf_options) {
Ok(db) => {
cfs = cfnames
Expand All @@ -350,7 +367,7 @@ impl Database {
}
}
}
None => DB::open(&opts, path),
true => DB::open(&opts, path),
};

let db = match db {
Expand All @@ -377,13 +394,13 @@ impl Database {
Ok(Database {
db: RwLock::new(Some(DBAndColumns { db: db, cfs: cfs })),
config: config.clone(),
write_opts: write_opts,
write_opts,
overlay: RwLock::new((0..(num_cols + 1)).map(|_| HashMap::new()).collect()),
flushing: RwLock::new((0..(num_cols + 1)).map(|_| HashMap::new()).collect()),
flushing_lock: Mutex::new(false),
path: path.to_owned(),
read_opts: read_opts,
block_opts: block_opts,
read_opts,
block_opts,
})
}

Expand Down Expand Up @@ -642,21 +659,17 @@ impl Database {

/// The number of non-default column families.
pub fn num_columns(&self) -> u32 {
self.db
.read()
.as_ref()
.and_then(|db| if db.cfs.is_empty() { None } else { Some(db.cfs.len()) })
.map(|n| n as u32)
.unwrap_or(0)
self.config.memory_budget.len() as u32
}

/// Drop a column family.
pub fn drop_column(&self) -> io::Result<()> {
pub fn drop_column(&mut self) -> io::Result<()> {
Comment thread
ordian marked this conversation as resolved.
Outdated
match *self.db.write() {
Some(DBAndColumns { ref mut db, ref mut cfs }) => {
if let Some(col) = cfs.pop() {
let name = format!("col{}", cfs.len());
drop(col);
self.config.memory_budget.resize(cfs.len(), None);
Comment thread
ordian marked this conversation as resolved.
Outdated
db.drop_cf(&name).map_err(other_io_err)?;
}
Ok(())
Expand All @@ -666,12 +679,17 @@ impl Database {
}

/// Add a column family.
pub fn add_column(&self) -> io::Result<()> {
pub fn add_column(&mut self) -> io::Result<()> {
match *self.db.write() {
Some(DBAndColumns { ref mut db, ref mut cfs }) => {
let col = cfs.len() as u32;
let col = cfs.len();
let name = format!("col{}", col);
cfs.push(db.create_cf(&name, &col_config(&self.config, &self.block_opts)?).map_err(other_io_err)?);
if self.config.memory_budget.len() < col + 1 {
self.config.memory_budget.resize(col + 1, None);
}
let memory_budget_per_col = self.config.memory_budget_per_col(col);
let col_config = col_config(&self.config, &self.block_opts, memory_budget_per_col)?;
Comment thread
ordian marked this conversation as resolved.
Outdated
cfs.push(db.create_cf(&name, &col_config).map_err(other_io_err)?);
Ok(())
}
None => Ok(()),
Expand Down Expand Up @@ -820,7 +838,7 @@ mod tests {

// open empty, add 5.
{
let db = Database::open(&config, tempdir.path().to_str().unwrap()).unwrap();
let mut db = Database::open(&config, tempdir.path().to_str().unwrap()).unwrap();
assert_eq!(db.num_columns(), 0);

for i in 0..5 {
Expand All @@ -845,7 +863,7 @@ mod tests {

// open 5, remove all.
{
let db = Database::open(&config_5, tempdir.path().to_str().unwrap()).unwrap();
let mut db = Database::open(&config_5, tempdir.path().to_str().unwrap()).unwrap();
assert_eq!(db.num_columns(), 5);

for i in (0..5).rev() {
Expand Down