Skip to content

Commit

Permalink
chore: use CacheReader on remotes
Browse files Browse the repository at this point in the history
  • Loading branch information
KKould committed Oct 23, 2024
1 parent 0a115d4 commit 4ab3a80
Show file tree
Hide file tree
Showing 8 changed files with 160 additions and 185 deletions.
26 changes: 13 additions & 13 deletions src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ where
) -> Result<Option<Scope<R::Key>>, CompactionError<R>> {
if !batches.is_empty() {
let level_0_path = option.level_fs_path(0).unwrap_or(&option.base_path);
let level_0_fs = manager.get_fs(level_0_path);
let (level_0_fs, _) = manager.get_fs(level_0_path);

let mut min = None;
let mut max = None;
Expand Down Expand Up @@ -207,7 +207,7 @@ where
Self::next_level_scopes(version, &mut min, &mut max, level, &meet_scopes_l)?;

let level_path = option.level_fs_path(level).unwrap_or(&option.base_path);
let level_fs = manager.get_fs(level_path);
let (level_fs, is_local) = manager.get_fs(level_path);
let mut streams = Vec::with_capacity(meet_scopes_l.len() + meet_scopes_ll.len());
// This Level
if level == 0 {
Expand All @@ -218,7 +218,7 @@ where
.await?;

streams.push(ScanStream::SsTable {
inner: SsTable::open(option, file, path)
inner: SsTable::open(option, file, path, is_local)
.await?
.scan(
(Bound::Unbounded, Bound::Unbounded),
Expand All @@ -240,7 +240,7 @@ where
u32::MAX.into(),
None,
ProjectionMask::all(),
level_fs.clone(),
(level_fs.clone(), is_local),
)
.ok_or(CompactionError::EmptyLevel)?;

Expand All @@ -260,7 +260,7 @@ where
u32::MAX.into(),
None,
ProjectionMask::all(),
level_fs.clone(),
(level_fs.clone(), is_local),
)
.ok_or(CompactionError::EmptyLevel)?;

Expand Down Expand Up @@ -846,14 +846,14 @@ pub(crate) mod tests {
option: &Arc<DbOption<Test>>,
manager: &StoreManager,
) -> ((FileId, FileId, FileId, FileId, FileId), Version<Test>) {
let level_0_fs = option
let (level_0_fs, _) = option
.level_fs_path(0)
.map(|path| manager.get_fs(path))
.unwrap_or(manager.base_fs());
let level_1_fs = option
.unwrap_or((manager.base_fs(), true));
let (level_1_fs, _) = option
.level_fs_path(1)
.map(|path| manager.get_fs(path))
.unwrap_or(manager.base_fs());
.unwrap_or((manager.base_fs(), true));

// level 0
let table_gen_1 = FileId::new();
Expand Down Expand Up @@ -1121,14 +1121,14 @@ pub(crate) mod tests {
.await
.unwrap();

let level_0_fs = option
let (level_0_fs, _) = option
.level_fs_path(0)
.map(|path| manager.get_fs(path))
.unwrap_or(manager.base_fs());
let level_1_fs = option
.unwrap_or((manager.base_fs(), true));
let (level_1_fs, _) = option
.level_fs_path(1)
.map(|path| manager.get_fs(path))
.unwrap_or(manager.base_fs());
.unwrap_or((manager.base_fs(), true));

let table_gen0 = FileId::new();
let table_gen1 = FileId::new();
Expand Down
3 changes: 2 additions & 1 deletion src/fs/cache_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ impl<R> CacheReader<R> {
inner: R,
meta_path: Path,
) -> Result<CacheReader<R>, CacheError> {
let path = path_to_local(&option.path)?;
// SAFETY: `meta_path` must be the path of a parquet file
let path = path_to_local(&option.path.child(meta_path.filename().unwrap()))?;
let cache: HybridCache<Range<usize>, Bytes> = HybridCacheBuilder::new()
.memory(option.memory)
.storage(Engine::Large) // use large object disk cache engine only
Expand Down
14 changes: 10 additions & 4 deletions src/fs/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use fusio_dispatch::FsOptions;

pub struct StoreManager {
base_fs: Arc<dyn DynFs>,
fs_map: HashMap<Path, Arc<dyn DynFs>>,
fs_map: HashMap<Path, (Arc<dyn DynFs>, bool)>,
}

impl StoreManager {
Expand All @@ -16,7 +16,10 @@ impl StoreManager {
let mut fs_map = HashMap::with_capacity(levels_fs.len());

for (path, fs_options) in levels_fs.into_iter().flatten() {
fs_map.entry(path).or_insert(fs_options.parse()?);
let is_local = matches!(fs_options, FsOptions::Local);
fs_map
.entry(path)
.or_insert((fs_options.parse()?, is_local));
}
let base_fs = base_options.parse()?;

Expand All @@ -27,8 +30,11 @@ impl StoreManager {
&self.base_fs
}

pub fn get_fs(&self, path: &Path) -> &Arc<dyn DynFs> {
self.fs_map.get(path).unwrap_or(&self.base_fs)
pub fn get_fs(&self, path: &Path) -> (&Arc<dyn DynFs>, bool) {
self.fs_map
.get(path)
.map(|(fs, is_local)| (fs, *is_local))
.unwrap_or((&self.base_fs, false))
}
}

Expand Down
11 changes: 6 additions & 5 deletions src/ondisk/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ use std::{
};

use arrow::datatypes::Schema;
use fusio_parquet::reader::AsyncReader;
use futures_core::{ready, Stream};
use parquet::arrow::{async_reader::ParquetRecordBatchStream, ProjectionMask};
use parquet::arrow::{
async_reader::{AsyncFileReader, ParquetRecordBatchStream},
ProjectionMask,
};
use pin_project_lite::pin_project;

use crate::{
fs::cache_reader::CacheReader,
record::Record,
stream::record_batch::{RecordBatchEntry, RecordBatchIterator},
};
Expand All @@ -21,7 +22,7 @@ pin_project! {
#[derive(Debug)]
pub struct SsTableScan<'scan, R>{
#[pin]
stream: ParquetRecordBatchStream<CacheReader<AsyncReader>>,
stream: ParquetRecordBatchStream<Box<dyn AsyncFileReader>>,
iter: Option<RecordBatchIterator<R>>,
projection_mask: ProjectionMask,
full_schema: Arc<Schema>,
Expand All @@ -31,7 +32,7 @@ pin_project! {

impl<R> SsTableScan<'_, R> {
pub(crate) fn new(
stream: ParquetRecordBatchStream<CacheReader<AsyncReader>>,
stream: ParquetRecordBatchStream<Box<dyn AsyncFileReader>>,
projection_mask: ProjectionMask,
full_schema: Arc<Schema>,
) -> Self {
Expand Down
Loading

0 comments on commit 4ab3a80

Please sign in to comment.