diff --git a/src/compaction/mod.rs b/src/compaction/mod.rs index 11cba09..aa45d57 100644 --- a/src/compaction/mod.rs +++ b/src/compaction/mod.rs @@ -136,7 +136,7 @@ where ) -> Result>, CompactionError> { if !batches.is_empty() { let level_0_path = option.level_fs_path(0).unwrap_or(&option.base_path); - let level_0_fs = manager.get_fs(level_0_path); + let (level_0_fs, _) = manager.get_fs(level_0_path); let mut min = None; let mut max = None; @@ -207,7 +207,7 @@ where Self::next_level_scopes(version, &mut min, &mut max, level, &meet_scopes_l)?; let level_path = option.level_fs_path(level).unwrap_or(&option.base_path); - let level_fs = manager.get_fs(level_path); + let (level_fs, is_local) = manager.get_fs(level_path); let mut streams = Vec::with_capacity(meet_scopes_l.len() + meet_scopes_ll.len()); // This Level if level == 0 { @@ -218,7 +218,7 @@ where .await?; streams.push(ScanStream::SsTable { - inner: SsTable::open(option, file, path) + inner: SsTable::open(option, file, path, is_local) .await? .scan( (Bound::Unbounded, Bound::Unbounded), @@ -240,7 +240,7 @@ where u32::MAX.into(), None, ProjectionMask::all(), - level_fs.clone(), + (level_fs.clone(), is_local), ) .ok_or(CompactionError::EmptyLevel)?; @@ -260,7 +260,7 @@ where u32::MAX.into(), None, ProjectionMask::all(), - level_fs.clone(), + (level_fs.clone(), is_local), ) .ok_or(CompactionError::EmptyLevel)?; @@ -846,14 +846,14 @@ pub(crate) mod tests { option: &Arc>, manager: &StoreManager, ) -> ((FileId, FileId, FileId, FileId, FileId), Version) { - let level_0_fs = option + let (level_0_fs, _) = option .level_fs_path(0) .map(|path| manager.get_fs(path)) - .unwrap_or(manager.base_fs()); - let level_1_fs = option + .unwrap_or((manager.base_fs(), true)); + let (level_1_fs, _) = option .level_fs_path(1) .map(|path| manager.get_fs(path)) - .unwrap_or(manager.base_fs()); + .unwrap_or((manager.base_fs(), true)); // level 0 let table_gen_1 = FileId::new(); @@ -1121,14 +1121,14 @@ pub(crate) mod tests { .await .unwrap(); - let level_0_fs = option + let (level_0_fs, _) = option .level_fs_path(0) .map(|path| manager.get_fs(path)) - .unwrap_or(manager.base_fs()); - let level_1_fs = option + .unwrap_or((manager.base_fs(), true)); + let (level_1_fs, _) = option .level_fs_path(1) .map(|path| manager.get_fs(path)) - .unwrap_or(manager.base_fs()); + .unwrap_or((manager.base_fs(), true)); let table_gen0 = FileId::new(); let table_gen1 = FileId::new(); diff --git a/src/fs/cache_reader.rs b/src/fs/cache_reader.rs index a700c2b..5fb25ad 100644 --- a/src/fs/cache_reader.rs +++ b/src/fs/cache_reader.rs @@ -25,7 +25,8 @@ impl CacheReader { inner: R, meta_path: Path, ) -> Result, CacheError> { - let path = path_to_local(&option.path)?; + // SAFETY: `meta_path` must be the path of a parquet file + let path = path_to_local(&option.path.child(meta_path.filename().unwrap()))?; let cache: HybridCache, Bytes> = HybridCacheBuilder::new() .memory(option.memory) .storage(Engine::Large) // use large object disk cache engine only diff --git a/src/fs/manager.rs b/src/fs/manager.rs index 6ae8372..0be9e66 100644 --- a/src/fs/manager.rs +++ b/src/fs/manager.rs @@ -5,7 +5,7 @@ use fusio_dispatch::FsOptions; pub struct StoreManager { base_fs: Arc, - fs_map: HashMap>, + fs_map: HashMap, bool)>, } impl StoreManager { @@ -16,7 +16,10 @@ impl StoreManager { let mut fs_map = HashMap::with_capacity(levels_fs.len()); for (path, fs_options) in levels_fs.into_iter().flatten() { - fs_map.entry(path).or_insert(fs_options.parse()?); + let is_local = matches!(fs_options, FsOptions::Local); + fs_map + .entry(path) + .or_insert((fs_options.parse()?, is_local)); } let base_fs = base_options.parse()?; @@ -27,8 +30,11 @@ impl StoreManager { &self.base_fs } - pub fn get_fs(&self, path: &Path) -> &Arc { - self.fs_map.get(path).unwrap_or(&self.base_fs) + pub fn get_fs(&self, path: &Path) -> (&Arc, bool) { + self.fs_map + .get(path) + .map(|(fs, is_local)| (fs, *is_local)) + .unwrap_or((&self.base_fs, false)) } } diff --git a/src/ondisk/scan.rs b/src/ondisk/scan.rs index bf6b5ec..a788508 100644 --- a/src/ondisk/scan.rs +++ b/src/ondisk/scan.rs @@ -6,13 +6,14 @@ use std::{ }; use arrow::datatypes::Schema; -use fusio_parquet::reader::AsyncReader; use futures_core::{ready, Stream}; -use parquet::arrow::{async_reader::ParquetRecordBatchStream, ProjectionMask}; +use parquet::arrow::{ + async_reader::{AsyncFileReader, ParquetRecordBatchStream}, + ProjectionMask, +}; use pin_project_lite::pin_project; use crate::{ - fs::cache_reader::CacheReader, record::Record, stream::record_batch::{RecordBatchEntry, RecordBatchIterator}, }; @@ -21,7 +22,7 @@ pin_project! { #[derive(Debug)] pub struct SsTableScan<'scan, R>{ #[pin] - stream: ParquetRecordBatchStream>, + stream: ParquetRecordBatchStream>, iter: Option>, projection_mask: ProjectionMask, full_schema: Arc, @@ -31,7 +32,7 @@ pin_project! { impl SsTableScan<'_, R> { pub(crate) fn new( - stream: ParquetRecordBatchStream>, + stream: ParquetRecordBatchStream>, projection_mask: ProjectionMask, full_schema: Arc, ) -> Self { diff --git a/src/ondisk/sstable.rs b/src/ondisk/sstable.rs index 53bf86b..26dd98c 100644 --- a/src/ondisk/sstable.rs +++ b/src/ondisk/sstable.rs @@ -5,6 +5,7 @@ use fusio_parquet::reader::AsyncReader; use futures_util::StreamExt; use parquet::arrow::{ arrow_reader::{ArrowReaderBuilder, ArrowReaderOptions}, + async_reader::AsyncFileReader, ParquetRecordBatchStreamBuilder, ProjectionMask, }; @@ -21,7 +22,7 @@ pub(crate) struct SsTable where R: Record, { - reader: CacheReader, + reader: Box, _marker: PhantomData, } @@ -33,38 +34,35 @@ where option: &DbOption, file: Box, path: Path, + is_local: bool, ) -> Result { let size = DynRead::size(&file).await?; + let reader = if is_local { + Box::new(AsyncReader::new(file, size).await?) as Box + } else { + Box::new( + CacheReader::new( + &option.cache_option, + option.meta_cache.clone(), + AsyncReader::new(file, size).await?, + path, + ) + .await?, + ) + }; Ok(SsTable { - reader: CacheReader::new( - &option.cache_option, - option.meta_cache.clone(), - AsyncReader::new(file, size).await?, - path, - ) - .await?, + reader, _marker: PhantomData, }) } #[cfg(test)] - pub(crate) async fn open_for_test( - option: &crate::fs::CacheOption, - meta_cache: crate::fs::cache_reader::MetaCache, - file: Box, - path: Path, - ) -> Result { + pub(crate) async fn open_local(file: Box) -> Result { let size = DynRead::size(&file).await?; Ok(SsTable { - reader: CacheReader::new( - option, - meta_cache, - AsyncReader::new(file, size).await?, - path, - ) - .await?, + reader: Box::new(AsyncReader::new(file, size).await?), _marker: PhantomData, }) } @@ -74,7 +72,7 @@ where limit: Option, projection_mask: ProjectionMask, ) -> parquet::errors::Result< - ArrowReaderBuilder>>, + ArrowReaderBuilder>>, > { let mut builder = ParquetRecordBatchStreamBuilder::new_with_options( self.reader, @@ -135,7 +133,6 @@ pub(crate) mod tests { use std::{borrow::Borrow, fs::File, ops::Bound, sync::Arc}; use arrow::array::RecordBatch; - use foyer::{CacheBuilder, LruConfig}; use fusio::{dynamic::DynFile, path::Path, DynFs}; use fusio_dispatch::FsOptions; use fusio_parquet::writer::AsyncWriter; @@ -152,7 +149,7 @@ pub(crate) mod tests { use super::SsTable; use crate::{ executor::tokio::TokioExecutor, - fs::{cache_reader::MetaCache, manager::StoreManager, CacheOption, FileType}, + fs::{manager::StoreManager, FileType}, record::Record, tests::{get_test_record_batch, Test}, timestamp::Timestamped, @@ -186,23 +183,15 @@ pub(crate) mod tests { Ok(()) } - pub(crate) async fn open_sstable( - option: &CacheOption, - meta_cache: MetaCache, - store: &Arc, - path: Path, - ) -> SsTable + pub(crate) async fn open_sstable(store: &Arc, path: Path) -> SsTable where R: Record, { - SsTable::open_for_test( - option, - meta_cache, + SsTable::open_local( store .open_options(&path, FileType::Parquet.open_options(true)) .await .unwrap(), - path, ) .await .unwrap() @@ -211,19 +200,6 @@ pub(crate) mod tests { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn projection_query() { let temp_dir = tempfile::tempdir().unwrap(); - let option = CacheOption { - path: Path::from_filesystem_path(temp_dir.path()).unwrap(), - memory: 64 * 1024, - local: 256 * 1024, - }; - let meta_cache = Arc::new( - CacheBuilder::new(32) - .with_shards(4) - .with_eviction_config(LruConfig { - high_priority_pool_ratio: 0.1, - }) - .build(), - ); let manager = StoreManager::new(FsOptions::Local, vec![]).unwrap(); let base_fs = manager.base_fs(); @@ -245,55 +221,52 @@ pub(crate) mod tests { let key = Timestamped::new("hello".to_owned(), 1.into()); { - let test_ref_1 = - open_sstable::(&option, meta_cache.clone(), base_fs, table_path.clone()) - .await - .get( - key.borrow(), - ProjectionMask::roots( - &arrow_to_parquet_schema(Test::arrow_schema()).unwrap(), - [0, 1, 2, 3], - ), - ) - .await - .unwrap() - .unwrap(); + let test_ref_1 = open_sstable::(base_fs, table_path.clone()) + .await + .get( + key.borrow(), + ProjectionMask::roots( + &arrow_to_parquet_schema(Test::arrow_schema()).unwrap(), + [0, 1, 2, 3], + ), + ) + .await + .unwrap() + .unwrap(); assert_eq!(test_ref_1.get().unwrap().vstring, "hello"); assert_eq!(test_ref_1.get().unwrap().vu32, Some(12)); assert_eq!(test_ref_1.get().unwrap().vbool, None); } { - let test_ref_2 = - open_sstable::(&option, meta_cache.clone(), base_fs, table_path.clone()) - .await - .get( - key.borrow(), - ProjectionMask::roots( - &arrow_to_parquet_schema(Test::arrow_schema()).unwrap(), - [0, 1, 2, 4], - ), - ) - .await - .unwrap() - .unwrap(); + let test_ref_2 = open_sstable::(base_fs, table_path.clone()) + .await + .get( + key.borrow(), + ProjectionMask::roots( + &arrow_to_parquet_schema(Test::arrow_schema()).unwrap(), + [0, 1, 2, 4], + ), + ) + .await + .unwrap() + .unwrap(); assert_eq!(test_ref_2.get().unwrap().vstring, "hello"); assert_eq!(test_ref_2.get().unwrap().vu32, None); assert_eq!(test_ref_2.get().unwrap().vbool, Some(true)); } { - let test_ref_3 = - open_sstable::(&option, meta_cache.clone(), base_fs, table_path.clone()) - .await - .get( - key.borrow(), - ProjectionMask::roots( - &arrow_to_parquet_schema(Test::arrow_schema()).unwrap(), - [0, 1, 2], - ), - ) - .await - .unwrap() - .unwrap(); + let test_ref_3 = open_sstable::(base_fs, table_path.clone()) + .await + .get( + key.borrow(), + ProjectionMask::roots( + &arrow_to_parquet_schema(Test::arrow_schema()).unwrap(), + [0, 1, 2], + ), + ) + .await + .unwrap() + .unwrap(); assert_eq!(test_ref_3.get().unwrap().vstring, "hello"); assert_eq!(test_ref_3.get().unwrap().vu32, None); assert_eq!(test_ref_3.get().unwrap().vbool, None); @@ -305,19 +278,6 @@ pub(crate) mod tests { let temp_dir = tempfile::tempdir().unwrap(); let manager = StoreManager::new(FsOptions::Local, vec![]).unwrap(); let base_fs = manager.base_fs(); - let option = CacheOption { - path: Path::from_filesystem_path(temp_dir.path()).unwrap(), - memory: 64 * 1024 * 1024, - local: 256 * 1024 * 1024, - }; - let meta_cache = Arc::new( - CacheBuilder::new(32) - .with_shards(4) - .with_eviction_config(LruConfig { - high_priority_pool_ratio: 0.1, - }) - .build(), - ); let record_batch = get_test_record_batch::( DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()), @@ -335,20 +295,19 @@ pub(crate) mod tests { write_record_batch(file, &record_batch).await.unwrap(); { - let mut test_ref_1 = - open_sstable::(&option, meta_cache.clone(), base_fs, table_path.clone()) - .await - .scan( - (Bound::Unbounded, Bound::Unbounded), - 1_u32.into(), - None, - ProjectionMask::roots( - &arrow_to_parquet_schema(Test::arrow_schema()).unwrap(), - [0, 1, 2, 3], - ), - ) - .await - .unwrap(); + let mut test_ref_1 = open_sstable::(base_fs, table_path.clone()) + .await + .scan( + (Bound::Unbounded, Bound::Unbounded), + 1_u32.into(), + None, + ProjectionMask::roots( + &arrow_to_parquet_schema(Test::arrow_schema()).unwrap(), + [0, 1, 2, 3], + ), + ) + .await + .unwrap(); let entry_0 = test_ref_1.next().await.unwrap().unwrap(); assert_eq!(entry_0.get().unwrap().vstring, "hello"); @@ -361,20 +320,19 @@ pub(crate) mod tests { assert_eq!(entry_1.get().unwrap().vbool, None); } { - let mut test_ref_2 = - open_sstable::(&option, meta_cache.clone(), base_fs, table_path.clone()) - .await - .scan( - (Bound::Unbounded, Bound::Unbounded), - 1_u32.into(), - None, - ProjectionMask::roots( - &arrow_to_parquet_schema(Test::arrow_schema()).unwrap(), - [0, 1, 2, 4], - ), - ) - .await - .unwrap(); + let mut test_ref_2 = open_sstable::(base_fs, table_path.clone()) + .await + .scan( + (Bound::Unbounded, Bound::Unbounded), + 1_u32.into(), + None, + ProjectionMask::roots( + &arrow_to_parquet_schema(Test::arrow_schema()).unwrap(), + [0, 1, 2, 4], + ), + ) + .await + .unwrap(); let entry_0 = test_ref_2.next().await.unwrap().unwrap(); assert_eq!(entry_0.get().unwrap().vstring, "hello"); @@ -387,20 +345,19 @@ pub(crate) mod tests { assert_eq!(entry_1.get().unwrap().vbool, None); } { - let mut test_ref_3 = - open_sstable::(&option, meta_cache.clone(), base_fs, table_path.clone()) - .await - .scan( - (Bound::Unbounded, Bound::Unbounded), - 1_u32.into(), - None, - ProjectionMask::roots( - &arrow_to_parquet_schema(Test::arrow_schema()).unwrap(), - [0, 1, 2], - ), - ) - .await - .unwrap(); + let mut test_ref_3 = open_sstable::(base_fs, table_path.clone()) + .await + .scan( + (Bound::Unbounded, Bound::Unbounded), + 1_u32.into(), + None, + ProjectionMask::roots( + &arrow_to_parquet_schema(Test::arrow_schema()).unwrap(), + [0, 1, 2], + ), + ) + .await + .unwrap(); let entry_0 = test_ref_3.next().await.unwrap().unwrap(); assert_eq!(entry_0.get().unwrap().vstring, "hello"); diff --git a/src/stream/level.rs b/src/stream/level.rs index 67e6905..96027f7 100644 --- a/src/stream/level.rs +++ b/src/stream/level.rs @@ -52,6 +52,7 @@ where projection_mask: ProjectionMask, status: FutureStatus<'level, R>, fs: Arc, + is_local: bool, path: Option, } @@ -70,7 +71,7 @@ where ts: Timestamp, limit: Option, projection_mask: ProjectionMask, - fs: Arc, + (fs, is_local): (Arc, bool), ) -> Option { let (lower, upper) = range; let mut gens: VecDeque = version.level_slice[level][start..end + 1] @@ -91,6 +92,7 @@ where projection_mask, status, fs, + is_local, path: None, }) } @@ -167,7 +169,9 @@ where Poll::Ready(Ok(file)) => { let option = self.option.clone(); let path = self.path.clone().unwrap(); - let future = async move { SsTable::open(&option, file, path).await }; + let is_local = self.is_local; + let future = + async move { SsTable::open(&option, file, path, is_local).await }; self.status = FutureStatus::OpenSst(Box::pin(future)); continue; } @@ -253,7 +257,7 @@ mod tests { &arrow_to_parquet_schema(Test::arrow_schema()).unwrap(), [0, 1, 2, 3], ), - manager.base_fs().clone(), + (manager.base_fs().clone(), true), ) .unwrap(); @@ -289,7 +293,7 @@ mod tests { &arrow_to_parquet_schema(Test::arrow_schema()).unwrap(), [0, 1, 2, 4], ), - manager.base_fs().clone(), + (manager.base_fs().clone(), true), ) .unwrap(); @@ -325,7 +329,7 @@ mod tests { &arrow_to_parquet_schema(Test::arrow_schema()).unwrap(), [0, 1, 2], ), - manager.base_fs().clone(), + (manager.base_fs().clone(), true), ) .unwrap(); diff --git a/src/version/cleaner.rs b/src/version/cleaner.rs index bd29e7f..3a01d71 100644 --- a/src/version/cleaner.rs +++ b/src/version/cleaner.rs @@ -70,21 +70,21 @@ where break; } for (gen, level) in gens { - let fs = self + let (fs, _) = self .option .level_fs_path(level) .map(|path| self.manager.get_fs(path)) - .unwrap_or(self.manager.base_fs()); + .unwrap_or((self.manager.base_fs(), true)); fs.remove(&self.option.table_path(&gen, level)).await?; } } } CleanTag::RecoverClean { wal_id: gen, level } => { - let fs = self + let (fs, _) = self .option .level_fs_path(level) .map(|path| self.manager.get_fs(path)) - .unwrap_or(self.manager.base_fs()); + .unwrap_or((self.manager.base_fs(), true)); fs.remove(&self.option.table_path(&gen, level)).await?; } } @@ -124,10 +124,10 @@ pub(crate) mod tests { let gen_1 = FileId::new(); let gen_2 = FileId::new(); let gen_3 = FileId::new(); - let fs = option + let (fs, _) = option .level_fs_path(0) .map(|path| manager.get_fs(path)) - .unwrap_or(manager.base_fs()); + .unwrap_or((manager.base_fs(), true)); { fs.open_options( &option.table_path(&gen_0, 0), diff --git a/src/version/mod.rs b/src/version/mod.rs index 5f57365..ed5220d 100644 --- a/src/version/mod.rs +++ b/src/version/mod.rs @@ -125,13 +125,19 @@ where .option .level_fs_path(0) .unwrap_or(&self.option.base_path); - let level_0_fs = manager.get_fs(level_0_path); + let (level_0_fs, is_local) = manager.get_fs(level_0_path); for scope in self.level_slice[0].iter().rev() { if !scope.contains(key.value()) { continue; } if let Some(entry) = self - .table_query(level_0_fs, key, 0, &scope.gen, projection_mask.clone()) + .table_query( + (level_0_fs, is_local), + key, + 0, + &scope.gen, + projection_mask.clone(), + ) .await? { return Ok(Some(entry)); @@ -143,7 +149,7 @@ where .option .level_fs_path(leve) .unwrap_or(&self.option.base_path); - let level_fs = manager.get_fs(level_path); + let (level_fs, is_local) = manager.get_fs(level_path); if sort_runs.is_empty() { continue; } @@ -153,7 +159,7 @@ where } if let Some(entry) = self .table_query( - level_fs, + (level_fs, is_local), key, leve, &sort_runs[index].gen, @@ -170,7 +176,7 @@ where async fn table_query( &self, - store: &Arc, + (store, is_local): (&Arc, bool), key: &TimestampedRef<::Key>, level: usize, gen: &FileId, @@ -181,7 +187,7 @@ where .open_options(&path, FileType::Parquet.open_options(true)) .await .map_err(VersionError::Fusio)?; - SsTable::::open(&self.option, file, path) + SsTable::::open(&self.option, file, path, is_local) .await? .get(key, projection_mask) .await @@ -211,7 +217,7 @@ where .option .level_fs_path(0) .unwrap_or(&self.option.base_path); - let level_0_fs = manager.get_fs(level_0_path); + let (level_0_fs, is_local) = manager.get_fs(level_0_path); for scope in self.level_slice[0].iter() { if !scope.meets_range(range) { continue; @@ -221,7 +227,7 @@ where .open_options(&path, FileType::Parquet.open_options(true)) .await .map_err(VersionError::Fusio)?; - let table = SsTable::open(&self.option, file, path).await?; + let table = SsTable::open(&self.option, file, path, is_local).await?; streams.push(ScanStream::SsTable { inner: table @@ -238,7 +244,7 @@ where .option .level_fs_path(i + 1) .unwrap_or(&self.option.base_path); - let level_fs = manager.get_fs(level_path); + let (level_fs, is_local) = manager.get_fs(level_path); let (mut start, mut end) = (None, None); @@ -265,7 +271,7 @@ where ts, limit, projection_mask.clone(), - level_fs.clone(), + (level_fs.clone(), is_local), ) .unwrap(), });