From cf742b9f2c846f096631f5441dbc3595f1f97cc4 Mon Sep 17 00:00:00 2001 From: Kould Date: Mon, 28 Oct 2024 18:40:33 +0800 Subject: [PATCH] chore: codefmt --- examples/declare.rs | 4 +- src/compaction/mod.rs | 66 +++++++++++++++------------- src/fs/cache_reader.rs | 11 ++--- src/inmem/mutable.rs | 12 ++---- src/lib.rs | 56 ++++++++---------------- src/ondisk/sstable.rs | 21 +++++---- src/option.rs | 84 +++++++++++++++++------------------- src/stream/level.rs | 25 +++++++---- src/stream/mem_projection.rs | 4 +- src/stream/merge.rs | 12 ++---- src/stream/package.rs | 4 +- src/transaction.rs | 48 +++++++-------------- src/version/cleaner.rs | 8 ++-- src/version/mod.rs | 33 ++++++++++++-- src/version/set.rs | 68 ++++++++++++++++++++++------- tests/data_integrity.rs | 4 +- 16 files changed, 237 insertions(+), 223 deletions(-) diff --git a/examples/declare.rs b/examples/declare.rs index 5b569abe..6e6edcd3 100644 --- a/examples/declare.rs +++ b/examples/declare.rs @@ -22,9 +22,7 @@ async fn main() { // make sure the path exists let _ = fs::create_dir_all("./db_path/users").await; - let options = DbOption::from_path(Path::from_filesystem_path("./db_path/users").unwrap()) - .await - .unwrap(); + let options = DbOption::from(Path::from_filesystem_path("./db_path/users").unwrap()); // pluggable async runtime and I/O let db = DB::new(options, TokioExecutor::default()).await.unwrap(); diff --git a/src/compaction/mod.rs b/src/compaction/mod.rs index c759e3c7..3bd8b937 100644 --- a/src/compaction/mod.rs +++ b/src/compaction/mod.rs @@ -218,15 +218,21 @@ where .await?; streams.push(ScanStream::SsTable { - inner: SsTable::open(option, file, scope.gen, !is_local) - .await? - .scan( - (Bound::Unbounded, Bound::Unbounded), - u32::MAX.into(), - None, - ProjectionMask::all(), - ) - .await?, + inner: SsTable::open( + file, + scope.gen, + !is_local, + version.range_cache(), + version.meta_cache(), + ) + .await? + .scan( + (Bound::Unbounded, Bound::Unbounded), + u32::MAX.into(), + None, + ProjectionMask::all(), + ) + .await?, }); } } else { @@ -525,7 +531,7 @@ pub(crate) mod tests { tests::Test, timestamp::Timestamp, trigger::{TriggerFactory, TriggerType}, - version::{edit::VersionEdit, Version, MAX_LEVEL}, + version::{edit::VersionEdit, set::VersionSet, Version, MAX_LEVEL}, wal::log::LogType, DbError, DbOption, DB, }; @@ -583,9 +589,7 @@ pub(crate) mod tests { let temp_dir = tempfile::tempdir().unwrap(); let temp_dir_l0 = tempfile::tempdir().unwrap(); - let option = DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap()) - .await - .unwrap() + let option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()) .level_path( 0, Path::from_filesystem_path(temp_dir_l0.path()).unwrap(), @@ -699,9 +703,7 @@ pub(crate) mod tests { Path::from_filesystem_path(temp_dir.path()).unwrap(), "id".to_string(), 0, - ) - .await - .unwrap(); + ); manager .base_fs() .create_dir_all(&option.wal_dir_path()) @@ -769,9 +771,7 @@ pub(crate) mod tests { let temp_dir_l0 = TempDir::new().unwrap(); let temp_dir_l1 = TempDir::new().unwrap(); - let mut option = DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap()) - .await - .unwrap() + let mut option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()) .level_path( 0, Path::from_filesystem_path(temp_dir_l0.path()).unwrap(), @@ -1061,8 +1061,14 @@ pub(crate) mod tests { .unwrap(); let (sender, _) = bounded(1); - let mut version = - Version::::new(option.clone(), sender, Arc::new(AtomicU32::default())); + let (meta_cache, range_cache) = VersionSet::build_cache(&option).await.unwrap(); + let mut version = Version::::new( + option.clone(), + sender, + Arc::new(AtomicU32::default()), + range_cache, + meta_cache, + ); version.level_slice[0].push(Scope { min: 1.to_string(), max: 3.to_string(), @@ -1110,9 +1116,7 @@ pub(crate) mod tests { pub(crate) async fn major_panic() { let temp_dir = TempDir::new().unwrap(); - let mut option = DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap()) - .await - .unwrap(); + let mut option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); option.major_threshold_with_sst_size = 1; option.level_sst_magnification = 1; let manager = @@ -1181,8 +1185,14 @@ pub(crate) mod tests { let option = Arc::new(option); let (sender, _) = bounded(1); - let mut version = - Version::::new(option.clone(), sender, Arc::new(AtomicU32::default())); + let (meta_cache, range_cache) = VersionSet::build_cache(&option).await.unwrap(); + let mut version = Version::::new( + option.clone(), + sender, + Arc::new(AtomicU32::default()), + range_cache, + meta_cache, + ); version.level_slice[0].push(Scope { min: 0.to_string(), max: 4.to_string(), @@ -1219,9 +1229,7 @@ pub(crate) mod tests { async fn test_flush_major_level_sort() { let temp_dir = TempDir::new().unwrap(); - let mut option = DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap()) - .await - .unwrap(); + let mut option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); option.immutable_chunk_num = 1; option.immutable_chunk_max_num = 0; option.major_threshold_with_sst_size = 2; diff --git a/src/fs/cache_reader.rs b/src/fs/cache_reader.rs index 458bb9fa..3defb2d1 100644 --- a/src/fs/cache_reader.rs +++ b/src/fs/cache_reader.rs @@ -95,6 +95,7 @@ pub(crate) mod tests { ondisk::sstable::SsTable, record::{Record, RecordInstance}, tests::Test, + version::set::VersionSet, wal::log::LogType, DbOption, }; @@ -137,10 +138,7 @@ pub(crate) mod tests { #[tokio::test] async fn test_cache_read() { let temp_dir = TempDir::new().unwrap(); - let option = - DbOption::::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap()) - .await - .unwrap(); + let option = DbOption::::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); let fs = option.base_fs.clone().parse().unwrap(); fs.create_dir_all(&option.version_log_dir_path()) .await @@ -189,9 +187,10 @@ pub(crate) mod tests { let read_count = Arc::new(AtomicUsize::new(0)); let table_path = option.table_path(&table_gen, 0); + let (meta_cache, range_cache) = VersionSet::build_cache(&option).await.unwrap(); + for _ in 0..1000 { let mut scan = SsTable::::open( - &option, Box::new(CountFile { inner: fs .open_options(&table_path, FileType::Parquet.open_options(true)) @@ -201,6 +200,8 @@ pub(crate) mod tests { }), table_gen, true, + range_cache.clone(), + meta_cache.clone(), ) .await .unwrap() diff --git a/src/inmem/mutable.rs b/src/inmem/mutable.rs index dfdccb58..e5a1ea3c 100644 --- a/src/inmem/mutable.rs +++ b/src/inmem/mutable.rs @@ -228,9 +228,7 @@ mod tests { let temp_dir = tempfile::tempdir().unwrap(); let fs = Arc::new(TokioFs) as Arc; - let option = DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap()) - .await - .unwrap(); + let option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); fs.create_dir_all(&option.wal_dir_path()).await.unwrap(); let trigger = Arc::new(TriggerFactory::create(option.trigger_type)); @@ -278,9 +276,7 @@ mod tests { async fn range() { let temp_dir = tempfile::tempdir().unwrap(); let fs = Arc::new(TokioFs) as Arc; - let option = DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap()) - .await - .unwrap(); + let option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); fs.create_dir_all(&option.wal_dir_path()).await.unwrap(); let trigger = Arc::new(TriggerFactory::create(option.trigger_type)); @@ -367,9 +363,7 @@ mod tests { Path::from_filesystem_path(temp_dir.path()).unwrap(), "age".to_string(), 0, - ) - .await - .unwrap(); + ); let fs = Arc::new(TokioFs) as Arc; fs.create_dir_all(&option.wal_dir_path()).await.unwrap(); diff --git a/src/lib.rs b/src/lib.rs index d07fdc36..17ac3300 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -54,7 +54,7 @@ //! // make sure the path exists //! let _ = fs::create_dir_all("./db_path/users").await; //! -//! let options = DbOption::from_path(Path::from_filesystem_path("./db_path/users").unwrap()).await.unwrap(); +//! let options = DbOption::from(Path::from_filesystem_path("./db_path/users").unwrap()); //! // pluggable async runtime and I/O //! let db = DB::new(options, TokioExecutor::default()).await.unwrap(); //! // insert with owned value @@ -1513,9 +1513,7 @@ pub(crate) mod tests { let path = Path::from_filesystem_path(temp_dir.path()).unwrap(); let path_l0 = Path::from_filesystem_path(temp_dir_l0.path()).unwrap(); - let mut option = DbOption::from_path(path) - .await - .unwrap() + let mut option = DbOption::from(path) .level_path(0, path_l0, FsOptions::Local) .unwrap(); option.immutable_chunk_num = 1; @@ -1553,9 +1551,7 @@ pub(crate) mod tests { async fn test_flush() { let temp_dir = TempDir::new().unwrap(); - let mut option = DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap()) - .await - .unwrap(); + let mut option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); option.immutable_chunk_num = 1; option.immutable_chunk_max_num = 1; option.major_threshold_with_sst_size = 3; @@ -1587,11 +1583,9 @@ pub(crate) mod tests { let temp_dir = TempDir::new().unwrap(); let fs = Arc::new(TokioFs) as Arc; - let option = Arc::new( - DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap()) - .await - .unwrap(), - ); + let option = Arc::new(DbOption::from( + Path::from_filesystem_path(temp_dir.path()).unwrap(), + )); fs.create_dir_all(&option.wal_dir_path()).await.unwrap(); let (task_tx, _task_rx) = bounded(1); @@ -1648,15 +1642,11 @@ pub(crate) mod tests { let manager = StoreManager::new(FsOptions::Local, vec![]).unwrap(); let (desc, primary_key_index) = test_dyn_item_schema(); - let option = Arc::new( - DbOption::with_path( - Path::from_filesystem_path(temp_dir.path()).unwrap(), - "id".to_owned(), - primary_key_index, - ) - .await - .unwrap(), - ); + let option = Arc::new(DbOption::with_path( + Path::from_filesystem_path(temp_dir.path()).unwrap(), + "id".to_owned(), + primary_key_index, + )); manager .base_fs() .create_dir_all(&option.wal_dir_path()) @@ -1690,9 +1680,7 @@ pub(crate) mod tests { Path::from_filesystem_path(temp_dir.path()).unwrap(), "id".to_owned(), primary_key_index, - ) - .await - .unwrap(); + ); let db: DB = DB::with_schema(option, TokioExecutor::new(), desc, primary_key_index) .await @@ -1727,9 +1715,7 @@ pub(crate) mod tests { async fn test_get_removed() { let temp_dir = TempDir::new().unwrap(); - let mut option = DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap()) - .await - .unwrap(); + let mut option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); option.immutable_chunk_num = 1; option.immutable_chunk_max_num = 1; option.major_threshold_with_sst_size = 3; @@ -1768,9 +1754,7 @@ pub(crate) mod tests { Path::from_filesystem_path(temp_dir.path()).unwrap(), "id".to_string(), primary_key_index, - ) - .await - .unwrap(); + ); option.immutable_chunk_num = 1; option.immutable_chunk_max_num = 1; option.major_threshold_with_sst_size = 3; @@ -1978,9 +1962,7 @@ pub(crate) mod tests { Path::from_filesystem_path(temp_dir1.path()).unwrap(), "id".to_string(), primary_key_index, - ) - .await - .unwrap(); + ); option.immutable_chunk_num = 1; option.immutable_chunk_max_num = 1; option.major_threshold_with_sst_size = 3; @@ -1992,9 +1974,7 @@ pub(crate) mod tests { Path::from_filesystem_path(temp_dir2.path()).unwrap(), "id".to_string(), primary_key_index, - ) - .await - .unwrap(); + ); option2.immutable_chunk_num = 1; option2.immutable_chunk_max_num = 1; option2.major_threshold_with_sst_size = 3; @@ -2006,9 +1986,7 @@ pub(crate) mod tests { Path::from_filesystem_path(temp_dir3.path()).unwrap(), "id".to_string(), primary_key_index, - ) - .await - .unwrap(); + ); option3.immutable_chunk_num = 1; option3.immutable_chunk_max_num = 1; option3.major_threshold_with_sst_size = 3; diff --git a/src/ondisk/sstable.rs b/src/ondisk/sstable.rs index 33988f9a..afb631cf 100644 --- a/src/ondisk/sstable.rs +++ b/src/ondisk/sstable.rs @@ -11,11 +11,13 @@ use parquet::arrow::{ use super::{arrows::get_range_filter, scan::SsTableScan}; use crate::{ - fs::{cache_reader::CacheReader, CacheError, FileId}, + fs::{ + cache_reader::{CacheReader, MetaCache, RangeCache}, + CacheError, FileId, + }, record::Record, stream::record_batch::RecordBatchEntry, timestamp::{Timestamp, TimestampedRef}, - DbOption, }; pub(crate) struct SsTable @@ -31,18 +33,19 @@ where R: Record, { pub(crate) async fn open( - option: &DbOption, file: Box, gen: FileId, enable_cache: bool, + range_cache: RangeCache, + meta_cache: MetaCache, ) -> Result { let size = file.size().await?; let reader = if !enable_cache { Box::new(AsyncReader::new(file, size).await?) as Box } else { Box::new(CacheReader::new( - option.meta_cache.clone(), - option.range_cache.clone(), + meta_cache, + range_cache, gen, AsyncReader::new(file, size).await?, )) @@ -201,9 +204,7 @@ pub(crate) mod tests { let manager = StoreManager::new(FsOptions::Local, vec![]).unwrap(); let base_fs = manager.base_fs(); let record_batch = get_test_record_batch::( - DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap()) - .await - .unwrap(), + DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()), TokioExecutor::new(), ) .await; @@ -279,9 +280,7 @@ pub(crate) mod tests { let base_fs = manager.base_fs(); let record_batch = get_test_record_batch::( - DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap()) - .await - .unwrap(), + DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()), TokioExecutor::new(), ) .await; diff --git a/src/option.rs b/src/option.rs index ce844764..4efbcb85 100644 --- a/src/option.rs +++ b/src/option.rs @@ -1,11 +1,9 @@ use std::{ fmt::{Debug, Formatter}, marker::PhantomData, - sync::Arc, }; -use foyer::{CacheBuilder, DirectFsDeviceOptions, Engine, HybridCacheBuilder, LruConfig}; -use fusio::path::{path_to_local, Path}; +use fusio::path::Path; use fusio_dispatch::FsOptions; use parquet::{ basic::Compression, @@ -15,10 +13,7 @@ use parquet::{ }; use crate::{ - fs::{ - cache_reader::{MetaCache, RangeCache}, - CacheError, FileId, FileType, - }, + fs::{FileId, FileType}, record::Record, trigger::TriggerType, version::{Version, MAX_LEVEL}, @@ -30,6 +25,13 @@ const DEFAULT_WAL_BUFFER_SIZE: usize = 4 * 1024; /// configure the operating parameters of each component in the [`DB`](crate::DB) #[derive(Clone)] pub struct DbOption { + pub(crate) cache_path: Path, + pub(crate) cache_meta_capacity: usize, + pub(crate) cache_meta_shards: usize, + pub(crate) cache_meta_ratio: f64, + pub(crate) cache_range_memory: usize, + pub(crate) cache_range_disk: usize, + pub(crate) clean_channel_buffer: usize, pub(crate) base_path: Path, pub(crate) base_fs: FsOptions, @@ -42,8 +44,6 @@ pub struct DbOption { pub(crate) major_l_selection_table_max_num: usize, pub(crate) major_threshold_with_sst_size: usize, pub(crate) max_sst_file_size: usize, - pub(crate) meta_cache: MetaCache, - pub(crate) range_cache: RangeCache, pub(crate) version_log_snapshot_threshold: u32, pub(crate) trigger_type: TriggerType, pub(crate) use_wal: bool, @@ -56,33 +56,29 @@ impl DbOption where R: Record, { - pub async fn from_path(base_path: Path) -> Result> { - let (column_paths, sorting_columns) = R::primary_key_path(); - - DbOption::fn_new(base_path, column_paths, sorting_columns).await - } - /// build the default configured [`DbOption`] with base path and primary key - pub async fn with_path( - base_path: Path, - primary_key_name: String, - primary_key_index: usize, - ) -> Result> { + pub fn with_path(base_path: Path, primary_key_name: String, primary_key_index: usize) -> Self { let (column_paths, sorting_columns) = Self::primary_key_path(primary_key_name, primary_key_index); - Self::fn_new(base_path, column_paths, sorting_columns).await + Self::fn_new(base_path, column_paths, sorting_columns) } - async fn fn_new( + fn fn_new( base_path: Path, column_paths: ColumnPath, sorting_columns: Vec, - ) -> Result> { + ) -> Self { let cache_path = base_path.child("cache"); let memory = 64 * 1024 * 1024; - Ok(DbOption { + DbOption { + cache_path, + cache_meta_capacity: 32, + cache_meta_shards: 4, + cache_meta_ratio: 0.1, + cache_range_memory: memory, + cache_range_disk: 8 * memory, immutable_chunk_num: 3, immutable_chunk_max_num: 5, major_threshold_with_sst_size: 4, @@ -107,25 +103,7 @@ where version_log_snapshot_threshold: 200, level_paths: vec![None; MAX_LEVEL], base_fs: FsOptions::Local, - meta_cache: Arc::new( - CacheBuilder::new(32) - .with_shards(4) - .with_eviction_config(LruConfig { - high_priority_pool_ratio: 0.1, - }) - .build(), - ), - range_cache: HybridCacheBuilder::new() - .memory(memory) - .storage(Engine::Large) // use large object disk cache engine only - .with_device_options( - DirectFsDeviceOptions::new(path_to_local(&cache_path).unwrap()) - .with_capacity(8 * memory), - ) - .build() - .await - .map_err(CacheError::from)?, - }) + } } fn primary_key_path( @@ -142,6 +120,18 @@ where } } +impl From for DbOption +where + R: Record, +{ + /// build the default configured [`DbOption`] based on the passed path + fn from(base_path: Path) -> Self { + let (column_paths, sorting_columns) = R::primary_key_path(); + + DbOption::fn_new(base_path, column_paths, sorting_columns) + } +} + impl DbOption where R: Record, @@ -300,6 +290,12 @@ where impl Debug for DbOption { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("DbOption") + .field("cache_path", &self.cache_path) + .field("cache_meta_shards", &self.cache_meta_shards) + .field("cache_meta_capacity", &self.cache_meta_capacity) + .field("cache_meta_ratio", &self.cache_meta_ratio) + .field("cache_range_memory", &self.cache_range_memory) + .field("cache_range_disk", &self.cache_range_disk) .field("clean_channel_buffer", &self.clean_channel_buffer) .field("base_path", &self.base_path) // TODO @@ -320,8 +316,6 @@ impl Debug for DbOption { &self.major_threshold_with_sst_size, ) .field("max_sst_file_size", &self.max_sst_file_size) - .field("meta_cache", &self.meta_cache) - .field("range_cache", &self.range_cache) .field( "version_log_snapshot_threshold", &self.version_log_snapshot_threshold, diff --git a/src/stream/level.rs b/src/stream/level.rs index 8c561a45..923126bb 100644 --- a/src/stream/level.rs +++ b/src/stream/level.rs @@ -15,7 +15,10 @@ use futures_core::Stream; use parquet::{arrow::ProjectionMask, errors::ParquetError}; use crate::{ - fs::{CacheError, FileId, FileType}, + fs::{ + cache_reader::{MetaCache, RangeCache}, + CacheError, FileId, FileType, + }, ondisk::{scan::SsTableScan, sstable::SsTable}, record::Record, scope::Scope, @@ -47,6 +50,8 @@ where ts: Timestamp, level: usize, option: Arc>, + meta_cache: MetaCache, + range_cache: RangeCache, gens: VecDeque, limit: Option, projection_mask: ProjectionMask, @@ -87,6 +92,8 @@ where ts, level, option: version.option().clone(), + meta_cache: version.meta_cache(), + range_cache: version.range_cache(), gens, limit, projection_mask, @@ -167,11 +174,13 @@ where }, FutureStatus::OpenFile(file_future) => match Pin::new(file_future).poll(cx) { Poll::Ready(Ok(file)) => { - let option = self.option.clone(); + let meta_cache = self.meta_cache.clone(); + let range_cache = self.range_cache.clone(); let (_, gen) = self.path.clone().unwrap(); let is_local = self.is_local; - let future = - async move { SsTable::open(&option, file, gen, !is_local).await }; + let future = async move { + SsTable::open(file, gen, !is_local, range_cache, meta_cache).await + }; self.status = FutureStatus::OpenSst(Box::pin(future)); continue; } @@ -227,11 +236,9 @@ mod tests { async fn projection_scan() { let temp_dir = TempDir::new().unwrap(); let manager = StoreManager::new(FsOptions::Local, vec![]).unwrap(); - let option = Arc::new( - DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap()) - .await - .unwrap(), - ); + let option = Arc::new(DbOption::from( + Path::from_filesystem_path(temp_dir.path()).unwrap(), + )); manager .base_fs() diff --git a/src/stream/mem_projection.rs b/src/stream/mem_projection.rs index a9ecef04..6671e285 100644 --- a/src/stream/mem_projection.rs +++ b/src/stream/mem_projection.rs @@ -71,9 +71,7 @@ mod tests { async fn merge_mutable() { let temp_dir = tempfile::tempdir().unwrap(); let fs = Arc::new(TokioFs) as Arc; - let option = DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap()) - .await - .unwrap(); + let option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); fs.create_dir_all(&option.wal_dir_path()).await.unwrap(); diff --git a/src/stream/merge.rs b/src/stream/merge.rs index ef10949b..f58583c6 100644 --- a/src/stream/merge.rs +++ b/src/stream/merge.rs @@ -171,9 +171,7 @@ mod tests { async fn merge_mutable() { let temp_dir = tempfile::tempdir().unwrap(); let fs = Arc::new(TokioFs) as Arc; - let option = DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap()) - .await - .unwrap(); + let option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); fs.create_dir_all(&option.wal_dir_path()).await.unwrap(); @@ -267,9 +265,7 @@ mod tests { async fn merge_mutable_remove_duplicates() { let temp_dir = tempfile::tempdir().unwrap(); let fs = Arc::new(TokioFs) as Arc; - let option = DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap()) - .await - .unwrap(); + let option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); fs.create_dir_all(&option.wal_dir_path()).await.unwrap(); @@ -355,9 +351,7 @@ mod tests { async fn merge_mutable_limit() { let temp_dir = tempfile::tempdir().unwrap(); let fs = Arc::new(TokioFs) as Arc; - let option = DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap()) - .await - .unwrap(); + let option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); fs.create_dir_all(&option.wal_dir_path()).await.unwrap(); diff --git a/src/stream/package.rs b/src/stream/package.rs index be4561cf..c8493e8f 100644 --- a/src/stream/package.rs +++ b/src/stream/package.rs @@ -103,9 +103,7 @@ mod tests { async fn iter() { let temp_dir = TempDir::new().unwrap(); let fs = Arc::new(TokioFs) as Arc; - let option = DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap()) - .await - .unwrap(); + let option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); fs.create_dir_all(&option.wal_dir_path()).await.unwrap(); diff --git a/src/transaction.rs b/src/transaction.rs index 551d144b..3a9d38c1 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -277,9 +277,7 @@ mod tests { let temp_dir = TempDir::new().unwrap(); let db = DB::::new( - DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap()) - .await - .unwrap(), + DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()), TokioExecutor::new(), ) .await @@ -314,11 +312,9 @@ mod tests { async fn transaction_get() { let temp_dir = TempDir::new().unwrap(); let manager = Arc::new(StoreManager::new(FsOptions::Local, vec![]).unwrap()); - let option = Arc::new( - DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap()) - .await - .unwrap(), - ); + let option = Arc::new(DbOption::from( + Path::from_filesystem_path(temp_dir.path()).unwrap(), + )); manager .base_fs() @@ -406,9 +402,7 @@ mod tests { #[tokio::test] async fn write_conflicts() { let temp_dir = TempDir::new().unwrap(); - let option = DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap()) - .await - .unwrap(); + let option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); let db = DB::::new(option, TokioExecutor::new()) .await @@ -441,9 +435,7 @@ mod tests { #[tokio::test] async fn transaction_projection() { let temp_dir = TempDir::new().unwrap(); - let option = DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap()) - .await - .unwrap(); + let option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); let db = DB::::new(option, TokioExecutor::new()) .await @@ -471,11 +463,9 @@ mod tests { async fn transaction_scan() { let temp_dir = TempDir::new().unwrap(); let manager = Arc::new(StoreManager::new(FsOptions::Local, vec![]).unwrap()); - let option = Arc::new( - DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap()) - .await - .unwrap(), - ); + let option = Arc::new(DbOption::from( + Path::from_filesystem_path(temp_dir.path()).unwrap(), + )); manager .base_fs() @@ -568,11 +558,9 @@ mod tests { async fn test_transaction_scan_bound() { let temp_dir = TempDir::new().unwrap(); let manager = Arc::new(StoreManager::new(FsOptions::Local, vec![]).unwrap()); - let option = Arc::new( - DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap()) - .await - .unwrap(), - ); + let option = Arc::new(DbOption::from( + Path::from_filesystem_path(temp_dir.path()).unwrap(), + )); manager .base_fs() @@ -746,11 +734,9 @@ mod tests { async fn test_transaction_scan_limit() { let temp_dir = TempDir::new().unwrap(); let manager = Arc::new(StoreManager::new(FsOptions::Local, vec![]).unwrap()); - let option = Arc::new( - DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap()) - .await - .unwrap(), - ); + let option = Arc::new(DbOption::from( + Path::from_filesystem_path(temp_dir.path()).unwrap(), + )); manager .base_fs() @@ -820,9 +806,7 @@ mod tests { Path::from_filesystem_path(temp_dir.path()).unwrap(), "age".to_string(), 0, - ) - .await - .unwrap(); + ); let db = DB::with_schema(option, TokioExecutor::default(), descs, 0) .await .unwrap(); diff --git a/src/version/cleaner.rs b/src/version/cleaner.rs index 4693514c..3a01d71c 100644 --- a/src/version/cleaner.rs +++ b/src/version/cleaner.rs @@ -116,11 +116,9 @@ pub(crate) mod tests { async fn test_cleaner() { let temp_dir = TempDir::new().unwrap(); let manager = Arc::new(StoreManager::new(FsOptions::Local, vec![]).unwrap()); - let option = Arc::new( - DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap()) - .await - .unwrap(), - ); + let option = Arc::new(DbOption::from( + Path::from_filesystem_path(temp_dir.path()).unwrap(), + )); let gen_0 = FileId::new(); let gen_1 = FileId::new(); diff --git a/src/version/mod.rs b/src/version/mod.rs index 10c3bbbd..f061b925 100644 --- a/src/version/mod.rs +++ b/src/version/mod.rs @@ -17,7 +17,11 @@ use thiserror::Error; use tracing::error; use crate::{ - fs::{manager::StoreManager, CacheError, FileId, FileType}, + fs::{ + cache_reader::{MetaCache, RangeCache}, + manager::StoreManager, + CacheError, FileId, FileType, + }, ondisk::sstable::SsTable, record::Record, scope::Scope, @@ -49,6 +53,9 @@ where option: Arc>, timestamp: Arc, log_length: u32, + + range_cache: RangeCache, + meta_cache: MetaCache, } impl Version @@ -60,6 +67,8 @@ where option: Arc>, clean_sender: Sender, timestamp: Arc, + range_cache: RangeCache, + meta_cache: MetaCache, ) -> Self { Version { ts: Timestamp::from(0), @@ -68,12 +77,21 @@ where option: option.clone(), timestamp, log_length: 0, + range_cache, + meta_cache, } } pub(crate) fn option(&self) -> &Arc> { &self.option } + + pub(crate) fn meta_cache(&self) -> MetaCache { + self.meta_cache.clone() + } + pub(crate) fn range_cache(&self) -> RangeCache { + self.range_cache.clone() + } } impl TransactionTs for Version @@ -107,6 +125,8 @@ where option: self.option.clone(), timestamp: self.timestamp.clone(), log_length: self.log_length, + range_cache: self.range_cache.clone(), + meta_cache: self.meta_cache.clone(), } } } @@ -187,7 +207,7 @@ where .open_options(&path, FileType::Parquet.open_options(true)) .await .map_err(VersionError::Fusio)?; - SsTable::::open(&self.option, file, *gen, !is_local) + SsTable::::open(file, *gen, !is_local, self.range_cache(), self.meta_cache()) .await? .get(key, projection_mask) .await @@ -227,7 +247,14 @@ where .open_options(&path, FileType::Parquet.open_options(true)) .await .map_err(VersionError::Fusio)?; - let table = SsTable::open(&self.option, file, scope.gen, !is_local).await?; + let table = SsTable::open( + file, + scope.gen, + !is_local, + self.range_cache(), + self.meta_cache(), + ) + .await?; streams.push(ScanStream::SsTable { inner: table diff --git a/src/version/set.rs b/src/version/set.rs index d3ee1359..79da0b59 100644 --- a/src/version/set.rs +++ b/src/version/set.rs @@ -10,12 +10,17 @@ use std::{ use async_lock::RwLock; use flume::Sender; -use fusio::{dynamic::DynFile, fs::FileMeta}; +use foyer::{CacheBuilder, DirectFsDeviceOptions, Engine, HybridCacheBuilder, LruConfig}; +use fusio::{dynamic::DynFile, fs::FileMeta, path::path_to_local}; use futures_util::StreamExt; use super::{TransactionTs, MAX_LEVEL}; use crate::{ - fs::{manager::StoreManager, parse_file_id, FileId, FileType}, + fs::{ + cache_reader::{MetaCache, RangeCache}, + manager::StoreManager, + parse_file_id, CacheError, FileId, FileType, + }, record::Record, serdes::Encode, timestamp::Timestamp, @@ -62,6 +67,9 @@ where timestamp: Arc, option: Arc>, manager: Arc, + + range_cache: RangeCache, + meta_cache: MetaCache, } impl Clone for VersionSet @@ -75,6 +83,8 @@ where timestamp: self.timestamp.clone(), option: self.option.clone(), manager: self.manager.clone(), + range_cache: self.range_cache.clone(), + meta_cache: self.meta_cache.clone(), } } } @@ -148,6 +158,9 @@ where let timestamp = Arc::new(AtomicU32::default()); drop(log_stream); + + let (meta_cache, range_cache) = Self::build_cache(&option).await?; + let set = VersionSet:: { inner: Arc::new(RwLock::new(VersionSetInner { current: Arc::new(Version:: { @@ -157,6 +170,8 @@ where option: option.clone(), timestamp: timestamp.clone(), log_length: 0, + range_cache: range_cache.clone(), + meta_cache: meta_cache.clone(), }), log_with_id: (log, log_id), })), @@ -164,12 +179,38 @@ where timestamp, option, manager, + range_cache, + meta_cache, }; set.apply_edits(edits, None, true).await?; Ok(set) } + pub(crate) async fn build_cache( + option: &DbOption, + ) -> Result<(MetaCache, RangeCache), VersionError> { + let meta_cache = Arc::new( + CacheBuilder::new(option.cache_meta_capacity) + .with_shards(option.cache_meta_shards) + .with_eviction_config(LruConfig { + high_priority_pool_ratio: option.cache_meta_ratio, + }) + .build(), + ); + let range_cache = HybridCacheBuilder::new() + .memory(option.cache_range_memory) + .storage(Engine::Large) + .with_device_options( + DirectFsDeviceOptions::new(path_to_local(&option.cache_path).unwrap()) + .with_capacity(option.cache_range_memory), + ) + .build() + .await + .map_err(CacheError::from)?; + Ok((meta_cache, range_cache)) + } + pub(crate) async fn current(&self) -> VersionRef { self.inner.read().await.current.clone() } @@ -326,6 +367,7 @@ pub(crate) mod tests { ) .await?; let timestamp = version.timestamp.clone(); + let (meta_cache, range_cache) = VersionSet::build_cache(&option).await?; Ok(VersionSet:: { inner: Arc::new(RwLock::new(VersionSetInner { @@ -336,6 +378,8 @@ pub(crate) mod tests { timestamp, option, manager, + range_cache, + meta_cache, }) } @@ -344,11 +388,9 @@ pub(crate) mod tests { let temp_dir = TempDir::new().unwrap(); let manager = Arc::new(StoreManager::new(FsOptions::Local, vec![]).unwrap()); let (sender, _) = bounded(1); - let option = Arc::new( - DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap()) - .await - .unwrap(), - ); + let option = Arc::new(DbOption::from( + Path::from_filesystem_path(temp_dir.path()).unwrap(), + )); manager .base_fs() .create_dir_all(&option.version_log_dir_path()) @@ -383,9 +425,7 @@ pub(crate) mod tests { let temp_dir = TempDir::new().unwrap(); let manager = Arc::new(StoreManager::new(FsOptions::Local, vec![]).unwrap()); let (sender, _) = bounded(1); - let mut option = DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap()) - .await - .unwrap(); + let mut option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); option.version_log_snapshot_threshold = 4; let option = Arc::new(option); @@ -512,11 +552,9 @@ pub(crate) mod tests { async fn version_level_sort() { let temp_dir = TempDir::new().unwrap(); let manager = Arc::new(StoreManager::new(FsOptions::Local, vec![]).unwrap()); - let option = Arc::new( - DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap()) - .await - .unwrap(), - ); + let option = Arc::new(DbOption::from( + Path::from_filesystem_path(temp_dir.path()).unwrap(), + )); let (sender, _) = bounded(1); manager diff --git a/tests/data_integrity.rs b/tests/data_integrity.rs index 2b8759af..239d0d83 100644 --- a/tests/data_integrity.rs +++ b/tests/data_integrity.rs @@ -70,9 +70,7 @@ mod tests { let mut write_hasher = crc32fast::Hasher::new(); let temp_dir = TempDir::new().unwrap(); - let option = DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap()) - .await - .unwrap(); + let option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); let db: DB = DB::new(option, TokioExecutor::new()).await.unwrap();