diff --git a/bindings/python/Cargo.toml b/bindings/python/Cargo.toml index 45aadadf..746b0e41 100644 --- a/bindings/python/Cargo.toml +++ b/bindings/python/Cargo.toml @@ -28,3 +28,4 @@ pyo3-asyncio = { package = "pyo3-asyncio-0-21", version = "0.21", features = [ ] } tokio = { version = "1", features = ["rt-multi-thread"] } tonbo = { version = ">=0", path = "../../" } +tonbo_ext_reader = { path = "../../tonbo_ext_reader" } diff --git a/bindings/python/src/db.rs b/bindings/python/src/db.rs index 840ff728..0af488fb 100644 --- a/bindings/python/src/db.rs +++ b/bindings/python/src/db.rs @@ -1,5 +1,4 @@ use std::sync::Arc; - use pyo3::{ prelude::*, pyclass, pymethods, @@ -12,7 +11,7 @@ use tonbo::{ record::{ColumnDesc, DynRecord}, DB, }; - +use tonbo_ext_reader::foyer_reader::FoyerReader; use crate::{ column::Column, error::{CommitError, DbError}, @@ -28,7 +27,7 @@ type PyExecutor = TokioExecutor; pub struct TonboDB { desc: Arc>, primary_key_index: usize, - db: Arc>, + db: Arc>, } #[pymethods] diff --git a/bindings/python/src/error.rs b/bindings/python/src/error.rs index 8e91373a..bd3b48f7 100644 --- a/bindings/python/src/error.rs +++ b/bindings/python/src/error.rs @@ -50,6 +50,7 @@ impl From for PyErr { tonbo::DbError::Fusio(err) => InnerError::new_err(err.to_string()), tonbo::DbError::Recover(err) => RecoverError::new_err(err.to_string()), tonbo::DbError::WalWrite(err) => PyIOError::new_err(err.to_string()), + tonbo::DbError::Cache(err) => InnerError::new_err(err.to_string()), tonbo::DbError::ExceedsMaxLevel => ExceedsMaxLevelError::new_err("Exceeds max level"), } } diff --git a/bindings/python/src/transaction.rs b/bindings/python/src/transaction.rs index 70d535d7..2c41c024 100644 --- a/bindings/python/src/transaction.rs +++ b/bindings/python/src/transaction.rs @@ -7,7 +7,7 @@ use pyo3::{ }; use pyo3_asyncio::tokio::future_into_py; use tonbo::{record::DynRecord, transaction, Projection}; - +use tonbo_ext_reader::foyer_reader::FoyerReader; use crate::{ column::Column, error::{repeated_commit_err, CommitError, DbError}, @@ -18,14 +18,14 @@ use crate::{ #[pyclass] pub struct Transaction { - txn: Option>, + txn: Option>, desc: Arc>, primary_key_index: usize, } impl Transaction { pub(crate) fn new<'txn>( - txn: transaction::Transaction<'txn, DynRecord>, + txn: transaction::Transaction<'txn, DynRecord, FoyerReader>, desc: Arc>, ) -> Self { let primary_key_index = desc @@ -37,8 +37,8 @@ impl Transaction { Transaction { txn: Some(unsafe { transmute::< - transaction::Transaction<'txn, DynRecord>, - transaction::Transaction<'static, DynRecord>, + transaction::Transaction<'txn, DynRecord, FoyerReader>, + transaction::Transaction<'static, DynRecord, FoyerReader>, >(txn) }), desc, @@ -84,8 +84,8 @@ impl Transaction { let txn = self.txn.as_ref().unwrap(); let txn = unsafe { transmute::< - &transaction::Transaction<'_, DynRecord>, - &'static transaction::Transaction<'_, DynRecord>, + &transaction::Transaction<'_, DynRecord, FoyerReader>, + &'static transaction::Transaction<'_, DynRecord, FoyerReader>, >(txn) }; @@ -169,8 +169,8 @@ impl Transaction { let txn = self.txn.as_ref().unwrap(); let txn = unsafe { transmute::< - &transaction::Transaction<'_, DynRecord>, - &'static transaction::Transaction<'_, DynRecord>, + &transaction::Transaction<'_, DynRecord, FoyerReader>, + &'static transaction::Transaction<'_, DynRecord, FoyerReader>, >(txn) }; let col_desc = self.desc.get(self.primary_key_index).unwrap(); diff --git a/examples/datafusion.rs b/examples/datafusion.rs index 2b7dd167..db571879 100644 --- a/examples/datafusion.rs +++ b/examples/datafusion.rs @@ -30,6 +30,7 @@ use tokio::fs; use tonbo::{ executor::tokio::TokioExecutor, inmem::immutable::ArrowArrays, record::Record, DbOption, DB, }; +use tonbo_ext_reader::foyer_reader::FoyerReader; use tonbo_macros::Record; #[derive(Record, Debug)] @@ -41,12 +42,12 @@ pub struct Music { } struct MusicProvider { - db: Arc>, + db: Arc>, } struct MusicExec { cache: PlanProperties, - db: Arc>, + db: Arc>, projection: Option>, limit: Option, range: (Bound<::Key>, Bound<::Key>), @@ -95,7 +96,10 @@ impl TableProvider for MusicProvider { } impl MusicExec { - fn new(db: Arc>, projection: Option<&Vec>) -> Self { + fn new( + db: Arc>, + projection: Option<&Vec>, + ) -> Self { let schema = Music::arrow_schema(); let schema = if let Some(projection) = &projection { Arc::new(schema.project(projection).unwrap()) diff --git a/examples/declare.rs b/examples/declare.rs index 6e6edcd3..0a057b46 100644 --- a/examples/declare.rs +++ b/examples/declare.rs @@ -5,6 +5,7 @@ use fusio::path::Path; use futures_util::stream::StreamExt; use tokio::fs; use tonbo::{executor::tokio::TokioExecutor, DbOption, Projection, Record, DB}; +use tonbo_ext_reader::foyer_reader::FoyerReader; /// Use macro to define schema of column family just like ORM /// It provides type-safe read & write API @@ -24,7 +25,9 @@ async fn main() { let options = DbOption::from(Path::from_filesystem_path("./db_path/users").unwrap()); // pluggable async runtime and I/O - let db = DB::new(options, TokioExecutor::default()).await.unwrap(); + let db = DB::<_, _, FoyerReader>::new(options, TokioExecutor::default()) + .await + .unwrap(); // insert with owned value db.insert(User { diff --git a/src/compaction/mod.rs b/src/compaction/mod.rs index ff8db6fc..321ede51 100644 --- a/src/compaction/mod.rs +++ b/src/compaction/mod.rs @@ -7,7 +7,7 @@ use futures_util::StreamExt; use parquet::arrow::{AsyncArrowWriter, ProjectionMask}; use thiserror::Error; use tokio::sync::oneshot; -use tonbo_ext_reader::CacheError; +use tonbo_ext_reader::{CacheError, CacheReader}; use ulid::Ulid; use crate::{ @@ -33,27 +33,29 @@ pub enum CompactTask { Flush(Option>), } -pub(crate) struct Compactor +pub(crate) struct Compactor where R: Record, + C: CacheReader, { pub(crate) option: Arc>, pub(crate) schema: Arc>>, - pub(crate) version_set: VersionSet, + pub(crate) version_set: VersionSet, pub(crate) manager: Arc, } -impl Compactor +impl Compactor where R: Record, + C: CacheReader + 'static, { pub(crate) fn new( schema: Arc>>, option: Arc>, - version_set: VersionSet, + version_set: VersionSet, manager: Arc, ) -> Self { - Compactor:: { + Compactor:: { option, schema, version_set, @@ -188,7 +190,7 @@ where #[allow(clippy::too_many_arguments)] pub(crate) async fn major_compaction( - version: &Version, + version: &Version, option: &DbOption, mut min: &R::Key, mut max: &R::Key, @@ -305,7 +307,7 @@ where } fn next_level_scopes<'a>( - version: &'a Version, + version: &'a Version, min: &mut &'a ::Key, max: &mut &'a ::Key, level: usize, @@ -328,8 +330,8 @@ where .max() .ok_or(CompactionError::EmptyLevel)?; - start_ll = Version::::scope_search(min, &version.level_slice[level + 1]); - end_ll = Version::::scope_search(max, &version.level_slice[level + 1]); + start_ll = Version::::scope_search(min, &version.level_slice[level + 1]); + end_ll = Version::::scope_search(max, &version.level_slice[level + 1]); let next_level_len = version.level_slice[level + 1].len(); for scope in version.level_slice[level + 1] @@ -345,13 +347,13 @@ where } fn this_level_scopes<'a>( - version: &'a Version, + version: &'a Version, min: &::Key, max: &::Key, level: usize, ) -> (Vec<&'a Scope<::Key>>, usize, usize) { let mut meet_scopes_l = Vec::new(); - let mut start_l = Version::::scope_search(min, &version.level_slice[level]); + let mut start_l = Version::::scope_search(min, &version.level_slice[level]); let mut end_l = start_l; let option = version.option(); @@ -386,11 +388,11 @@ where option: &DbOption, version_edits: &mut Vec::Key>>, level: usize, - streams: Vec>, + streams: Vec>, instance: &RecordInstance, fs: &Arc, ) -> Result<(), CompactionError> { - let mut stream = MergeStream::::from_vec(streams, u32::MAX.into()).await?; + let mut stream = MergeStream::::from_vec(streams, u32::MAX.into()).await?; // Kould: is the capacity parameter necessary? let mut builder = R::Columns::builder(&instance.arrow_schema::(), 8192); @@ -523,7 +525,7 @@ pub(crate) mod tests { use fusio_parquet::writer::AsyncWriter; use parquet::arrow::AsyncArrowWriter; use tempfile::TempDir; - use tonbo_ext_reader::foyer_reader::build_cache; + use tonbo_ext_reader::{foyer_reader::FoyerReader, CacheReader}; use crate::{ compaction::Compactor, @@ -682,7 +684,7 @@ pub(crate) mod tests { .await .unwrap(); - let scope = Compactor::::minor_compaction( + let scope = Compactor::::minor_compaction( &option, None, &vec![ @@ -746,7 +748,7 @@ pub(crate) mod tests { .await .unwrap(); - let scope = Compactor::::minor_compaction( + let scope = Compactor::::minor_compaction( &option, None, &vec![ @@ -811,7 +813,7 @@ pub(crate) mod tests { let max = 5.to_string(); let mut version_edits = Vec::new(); - Compactor::::major_compaction( + Compactor::::major_compaction( &version, &option, &min, @@ -855,7 +857,10 @@ pub(crate) mod tests { pub(crate) async fn build_version( option: &Arc>, manager: &StoreManager, - ) -> ((FileId, FileId, FileId, FileId, FileId), Version) { + ) -> ( + (FileId, FileId, FileId, FileId, FileId), + Version, + ) { let level_0_fs = option .level_fs_path(0) .map(|path| manager.get_fs(path)) @@ -1065,7 +1070,7 @@ pub(crate) mod tests { .unwrap(); let (sender, _) = bounded(1); - let (meta_cache, range_cache) = build_cache( + let (meta_cache, range_cache) = FoyerReader::build_caches( path_to_local(&option.cache_path).unwrap(), option.cache_meta_capacity, option.cache_meta_shards, @@ -1075,7 +1080,7 @@ pub(crate) mod tests { ) .await .unwrap(); - let mut version = Version::::new( + let mut version = Version::::new( option.clone(), sender, Arc::new(AtomicU32::default()), @@ -1198,7 +1203,7 @@ pub(crate) mod tests { let option = Arc::new(option); let (sender, _) = bounded(1); - let (meta_cache, range_cache) = build_cache( + let (meta_cache, range_cache) = FoyerReader::build_caches( path_to_local(&option.cache_path).unwrap(), option.cache_meta_capacity, option.cache_meta_shards, @@ -1208,7 +1213,7 @@ pub(crate) mod tests { ) .await .unwrap(); - let mut version = Version::::new( + let mut version = Version::::new( option.clone(), sender, Arc::new(AtomicU32::default()), @@ -1232,7 +1237,7 @@ pub(crate) mod tests { let min = 6.to_string(); let max = 9.to_string(); - Compactor::::major_compaction( + Compactor::::major_compaction( &version, &option, &min, @@ -1261,7 +1266,8 @@ pub(crate) mod tests { option.major_default_oldest_table_num = 1; option.trigger_type = TriggerType::Length(5); - let db: DB = DB::new(option, TokioExecutor::new()).await.unwrap(); + let db: DB = + DB::new(option, TokioExecutor::new()).await.unwrap(); for i in 5..9 { let item = Test { diff --git a/src/lib.rs b/src/lib.rs index 49edf830..6e7261c5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -38,6 +38,7 @@ //! use tokio::fs; //! use tokio_util::bytes::Bytes; //! use tonbo::{executor::tokio::TokioExecutor, DbOption, Projection, Record, DB}; +//! use tonbo_ext_reader::foyer_reader::FoyerReader; //! //! // use macro to define schema of column family just like ORM //! // it provides type safety read & write API @@ -56,7 +57,8 @@ //! //! let options = DbOption::from(Path::from_filesystem_path("./db_path/users").unwrap()); //! // pluggable async runtime and I/O -//! let db = DB::new(options, TokioExecutor::default()).await.unwrap(); +//! let db: DB = +//! DB::new(options, TokioExecutor::default()).await.unwrap(); //! // insert with owned value //! db.insert(User { //! name: "Alice".into(), @@ -150,7 +152,7 @@ use record::{ColumnDesc, DynRecord, Record, RecordInstance}; use thiserror::Error; use timestamp::{Timestamp, TimestampedRef}; use tokio::sync::oneshot; -use tonbo_ext_reader::CacheError; +use tonbo_ext_reader::{CacheError, CacheReader}; pub use tonbo_macros::{KeyAttributes, Record}; use tracing::error; use transaction::{CommitError, Transaction, TransactionEntry}; @@ -171,21 +173,23 @@ use crate::{ wal::{log::LogType, RecoverError, WalFile}, }; -pub struct DB +pub struct DB where R: Record, E: Executor, + C: CacheReader, { schema: Arc>>, - version_set: VersionSet, + version_set: VersionSet, lock_map: LockMap, manager: Arc, _p: PhantomData, } -impl DB +impl DB where E: Executor + Send + Sync + 'static, + C: CacheReader + 'static, { /// Open [`DB`] with schema which determined by [`ColumnDesc`]. pub async fn with_schema( @@ -203,11 +207,12 @@ where } } -impl DB +impl DB where R: Record + Send + Sync, R::Columns: Send + Sync, E: Executor + Send + Sync + 'static, + C: CacheReader + 'static, { /// Open [`DB`] with a [`DbOption`]. This will create a new directory at the /// path specified in [`DbOption`] (if it does not exist before) and run it @@ -247,7 +252,7 @@ where let schema = Arc::new(RwLock::new( Schema::new(option.clone(), task_tx, &version_set, instance, &manager).await?, )); - let mut compactor = Compactor::::new( + let mut compactor = Compactor::::new( schema.clone(), option.clone(), version_set.clone(), @@ -287,7 +292,7 @@ where } /// open an optimistic ACID transaction - pub async fn transaction(&self) -> Transaction<'_, R> { + pub async fn transaction(&self) -> Transaction<'_, R, C> { Transaction::new( self.version_set.current().await, self.schema.read().await, @@ -447,10 +452,10 @@ impl Schema where R: Record + Send, { - async fn new( + async fn new( option: Arc>, compaction_tx: Sender, - version_set: &VersionSet, + version_set: &VersionSet, record_instance: RecordInstance, manager: &StoreManager, ) -> Result> { @@ -556,9 +561,9 @@ where self.mutable.append(None, key, ts, value).await } - async fn get<'get>( + async fn get<'get, C: CacheReader + 'static>( &'get self, - version: &'get Version, + version: &'get Version, manager: &StoreManager, key: &'get R::Key, ts: Timestamp, @@ -613,9 +618,10 @@ where } /// scan configuration intermediate structure -pub struct Scan<'scan, R> +pub struct Scan<'scan, R, C> where R: Record, + C: CacheReader, { schema: &'scan Schema, manager: &'scan StoreManager, @@ -623,27 +629,28 @@ where upper: Bound<&'scan R::Key>, ts: Timestamp, - version: &'scan Version, + version: &'scan Version, fn_pre_stream: - Box) -> Option> + Send + 'scan>, + Box) -> Option> + Send + 'scan>, limit: Option, projection_indices: Option>, projection: ProjectionMask, } -impl<'scan, R> Scan<'scan, R> +impl<'scan, R, C> Scan<'scan, R, C> where R: Record + Send, + C: CacheReader + 'static, { fn new( schema: &'scan Schema, manager: &'scan StoreManager, (lower, upper): (Bound<&'scan R::Key>, Bound<&'scan R::Key>), ts: Timestamp, - version: &'scan Version, + version: &'scan Version, fn_pre_stream: Box< - dyn FnOnce(Option) -> Option> + Send + 'scan, + dyn FnOnce(Option) -> Option> + Send + 'scan, >, ) -> Self { Self { @@ -851,6 +858,7 @@ pub(crate) mod tests { use once_cell::sync::Lazy; use parquet::{arrow::ProjectionMask, format::SortingColumn, schema::types::ColumnPath}; use tempfile::TempDir; + use tonbo_ext_reader::{foyer_reader::FoyerReader, CacheReader}; use tracing::error; use crate::{ @@ -1089,7 +1097,7 @@ pub(crate) mod tests { option: DbOption, executor: E, ) -> RecordBatch { - let db: DB = DB::new(option.clone(), executor).await.unwrap(); + let db: DB = DB::new(option.clone(), executor).await.unwrap(); let base_fs = db.manager.base_fs(); db.write( @@ -1234,18 +1242,19 @@ pub(crate) mod tests { )) } - pub(crate) async fn build_db( + pub(crate) async fn build_db( option: Arc>, compaction_rx: Receiver, executor: E, schema: crate::Schema, - version: Version, + version: Version, manager: Arc, - ) -> Result, DbError> + ) -> Result, DbError> where R: Record + Send + Sync, R::Columns: Send + Sync, E: Executor + Send + Sync + 'static, + C: CacheReader + 'static, { { let base_fs = manager.base_fs(); @@ -1259,7 +1268,7 @@ pub(crate) mod tests { let (mut cleaner, clean_sender) = Cleaner::::new(option.clone(), manager.clone()); let version_set = build_version_set(version, clean_sender, option.clone(), manager.clone()).await?; - let mut compactor = Compactor::::new( + let mut compactor = Compactor::::new( schema.clone(), option.clone(), version_set.clone(), @@ -1525,7 +1534,8 @@ pub(crate) mod tests { option.major_default_oldest_table_num = 1; option.trigger_type = TriggerType::Length(/* max_mutable_len */ 5); - let db: DB = DB::new(option, TokioExecutor::new()).await.unwrap(); + let db: DB = + DB::new(option, TokioExecutor::new()).await.unwrap(); for (i, item) in test_items().into_iter().enumerate() { db.write(item, 0.into()).await.unwrap(); @@ -1561,7 +1571,8 @@ pub(crate) mod tests { option.major_default_oldest_table_num = 1; option.trigger_type = TriggerType::Length(/* max_mutable_len */ 50); - let db: DB = DB::new(option, TokioExecutor::new()).await.unwrap(); + let db: DB = + DB::new(option, TokioExecutor::new()).await.unwrap(); for item in &test_items()[0..10] { db.write(item.clone(), 0.into()).await.unwrap(); @@ -1610,9 +1621,10 @@ pub(crate) mod tests { schema.flush_wal().await.unwrap(); drop(schema); - let db: DB = DB::new(option.as_ref().to_owned(), TokioExecutor::new()) - .await - .unwrap(); + let db: DB = + DB::new(option.as_ref().to_owned(), TokioExecutor::new()) + .await + .unwrap(); let mut sort_items = BTreeMap::new(); for item in test_items() { @@ -1682,7 +1694,7 @@ pub(crate) mod tests { "id".to_owned(), primary_key_index, ); - let db: DB = + let db: DB = DB::with_schema(option, TokioExecutor::new(), desc, primary_key_index) .await .unwrap(); @@ -1722,7 +1734,8 @@ pub(crate) mod tests { option.major_threshold_with_sst_size = 3; option.major_default_oldest_table_num = 1; option.trigger_type = TriggerType::Length(5); - let db: DB = DB::new(option, TokioExecutor::new()).await.unwrap(); + let db: DB = + DB::new(option, TokioExecutor::new()).await.unwrap(); for (idx, item) in test_items().into_iter().enumerate() { if idx % 2 == 0 { @@ -1764,7 +1777,7 @@ pub(crate) mod tests { option.major_default_oldest_table_num = 1; option.trigger_type = TriggerType::Length(5); - let db: DB = + let db: DB = DB::with_schema(option, TokioExecutor::new(), cols_desc, primary_key_index) .await .unwrap(); @@ -1994,7 +2007,7 @@ pub(crate) mod tests { option3.major_default_oldest_table_num = 1; option3.trigger_type = TriggerType::Length(5); - let db1: DB = DB::with_schema( + let db1: DB = DB::with_schema( option, TokioExecutor::new(), cols_desc.clone(), @@ -2002,7 +2015,7 @@ pub(crate) mod tests { ) .await .unwrap(); - let db2: DB = DB::with_schema( + let db2: DB = DB::with_schema( option2, TokioExecutor::new(), cols_desc.clone(), @@ -2010,7 +2023,7 @@ pub(crate) mod tests { ) .await .unwrap(); - let db3: DB = + let db3: DB = DB::with_schema(option3, TokioExecutor::new(), cols_desc, primary_key_index) .await .unwrap(); diff --git a/src/ondisk/scan.rs b/src/ondisk/scan.rs index a7885082..077f70a1 100644 --- a/src/ondisk/scan.rs +++ b/src/ondisk/scan.rs @@ -7,11 +7,9 @@ use std::{ use arrow::datatypes::Schema; use futures_core::{ready, Stream}; -use parquet::arrow::{ - async_reader::{AsyncFileReader, ParquetRecordBatchStream}, - ProjectionMask, -}; +use parquet::arrow::{async_reader::ParquetRecordBatchStream, ProjectionMask}; use pin_project_lite::pin_project; +use tonbo_ext_reader::CacheReader; use crate::{ record::Record, @@ -20,9 +18,9 @@ use crate::{ pin_project! { #[derive(Debug)] - pub struct SsTableScan<'scan, R>{ + pub struct SsTableScan<'scan, R, C>{ #[pin] - stream: ParquetRecordBatchStream>, + stream: ParquetRecordBatchStream, iter: Option>, projection_mask: ProjectionMask, full_schema: Arc, @@ -30,9 +28,9 @@ pin_project! { } } -impl SsTableScan<'_, R> { +impl SsTableScan<'_, R, C> { pub(crate) fn new( - stream: ParquetRecordBatchStream>, + stream: ParquetRecordBatchStream, projection_mask: ProjectionMask, full_schema: Arc, ) -> Self { @@ -46,9 +44,10 @@ impl SsTableScan<'_, R> { } } -impl<'scan, R> Stream for SsTableScan<'scan, R> +impl<'scan, R, C> Stream for SsTableScan<'scan, R, C> where R: Record, + C: CacheReader + 'static, { type Item = Result, parquet::errors::ParquetError>; diff --git a/src/ondisk/sstable.rs b/src/ondisk/sstable.rs index 17ef1be6..5181d62b 100644 --- a/src/ondisk/sstable.rs +++ b/src/ondisk/sstable.rs @@ -5,10 +5,9 @@ use fusio_parquet::reader::AsyncReader; use futures_util::StreamExt; use parquet::arrow::{ arrow_reader::{ArrowReaderBuilder, ArrowReaderOptions}, - async_reader::AsyncFileReader, ParquetRecordBatchStreamBuilder, ProjectionMask, }; -use tonbo_ext_reader::{foyer_reader::CacheReader, MetaCache, RangeCache}; +use tonbo_ext_reader::CacheReader; use super::{arrows::get_range_filter, scan::SsTableScan}; use crate::{ @@ -18,31 +17,33 @@ use crate::{ timestamp::{Timestamp, TimestampedRef}, }; -pub(crate) struct SsTable +pub(crate) struct SsTable where R: Record, + C: CacheReader, { - reader: Box, + reader: C, _marker: PhantomData, } -impl SsTable +impl SsTable where R: Record, + C: CacheReader + 'static, { pub(crate) async fn open( file: Box, gen: FileId, - range_cache: RangeCache, - meta_cache: MetaCache, + range_cache: C::RangeCache, + meta_cache: C::MetaCache, ) -> Result { let size = file.size().await?; - let reader = Box::new(CacheReader::new( + let reader = C::new( meta_cache, range_cache, gen, AsyncReader::new(file, size).await?, - )); + ); Ok(SsTable { reader, @@ -50,23 +51,12 @@ where }) } - #[cfg(test)] - pub(crate) async fn open_local(file: Box) -> Result { - let size = file.size().await?; - - Ok(SsTable { - reader: Box::new(AsyncReader::new(file, size).await?), - _marker: PhantomData, - }) - } - async fn into_parquet_builder( self, limit: Option, projection_mask: ProjectionMask, - ) -> parquet::errors::Result< - ArrowReaderBuilder>>, - > { + ) -> parquet::errors::Result>> + { let mut builder = ParquetRecordBatchStreamBuilder::new_with_options( self.reader, ArrowReaderOptions::default().with_page_index(true), @@ -101,7 +91,7 @@ where ts: Timestamp, limit: Option, projection_mask: ProjectionMask, - ) -> Result, parquet::errors::ParquetError> { + ) -> Result, parquet::errors::ParquetError> { let builder = self .into_parquet_builder(limit, projection_mask.clone()) .await?; @@ -123,12 +113,16 @@ where #[cfg(test)] pub(crate) mod tests { - use std::{borrow::Borrow, fs::File, ops::Bound, sync::Arc}; + use std::{borrow::Borrow, fs::File, marker::PhantomData, ops::Bound, sync::Arc}; use arrow::array::RecordBatch; - use fusio::{dynamic::DynFile, path::Path, DynFs}; + use fusio::{ + dynamic::DynFile, + path::{path_to_local, Path}, + DynFs, + }; use fusio_dispatch::FsOptions; - use fusio_parquet::writer::AsyncWriter; + use fusio_parquet::{reader::AsyncReader, writer::AsyncWriter}; use futures_util::StreamExt; use parquet::{ arrow::{ @@ -138,11 +132,12 @@ pub(crate) mod tests { basic::{Compression, ZstdLevel}, file::properties::WriterProperties, }; + use tonbo_ext_reader::{foyer_reader::FoyerReader, CacheReader}; use super::SsTable; use crate::{ executor::tokio::TokioExecutor, - fs::{manager::StoreManager, FileType}, + fs::{manager::StoreManager, FileId, FileType}, record::Record, tests::{get_test_record_batch, Test}, timestamp::Timestamped, @@ -176,18 +171,41 @@ pub(crate) mod tests { Ok(()) } - pub(crate) async fn open_sstable(store: &Arc, path: Path) -> SsTable + pub(crate) async fn open_sstable( + store: &Arc, + path: Path, + gen: FileId, + option: &DbOption, + ) -> SsTable where R: Record, + C: CacheReader, { - SsTable::open_local( - store - .open_options(&path, FileType::Parquet.open_options(true)) - .await - .unwrap(), + let file = store + .open_options(&path, FileType::Parquet.open_options(true)) + .await + .unwrap(); + let size = file.size().await.unwrap(); + let (meta_cache, range_cache) = C::build_caches( + path_to_local(&option.cache_path).unwrap(), + option.cache_meta_capacity, + option.cache_meta_shards, + option.cache_meta_ratio, + option.cache_range_memory, + option.cache_range_disk, ) .await - .unwrap() + .unwrap(); + + SsTable { + reader: C::new( + meta_cache, + range_cache, + gen, + AsyncReader::new(file, size).await.unwrap(), + ), + _marker: PhantomData, + } } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] @@ -196,14 +214,13 @@ pub(crate) mod tests { let manager = StoreManager::new(FsOptions::Local, vec![]).unwrap(); let base_fs = manager.base_fs(); - let record_batch = get_test_record_batch::( - DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()), - TokioExecutor::new(), - ) - .await; + let option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); + let record_batch = + get_test_record_batch::(option.clone(), TokioExecutor::new()).await; let table_path = temp_dir.path().join("projection_query_test.parquet"); let _ = File::create(&table_path).unwrap(); let table_path = Path::from_filesystem_path(table_path).unwrap(); + let table_gen = FileId::new(); let file = base_fs .open_options(&table_path, FileType::Parquet.open_options(false)) @@ -214,52 +231,55 @@ pub(crate) mod tests { let key = Timestamped::new("hello".to_owned(), 1.into()); { - let test_ref_1 = open_sstable::(base_fs, table_path.clone()) - .await - .get( - key.borrow(), - ProjectionMask::roots( - &arrow_to_parquet_schema(Test::arrow_schema()).unwrap(), - [0, 1, 2, 3], - ), - ) - .await - .unwrap() - .unwrap(); + let test_ref_1 = + open_sstable::(base_fs, table_path.clone(), table_gen, &option) + .await + .get( + key.borrow(), + ProjectionMask::roots( + &arrow_to_parquet_schema(Test::arrow_schema()).unwrap(), + [0, 1, 2, 3], + ), + ) + .await + .unwrap() + .unwrap(); assert_eq!(test_ref_1.get().unwrap().vstring, "hello"); assert_eq!(test_ref_1.get().unwrap().vu32, Some(12)); assert_eq!(test_ref_1.get().unwrap().vbool, None); } { - let test_ref_2 = open_sstable::(base_fs, table_path.clone()) - .await - .get( - key.borrow(), - ProjectionMask::roots( - &arrow_to_parquet_schema(Test::arrow_schema()).unwrap(), - [0, 1, 2, 4], - ), - ) - .await - .unwrap() - .unwrap(); + let test_ref_2 = + open_sstable::(base_fs, table_path.clone(), table_gen, &option) + .await + .get( + key.borrow(), + ProjectionMask::roots( + &arrow_to_parquet_schema(Test::arrow_schema()).unwrap(), + [0, 1, 2, 4], + ), + ) + .await + .unwrap() + .unwrap(); assert_eq!(test_ref_2.get().unwrap().vstring, "hello"); assert_eq!(test_ref_2.get().unwrap().vu32, None); assert_eq!(test_ref_2.get().unwrap().vbool, Some(true)); } { - let test_ref_3 = open_sstable::(base_fs, table_path.clone()) - .await - .get( - key.borrow(), - ProjectionMask::roots( - &arrow_to_parquet_schema(Test::arrow_schema()).unwrap(), - [0, 1, 2], - ), - ) - .await - .unwrap() - .unwrap(); + let test_ref_3 = + open_sstable::(base_fs, table_path.clone(), table_gen, &option) + .await + .get( + key.borrow(), + ProjectionMask::roots( + &arrow_to_parquet_schema(Test::arrow_schema()).unwrap(), + [0, 1, 2], + ), + ) + .await + .unwrap() + .unwrap(); assert_eq!(test_ref_3.get().unwrap().vstring, "hello"); assert_eq!(test_ref_3.get().unwrap().vu32, None); assert_eq!(test_ref_3.get().unwrap().vbool, None); @@ -272,14 +292,13 @@ pub(crate) mod tests { let manager = StoreManager::new(FsOptions::Local, vec![]).unwrap(); let base_fs = manager.base_fs(); - let record_batch = get_test_record_batch::( - DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()), - TokioExecutor::new(), - ) - .await; + let option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); + let record_batch = + get_test_record_batch::(option.clone(), TokioExecutor::new()).await; let table_path = temp_dir.path().join("projection_scan_test.parquet"); let _ = File::create(&table_path).unwrap(); let table_path = Path::from_filesystem_path(table_path).unwrap(); + let table_gen = FileId::new(); let file = base_fs .open_options(&table_path, FileType::Parquet.open_options(false)) @@ -288,19 +307,20 @@ pub(crate) mod tests { write_record_batch(file, &record_batch).await.unwrap(); { - let mut test_ref_1 = open_sstable::(base_fs, table_path.clone()) - .await - .scan( - (Bound::Unbounded, Bound::Unbounded), - 1_u32.into(), - None, - ProjectionMask::roots( - &arrow_to_parquet_schema(Test::arrow_schema()).unwrap(), - [0, 1, 2, 3], - ), - ) - .await - .unwrap(); + let mut test_ref_1 = + open_sstable::(base_fs, table_path.clone(), table_gen, &option) + .await + .scan( + (Bound::Unbounded, Bound::Unbounded), + 1_u32.into(), + None, + ProjectionMask::roots( + &arrow_to_parquet_schema(Test::arrow_schema()).unwrap(), + [0, 1, 2, 3], + ), + ) + .await + .unwrap(); let entry_0 = test_ref_1.next().await.unwrap().unwrap(); assert_eq!(entry_0.get().unwrap().vstring, "hello"); @@ -313,19 +333,20 @@ pub(crate) mod tests { assert_eq!(entry_1.get().unwrap().vbool, None); } { - let mut test_ref_2 = open_sstable::(base_fs, table_path.clone()) - .await - .scan( - (Bound::Unbounded, Bound::Unbounded), - 1_u32.into(), - None, - ProjectionMask::roots( - &arrow_to_parquet_schema(Test::arrow_schema()).unwrap(), - [0, 1, 2, 4], - ), - ) - .await - .unwrap(); + let mut test_ref_2 = + open_sstable::(base_fs, table_path.clone(), table_gen, &option) + .await + .scan( + (Bound::Unbounded, Bound::Unbounded), + 1_u32.into(), + None, + ProjectionMask::roots( + &arrow_to_parquet_schema(Test::arrow_schema()).unwrap(), + [0, 1, 2, 4], + ), + ) + .await + .unwrap(); let entry_0 = test_ref_2.next().await.unwrap().unwrap(); assert_eq!(entry_0.get().unwrap().vstring, "hello"); @@ -338,19 +359,20 @@ pub(crate) mod tests { assert_eq!(entry_1.get().unwrap().vbool, None); } { - let mut test_ref_3 = open_sstable::(base_fs, table_path.clone()) - .await - .scan( - (Bound::Unbounded, Bound::Unbounded), - 1_u32.into(), - None, - ProjectionMask::roots( - &arrow_to_parquet_schema(Test::arrow_schema()).unwrap(), - [0, 1, 2], - ), - ) - .await - .unwrap(); + let mut test_ref_3 = + open_sstable::(base_fs, table_path.clone(), table_gen, &option) + .await + .scan( + (Bound::Unbounded, Bound::Unbounded), + 1_u32.into(), + None, + ProjectionMask::roots( + &arrow_to_parquet_schema(Test::arrow_schema()).unwrap(), + [0, 1, 2], + ), + ) + .await + .unwrap(); let entry_0 = test_ref_3.next().await.unwrap().unwrap(); assert_eq!(entry_0.get().unwrap().vstring, "hello"); diff --git a/src/option.rs b/src/option.rs index 4efbcb85..3f90a57d 100644 --- a/src/option.rs +++ b/src/option.rs @@ -11,6 +11,7 @@ use parquet::{ format::SortingColumn, schema::types::ColumnPath, }; +use tonbo_ext_reader::CacheReader; use crate::{ fs::{FileId, FileType}, @@ -281,8 +282,12 @@ where self.level_paths[level].as_ref().map(|(path, _)| path) } - pub(crate) fn is_threshold_exceeded_major(&self, version: &Version, level: usize) -> bool { - Version::::tables_len(version, level) + pub(crate) fn is_threshold_exceeded_major( + &self, + version: &Version, + level: usize, + ) -> bool { + Version::::tables_len(version, level) >= (self.major_threshold_with_sst_size * self.level_sst_magnification.pow(level as u32)) } } diff --git a/src/stream/level.rs b/src/stream/level.rs index ebcdb688..b41326ec 100644 --- a/src/stream/level.rs +++ b/src/stream/level.rs @@ -13,7 +13,8 @@ use fusio::{ }; use futures_core::Stream; use parquet::{arrow::ProjectionMask, errors::ParquetError}; -use tonbo_ext_reader::{MetaCache, RangeCache}; +use pin_project_lite::pin_project; +use tonbo_ext_reader::CacheReader; use crate::{ fs::{FileId, FileType}, @@ -26,46 +27,57 @@ use crate::{ DbOption, }; -enum FutureStatus<'level, R> +enum FutureStatus<'level, R, C> where R: Record, + C: CacheReader, { Init(FileId), - Ready(SsTableScan<'level, R>), + Ready(SsTableScan<'level, R, C>), OpenFile(Pin, Error>> + 'level>>), - OpenSst(Pin, Error>> + Send + 'level>>), + OpenSst(Pin, Error>> + Send + 'level>>), LoadStream( - Pin, ParquetError>> + Send + 'level>>, + Pin< + Box< + dyn Future, ParquetError>> + + Send + + 'level, + >, + >, ), } -pub(crate) struct LevelStream<'level, R> -where - R: Record, -{ - lower: Bound<&'level R::Key>, - upper: Bound<&'level R::Key>, - ts: Timestamp, - level: usize, - option: Arc>, - meta_cache: MetaCache, - range_cache: RangeCache, - gens: VecDeque, - limit: Option, - projection_mask: ProjectionMask, - status: FutureStatus<'level, R>, - fs: Arc, - path: Option<(Path, FileId)>, +pin_project! { + pub(crate) struct LevelStream<'level, R, C> + where + R: Record, + C: CacheReader, + { + lower: Bound<&'level R::Key>, + upper: Bound<&'level R::Key>, + ts: Timestamp, + level: usize, + option: Arc>, + meta_cache: C::MetaCache, + range_cache: C::RangeCache, + gens: VecDeque, + limit: Option, + projection_mask: ProjectionMask, + status: FutureStatus<'level, R, C>, + fs: Arc, + path: Option<(Path, FileId)>, + } } -impl<'level, R> LevelStream<'level, R> +impl<'level, R, C> LevelStream<'level, R, C> where R: Record, + C: CacheReader, { // Kould: only used by Compaction now, and the start and end of the sstables range are known #[allow(clippy::too_many_arguments)] pub(crate) fn new( - version: &Version, + version: &Version, level: usize, start: usize, end: usize, @@ -101,21 +113,24 @@ where } } -impl<'level, R> Stream for LevelStream<'level, R> +impl<'level, R, C> Stream for LevelStream<'level, R, C> where R: Record, + C: CacheReader + 'static, { type Item = Result, ParquetError>; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + loop { - return match &mut self.status { + return match &mut this.status { FutureStatus::Init(gen) => { let gen = *gen; - self.path = Some((self.option.table_path(&gen, self.level), gen)); + *this.path = Some((this.option.table_path(&gen, *this.level), gen)); - let reader = self.fs.open_options( - &self.path.as_ref().unwrap().0, + let reader = this.fs.open_options( + &this.path.as_ref().unwrap().0, FileType::Parquet.open_options(true), ); #[allow(clippy::missing_transmute_annotations)] @@ -130,17 +145,17 @@ where >, >(reader) }; - self.status = FutureStatus::OpenFile(reader); + *this.status = FutureStatus::OpenFile(reader); continue; } FutureStatus::Ready(stream) => match Pin::new(stream).poll_next(cx) { - Poll::Ready(None) => match self.gens.pop_front() { + Poll::Ready(None) => match this.gens.pop_front() { None => Poll::Ready(None), Some(gen) => { - self.path = Some((self.option.table_path(&gen, self.level), gen)); + *this.path = Some((this.option.table_path(&gen, *this.level), gen)); - let reader = self.fs.open_options( - &self.path.as_ref().unwrap().0, + let reader = this.fs.open_options( + &this.path.as_ref().unwrap().0, FileType::Parquet.open_options(true), ); #[allow(clippy::missing_transmute_annotations)] @@ -156,12 +171,12 @@ where >, >(reader) }; - self.status = FutureStatus::OpenFile(reader); + *this.status = FutureStatus::OpenFile(reader); continue; } }, Poll::Ready(Some(result)) => { - if let Some(limit) = &mut self.limit { + if let Some(limit) = &mut this.limit { *limit -= 1; } Poll::Ready(Some(result)) @@ -170,12 +185,12 @@ where }, FutureStatus::OpenFile(file_future) => match Pin::new(file_future).poll(cx) { Poll::Ready(Ok(file)) => { - let meta_cache = self.meta_cache.clone(); - let range_cache = self.range_cache.clone(); - let (_, gen) = self.path.clone().unwrap(); + let meta_cache = this.meta_cache.clone(); + let range_cache = this.range_cache.clone(); + let (_, gen) = this.path.clone().unwrap(); let future = async move { SsTable::open(file, gen, range_cache, meta_cache).await }; - self.status = FutureStatus::OpenSst(Box::pin(future)); + *this.status = FutureStatus::OpenSst(Box::pin(future)); continue; } Poll::Ready(Err(err)) => { @@ -185,11 +200,11 @@ where }, FutureStatus::OpenSst(sst_future) => match Pin::new(sst_future).poll(cx) { Poll::Ready(Ok(sst)) => { - self.status = FutureStatus::LoadStream(Box::pin(sst.scan( - (self.lower, self.upper), - self.ts, - self.limit, - self.projection_mask.clone(), + *this.status = FutureStatus::LoadStream(Box::pin(sst.scan( + (*this.lower, *this.upper), + *this.ts, + *this.limit, + this.projection_mask.clone(), ))); continue; } @@ -200,7 +215,7 @@ where }, FutureStatus::LoadStream(stream_future) => match Pin::new(stream_future).poll(cx) { Poll::Ready(Ok(scan)) => { - self.status = FutureStatus::Ready(scan); + *this.status = FutureStatus::Ready(scan); continue; } Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))), diff --git a/src/stream/mem_projection.rs b/src/stream/mem_projection.rs index 6671e285..92062940 100644 --- a/src/stream/mem_projection.rs +++ b/src/stream/mem_projection.rs @@ -7,6 +7,7 @@ use std::{ use futures_core::Stream; use parquet::{arrow::ProjectionMask, errors::ParquetError}; use pin_project_lite::pin_project; +use tonbo_ext_reader::CacheReader; use crate::{ record::Record, @@ -14,20 +15,25 @@ use crate::{ }; pin_project! { - pub struct MemProjectionStream<'projection, R> + pub struct MemProjectionStream<'projection, R, C> where R: Record, + C: CacheReader { - stream: Box>, + stream: Box>, projection_mask: Arc, } } -impl<'projection, R> MemProjectionStream<'projection, R> +impl<'projection, R, C> MemProjectionStream<'projection, R, C> where R: Record, + C: CacheReader, { - pub(crate) fn new(stream: ScanStream<'projection, R>, projection_mask: ProjectionMask) -> Self { + pub(crate) fn new( + stream: ScanStream<'projection, R, C>, + projection_mask: ProjectionMask, + ) -> Self { Self { stream: Box::new(stream), projection_mask: Arc::new(projection_mask), @@ -35,9 +41,10 @@ where } } -impl<'projection, R> Stream for MemProjectionStream<'projection, R> +impl<'projection, R, C> Stream for MemProjectionStream<'projection, R, C> where R: Record, + C: CacheReader + 'static, { type Item = Result, ParquetError>; @@ -61,6 +68,7 @@ mod tests { use fusio::{disk::TokioFs, path::Path, DynFs}; use futures_util::StreamExt; use parquet::arrow::{arrow_to_parquet_schema, ProjectionMask}; + use tonbo_ext_reader::foyer_reader::FoyerReader; use crate::{ inmem::mutable::Mutable, record::Record, stream::mem_projection::MemProjectionStream, @@ -121,7 +129,7 @@ mod tests { vec![0, 1, 2, 4], ); - let mut stream = MemProjectionStream::::new( + let mut stream = MemProjectionStream::::new( mutable .scan((Bound::Unbounded, Bound::Unbounded), 6.into()) .into(), diff --git a/src/stream/merge.rs b/src/stream/merge.rs index f58583c6..7d144a6f 100644 --- a/src/stream/merge.rs +++ b/src/stream/merge.rs @@ -8,16 +8,18 @@ use std::{ use futures_core::{ready, Stream}; use futures_util::stream::StreamExt; use pin_project_lite::pin_project; +use tonbo_ext_reader::CacheReader; use super::{Entry, ScanStream}; use crate::{record::Record, timestamp::Timestamp}; pin_project! { - pub struct MergeStream<'merge, R> + pub struct MergeStream<'merge, R, C> where R: Record, + C: CacheReader, { - streams: Vec>, + streams: Vec>, peeked: BinaryHeap>, buf: Option>, ts: Timestamp, @@ -25,12 +27,13 @@ pin_project! { } } -impl<'merge, R> MergeStream<'merge, R> +impl<'merge, R, C> MergeStream<'merge, R, C> where R: Record, + C: CacheReader + 'static, { pub(crate) async fn from_vec( - mut streams: Vec>, + mut streams: Vec>, ts: Timestamp, ) -> Result { let mut peeked = BinaryHeap::with_capacity(streams.len()); @@ -62,9 +65,10 @@ where } } -impl<'merge, R> Stream for MergeStream<'merge, R> +impl<'merge, R, C> Stream for MergeStream<'merge, R, C> where R: Record, + C: CacheReader + 'static, { type Item = Result, parquet::errors::ParquetError>; @@ -160,6 +164,7 @@ mod tests { use fusio::{disk::TokioFs, path::Path, DynFs}; use futures_util::StreamExt; + use tonbo_ext_reader::foyer_reader::FoyerReader; use super::MergeStream; use crate::{ @@ -212,7 +217,7 @@ mod tests { let lower = "a".to_string(); let upper = "e".to_string(); let bound = (Bound::Included(&lower), Bound::Included(&upper)); - let mut merge = MergeStream::::from_vec( + let mut merge = MergeStream::::from_vec( vec![ m1.scan(bound, 6.into()).into(), m2.scan(bound, 6.into()).into(), @@ -291,10 +296,12 @@ mod tests { let lower = "1".to_string(); let upper = "4".to_string(); let bound = (Bound::Included(&lower), Bound::Included(&upper)); - let mut merge = - MergeStream::::from_vec(vec![m1.scan(bound, 0.into()).into()], 0.into()) - .await - .unwrap(); + let mut merge = MergeStream::::from_vec( + vec![m1.scan(bound, 0.into()).into()], + 0.into(), + ) + .await + .unwrap(); if let Some(Ok(Entry::Mutable(entry))) = merge.next().await { assert_eq!(entry.key().value, "1"); @@ -319,10 +326,12 @@ mod tests { let lower = "1".to_string(); let upper = "4".to_string(); let bound = (Bound::Included(&lower), Bound::Included(&upper)); - let mut merge = - MergeStream::::from_vec(vec![m1.scan(bound, 1.into()).into()], 1.into()) - .await - .unwrap(); + let mut merge = MergeStream::::from_vec( + vec![m1.scan(bound, 1.into()).into()], + 1.into(), + ) + .await + .unwrap(); if let Some(Ok(Entry::Mutable(entry))) = merge.next().await { assert_eq!(entry.key().value, "1"); @@ -371,7 +380,7 @@ mod tests { let lower = "1".to_string(); let upper = "3".to_string(); { - let mut merge = MergeStream::::from_vec( + let mut merge = MergeStream::::from_vec( vec![m1 .scan((Bound::Included(&lower), Bound::Included(&upper)), 0.into()) .into()], @@ -391,7 +400,7 @@ mod tests { assert!(merge.next().await.is_none()); } { - let mut merge = MergeStream::::from_vec( + let mut merge = MergeStream::::from_vec( vec![m1 .scan((Bound::Included(&lower), Bound::Included(&upper)), 0.into()) .into()], diff --git a/src/stream/mod.rs b/src/stream/mod.rs index fa0b5afe..c42db2ae 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -17,6 +17,7 @@ use futures_util::{ready, stream}; use parquet::arrow::ProjectionMask; use pin_project_lite::pin_project; use record_batch::RecordBatchEntry; +use tonbo_ext_reader::CacheReader; use crate::{ inmem::{immutable::ImmutableScan, mutable::MutableScan}, @@ -100,9 +101,10 @@ where pin_project! { #[project = ScanStreamProject] - pub enum ScanStream<'scan, R> + pub enum ScanStream<'scan, R, C> where R: Record, + C: CacheReader, { Transaction { #[pin] @@ -118,22 +120,23 @@ pin_project! { }, SsTable { #[pin] - inner: SsTableScan<'scan, R>, + inner: SsTableScan<'scan, R, C>, }, Level { #[pin] - inner: LevelStream<'scan, R>, + inner: LevelStream<'scan, R, C>, }, MemProjection { #[pin] - inner: MemProjectionStream<'scan, R>, + inner: MemProjectionStream<'scan, R, C>, } } } -impl<'scan, R> From> for ScanStream<'scan, R> +impl<'scan, R, C> From> for ScanStream<'scan, R, C> where R: Record, + C: CacheReader, { fn from(inner: TransactionScan<'scan, R>) -> Self { ScanStream::Transaction { @@ -142,9 +145,10 @@ where } } -impl<'scan, R> From> for ScanStream<'scan, R> +impl<'scan, R, C> From> for ScanStream<'scan, R, C> where R: Record, + C: CacheReader, { fn from(inner: MutableScan<'scan, R>) -> Self { ScanStream::Mutable { @@ -153,9 +157,10 @@ where } } -impl<'scan, R> From> for ScanStream<'scan, R> +impl<'scan, R, C> From> for ScanStream<'scan, R, C> where R: Record, + C: CacheReader, { fn from(inner: ImmutableScan<'scan, R>) -> Self { ScanStream::Immutable { @@ -164,27 +169,30 @@ where } } -impl<'scan, R> From> for ScanStream<'scan, R> +impl<'scan, R, C> From> for ScanStream<'scan, R, C> where R: Record, + C: CacheReader, { - fn from(inner: SsTableScan<'scan, R>) -> Self { + fn from(inner: SsTableScan<'scan, R, C>) -> Self { ScanStream::SsTable { inner } } } -impl<'scan, R> From> for ScanStream<'scan, R> +impl<'scan, R, C> From> for ScanStream<'scan, R, C> where R: Record, + C: CacheReader, { - fn from(inner: MemProjectionStream<'scan, R>) -> Self { + fn from(inner: MemProjectionStream<'scan, R, C>) -> Self { ScanStream::MemProjection { inner } } } -impl fmt::Debug for ScanStream<'_, R> +impl fmt::Debug for ScanStream<'_, R, C> where R: Record, + C: CacheReader, { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match self { @@ -198,9 +206,10 @@ where } } -impl<'scan, R> Stream for ScanStream<'scan, R> +impl<'scan, R, C> Stream for ScanStream<'scan, R, C> where R: Record, + C: CacheReader + 'static, { type Item = Result, parquet::errors::ParquetError>; diff --git a/src/stream/package.rs b/src/stream/package.rs index c8493e8f..e1a4f20d 100644 --- a/src/stream/package.rs +++ b/src/stream/package.rs @@ -5,6 +5,7 @@ use std::{ use futures_core::Stream; use pin_project_lite::pin_project; +use tonbo_ext_reader::CacheReader; use crate::{ inmem::immutable::{ArrowArrays, Builder}, @@ -13,25 +14,27 @@ use crate::{ }; pin_project! { - pub struct PackageStream<'package, R> + pub struct PackageStream<'package, R, C> where R: Record, + C: CacheReader, { row_count: usize, batch_size: usize, - inner: MergeStream<'package, R>, + inner: MergeStream<'package, R, C>, builder: ::Builder, projection_indices: Option>, } } -impl<'package, R> PackageStream<'package, R> +impl<'package, R, C> PackageStream<'package, R, C> where R: Record, + C: CacheReader, { pub(crate) fn new( batch_size: usize, - merge: MergeStream<'package, R>, + merge: MergeStream<'package, R, C>, projection_indices: Option>, instance: &RecordInstance, ) -> Self { @@ -45,9 +48,10 @@ where } } -impl<'package, R> Stream for PackageStream<'package, R> +impl<'package, R, C> Stream for PackageStream<'package, R, C> where R: Record, + C: CacheReader + 'static, { type Item = Result; @@ -85,6 +89,7 @@ mod tests { use fusio::{disk::TokioFs, path::Path, DynFs}; use futures_util::StreamExt; use tempfile::TempDir; + use tonbo_ext_reader::foyer_reader::FoyerReader; use crate::{ inmem::{ @@ -177,7 +182,7 @@ mod tests { .await .unwrap(); - let merge = MergeStream::::from_vec( + let merge = MergeStream::::from_vec( vec![m1 .scan((Bound::Unbounded, Bound::Unbounded), 6.into()) .into()], diff --git a/src/transaction.rs b/src/transaction.rs index 3a9d38c1..8e85b340 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -13,6 +13,7 @@ use flume::SendError; use lockable::AsyncLimit; use parquet::{arrow::ProjectionMask, errors::ParquetError}; use thiserror::Error; +use tonbo_ext_reader::CacheReader; use crate::{ compaction::CompactTask, @@ -45,24 +46,26 @@ where } /// optimistic ACID transaction, open with /// [`DB::transaction`](crate::DB::transaction) method -pub struct Transaction<'txn, R> +pub struct Transaction<'txn, R, C> where R: Record, + C: CacheReader, { ts: Timestamp, local: BTreeMap>, share: RwLockReadGuard<'txn, Schema>, - version: VersionRef, + version: VersionRef, lock_map: LockMap, manager: Arc, } -impl<'txn, R> Transaction<'txn, R> +impl<'txn, R, C> Transaction<'txn, R, C> where R: Record + Send, + C: CacheReader + 'static, { pub(crate) fn new( - version: VersionRef, + version: VersionRef, share: RwLockReadGuard<'txn, Schema>, lock_map: LockMap, manager: Arc, @@ -104,7 +107,7 @@ where pub fn scan<'scan>( &'scan self, range: (Bound<&'scan R::Key>, Bound<&'scan R::Key>), - ) -> Scan<'scan, R> { + ) -> Scan<'scan, R, C> { Scan::new( &self.share, &self.manager, @@ -257,6 +260,7 @@ mod tests { use fusio_dispatch::FsOptions; use futures_util::StreamExt; use tempfile::TempDir; + use tonbo_ext_reader::foyer_reader::FoyerReader; use crate::{ compaction::tests::build_version, @@ -276,7 +280,7 @@ mod tests { async fn transaction_read_write() { let temp_dir = TempDir::new().unwrap(); - let db = DB::::new( + let db = DB::::new( DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()), TokioExecutor::new(), ) @@ -404,7 +408,7 @@ mod tests { let temp_dir = TempDir::new().unwrap(); let option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); - let db = DB::::new(option, TokioExecutor::new()) + let db = DB::::new(option, TokioExecutor::new()) .await .unwrap(); @@ -437,7 +441,7 @@ mod tests { let temp_dir = TempDir::new().unwrap(); let option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); - let db = DB::::new(option, TokioExecutor::new()) + let db = DB::::new(option, TokioExecutor::new()) .await .unwrap(); @@ -807,7 +811,7 @@ mod tests { "age".to_string(), 0, ); - let db = DB::with_schema(option, TokioExecutor::default(), descs, 0) + let db = DB::<_, _, FoyerReader>::with_schema(option, TokioExecutor::default(), descs, 0) .await .unwrap(); diff --git a/src/version/mod.rs b/src/version/mod.rs index b7bd9c68..7ee5eeab 100644 --- a/src/version/mod.rs +++ b/src/version/mod.rs @@ -3,6 +3,7 @@ pub(crate) mod edit; pub(crate) mod set; use std::{ + fmt::{Debug, Formatter}, ops::Bound, sync::{ atomic::{AtomicU32, Ordering}, @@ -14,7 +15,7 @@ use flume::{SendError, Sender}; use fusio::DynFs; use parquet::arrow::ProjectionMask; use thiserror::Error; -use tonbo_ext_reader::{CacheError, MetaCache, RangeCache}; +use tonbo_ext_reader::{CacheError, CacheReader}; use tracing::error; use crate::{ @@ -31,7 +32,7 @@ use crate::{ pub(crate) const MAX_LEVEL: usize = 7; -pub(crate) type VersionRef = Arc>; +pub(crate) type VersionRef = Arc>; pub(crate) trait TransactionTs { fn load_ts(&self) -> Timestamp; @@ -39,10 +40,10 @@ pub(crate) trait TransactionTs { fn increase_ts(&self) -> Timestamp; } -#[derive(Debug)] -pub(crate) struct Version +pub(crate) struct Version where R: Record, + C: CacheReader, { ts: Timestamp, pub(crate) level_slice: [Vec>; MAX_LEVEL], @@ -51,21 +52,41 @@ where timestamp: Arc, log_length: u32, - range_cache: RangeCache, - meta_cache: MetaCache, + range_cache: C::RangeCache, + meta_cache: C::MetaCache, } -impl Version +impl Debug for Version where R: Record, + C: CacheReader, +{ + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Version") + .field("ts", &self.ts) + .field("level_slice", &self.level_slice) + .field("clean_sender", &format_args!("{:?}", self.clean_sender)) + .field("option", &format_args!("{:?}", self.option)) + .field("timestamp", &format_args!("{:?}", self.timestamp)) + .field("log_length", &format_args!("{}", self.log_length)) + .field("range_cache", &format_args!("{:?}", self.range_cache)) + .field("meta_cache", &format_args!("{:?}", self.meta_cache)) + .finish() + } +} + +impl Version +where + R: Record, + C: CacheReader, { #[cfg(test)] pub(crate) fn new( option: Arc>, clean_sender: Sender, timestamp: Arc, - range_cache: RangeCache, - meta_cache: MetaCache, + range_cache: C::RangeCache, + meta_cache: C::MetaCache, ) -> Self { Version { ts: Timestamp::from(0), @@ -83,17 +104,18 @@ where &self.option } - pub(crate) fn meta_cache(&self) -> MetaCache { + pub(crate) fn meta_cache(&self) -> C::MetaCache { self.meta_cache.clone() } - pub(crate) fn range_cache(&self) -> RangeCache { + pub(crate) fn range_cache(&self) -> C::RangeCache { self.range_cache.clone() } } -impl TransactionTs for Version +impl TransactionTs for Version where R: Record, + C: CacheReader, { fn load_ts(&self) -> Timestamp { self.timestamp.load(Ordering::Acquire).into() @@ -104,9 +126,10 @@ where } } -impl Clone for Version +impl Clone for Version where R: Record, + C: CacheReader, { fn clone(&self) -> Self { let mut level_slice = [const { Vec::new() }; MAX_LEVEL]; @@ -128,9 +151,10 @@ where } } -impl Version +impl Version where R: Record, + C: CacheReader + 'static, { pub(crate) async fn query( &self, @@ -198,7 +222,7 @@ where .open_options(&path, FileType::Parquet.open_options(true)) .await .map_err(VersionError::Fusio)?; - SsTable::::open(file, *gen, self.range_cache(), self.meta_cache()) + SsTable::::open(file, *gen, self.range_cache(), self.meta_cache()) .await? .get(key, projection_mask) .await @@ -218,7 +242,7 @@ where pub(crate) async fn streams<'streams>( &self, manager: &StoreManager, - streams: &mut Vec>, + streams: &mut Vec>, range: (Bound<&'streams R::Key>, Bound<&'streams R::Key>), ts: Timestamp, limit: Option, @@ -308,9 +332,10 @@ where } } -impl Drop for Version +impl Drop for Version where R: Record, + C: CacheReader, { fn drop(&mut self) { if let Err(err) = self.clean_sender.send(CleanTag::Clean { ts: self.ts }) { diff --git a/src/version/set.rs b/src/version/set.rs index 8793b264..8a500b7f 100644 --- a/src/version/set.rs +++ b/src/version/set.rs @@ -12,7 +12,7 @@ use async_lock::RwLock; use flume::Sender; use fusio::{dynamic::DynFile, fs::FileMeta, path::path_to_local}; use futures_util::StreamExt; -use tonbo_ext_reader::{foyer_reader::build_cache, MetaCache, RangeCache}; +use tonbo_ext_reader::CacheReader; use super::{TransactionTs, MAX_LEVEL}; use crate::{ @@ -46,31 +46,34 @@ impl Ord for CmpMeta { } } -pub(crate) struct VersionSetInner +pub(crate) struct VersionSetInner where R: Record, + C: CacheReader, { - current: VersionRef, + current: VersionRef, log_with_id: (Box, FileId), } -pub(crate) struct VersionSet +pub(crate) struct VersionSet where R: Record, + C: CacheReader, { - inner: Arc>>, + inner: Arc>>, clean_sender: Sender, timestamp: Arc, option: Arc>, manager: Arc, - range_cache: RangeCache, - meta_cache: MetaCache, + range_cache: C::RangeCache, + meta_cache: C::MetaCache, } -impl Clone for VersionSet +impl Clone for VersionSet where R: Record, + C: CacheReader, { fn clone(&self) -> Self { VersionSet { @@ -85,9 +88,10 @@ where } } -impl TransactionTs for VersionSet +impl TransactionTs for VersionSet where R: Record, + C: CacheReader, { fn load_ts(&self) -> Timestamp { self.timestamp.load(Ordering::Acquire).into() @@ -98,9 +102,10 @@ where } } -impl VersionSet +impl VersionSet where R: Record, + C: CacheReader + 'static, { pub(crate) async fn new( clean_sender: Sender, @@ -155,7 +160,7 @@ where let timestamp = Arc::new(AtomicU32::default()); drop(log_stream); - let (meta_cache, range_cache) = build_cache( + let (meta_cache, range_cache) = C::build_caches( path_to_local(&option.cache_path).unwrap(), option.cache_meta_capacity, option.cache_meta_shards, @@ -165,9 +170,9 @@ where ) .await?; - let set = VersionSet:: { + let set = VersionSet:: { inner: Arc::new(RwLock::new(VersionSetInner { - current: Arc::new(Version:: { + current: Arc::new(Version:: { ts: Timestamp::from(0), level_slice: [const { Vec::new() }; MAX_LEVEL], clean_sender: clean_sender.clone(), @@ -191,7 +196,7 @@ where Ok(set) } - pub(crate) async fn current(&self) -> VersionRef { + pub(crate) async fn current(&self) -> VersionRef { self.inner.read().await.current.clone() } @@ -315,7 +320,7 @@ pub(crate) mod tests { use fusio_dispatch::FsOptions; use futures_util::StreamExt; use tempfile::TempDir; - use tonbo_ext_reader::foyer_reader::build_cache; + use tonbo_ext_reader::{foyer_reader::FoyerReader, CacheReader}; use crate::{ fs::{manager::StoreManager, FileId, FileType}, @@ -330,14 +335,15 @@ pub(crate) mod tests { DbOption, }; - pub(crate) async fn build_version_set( - version: Version, + pub(crate) async fn build_version_set( + version: Version, clean_sender: Sender, option: Arc>, manager: Arc, - ) -> Result, VersionError> + ) -> Result, VersionError> where R: Record, + C: CacheReader, { let log_id = FileId::new(); let log = manager @@ -348,7 +354,7 @@ pub(crate) mod tests { ) .await?; let timestamp = version.timestamp.clone(); - let (meta_cache, range_cache) = build_cache( + let (meta_cache, range_cache) = C::build_caches( path_to_local(&option.cache_path).unwrap(), option.cache_meta_capacity, option.cache_meta_shards, @@ -358,7 +364,7 @@ pub(crate) mod tests { ) .await?; - Ok(VersionSet:: { + Ok(VersionSet:: { inner: Arc::new(RwLock::new(VersionSetInner { current: Arc::new(version), log_with_id: (log, log_id), @@ -386,7 +392,7 @@ pub(crate) mod tests { .await .unwrap(); - let version_set: VersionSet = + let version_set: VersionSet = VersionSet::new(sender.clone(), option.clone(), manager.clone()) .await .unwrap(); @@ -402,7 +408,7 @@ pub(crate) mod tests { drop(version_set); - let version_set: VersionSet = + let version_set: VersionSet = VersionSet::new(sender.clone(), option.clone(), manager) .await .unwrap(); @@ -424,7 +430,7 @@ pub(crate) mod tests { .await .unwrap(); - let version_set: VersionSet = + let version_set: VersionSet = VersionSet::new(sender.clone(), option.clone(), manager.clone()) .await .unwrap(); @@ -552,7 +558,7 @@ pub(crate) mod tests { .await .unwrap(); - let version_set: VersionSet = + let version_set: VersionSet = VersionSet::new(sender.clone(), option.clone(), manager) .await .unwrap(); diff --git a/tests/data_integrity.rs b/tests/data_integrity.rs index 239d0d83..0cd2b39f 100644 --- a/tests/data_integrity.rs +++ b/tests/data_integrity.rs @@ -6,6 +6,7 @@ mod tests { use futures_util::StreamExt; use tempfile::TempDir; use tonbo::{executor::tokio::TokioExecutor, DbOption, Record, DB}; + use tonbo_ext_reader::foyer_reader::FoyerReader; const WRITE_TIMES: usize = 500_000; const STRING_SIZE: usize = 50; @@ -72,7 +73,8 @@ mod tests { let temp_dir = TempDir::new().unwrap(); let option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); - let db: DB = DB::new(option, TokioExecutor::new()).await.unwrap(); + let db: DB = + DB::new(option, TokioExecutor::new()).await.unwrap(); for _ in 0..WRITE_TIMES { let customer = gen_record(&mut rng, &mut primary_key_count); diff --git a/tonbo_ext_reader/Cargo.toml b/tonbo_ext_reader/Cargo.toml index d75f344a..1e792d35 100644 --- a/tonbo_ext_reader/Cargo.toml +++ b/tonbo_ext_reader/Cargo.toml @@ -11,6 +11,7 @@ bytes = { version = "1.7", features = ["serde"] } foyer = { version = "0.12" } futures-core = "0.3" futures-util = "0.3" +fusio-parquet = { git = "https://github.com/tonbo-io/fusio.git", rev = "d2a5e7c0d17d173b1f67af25b7ff133e93244160", package = "fusio-parquet", version = "0.2.0" } parquet = { version = "53", features = ["async"] } thiserror = "1" ulid = { version = "1", features = ["serde"] } @@ -27,6 +28,5 @@ fusio = { git = "https://github.com/tonbo-io/fusio.git", rev = "d2a5e7c0d17d173b fusio-dispatch = { git = "https://github.com/tonbo-io/fusio.git", rev = "d2a5e7c0d17d173b1f67af25b7ff133e93244160", package = "fusio-dispatch", version = "0.2.0", features = [ "tokio", ] } -fusio-parquet = { git = "https://github.com/tonbo-io/fusio.git", rev = "d2a5e7c0d17d173b1f67af25b7ff133e93244160", package = "fusio-parquet", version = "0.2.0" } tempfile = "3" tokio = { version = "1", features = ["full"] } \ No newline at end of file diff --git a/tonbo_ext_reader/src/foyer_reader.rs b/tonbo_ext_reader/src/foyer_reader.rs index c8b76de2..4e79f544 100644 --- a/tonbo_ext_reader/src/foyer_reader.rs +++ b/tonbo_ext_reader/src/foyer_reader.rs @@ -1,28 +1,59 @@ use std::{ops::Range, sync::Arc}; use bytes::Bytes; -use foyer::{CacheBuilder, DirectFsDeviceOptions, Engine, HybridCacheBuilder, LruConfig}; +use foyer::{ + Cache, CacheBuilder, DirectFsDeviceOptions, Engine, HybridCache, HybridCacheBuilder, LruConfig, +}; +use fusio_parquet::reader::AsyncReader; use futures_core::future::BoxFuture; use futures_util::FutureExt; use parquet::{arrow::async_reader::AsyncFileReader, file::metadata::ParquetMetaData}; use ulid::Ulid; -use crate::{CacheError, MetaCache, RangeCache}; +use crate::{CacheError, CacheReader, MetaCache, RangeCache}; -pub struct CacheReader { +#[derive(Debug, Clone)] +pub struct FoyerMetaCache(Cache>); +#[derive(Debug, Clone)] +pub struct FoyerRangeCache(HybridCache<(Ulid, Range), Bytes>); + +pub struct FoyerReader { gen: Ulid, - inner: R, - range_cache: RangeCache, - meta_cache: MetaCache, + inner: AsyncReader, + range_cache: FoyerRangeCache, + meta_cache: FoyerMetaCache, } -impl CacheReader { - pub fn new( - meta_cache: MetaCache, - range_cache: RangeCache, +impl MetaCache for FoyerMetaCache { + fn get(&self, gen: &Ulid) -> Option> { + self.0.get(gen).map(|entry| entry.value().clone()) + } + + fn insert(&self, gen: Ulid, data: Arc) -> Arc { + self.0.insert(gen, data).value().clone() + } +} + +impl RangeCache for FoyerRangeCache { + async fn get(&self, key: &(Ulid, Range)) -> Result, CacheError> { + Ok(self.0.get(key).await?.map(|entry| entry.value().clone())) + } + + fn insert(&self, key: (Ulid, Range), bytes: Bytes) -> Bytes { + self.0.insert(key, bytes).value().clone() + } +} + +impl CacheReader for FoyerReader { + type MetaCache = FoyerMetaCache; + type RangeCache = FoyerRangeCache; + + fn new( + meta_cache: Self::MetaCache, + range_cache: Self::RangeCache, gen: Ulid, - inner: R, - ) -> CacheReader { + inner: AsyncReader, + ) -> Self { Self { gen, inner, @@ -30,58 +61,57 @@ impl CacheReader { meta_cache, } } -} -pub async fn build_cache( - cache_path: impl AsRef, - cache_meta_capacity: usize, - cache_meta_shards: usize, - cache_meta_ratio: f64, - cache_range_memory: usize, - cache_range_disk: usize, -) -> Result<(MetaCache, RangeCache), CacheError> { - let meta_cache = Arc::new( - CacheBuilder::new(cache_meta_capacity) + async fn build_caches( + cache_path: impl AsRef + Send, + cache_meta_capacity: usize, + cache_meta_shards: usize, + cache_meta_ratio: f64, + cache_range_memory: usize, + cache_range_disk: usize, + ) -> Result<(Self::MetaCache, Self::RangeCache), CacheError> { + let meta_cache = CacheBuilder::new(cache_meta_capacity) .with_shards(cache_meta_shards) .with_eviction_config(LruConfig { high_priority_pool_ratio: cache_meta_ratio, }) - .build(), - ); - let range_cache = HybridCacheBuilder::new() - .memory(cache_range_memory) - .storage(Engine::Large) - .with_device_options(DirectFsDeviceOptions::new(cache_path).with_capacity(cache_range_disk)) - .build() - .await - .map_err(CacheError::from)?; - Ok((meta_cache, range_cache)) + .build(); + let range_cache = HybridCacheBuilder::new() + .memory(cache_range_memory) + .storage(Engine::Large) + .with_device_options( + DirectFsDeviceOptions::new(cache_path).with_capacity(cache_range_disk), + ) + .build() + .await + .map_err(CacheError::from)?; + Ok((FoyerMetaCache(meta_cache), FoyerRangeCache(range_cache))) + } } -impl AsyncFileReader for CacheReader { +impl AsyncFileReader for FoyerReader { fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, parquet::errors::Result> { async move { let key = (self.gen, range); - if let Some(entry) = self + if let Some(bytes) = self .range_cache .get(&key) .await .map_err(|e| parquet::errors::ParquetError::External(From::from(e)))? { - return Ok(entry.value().clone()); + return Ok(bytes); } let bytes = self.inner.get_bytes(key.1.clone()).await?; - let entry = self.range_cache.insert(key, bytes); - Ok(entry.value().clone()) + Ok(self.range_cache.insert(key, bytes)) } .boxed() } fn get_metadata(&mut self) -> BoxFuture<'_, parquet::errors::Result>> { async move { - if let Some(entry) = self.meta_cache.get(&self.gen) { - return Ok(entry.value().clone()); + if let Some(meta) = self.meta_cache.get(&self.gen) { + return Ok(meta); } let meta = self @@ -89,9 +119,8 @@ impl AsyncFileReader for CacheReader { .get_metadata() .await .map_err(|e| parquet::errors::ParquetError::External(From::from(e)))?; - let entry = self.meta_cache.insert(self.gen, meta); - Ok(entry.value().clone()) + Ok(self.meta_cache.insert(self.gen, meta)) } .boxed() } @@ -121,7 +150,7 @@ pub(crate) mod tests { use tempfile::TempDir; use ulid::Ulid; - use crate::foyer_reader::{build_cache, CacheReader}; + use crate::{foyer_reader::FoyerReader, CacheReader}; struct CountFile { inner: Box, @@ -208,7 +237,7 @@ pub(crate) mod tests { writer.close().await.unwrap(); let read_count = Arc::new(AtomicUsize::new(0)); - let (meta_cache, range_cache) = build_cache( + let (meta_cache, range_cache) = FoyerReader::build_caches( temp_dir.path().join("cache"), 32, 4, @@ -227,7 +256,7 @@ pub(crate) mod tests { .unwrap(); let content_len = file.size().await.unwrap(); - let mut reader = CacheReader::new( + let mut reader = FoyerReader::new( meta_cache.clone(), range_cache.clone(), gen, diff --git a/tonbo_ext_reader/src/lib.rs b/tonbo_ext_reader/src/lib.rs index 5c64128d..e9cbab10 100644 --- a/tonbo_ext_reader/src/lib.rs +++ b/tonbo_ext_reader/src/lib.rs @@ -1,16 +1,48 @@ -use std::{io, ops::Range, sync::Arc}; +use std::{fmt::Debug, io, ops::Range, sync::Arc}; use bytes::Bytes; -use foyer::{Cache, HybridCache}; -use parquet::file::metadata::ParquetMetaData; +use fusio_parquet::reader::AsyncReader; +use parquet::{arrow::async_reader::AsyncFileReader, file::metadata::ParquetMetaData}; use thiserror::Error; use ulid::Ulid; pub mod foyer_reader; -// Tips: Conditional compilation adapts to different implementations -pub type MetaCache = Arc>>; -pub type RangeCache = HybridCache<(Ulid, Range), Bytes>; +pub trait MetaCache: Sync + Send + Clone + Debug { + fn get(&self, gen: &Ulid) -> Option>; + + fn insert(&self, gen: Ulid, data: Arc) -> Arc; +} + +pub trait RangeCache: Sync + Send + Clone + Debug { + fn get( + &self, + key: &(Ulid, Range), + ) -> impl std::future::Future, CacheError>> + Send; + + fn insert(&self, key: (Ulid, Range), bytes: Bytes) -> Bytes; +} + +pub trait CacheReader: AsyncFileReader + Unpin { + type MetaCache: MetaCache; + type RangeCache: RangeCache; + + fn new( + meta_cache: Self::MetaCache, + range_cache: Self::RangeCache, + gen: Ulid, + inner: AsyncReader, + ) -> Self; + + fn build_caches( + cache_path: impl AsRef + Send, + cache_meta_capacity: usize, + cache_meta_shards: usize, + cache_meta_ratio: f64, + cache_range_memory: usize, + cache_range_disk: usize, + ) -> impl std::future::Future> + Send; +} #[derive(Debug, Error)] pub enum CacheError {