diff --git a/kvdb-rocksdb/src/lib.rs b/kvdb-rocksdb/src/lib.rs index c4a9c0b15..3cdf19e75 100644 --- a/kvdb-rocksdb/src/lib.rs +++ b/kvdb-rocksdb/src/lib.rs @@ -1,4 +1,4 @@ -// Copyright 2015-2018 Parity Technologies (UK) Ltd. +// Copyright 2015-2019 Parity Technologies (UK) Ltd. // This file is part of Parity. // Parity is free software: you can redistribute it and/or modify @@ -44,9 +44,17 @@ where io::Error::new(io::ErrorKind::Other, e) } +// Used for memory budget. +type MiB = usize; + const KB: usize = 1024; const MB: usize = 1024 * KB; -const DB_DEFAULT_MEMORY_BUDGET_MB: usize = 128; + +/// The default column memory budget in MiB. +pub const DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB: MiB = 128; + +/// The default memory budget in MiB. +pub const DB_DEFAULT_MEMORY_BUDGET_MB: MiB = 512; enum KeyState { Insert(DBValue), @@ -54,14 +62,17 @@ enum KeyState { } /// Compaction profile for the database settings +/// Note, that changing these parameters may trigger +/// the compaction process of RocksDB on startup. +/// https://github.com/facebook/rocksdb/wiki/Leveled-Compaction#level_compaction_dynamic_level_bytes-is-true #[derive(Clone, Copy, PartialEq, Debug)] pub struct CompactionProfile { /// L0-L1 target file size + /// The mimimum size should be calculated in accordance with the + /// number of levels and the expected size of the database. pub initial_file_size: u64, /// block size pub block_size: usize, - /// rate limiter for background flushes and compactions, bytes/sec, if any - pub write_rate_limit: Option, } impl Default for CompactionProfile { @@ -101,9 +112,10 @@ impl CompactionProfile { let hdd_check_file = db_path .to_str() .and_then(|path_str| Command::new("df").arg(path_str).output().ok()) - .and_then(|df_res| match df_res.status.success() { - true => Some(df_res.stdout), - false => None, + .and_then(|df_res| if df_res.status.success() { + Some(df_res.stdout) + } else { + None }) .and_then(rotational_from_df_output); // Read out the file and match compaction profile. @@ -134,7 +146,7 @@ impl CompactionProfile { /// Default profile suitable for SSD storage pub fn ssd() -> CompactionProfile { - CompactionProfile { initial_file_size: 64 * MB as u64, block_size: 16 * KB, write_rate_limit: None } + CompactionProfile { initial_file_size: 64 * MB as u64, block_size: 8 * MB } } /// Slow HDD compaction profile @@ -142,7 +154,6 @@ impl CompactionProfile { CompactionProfile { initial_file_size: 256 * MB as u64, block_size: 64 * KB, - write_rate_limit: Some(16 * MB as u64), } } } @@ -152,29 +163,62 @@ impl CompactionProfile { pub struct DatabaseConfig { /// Max number of open files. pub max_open_files: i32, - /// Memory budget (in MiB) used for setting block cache size, write buffer size. - pub memory_budget: Option, - /// Compaction profile + /// Memory budget (in MiB) used for setting block cache size and + /// write buffer size for each column including the default one. + /// If the memory budget of a column is not specified, + /// `DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB` is used for that column. + pub memory_budget: HashMap, MiB>, + /// Compaction profile. pub compaction: CompactionProfile, - /// Set number of columns + /// Set number of columns. pub columns: Option, + /// Specify the maximum number of info/debug log files to be kept. + pub keep_log_file_num: i32, } impl DatabaseConfig { /// Create new `DatabaseConfig` with default parameters and specified set of columns. /// Note that cache sizes must be explicitly set. pub fn with_columns(columns: Option) -> Self { - let mut config = Self::default(); - config.columns = columns; - config + Self { columns, ..Default::default() } + } + + /// Returns the total memory budget in bytes. + pub fn memory_budget(&self) -> MiB { + if self.memory_budget.is_empty() && self.columns.is_none() { + return DB_DEFAULT_MEMORY_BUDGET_MB * MB; + } + (0..=self.columns.unwrap_or(0)) + .map(|i| self.memory_budget.get(&i.checked_sub(1)).unwrap_or(&DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB) * MB) + .sum() } - pub fn memory_budget(&self) -> usize { - self.memory_budget.unwrap_or(DB_DEFAULT_MEMORY_BUDGET_MB) * MB + /// Returns the memory budget of the specified column in bytes. + fn memory_budget_per_col(&self, col: Option) -> MiB { + self.memory_budget.get(&col).unwrap_or(&DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB) * MB } - pub fn memory_budget_per_col(&self) -> usize { - self.memory_budget() / self.columns.unwrap_or(1) as usize + // Get column family configuration with the given block based options. + fn column_config(&self, block_opts: &BlockBasedOptions, col: Option) -> io::Result { + let memory_budget_per_col = self.memory_budget_per_col(col); + let mut opts = Options::new(); + + opts.set_parsed_options("level_compaction_dynamic_level_bytes=true").map_err(other_io_err)?; + + opts.set_block_based_table_factory(block_opts); + + opts.set_parsed_options(&format!( + "block_based_table_factory={{{};{}}}", + "cache_index_and_filter_blocks=true", "pin_l0_filter_and_index_blocks_in_cache=true" + )) + .map_err(other_io_err)?; + + opts.optimize_level_style_compaction(memory_budget_per_col as i32); + opts.set_target_file_size_base(self.compaction.initial_file_size); + + opts.set_parsed_options("compression_per_level=").map_err(other_io_err)?; + + Ok(opts) } } @@ -182,9 +226,10 @@ impl Default for DatabaseConfig { fn default() -> DatabaseConfig { DatabaseConfig { max_open_files: 512, - memory_budget: None, + memory_budget: HashMap::new(), compaction: CompactionProfile::default(), columns: None, + keep_log_file_num: 1, } } } @@ -192,7 +237,6 @@ impl Default for DatabaseConfig { /// Database iterator (for flushed data only) // The compromise of holding only a virtual borrow vs. holding a lock on the // inner DB (to prevent closing via restoration) may be re-evaluated in the future. -// pub struct DatabaseIterator<'a> { iter: InterleaveOrdered<::std::vec::IntoIter<(Box<[u8]>, Box<[u8]>)>, DBIterator>, _marker: PhantomData<&'a Database>, @@ -211,28 +255,6 @@ struct DBAndColumns { cfs: Vec, } -// get column family configuration from database config. -fn col_config(config: &DatabaseConfig, block_opts: &BlockBasedOptions) -> io::Result { - let mut opts = Options::new(); - - opts.set_parsed_options("level_compaction_dynamic_level_bytes=true").map_err(other_io_err)?; - - opts.set_block_based_table_factory(block_opts); - - opts.set_parsed_options(&format!( - "block_based_table_factory={{{};{}}}", - "cache_index_and_filter_blocks=true", "pin_l0_filter_and_index_blocks_in_cache=true" - )) - .map_err(other_io_err)?; - - opts.optimize_level_style_compaction(config.memory_budget_per_col() as i32); - opts.set_target_file_size_base(config.compaction.initial_file_size); - - opts.set_parsed_options("compression_per_level=").map_err(other_io_err)?; - - Ok(opts) -} - /// Key-Value database. pub struct Database { db: RwLock>, @@ -278,16 +300,24 @@ impl Database { pub fn open(config: &DatabaseConfig, path: &str) -> io::Result { let mut opts = Options::new(); - if let Some(rate_limit) = config.compaction.write_rate_limit { - opts.set_parsed_options(&format!("rate_limiter_bytes_per_sec={}", rate_limit)).map_err(other_io_err)?; - } opts.set_use_fsync(false); opts.create_if_missing(true); opts.set_max_open_files(config.max_open_files); - opts.set_parsed_options("keep_log_file_num=1").map_err(other_io_err)?; + opts.set_parsed_options(&format!("keep_log_file_num={}", config.keep_log_file_num)).map_err(other_io_err)?; opts.set_parsed_options("bytes_per_sync=1048576").map_err(other_io_err)?; - opts.set_db_write_buffer_size(config.memory_budget_per_col() / 2); - opts.increase_parallelism(cmp::max(1, ::num_cpus::get() as i32 / 2)); + + let columns = config.columns.unwrap_or(0); + + if columns == 0 { + let budget = config.memory_budget() / 2; + opts.set_db_write_buffer_size(budget); + // from https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB#memtable + // Memtable size is controlled by the option `write_buffer_size`. + // If you increase your memtable size, be sure to also increase your L1 size! + // L1 size is controlled by the option `max_bytes_for_level_base`. + opts.set_parsed_options(&format!("max_bytes_for_level_base={}", budget)).map_err(other_io_err)?; + } + opts.increase_parallelism(cmp::max(1, num_cpus::get() as i32 / 2)); let mut block_opts = BlockBasedOptions::new(); @@ -308,14 +338,12 @@ impl Database { fs::remove_file(db_corrupted)?; } - let columns = config.columns.unwrap_or(0) as usize; - - let mut cf_options = Vec::with_capacity(columns); + let mut cf_options = Vec::with_capacity(columns as usize); let cfnames: Vec<_> = (0..columns).map(|c| format!("col{}", c)).collect(); let cfnames: Vec<&str> = cfnames.iter().map(|n| n as &str).collect(); - for _ in 0..config.columns.unwrap_or(0) { - cf_options.push(col_config(&config, &block_opts)?); + for i in 0..columns { + cf_options.push(config.column_config(&block_opts, Some(i))?); } let write_opts = WriteOptions::new(); @@ -359,31 +387,30 @@ impl Database { warn!("DB corrupted: {}, attempting repair", s); DB::repair(&opts, path).map_err(other_io_err)?; - match cfnames.is_empty() { - true => DB::open(&opts, path).map_err(other_io_err)?, - false => { - let db = DB::open_cf(&opts, path, &cfnames, &cf_options).map_err(other_io_err)?; - cfs = cfnames - .iter() - .map(|n| db.cf_handle(n).expect("rocksdb opens a cf_handle for each cfname; qed")) - .collect(); - db - } + if cfnames.is_empty() { + DB::open(&opts, path).map_err(other_io_err)? + } else { + let db = DB::open_cf(&opts, path, &cfnames, &cf_options).map_err(other_io_err)?; + cfs = cfnames + .iter() + .map(|n| db.cf_handle(n).expect("rocksdb opens a cf_handle for each cfname; qed")) + .collect(); + db } } Err(s) => return Err(other_io_err(s)), }; let num_cols = cfs.len(); Ok(Database { - db: RwLock::new(Some(DBAndColumns { db: db, cfs: cfs })), + db: RwLock::new(Some(DBAndColumns { db, cfs })), config: config.clone(), - write_opts: write_opts, + write_opts, overlay: RwLock::new((0..(num_cols + 1)).map(|_| HashMap::new()).collect()), flushing: RwLock::new((0..(num_cols + 1)).map(|_| HashMap::new()).collect()), flushing_lock: Mutex::new(false), path: path.to_owned(), - read_opts: read_opts, - block_opts: block_opts, + read_opts, + block_opts, }) } @@ -654,9 +681,8 @@ impl Database { pub fn drop_column(&self) -> io::Result<()> { match *self.db.write() { Some(DBAndColumns { ref mut db, ref mut cfs }) => { - if let Some(col) = cfs.pop() { + if let Some(_col) = cfs.pop() { let name = format!("col{}", cfs.len()); - drop(col); db.drop_cf(&name).map_err(other_io_err)?; } Ok(()) @@ -669,9 +695,10 @@ impl Database { pub fn add_column(&self) -> io::Result<()> { match *self.db.write() { Some(DBAndColumns { ref mut db, ref mut cfs }) => { - let col = cfs.len() as u32; + let col = cfs.len(); let name = format!("col{}", col); - cfs.push(db.create_cf(&name, &col_config(&self.config, &self.block_opts)?).map_err(other_io_err)?); + let col_config = self.config.column_config(&self.block_opts, Some(col as u32))?; + cfs.push(db.create_cf(&name, &col_config).map_err(other_io_err)?); Ok(()) } None => Ok(()), @@ -823,9 +850,9 @@ mod tests { let db = Database::open(&config, tempdir.path().to_str().unwrap()).unwrap(); assert_eq!(db.num_columns(), 0); - for i in 0..5 { + for i in 1..=5 { db.add_column().unwrap(); - assert_eq!(db.num_columns(), i + 1); + assert_eq!(db.num_columns(), i); } }