diff --git a/Cargo.toml b/Cargo.toml index fb5e4e74..ea42d346 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -86,7 +86,7 @@ tokio = { version = "1", features = ["io-util"], default-features = false } tokio-util = { version = "0.7" } tonbo_macros = { version = "0.1.0", path = "tonbo_macros" } tracing = "0.1" -ulid = "1" +ulid = { version = "1", features = ["serde"] } # Only used for benchmarks log = "0.4.22" diff --git a/examples/declare.rs b/examples/declare.rs index 6e6edcd3..5b569abe 100644 --- a/examples/declare.rs +++ b/examples/declare.rs @@ -22,7 +22,9 @@ async fn main() { // make sure the path exists let _ = fs::create_dir_all("./db_path/users").await; - let options = DbOption::from(Path::from_filesystem_path("./db_path/users").unwrap()); + let options = DbOption::from_path(Path::from_filesystem_path("./db_path/users").unwrap()) + .await + .unwrap(); // pluggable async runtime and I/O let db = DB::new(options, TokioExecutor::default()).await.unwrap(); diff --git a/src/compaction/mod.rs b/src/compaction/mod.rs index aa45d57a..c759e3c7 100644 --- a/src/compaction/mod.rs +++ b/src/compaction/mod.rs @@ -218,7 +218,7 @@ where .await?; streams.push(ScanStream::SsTable { - inner: SsTable::open(option, file, path, is_local) + inner: SsTable::open(option, file, scope.gen, !is_local) .await? .scan( (Bound::Unbounded, Bound::Unbounded), @@ -583,7 +583,9 @@ pub(crate) mod tests { let temp_dir = tempfile::tempdir().unwrap(); let temp_dir_l0 = tempfile::tempdir().unwrap(); - let option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()) + let option = DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap()) + .await + .unwrap() .level_path( 0, Path::from_filesystem_path(temp_dir_l0.path()).unwrap(), @@ -697,7 +699,9 @@ pub(crate) mod tests { Path::from_filesystem_path(temp_dir.path()).unwrap(), "id".to_string(), 0, - ); + ) + .await + .unwrap(); manager .base_fs() .create_dir_all(&option.wal_dir_path()) @@ -765,7 +769,9 @@ pub(crate) mod tests { let temp_dir_l0 = TempDir::new().unwrap(); let temp_dir_l1 = TempDir::new().unwrap(); - let mut option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()) + let mut option = DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap()) + .await + .unwrap() .level_path( 0, Path::from_filesystem_path(temp_dir_l0.path()).unwrap(), @@ -1104,7 +1110,9 @@ pub(crate) mod tests { pub(crate) async fn major_panic() { let temp_dir = TempDir::new().unwrap(); - let mut option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); + let mut option = DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap()) + .await + .unwrap(); option.major_threshold_with_sst_size = 1; option.level_sst_magnification = 1; let manager = @@ -1211,7 +1219,9 @@ pub(crate) mod tests { async fn test_flush_major_level_sort() { let temp_dir = TempDir::new().unwrap(); - let mut option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); + let mut option = DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap()) + .await + .unwrap(); option.immutable_chunk_num = 1; option.immutable_chunk_max_num = 0; option.major_threshold_with_sst_size = 2; diff --git a/src/fs/cache_reader.rs b/src/fs/cache_reader.rs index 5fb25ad7..458bb9fa 100644 --- a/src/fs/cache_reader.rs +++ b/src/fs/cache_reader.rs @@ -1,62 +1,54 @@ use std::{ops::Range, sync::Arc}; use bytes::Bytes; -use foyer::{Cache, DirectFsDeviceOptions, Engine, HybridCache, HybridCacheBuilder}; -use fusio::path::{path_to_local, Path}; +use foyer::{Cache, HybridCache}; use futures_core::future::BoxFuture; use futures_util::FutureExt; use parquet::{arrow::async_reader::AsyncFileReader, file::metadata::ParquetMetaData}; -use crate::fs::{CacheError, CacheOption}; +use crate::fs::FileId; -pub(crate) type MetaCache = Arc>>; +pub(crate) type MetaCache = Arc>>; +pub(crate) type RangeCache = HybridCache<(FileId, Range), Bytes>; pub(crate) struct CacheReader { + gen: FileId, inner: R, - meta_path: Path, - cache: HybridCache, Bytes>, + range_cache: RangeCache, meta_cache: MetaCache, } impl CacheReader { - pub(crate) async fn new( - option: &CacheOption, + pub(crate) fn new( meta_cache: MetaCache, + range_cache: RangeCache, + gen: FileId, inner: R, - meta_path: Path, - ) -> Result, CacheError> { - // SAFETY: `meta_path` must be the path of a parquet file - let path = path_to_local(&option.path.child(meta_path.filename().unwrap()))?; - let cache: HybridCache, Bytes> = HybridCacheBuilder::new() - .memory(option.memory) - .storage(Engine::Large) // use large object disk cache engine only - .with_device_options(DirectFsDeviceOptions::new(path).with_capacity(option.local)) - .build() - .await?; - - Ok(Self { + ) -> CacheReader { + Self { + gen, inner, - meta_path, - cache, + range_cache, meta_cache, - }) + } } } impl AsyncFileReader for CacheReader { fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, parquet::errors::Result> { async move { + let key = (self.gen, range); if let Some(entry) = self - .cache - .get(&range) + .range_cache + .get(&key) .await .map_err(|e| parquet::errors::ParquetError::External(From::from(e)))? { return Ok(entry.value().clone()); } - let bytes = self.inner.get_bytes(range.clone()).await?; - let entry = self.cache.insert(range, bytes); + let bytes = self.inner.get_bytes(key.1.clone()).await?; + let entry = self.range_cache.insert(key, bytes); Ok(entry.value().clone()) } .boxed() @@ -64,7 +56,7 @@ impl AsyncFileReader for CacheReader { fn get_metadata(&mut self) -> BoxFuture<'_, parquet::errors::Result>> { async move { - if let Some(entry) = self.meta_cache.get(&self.meta_path) { + if let Some(entry) = self.meta_cache.get(&self.gen) { return Ok(entry.value().clone()); } @@ -73,10 +65,173 @@ impl AsyncFileReader for CacheReader { .get_metadata() .await .map_err(|e| parquet::errors::ParquetError::External(From::from(e)))?; - let entry = self.meta_cache.insert(self.meta_path.clone(), meta); + let entry = self.meta_cache.insert(self.gen, meta); Ok(entry.value().clone()) } .boxed() } } + +#[cfg(test)] +pub(crate) mod tests { + use std::{ + collections::Bound, + sync::{ + atomic::{AtomicUsize, Ordering::SeqCst}, + Arc, + }, + }; + + use fusio::{dynamic::DynFile, path::Path, Error, IoBuf, IoBufMut, Read, Write}; + use futures_util::StreamExt; + use parquet::arrow::{arrow_to_parquet_schema, ProjectionMask}; + use tempfile::TempDir; + use ulid::Ulid; + + use crate::{ + compaction::tests::build_parquet_table, + fs::FileType, + ondisk::sstable::SsTable, + record::{Record, RecordInstance}, + tests::Test, + wal::log::LogType, + DbOption, + }; + + struct CountFile { + inner: Box, + read_count: Arc, + } + + impl Read for CountFile { + async fn read_exact_at(&mut self, buf: B, pos: u64) -> (Result<(), Error>, B) { + self.read_count.fetch_add(1, SeqCst); + self.inner.read_exact_at(buf, pos).await + } + + async fn read_to_end_at(&mut self, buf: Vec, pos: u64) -> (Result<(), Error>, Vec) { + self.read_count.fetch_add(1, SeqCst); + self.inner.read_to_end_at(buf, pos).await + } + + async fn size(&self) -> Result { + self.inner.size().await + } + } + + impl Write for CountFile { + async fn write_all(&mut self, buf: B) -> (Result<(), Error>, B) { + self.inner.write_all(buf).await + } + + async fn flush(&mut self) -> Result<(), Error> { + self.inner.flush().await + } + + async fn close(&mut self) -> Result<(), Error> { + self.inner.close().await + } + } + + #[tokio::test] + async fn test_cache_read() { + let temp_dir = TempDir::new().unwrap(); + let option = + DbOption::::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap()) + .await + .unwrap(); + let fs = option.base_fs.clone().parse().unwrap(); + fs.create_dir_all(&option.version_log_dir_path()) + .await + .unwrap(); + fs.create_dir_all(&option.wal_dir_path()).await.unwrap(); + + let table_gen = Ulid::new(); + build_parquet_table::( + &option, + table_gen, + vec![ + ( + LogType::Full, + Test { + vstring: 1.to_string(), + vu32: 0, + vbool: Some(true), + }, + 0.into(), + ), + ( + LogType::Full, + Test { + vstring: 2.to_string(), + vu32: 0, + vbool: Some(true), + }, + 0.into(), + ), + ( + LogType::Full, + Test { + vstring: 3.to_string(), + vu32: 0, + vbool: Some(true), + }, + 0.into(), + ), + ], + &RecordInstance::Normal, + 0, + &fs, + ) + .await + .unwrap(); + + let read_count = Arc::new(AtomicUsize::new(0)); + let table_path = option.table_path(&table_gen, 0); + for _ in 0..1000 { + let mut scan = SsTable::::open( + &option, + Box::new(CountFile { + inner: fs + .open_options(&table_path, FileType::Parquet.open_options(true)) + .await + .unwrap(), + read_count: read_count.clone(), + }), + table_gen, + true, + ) + .await + .unwrap() + .scan( + (Bound::Unbounded, Bound::Unbounded), + 0_u32.into(), + None, + ProjectionMask::roots( + &arrow_to_parquet_schema(Test::arrow_schema()).unwrap(), + [0, 1, 2, 3, 4], + ), + ) + .await + .unwrap(); + + let entry_0 = scan.next().await.unwrap().unwrap(); + assert_eq!(entry_0.get().unwrap().vstring, "1"); + assert_eq!(entry_0.get().unwrap().vu32, Some(0)); + assert_eq!(entry_0.get().unwrap().vbool, Some(true)); + + let entry_1 = scan.next().await.unwrap().unwrap(); + assert_eq!(entry_1.get().unwrap().vstring, "2"); + assert_eq!(entry_1.get().unwrap().vu32, Some(0)); + assert_eq!(entry_1.get().unwrap().vbool, Some(true)); + + let entry_2 = scan.next().await.unwrap().unwrap(); + assert_eq!(entry_2.get().unwrap().vstring, "3"); + assert_eq!(entry_2.get().unwrap().vu32, Some(0)); + assert_eq!(entry_2.get().unwrap().vbool, Some(true)); + } + + assert_eq!(read_count.load(SeqCst), 9); + } +} diff --git a/src/fs/mod.rs b/src/fs/mod.rs index aa79e977..343ffb7f 100644 --- a/src/fs/mod.rs +++ b/src/fs/mod.rs @@ -55,13 +55,6 @@ pub(crate) fn parse_file_id(path: &Path, suffix: FileType) -> Result; - let option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); + let option = DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap()) + .await + .unwrap(); fs.create_dir_all(&option.wal_dir_path()).await.unwrap(); let trigger = Arc::new(TriggerFactory::create(option.trigger_type)); @@ -265,7 +267,9 @@ mod tests { async fn range() { let temp_dir = tempfile::tempdir().unwrap(); let fs = Arc::new(TokioFs) as Arc; - let option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); + let option = DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap()) + .await + .unwrap(); fs.create_dir_all(&option.wal_dir_path()).await.unwrap(); let trigger = Arc::new(TriggerFactory::create(option.trigger_type)); @@ -352,7 +356,9 @@ mod tests { Path::from_filesystem_path(temp_dir.path()).unwrap(), "age".to_string(), 0, - ); + ) + .await + .unwrap(); let fs = Arc::new(TokioFs) as Arc; fs.create_dir_all(&option.wal_dir_path()).await.unwrap(); diff --git a/src/lib.rs b/src/lib.rs index 0aa5b92b..6a29d407 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -54,7 +54,7 @@ //! // make sure the path exists //! let _ = fs::create_dir_all("./db_path/users").await; //! -//! let options = DbOption::from(Path::from_filesystem_path("./db_path/users").unwrap()); +//! let options = DbOption::from_path(Path::from_filesystem_path("./db_path/users").unwrap()).await.unwrap(); //! // pluggable async runtime and I/O //! let db = DB::new(options, TokioExecutor::default()).await.unwrap(); //! // insert with owned value @@ -1501,7 +1501,9 @@ pub(crate) mod tests { let path = Path::from_filesystem_path(temp_dir.path()).unwrap(); let path_l0 = Path::from_filesystem_path(temp_dir_l0.path()).unwrap(); - let mut option = DbOption::from(path) + let mut option = DbOption::from_path(path) + .await + .unwrap() .level_path(0, path_l0, FsOptions::Local) .unwrap(); option.immutable_chunk_num = 1; @@ -1539,7 +1541,9 @@ pub(crate) mod tests { async fn test_flush() { let temp_dir = TempDir::new().unwrap(); - let mut option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); + let mut option = DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap()) + .await + .unwrap(); option.immutable_chunk_num = 1; option.immutable_chunk_max_num = 1; option.major_threshold_with_sst_size = 3; @@ -1567,9 +1571,11 @@ pub(crate) mod tests { let temp_dir = TempDir::new().unwrap(); let fs = Arc::new(TokioFs) as Arc; - let option = Arc::new(DbOption::from( - Path::from_filesystem_path(temp_dir.path()).unwrap(), - )); + let option = Arc::new( + DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap()) + .await + .unwrap(), + ); fs.create_dir_all(&option.wal_dir_path()).await.unwrap(); let (task_tx, _task_rx) = bounded(1); @@ -1626,11 +1632,15 @@ pub(crate) mod tests { let manager = StoreManager::new(FsOptions::Local, vec![]).unwrap(); let (desc, primary_key_index) = test_dyn_item_schema(); - let option = Arc::new(DbOption::with_path( - Path::from_filesystem_path(temp_dir.path()).unwrap(), - "id".to_owned(), - primary_key_index, - )); + let option = Arc::new( + DbOption::with_path( + Path::from_filesystem_path(temp_dir.path()).unwrap(), + "id".to_owned(), + primary_key_index, + ) + .await + .unwrap(), + ); manager .base_fs() .create_dir_all(&option.wal_dir_path()) @@ -1663,7 +1673,9 @@ pub(crate) mod tests { Path::from_filesystem_path(temp_dir.path()).unwrap(), "id".to_owned(), primary_key_index, - ); + ) + .await + .unwrap(); let db: DB = DB::with_schema(option, TokioExecutor::new(), desc, primary_key_index) .await @@ -1698,7 +1710,9 @@ pub(crate) mod tests { async fn test_get_removed() { let temp_dir = TempDir::new().unwrap(); - let mut option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); + let mut option = DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap()) + .await + .unwrap(); option.immutable_chunk_num = 1; option.immutable_chunk_max_num = 1; option.major_threshold_with_sst_size = 3; @@ -1737,7 +1751,9 @@ pub(crate) mod tests { Path::from_filesystem_path(temp_dir.path()).unwrap(), "id".to_string(), primary_key_index, - ); + ) + .await + .unwrap(); option.immutable_chunk_num = 1; option.immutable_chunk_max_num = 1; option.major_threshold_with_sst_size = 3; @@ -1945,7 +1961,9 @@ pub(crate) mod tests { Path::from_filesystem_path(temp_dir1.path()).unwrap(), "id".to_string(), primary_key_index, - ); + ) + .await + .unwrap(); option.immutable_chunk_num = 1; option.immutable_chunk_max_num = 1; option.major_threshold_with_sst_size = 3; @@ -1957,7 +1975,9 @@ pub(crate) mod tests { Path::from_filesystem_path(temp_dir2.path()).unwrap(), "id".to_string(), primary_key_index, - ); + ) + .await + .unwrap(); option2.immutable_chunk_num = 1; option2.immutable_chunk_max_num = 1; option2.major_threshold_with_sst_size = 3; @@ -1969,7 +1989,9 @@ pub(crate) mod tests { Path::from_filesystem_path(temp_dir3.path()).unwrap(), "id".to_string(), primary_key_index, - ); + ) + .await + .unwrap(); option3.immutable_chunk_num = 1; option3.immutable_chunk_max_num = 1; option3.major_threshold_with_sst_size = 3; diff --git a/src/ondisk/sstable.rs b/src/ondisk/sstable.rs index 14fe3299..33988f9a 100644 --- a/src/ondisk/sstable.rs +++ b/src/ondisk/sstable.rs @@ -1,6 +1,6 @@ use std::{marker::PhantomData, ops::Bound}; -use fusio::{dynamic::DynFile, path::Path, DynRead}; +use fusio::{dynamic::DynFile, DynRead}; use fusio_parquet::reader::AsyncReader; use futures_util::StreamExt; use parquet::arrow::{ @@ -11,7 +11,7 @@ use parquet::arrow::{ use super::{arrows::get_range_filter, scan::SsTableScan}; use crate::{ - fs::{cache_reader::CacheReader, CacheError}, + fs::{cache_reader::CacheReader, CacheError, FileId}, record::Record, stream::record_batch::RecordBatchEntry, timestamp::{Timestamp, TimestampedRef}, @@ -33,22 +33,19 @@ where pub(crate) async fn open( option: &DbOption, file: Box, - path: Path, - is_local: bool, + gen: FileId, + enable_cache: bool, ) -> Result { let size = file.size().await?; - let reader = if is_local { + let reader = if !enable_cache { Box::new(AsyncReader::new(file, size).await?) as Box } else { - Box::new( - CacheReader::new( - &option.cache_option, - option.meta_cache.clone(), - AsyncReader::new(file, size).await?, - path, - ) - .await?, - ) + Box::new(CacheReader::new( + option.meta_cache.clone(), + option.range_cache.clone(), + gen, + AsyncReader::new(file, size).await?, + )) }; Ok(SsTable { @@ -204,7 +201,9 @@ pub(crate) mod tests { let manager = StoreManager::new(FsOptions::Local, vec![]).unwrap(); let base_fs = manager.base_fs(); let record_batch = get_test_record_batch::( - DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()), + DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap()) + .await + .unwrap(), TokioExecutor::new(), ) .await; @@ -280,7 +279,9 @@ pub(crate) mod tests { let base_fs = manager.base_fs(); let record_batch = get_test_record_batch::( - DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()), + DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap()) + .await + .unwrap(), TokioExecutor::new(), ) .await; diff --git a/src/option.rs b/src/option.rs index 6b9c9e0b..d79a85bc 100644 --- a/src/option.rs +++ b/src/option.rs @@ -4,8 +4,8 @@ use std::{ sync::Arc, }; -use foyer::{CacheBuilder, LruConfig}; -use fusio::path::Path; +use foyer::{CacheBuilder, DirectFsDeviceOptions, Engine, HybridCacheBuilder, LruConfig}; +use fusio::path::{path_to_local, Path}; use fusio_dispatch::FsOptions; use parquet::{ basic::Compression, @@ -15,7 +15,10 @@ use parquet::{ }; use crate::{ - fs::{cache_reader::MetaCache, CacheOption, FileId, FileType}, + fs::{ + cache_reader::{MetaCache, RangeCache}, + CacheError, FileId, FileType, + }, record::Record, trigger::TriggerType, version::{Version, MAX_LEVEL}, @@ -28,7 +31,6 @@ pub struct DbOption { pub(crate) clean_channel_buffer: usize, pub(crate) base_path: Path, pub(crate) base_fs: FsOptions, - pub(crate) cache_option: CacheOption, // TODO: DEBUG pub(crate) level_paths: Vec>, pub(crate) immutable_chunk_num: usize, @@ -39,6 +41,7 @@ pub struct DbOption { pub(crate) major_threshold_with_sst_size: usize, pub(crate) max_sst_file_size: usize, pub(crate) meta_cache: MetaCache, + pub(crate) range_cache: RangeCache, pub(crate) version_log_snapshot_threshold: u32, pub(crate) trigger_type: TriggerType, pub(crate) use_wal: bool, @@ -50,23 +53,33 @@ impl DbOption where R: Record, { + pub async fn from_path(base_path: Path) -> Result> { + let (column_paths, sorting_columns) = R::primary_key_path(); + + DbOption::fn_new(base_path, column_paths, sorting_columns).await + } + /// build the default configured [`DbOption`] with base path and primary key - pub fn with_path(base_path: Path, primary_key_name: String, primary_key_index: usize) -> Self { + pub async fn with_path( + base_path: Path, + primary_key_name: String, + primary_key_index: usize, + ) -> Result> { let (column_paths, sorting_columns) = Self::primary_key_path(primary_key_name, primary_key_index); - Self::fn_new(base_path, column_paths, sorting_columns) + Self::fn_new(base_path, column_paths, sorting_columns).await } - fn fn_new( + async fn fn_new( base_path: Path, column_paths: ColumnPath, sorting_columns: Vec, - ) -> Self { + ) -> Result> { let cache_path = base_path.child("cache"); let memory = 64 * 1024 * 1024; - DbOption { + Ok(DbOption { immutable_chunk_num: 3, immutable_chunk_max_num: 5, major_threshold_with_sst_size: 4, @@ -90,11 +103,6 @@ where version_log_snapshot_threshold: 200, level_paths: vec![None; MAX_LEVEL], base_fs: FsOptions::Local, - cache_option: CacheOption { - path: cache_path, - memory, - local: 8 * memory, - }, meta_cache: Arc::new( CacheBuilder::new(32) .with_shards(4) @@ -103,7 +111,17 @@ where }) .build(), ), - } + range_cache: HybridCacheBuilder::new() + .memory(memory) + .storage(Engine::Large) // use large object disk cache engine only + .with_device_options( + DirectFsDeviceOptions::new(path_to_local(&cache_path).unwrap()) + .with_capacity(8 * memory), + ) + .build() + .await + .map_err(CacheError::from)?, + }) } fn primary_key_path( @@ -120,18 +138,6 @@ where } } -impl From for DbOption -where - R: Record, -{ - /// build the default configured [`DbOption`] based on the passed path - fn from(base_path: Path) -> Self { - let (column_paths, sorting_columns) = R::primary_key_path(); - - DbOption::fn_new(base_path, column_paths, sorting_columns) - } -} - impl DbOption where R: Record, @@ -284,7 +290,6 @@ impl Debug for DbOption { f.debug_struct("DbOption") .field("clean_channel_buffer", &self.clean_channel_buffer) .field("base_path", &self.base_path) - .field("cache_option", &self.cache_option) // TODO // .field("level_paths", &self.level_paths) .field("immutable_chunk_num", &self.immutable_chunk_num) @@ -304,6 +309,7 @@ impl Debug for DbOption { ) .field("max_sst_file_size", &self.max_sst_file_size) .field("meta_cache", &self.meta_cache) + .field("range_cache", &self.range_cache) .field( "version_log_snapshot_threshold", &self.version_log_snapshot_threshold, diff --git a/src/stream/level.rs b/src/stream/level.rs index 96027f76..8c561a45 100644 --- a/src/stream/level.rs +++ b/src/stream/level.rs @@ -53,7 +53,7 @@ where status: FutureStatus<'level, R>, fs: Arc, is_local: bool, - path: Option, + path: Option<(Path, FileId)>, } impl<'level, R> LevelStream<'level, R> @@ -109,10 +109,10 @@ where return match &mut self.status { FutureStatus::Init(gen) => { let gen = *gen; - self.path = Some(self.option.table_path(&gen, self.level)); + self.path = Some((self.option.table_path(&gen, self.level), gen)); let reader = self.fs.open_options( - self.path.as_ref().unwrap(), + &self.path.as_ref().unwrap().0, FileType::Parquet.open_options(true), ); #[allow(clippy::missing_transmute_annotations)] @@ -134,10 +134,10 @@ where Poll::Ready(None) => match self.gens.pop_front() { None => Poll::Ready(None), Some(gen) => { - self.path = Some(self.option.table_path(&gen, self.level)); + self.path = Some((self.option.table_path(&gen, self.level), gen)); let reader = self.fs.open_options( - self.path.as_ref().unwrap(), + &self.path.as_ref().unwrap().0, FileType::Parquet.open_options(true), ); #[allow(clippy::missing_transmute_annotations)] @@ -168,10 +168,10 @@ where FutureStatus::OpenFile(file_future) => match Pin::new(file_future).poll(cx) { Poll::Ready(Ok(file)) => { let option = self.option.clone(); - let path = self.path.clone().unwrap(); + let (_, gen) = self.path.clone().unwrap(); let is_local = self.is_local; let future = - async move { SsTable::open(&option, file, path, is_local).await }; + async move { SsTable::open(&option, file, gen, !is_local).await }; self.status = FutureStatus::OpenSst(Box::pin(future)); continue; } @@ -227,9 +227,11 @@ mod tests { async fn projection_scan() { let temp_dir = TempDir::new().unwrap(); let manager = StoreManager::new(FsOptions::Local, vec![]).unwrap(); - let option = Arc::new(DbOption::from( - Path::from_filesystem_path(temp_dir.path()).unwrap(), - )); + let option = Arc::new( + DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap()) + .await + .unwrap(), + ); manager .base_fs() diff --git a/src/stream/mem_projection.rs b/src/stream/mem_projection.rs index 6671e285..a9ecef04 100644 --- a/src/stream/mem_projection.rs +++ b/src/stream/mem_projection.rs @@ -71,7 +71,9 @@ mod tests { async fn merge_mutable() { let temp_dir = tempfile::tempdir().unwrap(); let fs = Arc::new(TokioFs) as Arc; - let option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); + let option = DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap()) + .await + .unwrap(); fs.create_dir_all(&option.wal_dir_path()).await.unwrap(); diff --git a/src/stream/merge.rs b/src/stream/merge.rs index f58583c6..ef10949b 100644 --- a/src/stream/merge.rs +++ b/src/stream/merge.rs @@ -171,7 +171,9 @@ mod tests { async fn merge_mutable() { let temp_dir = tempfile::tempdir().unwrap(); let fs = Arc::new(TokioFs) as Arc; - let option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); + let option = DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap()) + .await + .unwrap(); fs.create_dir_all(&option.wal_dir_path()).await.unwrap(); @@ -265,7 +267,9 @@ mod tests { async fn merge_mutable_remove_duplicates() { let temp_dir = tempfile::tempdir().unwrap(); let fs = Arc::new(TokioFs) as Arc; - let option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); + let option = DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap()) + .await + .unwrap(); fs.create_dir_all(&option.wal_dir_path()).await.unwrap(); @@ -351,7 +355,9 @@ mod tests { async fn merge_mutable_limit() { let temp_dir = tempfile::tempdir().unwrap(); let fs = Arc::new(TokioFs) as Arc; - let option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); + let option = DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap()) + .await + .unwrap(); fs.create_dir_all(&option.wal_dir_path()).await.unwrap(); diff --git a/src/stream/package.rs b/src/stream/package.rs index c8493e8f..be4561cf 100644 --- a/src/stream/package.rs +++ b/src/stream/package.rs @@ -103,7 +103,9 @@ mod tests { async fn iter() { let temp_dir = TempDir::new().unwrap(); let fs = Arc::new(TokioFs) as Arc; - let option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); + let option = DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap()) + .await + .unwrap(); fs.create_dir_all(&option.wal_dir_path()).await.unwrap(); diff --git a/src/transaction.rs b/src/transaction.rs index 3a9d38c1..551d144b 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -277,7 +277,9 @@ mod tests { let temp_dir = TempDir::new().unwrap(); let db = DB::::new( - DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()), + DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap()) + .await + .unwrap(), TokioExecutor::new(), ) .await @@ -312,9 +314,11 @@ mod tests { async fn transaction_get() { let temp_dir = TempDir::new().unwrap(); let manager = Arc::new(StoreManager::new(FsOptions::Local, vec![]).unwrap()); - let option = Arc::new(DbOption::from( - Path::from_filesystem_path(temp_dir.path()).unwrap(), - )); + let option = Arc::new( + DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap()) + .await + .unwrap(), + ); manager .base_fs() @@ -402,7 +406,9 @@ mod tests { #[tokio::test] async fn write_conflicts() { let temp_dir = TempDir::new().unwrap(); - let option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); + let option = DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap()) + .await + .unwrap(); let db = DB::::new(option, TokioExecutor::new()) .await @@ -435,7 +441,9 @@ mod tests { #[tokio::test] async fn transaction_projection() { let temp_dir = TempDir::new().unwrap(); - let option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); + let option = DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap()) + .await + .unwrap(); let db = DB::::new(option, TokioExecutor::new()) .await @@ -463,9 +471,11 @@ mod tests { async fn transaction_scan() { let temp_dir = TempDir::new().unwrap(); let manager = Arc::new(StoreManager::new(FsOptions::Local, vec![]).unwrap()); - let option = Arc::new(DbOption::from( - Path::from_filesystem_path(temp_dir.path()).unwrap(), - )); + let option = Arc::new( + DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap()) + .await + .unwrap(), + ); manager .base_fs() @@ -558,9 +568,11 @@ mod tests { async fn test_transaction_scan_bound() { let temp_dir = TempDir::new().unwrap(); let manager = Arc::new(StoreManager::new(FsOptions::Local, vec![]).unwrap()); - let option = Arc::new(DbOption::from( - Path::from_filesystem_path(temp_dir.path()).unwrap(), - )); + let option = Arc::new( + DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap()) + .await + .unwrap(), + ); manager .base_fs() @@ -734,9 +746,11 @@ mod tests { async fn test_transaction_scan_limit() { let temp_dir = TempDir::new().unwrap(); let manager = Arc::new(StoreManager::new(FsOptions::Local, vec![]).unwrap()); - let option = Arc::new(DbOption::from( - Path::from_filesystem_path(temp_dir.path()).unwrap(), - )); + let option = Arc::new( + DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap()) + .await + .unwrap(), + ); manager .base_fs() @@ -806,7 +820,9 @@ mod tests { Path::from_filesystem_path(temp_dir.path()).unwrap(), "age".to_string(), 0, - ); + ) + .await + .unwrap(); let db = DB::with_schema(option, TokioExecutor::default(), descs, 0) .await .unwrap(); diff --git a/src/version/cleaner.rs b/src/version/cleaner.rs index 3a01d71c..4693514c 100644 --- a/src/version/cleaner.rs +++ b/src/version/cleaner.rs @@ -116,9 +116,11 @@ pub(crate) mod tests { async fn test_cleaner() { let temp_dir = TempDir::new().unwrap(); let manager = Arc::new(StoreManager::new(FsOptions::Local, vec![]).unwrap()); - let option = Arc::new(DbOption::from( - Path::from_filesystem_path(temp_dir.path()).unwrap(), - )); + let option = Arc::new( + DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap()) + .await + .unwrap(), + ); let gen_0 = FileId::new(); let gen_1 = FileId::new(); diff --git a/src/version/mod.rs b/src/version/mod.rs index ed5220da..10c3bbbd 100644 --- a/src/version/mod.rs +++ b/src/version/mod.rs @@ -187,7 +187,7 @@ where .open_options(&path, FileType::Parquet.open_options(true)) .await .map_err(VersionError::Fusio)?; - SsTable::::open(&self.option, file, path, is_local) + SsTable::::open(&self.option, file, *gen, !is_local) .await? .get(key, projection_mask) .await @@ -227,7 +227,7 @@ where .open_options(&path, FileType::Parquet.open_options(true)) .await .map_err(VersionError::Fusio)?; - let table = SsTable::open(&self.option, file, path, is_local).await?; + let table = SsTable::open(&self.option, file, scope.gen, !is_local).await?; streams.push(ScanStream::SsTable { inner: table diff --git a/src/version/set.rs b/src/version/set.rs index 586ca89e..d3ee1359 100644 --- a/src/version/set.rs +++ b/src/version/set.rs @@ -344,9 +344,11 @@ pub(crate) mod tests { let temp_dir = TempDir::new().unwrap(); let manager = Arc::new(StoreManager::new(FsOptions::Local, vec![]).unwrap()); let (sender, _) = bounded(1); - let option = Arc::new(DbOption::from( - Path::from_filesystem_path(temp_dir.path()).unwrap(), - )); + let option = Arc::new( + DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap()) + .await + .unwrap(), + ); manager .base_fs() .create_dir_all(&option.version_log_dir_path()) @@ -381,7 +383,9 @@ pub(crate) mod tests { let temp_dir = TempDir::new().unwrap(); let manager = Arc::new(StoreManager::new(FsOptions::Local, vec![]).unwrap()); let (sender, _) = bounded(1); - let mut option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); + let mut option = DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap()) + .await + .unwrap(); option.version_log_snapshot_threshold = 4; let option = Arc::new(option); @@ -508,9 +512,11 @@ pub(crate) mod tests { async fn version_level_sort() { let temp_dir = TempDir::new().unwrap(); let manager = Arc::new(StoreManager::new(FsOptions::Local, vec![]).unwrap()); - let option = Arc::new(DbOption::from( - Path::from_filesystem_path(temp_dir.path()).unwrap(), - )); + let option = Arc::new( + DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap()) + .await + .unwrap(), + ); let (sender, _) = bounded(1); manager diff --git a/tests/data_integrity.rs b/tests/data_integrity.rs index 239d0d83..2b8759af 100644 --- a/tests/data_integrity.rs +++ b/tests/data_integrity.rs @@ -70,7 +70,9 @@ mod tests { let mut write_hasher = crc32fast::Hasher::new(); let temp_dir = TempDir::new().unwrap(); - let option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); + let option = DbOption::from_path(Path::from_filesystem_path(temp_dir.path()).unwrap()) + .await + .unwrap(); let db: DB = DB::new(option, TokioExecutor::new()).await.unwrap();