Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 102 additions & 75 deletions kvdb-rocksdb/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2015-2018 Parity Technologies (UK) Ltd.
// Copyright 2015-2019 Parity Technologies (UK) Ltd.
// This file is part of Parity.

// Parity is free software: you can redistribute it and/or modify
Expand Down Expand Up @@ -44,24 +44,35 @@ where
io::Error::new(io::ErrorKind::Other, e)
}

// Used for memory budget.
type MiB = usize;

const KB: usize = 1024;
const MB: usize = 1024 * KB;
const DB_DEFAULT_MEMORY_BUDGET_MB: usize = 128;

/// The default column memory budget in MiB.
pub const DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB: MiB = 128;

/// The default memory budget in MiB.
pub const DB_DEFAULT_MEMORY_BUDGET_MB: MiB = 512;

enum KeyState {
Insert(DBValue),
Delete,
}

/// Compaction profile for the database settings
/// Note, that changing these parameters may trigger
/// the compaction process of RocksDB on startup.
/// https://github.com/facebook/rocksdb/wiki/Leveled-Compaction#level_compaction_dynamic_level_bytes-is-true
#[derive(Clone, Copy, PartialEq, Debug)]
pub struct CompactionProfile {
/// L0-L1 target file size
/// The mimimum size should be calculated in accordance with the
/// number of levels and the expected size of the database.
pub initial_file_size: u64,
/// block size
pub block_size: usize,
/// rate limiter for background flushes and compactions, bytes/sec, if any
pub write_rate_limit: Option<u64>,
}

impl Default for CompactionProfile {
Expand Down Expand Up @@ -101,9 +112,10 @@ impl CompactionProfile {
let hdd_check_file = db_path
.to_str()
.and_then(|path_str| Command::new("df").arg(path_str).output().ok())
.and_then(|df_res| match df_res.status.success() {
true => Some(df_res.stdout),
false => None,
.and_then(|df_res| if df_res.status.success() {
Some(df_res.stdout)
} else {
None
})
.and_then(rotational_from_df_output);
// Read out the file and match compaction profile.
Expand Down Expand Up @@ -134,15 +146,14 @@ impl CompactionProfile {

/// Default profile suitable for SSD storage
pub fn ssd() -> CompactionProfile {
CompactionProfile { initial_file_size: 64 * MB as u64, block_size: 16 * KB, write_rate_limit: None }
CompactionProfile { initial_file_size: 64 * MB as u64, block_size: 8 * MB }
}

/// Slow HDD compaction profile
pub fn hdd() -> CompactionProfile {
CompactionProfile {
initial_file_size: 256 * MB as u64,
block_size: 64 * KB,
write_rate_limit: Some(16 * MB as u64),
}
}
}
Expand All @@ -152,47 +163,80 @@ impl CompactionProfile {
pub struct DatabaseConfig {
/// Max number of open files.
pub max_open_files: i32,
/// Memory budget (in MiB) used for setting block cache size, write buffer size.
pub memory_budget: Option<usize>,
/// Compaction profile
/// Memory budget (in MiB) used for setting block cache size and
/// write buffer size for each column including the default one.
/// If the memory budget of a column is not specified,
/// `DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB` is used for that column.
pub memory_budget: HashMap<Option<u32>, MiB>,
/// Compaction profile.
pub compaction: CompactionProfile,
/// Set number of columns
/// Set number of columns.
pub columns: Option<u32>,
/// Specify the maximum number of info/debug log files to be kept.
pub keep_log_file_num: i32,
}

impl DatabaseConfig {
/// Create new `DatabaseConfig` with default parameters and specified set of columns.
/// Note that cache sizes must be explicitly set.
pub fn with_columns(columns: Option<u32>) -> Self {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't new_with_columns be a more idiomatic name for a constructor?

let mut config = Self::default();
config.columns = columns;
config
Self { columns, ..Default::default() }
}

/// Returns the total memory budget in bytes.
pub fn memory_budget(&self) -> MiB {
if self.memory_budget.is_empty() && self.columns.is_none() {
return DB_DEFAULT_MEMORY_BUDGET_MB * MB;
Comment thread
ordian marked this conversation as resolved.
}
(0..=self.columns.unwrap_or(0))
.map(|i| self.memory_budget.get(&i.checked_sub(1)).unwrap_or(&DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB) * MB)
.sum()
}

pub fn memory_budget(&self) -> usize {
self.memory_budget.unwrap_or(DB_DEFAULT_MEMORY_BUDGET_MB) * MB
/// Returns the memory budget of the specified column in bytes.
fn memory_budget_per_col(&self, col: Option<u32>) -> MiB {
self.memory_budget.get(&col).unwrap_or(&DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB) * MB
}

pub fn memory_budget_per_col(&self) -> usize {
self.memory_budget() / self.columns.unwrap_or(1) as usize
// Get column family configuration with the given block based options.
fn column_config(&self, block_opts: &BlockBasedOptions, col: Option<u32>) -> io::Result<Options> {
let memory_budget_per_col = self.memory_budget_per_col(col);
let mut opts = Options::new();

opts.set_parsed_options("level_compaction_dynamic_level_bytes=true").map_err(other_io_err)?;

opts.set_block_based_table_factory(block_opts);

opts.set_parsed_options(&format!(
"block_based_table_factory={{{};{}}}",
"cache_index_and_filter_blocks=true", "pin_l0_filter_and_index_blocks_in_cache=true"
))
.map_err(other_io_err)?;

opts.optimize_level_style_compaction(memory_budget_per_col as i32);
opts.set_target_file_size_base(self.compaction.initial_file_size);

opts.set_parsed_options("compression_per_level=").map_err(other_io_err)?;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not new code but shouldn't there be a value passed in here? (set_parsed_option is pretty terrible imo).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was added in openethereum/parity-ethereum@3cf1aac by @andresilva with the intention of enabling compression on all levels, otherwise optimize_level_style_compaction will disable compression for the first two levels IIUC. #257 uses https://docs.rs/rocksdb/0.13.0/rocksdb/struct.Options.html#method.set_compression_per_level instead of set_parsed_option.


Ok(opts)
}
}

impl Default for DatabaseConfig {
fn default() -> DatabaseConfig {
DatabaseConfig {
max_open_files: 512,
memory_budget: None,
memory_budget: HashMap::new(),
compaction: CompactionProfile::default(),
columns: None,
keep_log_file_num: 1,
}
}
}

/// Database iterator (for flushed data only)
// The compromise of holding only a virtual borrow vs. holding a lock on the
Comment thread
dvdplm marked this conversation as resolved.
// inner DB (to prevent closing via restoration) may be re-evaluated in the future.
//
pub struct DatabaseIterator<'a> {
iter: InterleaveOrdered<::std::vec::IntoIter<(Box<[u8]>, Box<[u8]>)>, DBIterator>,
_marker: PhantomData<&'a Database>,
Expand All @@ -211,28 +255,6 @@ struct DBAndColumns {
cfs: Vec<Column>,
}

// get column family configuration from database config.
fn col_config(config: &DatabaseConfig, block_opts: &BlockBasedOptions) -> io::Result<Options> {
let mut opts = Options::new();

opts.set_parsed_options("level_compaction_dynamic_level_bytes=true").map_err(other_io_err)?;

opts.set_block_based_table_factory(block_opts);

opts.set_parsed_options(&format!(
"block_based_table_factory={{{};{}}}",
"cache_index_and_filter_blocks=true", "pin_l0_filter_and_index_blocks_in_cache=true"
))
.map_err(other_io_err)?;

opts.optimize_level_style_compaction(config.memory_budget_per_col() as i32);
opts.set_target_file_size_base(config.compaction.initial_file_size);

opts.set_parsed_options("compression_per_level=").map_err(other_io_err)?;

Ok(opts)
}

/// Key-Value database.
pub struct Database {
db: RwLock<Option<DBAndColumns>>,
Expand Down Expand Up @@ -278,16 +300,24 @@ impl Database {
pub fn open(config: &DatabaseConfig, path: &str) -> io::Result<Database> {
let mut opts = Options::new();

if let Some(rate_limit) = config.compaction.write_rate_limit {
opts.set_parsed_options(&format!("rate_limiter_bytes_per_sec={}", rate_limit)).map_err(other_io_err)?;
}
opts.set_use_fsync(false);
opts.create_if_missing(true);
opts.set_max_open_files(config.max_open_files);
opts.set_parsed_options("keep_log_file_num=1").map_err(other_io_err)?;
opts.set_parsed_options(&format!("keep_log_file_num={}", config.keep_log_file_num)).map_err(other_io_err)?;
opts.set_parsed_options("bytes_per_sync=1048576").map_err(other_io_err)?;
opts.set_db_write_buffer_size(config.memory_budget_per_col() / 2);
opts.increase_parallelism(cmp::max(1, ::num_cpus::get() as i32 / 2));

let columns = config.columns.unwrap_or(0);

if columns == 0 {
let budget = config.memory_budget() / 2;
opts.set_db_write_buffer_size(budget);
// from https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB#memtable
// Memtable size is controlled by the option `write_buffer_size`.
// If you increase your memtable size, be sure to also increase your L1 size!
// L1 size is controlled by the option `max_bytes_for_level_base`.
opts.set_parsed_options(&format!("max_bytes_for_level_base={}", budget)).map_err(other_io_err)?;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the advice is to set max_bytes_for_level to 2x the memtable size?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've set it to the same value as write_buffer_size, couldn't find any advice on that.

}
opts.increase_parallelism(cmp::max(1, num_cpus::get() as i32 / 2));
Comment thread
dvdplm marked this conversation as resolved.

let mut block_opts = BlockBasedOptions::new();

Expand All @@ -308,14 +338,12 @@ impl Database {
fs::remove_file(db_corrupted)?;
}

let columns = config.columns.unwrap_or(0) as usize;

let mut cf_options = Vec::with_capacity(columns);
let mut cf_options = Vec::with_capacity(columns as usize);
let cfnames: Vec<_> = (0..columns).map(|c| format!("col{}", c)).collect();
let cfnames: Vec<&str> = cfnames.iter().map(|n| n as &str).collect();

for _ in 0..config.columns.unwrap_or(0) {
cf_options.push(col_config(&config, &block_opts)?);
for i in 0..columns {
cf_options.push(config.column_config(&block_opts, Some(i))?);
}

let write_opts = WriteOptions::new();
Expand Down Expand Up @@ -359,31 +387,30 @@ impl Database {
warn!("DB corrupted: {}, attempting repair", s);
DB::repair(&opts, path).map_err(other_io_err)?;

match cfnames.is_empty() {
true => DB::open(&opts, path).map_err(other_io_err)?,
false => {
let db = DB::open_cf(&opts, path, &cfnames, &cf_options).map_err(other_io_err)?;
cfs = cfnames
.iter()
.map(|n| db.cf_handle(n).expect("rocksdb opens a cf_handle for each cfname; qed"))
.collect();
db
}
if cfnames.is_empty() {
DB::open(&opts, path).map_err(other_io_err)?
} else {
let db = DB::open_cf(&opts, path, &cfnames, &cf_options).map_err(other_io_err)?;
cfs = cfnames
.iter()
.map(|n| db.cf_handle(n).expect("rocksdb opens a cf_handle for each cfname; qed"))
.collect();
db
}
}
Err(s) => return Err(other_io_err(s)),
};
let num_cols = cfs.len();
Ok(Database {
db: RwLock::new(Some(DBAndColumns { db: db, cfs: cfs })),
db: RwLock::new(Some(DBAndColumns { db, cfs })),
config: config.clone(),
write_opts: write_opts,
write_opts,
overlay: RwLock::new((0..(num_cols + 1)).map(|_| HashMap::new()).collect()),
flushing: RwLock::new((0..(num_cols + 1)).map(|_| HashMap::new()).collect()),
flushing_lock: Mutex::new(false),
path: path.to_owned(),
read_opts: read_opts,
block_opts: block_opts,
read_opts,
block_opts,
})
}

Expand Down Expand Up @@ -654,9 +681,8 @@ impl Database {
pub fn drop_column(&self) -> io::Result<()> {
match *self.db.write() {
Some(DBAndColumns { ref mut db, ref mut cfs }) => {
if let Some(col) = cfs.pop() {
if let Some(_col) = cfs.pop() {
let name = format!("col{}", cfs.len());
drop(col);
db.drop_cf(&name).map_err(other_io_err)?;
}
Ok(())
Expand All @@ -669,9 +695,10 @@ impl Database {
pub fn add_column(&self) -> io::Result<()> {
match *self.db.write() {
Some(DBAndColumns { ref mut db, ref mut cfs }) => {
let col = cfs.len() as u32;
let col = cfs.len();
let name = format!("col{}", col);
cfs.push(db.create_cf(&name, &col_config(&self.config, &self.block_opts)?).map_err(other_io_err)?);
let col_config = self.config.column_config(&self.block_opts, Some(col as u32))?;
cfs.push(db.create_cf(&name, &col_config).map_err(other_io_err)?);
Ok(())
}
None => Ok(()),
Expand Down Expand Up @@ -823,9 +850,9 @@ mod tests {
let db = Database::open(&config, tempdir.path().to_str().unwrap()).unwrap();
assert_eq!(db.num_columns(), 0);

for i in 0..5 {
for i in 1..=5 {
db.add_column().unwrap();
assert_eq!(db.num_columns(), i + 1);
assert_eq!(db.num_columns(), i);
}
}

Expand Down