diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f70840ce..ac7bdba2 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -93,7 +93,6 @@ jobs: contents: write pull-requests: write repository-projects: write - if: github.event_name == 'pull_request' steps: - uses: actions/checkout@v4 - name: Install latest nightly diff --git a/Cargo.toml b/Cargo.toml index fda0a5b1..5e4520f9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,4 +1,4 @@ -workspace = { members = ["tonbo_macros"] } +workspace = { members = [ "tonbo_ext_reader","tonbo_macros"] } [package] description = "An embedded persistent KV database in Rust." @@ -11,13 +11,14 @@ resolver = "2" version = "0.2.0" [package.metadata] -msrv = "1.79.0" +msrv = "1.82.0" [features] -bench = ["redb", "rocksdb", "sled"] +bench = ["redb", "rocksdb", "sled", "foyer"] bytes = ["dep:bytes"] datafusion = ["dep:async-trait", "dep:datafusion"] default = ["bytes", "tokio"] +foyer = ["tonbo_ext_reader/foyer"] load_tbl = [] redb = ["dep:redb"] rocksdb = ["dep:rocksdb"] @@ -58,7 +59,7 @@ crc32fast = "1" crossbeam-skiplist = "0.1" datafusion = { version = "42", optional = true } flume = { version = "0.11", features = ["async"] } -fusio = { package = "fusio", version = "0.3.1", features = [ +fusio = { package = "fusio", version = "0.3.3", features = [ "aws", "dyn", "fs", @@ -66,11 +67,11 @@ fusio = { package = "fusio", version = "0.3.1", features = [ "tokio", "tokio-http", ] } -fusio-dispatch = { package = "fusio-dispatch", version = "0.2.1", features = [ +fusio-dispatch = { package = "fusio-dispatch", version = "0.2.2", features = [ "aws", "tokio", ] } -fusio-parquet = { package = "fusio-parquet", version = "0.2.1" } +fusio-parquet = { package = "fusio-parquet", version = "0.2.2" } futures-core = "0.3" futures-io = "0.3" futures-util = "0.3" @@ -83,6 +84,7 @@ thiserror = "1" tokio = { version = "1", features = ["io-util"], default-features = false } tokio-util = { version = "0.7" } tonbo_macros = { version = "0.2.0", path = "tonbo_macros" } +tonbo_ext_reader = { version = "0.1.0", path = "tonbo_ext_reader" } tracing = "0.1" ulid = "1" diff --git a/benches/common.rs b/benches/common.rs index 3abb7c5b..ab613078 100644 --- a/benches/common.rs +++ b/benches/common.rs @@ -7,6 +7,7 @@ use std::{ }; use async_stream::stream; +use fusio_dispatch::FsOptions; use futures_core::Stream; use futures_util::StreamExt; use parquet::data_type::AsBytes; @@ -16,6 +17,7 @@ use tokio::fs::create_dir_all; use tonbo::{ executor::tokio::TokioExecutor, stream, transaction::TransactionEntry, DbOption, Projection, }; +use tonbo_ext_reader::{foyer_reader::FoyerReader, lru_reader::LruReader, CacheReader}; use tonbo_macros::Record; const RNG_SEED: u64 = 3; @@ -184,24 +186,164 @@ pub trait BenchReader { ) -> impl Stream + 'a; } +pub struct TonboS3BenchDataBase { + db: tonbo::DB, +} + +impl TonboS3BenchDataBase { + #[allow(dead_code)] + pub fn new(db: tonbo::DB) -> Self { + TonboS3BenchDataBase { db } + } +} + +impl BenchDatabase for TonboS3BenchDataBase { + type W<'db> + = TonboBenchWriteTransaction<'db, FoyerReader> + where + Self: 'db; + type R<'db> + = TonboBenchReadTransaction<'db, FoyerReader> + where + Self: 'db; + + fn db_type_name() -> &'static str { + "tonbo on s3" + } + + async fn write_transaction(&self) -> Self::W<'_> { + TonboBenchWriteTransaction { + txn: self.db.transaction().await, + } + } + + async fn read_transaction(&self) -> Self::R<'_> { + TonboBenchReadTransaction { + txn: self.db.transaction().await, + } + } + + async fn build(path: impl AsRef) -> Self { + create_dir_all(path.as_ref()).await.unwrap(); + + let fs_options = FsOptions::S3 { + bucket: "data".to_string(), + credential: Some(fusio::remotes::aws::credential::AwsCredential { + key_id: "user".to_string(), + secret_key: "password".to_string(), + token: None, + }), + endpoint: Some("http://localhost:9000".to_string()), + sign_payload: None, + checksum: None, + region: None, + }; + + let path = fusio::path::Path::from_filesystem_path(path.as_ref()).unwrap(); + let option = DbOption::from(path.clone()) + .level_path( + 0, + fusio::path::Path::from_url_path("/l0").unwrap(), + fs_options.clone(), + ) + .unwrap() + .level_path( + 1, + fusio::path::Path::from_url_path("/l1").unwrap(), + fs_options.clone(), + ) + .unwrap() + .level_path( + 2, + fusio::path::Path::from_url_path("/l2").unwrap(), + fs_options.clone(), + ) + .unwrap() + .level_path( + 3, + fusio::path::Path::from_url_path("/l3").unwrap(), + fs_options.clone(), + ) + .unwrap() + .level_path( + 4, + fusio::path::Path::from_url_path("/l4").unwrap(), + fs_options.clone(), + ) + .unwrap() + .disable_wal(); + + TonboS3BenchDataBase::new(tonbo::DB::new(option, TokioExecutor::new()).await.unwrap()) + } +} + +pub struct TonboFoyerBenchDataBase { + db: tonbo::DB, +} + +impl TonboFoyerBenchDataBase { + #[allow(dead_code)] + pub fn new(db: tonbo::DB) -> Self { + TonboFoyerBenchDataBase { db } + } +} + +impl BenchDatabase for TonboFoyerBenchDataBase { + type W<'db> + = TonboBenchWriteTransaction<'db, FoyerReader> + where + Self: 'db; + type R<'db> + = TonboBenchReadTransaction<'db, FoyerReader> + where + Self: 'db; + + fn db_type_name() -> &'static str { + "tonbo_foyer" + } + + async fn write_transaction(&self) -> Self::W<'_> { + TonboBenchWriteTransaction { + txn: self.db.transaction().await, + } + } + + async fn read_transaction(&self) -> Self::R<'_> { + TonboBenchReadTransaction { + txn: self.db.transaction().await, + } + } + + async fn build(path: impl AsRef) -> Self { + create_dir_all(path.as_ref()).await.unwrap(); + + let option = + DbOption::from(fusio::path::Path::from_filesystem_path(path.as_ref()).unwrap()) + .disable_wal(); + + let db = tonbo::DB::new(option, TokioExecutor::new()).await.unwrap(); + TonboFoyerBenchDataBase::new(db) + } +} + pub struct TonboBenchDataBase { - db: tonbo::DB, + db: tonbo::DB, } impl TonboBenchDataBase { #[allow(dead_code)] - pub fn new(db: tonbo::DB) -> Self { + pub fn new(db: tonbo::DB) -> Self { TonboBenchDataBase { db } } } impl BenchDatabase for TonboBenchDataBase { type W<'db> - = TonboBenchWriteTransaction<'db> + = TonboBenchWriteTransaction<'db, LruReader> where Self: 'db; type R<'db> - = TonboBenchReadTransaction<'db> + = TonboBenchReadTransaction<'db, LruReader> where Self: 'db; @@ -233,13 +375,13 @@ impl BenchDatabase for TonboBenchDataBase { } } -pub struct TonboBenchReadTransaction<'a> { - txn: tonbo::transaction::Transaction<'a, Customer>, +pub struct TonboBenchReadTransaction<'a, R: CacheReader> { + txn: tonbo::transaction::Transaction<'a, Customer, R>, } -impl<'db> BenchReadTransaction for TonboBenchReadTransaction<'db> { +impl<'db, R: CacheReader + 'static> BenchReadTransaction for TonboBenchReadTransaction<'db, R> { type T<'txn> - = TonboBenchReader<'db, 'txn> + = TonboBenchReader<'db, 'txn, R> where Self: 'txn; @@ -248,11 +390,11 @@ impl<'db> BenchReadTransaction for TonboBenchReadTransaction<'db> { } } -pub struct TonboBenchReader<'db, 'txn> { - txn: &'txn tonbo::transaction::Transaction<'db, Customer>, +pub struct TonboBenchReader<'db, 'txn, R: CacheReader> { + txn: &'txn tonbo::transaction::Transaction<'db, Customer, R>, } -impl BenchReader for TonboBenchReader<'_, '_> { +impl BenchReader for TonboBenchReader<'_, '_, R> { async fn get<'a>(&'a self, key: &'a ItemKey) -> Option { self.txn .get(key, Projection::All) @@ -288,13 +430,13 @@ impl BenchReader for TonboBenchReader<'_, '_> { } } -pub struct TonboBenchWriteTransaction<'a> { - txn: tonbo::transaction::Transaction<'a, Customer>, +pub struct TonboBenchWriteTransaction<'a, R: CacheReader> { + txn: tonbo::transaction::Transaction<'a, Customer, R>, } -impl<'db> BenchWriteTransaction for TonboBenchWriteTransaction<'db> { +impl<'db, R: CacheReader + 'static> BenchWriteTransaction for TonboBenchWriteTransaction<'db, R> { type W<'txn> - = TonboBenchInserter<'db, 'txn> + = TonboBenchInserter<'db, 'txn, R> where Self: 'txn; @@ -308,11 +450,11 @@ impl<'db> BenchWriteTransaction for TonboBenchWriteTransaction<'db> { } } -pub struct TonboBenchInserter<'db, 'txn> { - txn: &'txn mut tonbo::transaction::Transaction<'db, Customer>, +pub struct TonboBenchInserter<'db, 'txn, R: CacheReader> { + txn: &'txn mut tonbo::transaction::Transaction<'db, Customer, R>, } -impl BenchInserter for TonboBenchInserter<'_, '_> { +impl BenchInserter for TonboBenchInserter<'_, '_, R> { fn insert(&mut self, record: Customer) -> Result<(), ()> { self.txn.insert(record); Ok(()) diff --git a/benches/criterion/writes.rs b/benches/criterion/writes.rs index 41f3af21..d46342bb 100644 --- a/benches/criterion/writes.rs +++ b/benches/criterion/writes.rs @@ -3,6 +3,7 @@ use std::{iter::repeat_with, sync::Arc}; use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; use mimalloc::MiMalloc; use tonbo::{executor::tokio::TokioExecutor, DbOption, Record, DB}; +use tonbo_ext_reader::foyer_reader::FoyerReader; #[global_allocator] static GLOBAL: MiMalloc = MiMalloc; @@ -15,7 +16,7 @@ pub struct KV { } #[inline(never)] -async fn tonbo_write(db: &DB, batch_size: usize) { +async fn tonbo_write(db: &DB, batch_size: usize) { let mut kvs = Vec::with_capacity(128); for _ in 0..batch_size { let key = repeat_with(fastrand::alphanumeric).take(256).collect(); diff --git a/benches/read_bench.rs b/benches/read_bench.rs index c849fcb9..8dd68d98 100644 --- a/benches/read_bench.rs +++ b/benches/read_bench.rs @@ -12,7 +12,8 @@ use tokio::{fs, io::AsyncWriteExt}; use crate::common::{ read_tbl, BenchDatabase, BenchReadTransaction, BenchReader, RedbBenchDatabase, - RocksdbBenchDatabase, SledBenchDatabase, TonboBenchDataBase, ITERATIONS, NUM_SCAN, READ_TIMES, + RocksdbBenchDatabase, SledBenchDatabase, TonboBenchDataBase, TonboFoyerBenchDataBase, + TonboS3BenchDataBase, ITERATIONS, NUM_SCAN, READ_TIMES, }; async fn benchmark( @@ -152,10 +153,16 @@ async fn main() { load::(&tbl_path, data_dir.join("tonbo")).await; load::(&tbl_path, data_dir.join("rocksdb")).await; + load::(&tbl_path, data_dir.join("tonbo_foyer")).await; + load::(&tbl_path, data_dir.join("tonbo_s3")).await; } let tonbo_latency_results = { benchmark::(data_dir.join("tonbo")).await }; + let tonbo_foyer_latency_results = + { benchmark::(data_dir.join("tonbo_foyer")).await }; let rocksdb_results = { benchmark::(data_dir.join("rocksdb")).await }; + let tonbo_s3_latency_results = + { benchmark::(data_dir.join("tonbo_s3")).await }; let mut rows: Vec> = Vec::new(); @@ -163,7 +170,12 @@ async fn main() { rows.push(vec![benchmark.to_string()]); } - for results in [tonbo_latency_results, rocksdb_results] { + for results in [ + tonbo_latency_results, + tonbo_foyer_latency_results, + rocksdb_results, + tonbo_s3_latency_results, + ] { for (i, (_benchmark, duration)) in results.iter().enumerate() { rows[i].push(format!("{}ms", duration.as_millis())); } @@ -171,7 +183,7 @@ async fn main() { let mut table = comfy_table::Table::new(); table.set_width(100); - table.set_header(["", "tonbo", "rocksdb"]); + table.set_header(["", "tonbo", "tonbo_foyer", "rocksdb", "tonbo_s3"]); for row in rows { table.add_row(row); } diff --git a/benches/write_bench.rs b/benches/write_bench.rs index f2f43354..2d2a3aff 100644 --- a/benches/write_bench.rs +++ b/benches/write_bench.rs @@ -201,6 +201,10 @@ async fn main() { let tmp_file: TempDir = tempfile::tempdir_in(&tmpdir).unwrap(); benchmark::(tmp_file.path()).await }; + let tonbo_s3_latency_results = { + let tmp_file: TempDir = tempfile::tempdir_in(&tmpdir).unwrap(); + benchmark::(tmp_file.path()).await + }; let _ = fs::remove_dir_all(&tmpdir); @@ -210,7 +214,11 @@ async fn main() { rows.push(vec![benchmark.to_string()]); } - for results in [tonbo_latency_results, rocksdb_results] { + for results in [ + tonbo_latency_results, + rocksdb_results, + tonbo_s3_latency_results, + ] { for (i, (_benchmark, duration)) in results.iter().enumerate() { rows[i].push(format!("{}ms", duration.as_millis())); } @@ -218,7 +226,7 @@ async fn main() { let mut table = comfy_table::Table::new(); table.set_width(100); - table.set_header(["", "tonbo", "rocksdb"]); + table.set_header(["", "tonbo", "rocksdb", "tonbo_s3"]); for row in rows { table.add_row(row); } diff --git a/bindings/python/Cargo.toml b/bindings/python/Cargo.toml index 21b170ca..73659912 100644 --- a/bindings/python/Cargo.toml +++ b/bindings/python/Cargo.toml @@ -9,8 +9,8 @@ crate-type = ["cdylib"] [workspace] [dependencies] -fusio = { package = "fusio", version = "0.3.1", features = ["aws", "tokio"] } -fusio-dispatch = { package = "fusio-dispatch", version = "0.2.0", features = [ +fusio = { package = "fusio", version = "0.3.3", features = ["aws", "tokio"] } +fusio-dispatch = { package = "fusio-dispatch", version = "0.2.2", features = [ "aws", "tokio", ] } @@ -25,3 +25,4 @@ pyo3-asyncio = { package = "pyo3-asyncio-0-21", version = "0.21", features = [ ] } tokio = { version = "1", features = ["rt-multi-thread"] } tonbo = { version = "0.2.0", path = "../../" } +tonbo_ext_reader = { path = "../../tonbo_ext_reader" } diff --git a/bindings/python/src/db.rs b/bindings/python/src/db.rs index 840ff728..e640301b 100644 --- a/bindings/python/src/db.rs +++ b/bindings/python/src/db.rs @@ -1,5 +1,4 @@ use std::sync::Arc; - use pyo3::{ prelude::*, pyclass, pymethods, @@ -12,7 +11,7 @@ use tonbo::{ record::{ColumnDesc, DynRecord}, DB, }; - +use tonbo_ext_reader::lru_reader::LruReader; use crate::{ column::Column, error::{CommitError, DbError}, @@ -28,7 +27,7 @@ type PyExecutor = TokioExecutor; pub struct TonboDB { desc: Arc>, primary_key_index: usize, - db: Arc>, + db: Arc>, } #[pymethods] diff --git a/bindings/python/src/error.rs b/bindings/python/src/error.rs index 8e91373a..bd3b48f7 100644 --- a/bindings/python/src/error.rs +++ b/bindings/python/src/error.rs @@ -50,6 +50,7 @@ impl From for PyErr { tonbo::DbError::Fusio(err) => InnerError::new_err(err.to_string()), tonbo::DbError::Recover(err) => RecoverError::new_err(err.to_string()), tonbo::DbError::WalWrite(err) => PyIOError::new_err(err.to_string()), + tonbo::DbError::Cache(err) => InnerError::new_err(err.to_string()), tonbo::DbError::ExceedsMaxLevel => ExceedsMaxLevelError::new_err("Exceeds max level"), } } diff --git a/bindings/python/src/fs.rs b/bindings/python/src/fs.rs index b9b7f730..ce4dbae8 100644 --- a/bindings/python/src/fs.rs +++ b/bindings/python/src/fs.rs @@ -59,6 +59,7 @@ impl From for fusio_dispatch::FsOptions { } => fusio_dispatch::FsOptions::S3 { bucket, credential: credential.map(fusio::remotes::aws::AwsCredential::from), + endpoint: None, region, sign_payload, checksum, diff --git a/bindings/python/src/transaction.rs b/bindings/python/src/transaction.rs index 70d535d7..15e70a97 100644 --- a/bindings/python/src/transaction.rs +++ b/bindings/python/src/transaction.rs @@ -7,7 +7,7 @@ use pyo3::{ }; use pyo3_asyncio::tokio::future_into_py; use tonbo::{record::DynRecord, transaction, Projection}; - +use tonbo_ext_reader::lru_reader::LruReader; use crate::{ column::Column, error::{repeated_commit_err, CommitError, DbError}, @@ -18,14 +18,14 @@ use crate::{ #[pyclass] pub struct Transaction { - txn: Option>, + txn: Option>, desc: Arc>, primary_key_index: usize, } impl Transaction { pub(crate) fn new<'txn>( - txn: transaction::Transaction<'txn, DynRecord>, + txn: transaction::Transaction<'txn, DynRecord, LruReader>, desc: Arc>, ) -> Self { let primary_key_index = desc @@ -37,8 +37,8 @@ impl Transaction { Transaction { txn: Some(unsafe { transmute::< - transaction::Transaction<'txn, DynRecord>, - transaction::Transaction<'static, DynRecord>, + transaction::Transaction<'txn, DynRecord, LruReader>, + transaction::Transaction<'static, DynRecord, LruReader>, >(txn) }), desc, @@ -84,8 +84,8 @@ impl Transaction { let txn = self.txn.as_ref().unwrap(); let txn = unsafe { transmute::< - &transaction::Transaction<'_, DynRecord>, - &'static transaction::Transaction<'_, DynRecord>, + &transaction::Transaction<'_, DynRecord, LruReader>, + &'static transaction::Transaction<'_, DynRecord, LruReader>, >(txn) }; @@ -169,8 +169,8 @@ impl Transaction { let txn = self.txn.as_ref().unwrap(); let txn = unsafe { transmute::< - &transaction::Transaction<'_, DynRecord>, - &'static transaction::Transaction<'_, DynRecord>, + &transaction::Transaction<'_, DynRecord, LruReader>, + &'static transaction::Transaction<'_, DynRecord, LruReader>, >(txn) }; let col_desc = self.desc.get(self.primary_key_index).unwrap(); diff --git a/examples/datafusion.rs b/examples/datafusion.rs index 2b7dd167..f9e48cad 100644 --- a/examples/datafusion.rs +++ b/examples/datafusion.rs @@ -30,6 +30,7 @@ use tokio::fs; use tonbo::{ executor::tokio::TokioExecutor, inmem::immutable::ArrowArrays, record::Record, DbOption, DB, }; +use tonbo_ext_reader::lru_reader::LruReader; use tonbo_macros::Record; #[derive(Record, Debug)] @@ -41,12 +42,12 @@ pub struct Music { } struct MusicProvider { - db: Arc>, + db: Arc>, } struct MusicExec { cache: PlanProperties, - db: Arc>, + db: Arc>, projection: Option>, limit: Option, range: (Bound<::Key>, Bound<::Key>), @@ -95,7 +96,7 @@ impl TableProvider for MusicProvider { } impl MusicExec { - fn new(db: Arc>, projection: Option<&Vec>) -> Self { + fn new(db: Arc>, projection: Option<&Vec>) -> Self { let schema = Music::arrow_schema(); let schema = if let Some(projection) = &projection { Arc::new(schema.project(projection).unwrap()) diff --git a/examples/declare.rs b/examples/declare.rs index 6e6edcd3..f87aeed9 100644 --- a/examples/declare.rs +++ b/examples/declare.rs @@ -5,6 +5,7 @@ use fusio::path::Path; use futures_util::stream::StreamExt; use tokio::fs; use tonbo::{executor::tokio::TokioExecutor, DbOption, Projection, Record, DB}; +use tonbo_ext_reader::lru_reader::LruReader; /// Use macro to define schema of column family just like ORM /// It provides type-safe read & write API @@ -24,7 +25,9 @@ async fn main() { let options = DbOption::from(Path::from_filesystem_path("./db_path/users").unwrap()); // pluggable async runtime and I/O - let db = DB::new(options, TokioExecutor::default()).await.unwrap(); + let db = DB::<_, _, LruReader>::new(options, TokioExecutor::default()) + .await + .unwrap(); // insert with owned value db.insert(User { diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 14404d0c..704483a8 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,3 +1,3 @@ [toolchain] -channel = "1.80.0" +channel = "1.82.0" components = ["clippy", "rust-analyzer", "rustfmt"] diff --git a/src/compaction/mod.rs b/src/compaction/mod.rs index eac33d95..65bc4404 100644 --- a/src/compaction/mod.rs +++ b/src/compaction/mod.rs @@ -7,6 +7,7 @@ use futures_util::StreamExt; use parquet::arrow::{AsyncArrowWriter, ProjectionMask}; use thiserror::Error; use tokio::sync::oneshot; +use tonbo_ext_reader::{CacheError, CacheReader}; use ulid::Ulid; use crate::{ @@ -32,27 +33,29 @@ pub enum CompactTask { Flush(Option>), } -pub(crate) struct Compactor +pub(crate) struct Compactor where R: Record, + C: CacheReader, { pub(crate) option: Arc>, pub(crate) schema: Arc>>, - pub(crate) version_set: VersionSet, + pub(crate) version_set: VersionSet, pub(crate) manager: Arc, } -impl Compactor +impl Compactor where R: Record, + C: CacheReader + 'static, { pub(crate) fn new( schema: Arc>>, option: Arc>, - version_set: VersionSet, + version_set: VersionSet, manager: Arc, ) -> Self { - Compactor:: { + Compactor:: { option, schema, version_set, @@ -187,7 +190,7 @@ where #[allow(clippy::too_many_arguments)] pub(crate) async fn major_compaction( - version: &Version, + version: &Version, option: &DbOption, mut min: &R::Key, mut max: &R::Key, @@ -212,23 +215,26 @@ where // This Level if level == 0 { for scope in meet_scopes_l.iter() { + let path = option.table_path(&scope.gen, level); let file = level_fs - .open_options( - &option.table_path(&scope.gen, level), - FileType::Parquet.open_options(true), - ) + .open_options(&path, FileType::Parquet.open_options(true)) .await?; streams.push(ScanStream::SsTable { - inner: SsTable::open(file) - .await? - .scan( - (Bound::Unbounded, Bound::Unbounded), - u32::MAX.into(), - None, - ProjectionMask::all(), - ) - .await?, + inner: SsTable::open( + file, + scope.gen, + version.range_cache(), + version.meta_cache(), + ) + .await? + .scan( + (Bound::Unbounded, Bound::Unbounded), + u32::MAX.into(), + None, + ProjectionMask::all(), + ) + .await?, }); } } else { @@ -301,7 +307,7 @@ where } fn next_level_scopes<'a>( - version: &'a Version, + version: &'a Version, min: &mut &'a ::Key, max: &mut &'a ::Key, level: usize, @@ -324,8 +330,8 @@ where .max() .ok_or(CompactionError::EmptyLevel)?; - start_ll = Version::::scope_search(min, &version.level_slice[level + 1]); - end_ll = Version::::scope_search(max, &version.level_slice[level + 1]); + start_ll = Version::::scope_search(min, &version.level_slice[level + 1]); + end_ll = Version::::scope_search(max, &version.level_slice[level + 1]); let next_level_len = version.level_slice[level + 1].len(); for scope in version.level_slice[level + 1] @@ -341,13 +347,13 @@ where } fn this_level_scopes<'a>( - version: &'a Version, + version: &'a Version, min: &::Key, max: &::Key, level: usize, ) -> (Vec<&'a Scope<::Key>>, usize, usize) { let mut meet_scopes_l = Vec::new(); - let mut start_l = Version::::scope_search(min, &version.level_slice[level]); + let mut start_l = Version::::scope_search(min, &version.level_slice[level]); let mut end_l = start_l; let option = version.option(); @@ -382,11 +388,11 @@ where option: &DbOption, version_edits: &mut Vec::Key>>, level: usize, - streams: Vec>, + streams: Vec>, instance: &RecordInstance, fs: &Arc, ) -> Result<(), CompactionError> { - let mut stream = MergeStream::::from_vec(streams, u32::MAX.into()).await?; + let mut stream = MergeStream::::from_vec(streams, u32::MAX.into()).await?; // Kould: is the capacity parameter necessary? let mut builder = R::Columns::builder(&instance.arrow_schema::(), 8192); @@ -496,6 +502,8 @@ where Fusio(#[from] fusio::Error), #[error("compaction version error: {0}")] Version(#[from] VersionError), + #[error("compaction cache error: {0}")] + Cache(#[from] CacheError), #[error("compaction channel is closed")] ChannelClose, #[error("database error: {0}")] @@ -509,11 +517,15 @@ pub(crate) mod tests { use std::sync::{atomic::AtomicU32, Arc}; use flume::bounded; - use fusio::{path::Path, DynFs}; + use fusio::{ + path::{path_to_local, Path}, + DynFs, + }; use fusio_dispatch::FsOptions; use fusio_parquet::writer::AsyncWriter; use parquet::arrow::AsyncArrowWriter; use tempfile::TempDir; + use tonbo_ext_reader::{lru_reader::LruReader, CacheReader}; use crate::{ compaction::Compactor, @@ -672,7 +684,7 @@ pub(crate) mod tests { .await .unwrap(); - let scope = Compactor::::minor_compaction( + let scope = Compactor::::minor_compaction( &option, None, &vec![ @@ -736,7 +748,7 @@ pub(crate) mod tests { .await .unwrap(); - let scope = Compactor::::minor_compaction( + let scope = Compactor::::minor_compaction( &option, None, &vec![ @@ -801,7 +813,7 @@ pub(crate) mod tests { let max = 5.to_string(); let mut version_edits = Vec::new(); - Compactor::::major_compaction( + Compactor::::major_compaction( &version, &option, &min, @@ -845,7 +857,10 @@ pub(crate) mod tests { pub(crate) async fn build_version( option: &Arc>, manager: &StoreManager, - ) -> ((FileId, FileId, FileId, FileId, FileId), Version) { + ) -> ( + (FileId, FileId, FileId, FileId, FileId), + Version, + ) { let level_0_fs = option .level_fs_path(0) .map(|path| manager.get_fs(path)) @@ -1055,8 +1070,25 @@ pub(crate) mod tests { .unwrap(); let (sender, _) = bounded(1); - let mut version = - Version::::new(option.clone(), sender, Arc::new(AtomicU32::default())); + let (meta_cache, range_cache) = LruReader::build_caches( + path_to_local(&option.cache_path).unwrap(), + option.cache_meta_capacity, + option.cache_meta_shards, + option.cache_meta_ratio, + option.cache_range_memory, + option.cache_range_disk, + option.cache_range_capacity, + option.cache_range_shards, + ) + .await + .unwrap(); + let mut version = Version::::new( + option.clone(), + sender, + Arc::new(AtomicU32::default()), + range_cache, + meta_cache, + ); version.level_slice[0].push(Scope { min: 1.to_string(), max: 3.to_string(), @@ -1173,8 +1205,25 @@ pub(crate) mod tests { let option = Arc::new(option); let (sender, _) = bounded(1); - let mut version = - Version::::new(option.clone(), sender, Arc::new(AtomicU32::default())); + let (meta_cache, range_cache) = LruReader::build_caches( + path_to_local(&option.cache_path).unwrap(), + option.cache_meta_capacity, + option.cache_meta_shards, + option.cache_meta_ratio, + option.cache_range_memory, + option.cache_range_disk, + option.cache_range_capacity, + option.cache_range_shards, + ) + .await + .unwrap(); + let mut version = Version::::new( + option.clone(), + sender, + Arc::new(AtomicU32::default()), + range_cache, + meta_cache, + ); version.level_slice[0].push(Scope { min: 0.to_string(), max: 4.to_string(), @@ -1192,7 +1241,7 @@ pub(crate) mod tests { let min = 6.to_string(); let max = 9.to_string(); - Compactor::::major_compaction( + Compactor::::major_compaction( &version, &option, &min, @@ -1221,7 +1270,8 @@ pub(crate) mod tests { option.major_default_oldest_table_num = 1; option.trigger_type = TriggerType::Length(5); - let db: DB = DB::new(option, TokioExecutor::new()).await.unwrap(); + let db: DB = + DB::new(option, TokioExecutor::new()).await.unwrap(); for i in 5..9 { let item = Test { diff --git a/src/inmem/mutable.rs b/src/inmem/mutable.rs index e5a1ea3c..230baffa 100644 --- a/src/inmem/mutable.rs +++ b/src/inmem/mutable.rs @@ -5,7 +5,7 @@ use crossbeam_skiplist::{ map::{Entry, Range}, SkipMap, }; -use fusio::{buffered::BufWriter, dynamic::DynFile, DynFs}; +use fusio::{buffered::BufWriter, DynFs, DynWrite}; use ulid::Ulid; use crate::{ @@ -37,7 +37,7 @@ where R: Record, { pub(crate) data: SkipMap, Option>, - wal: Option, R>>>, + wal: Option, R>>>, pub(crate) trigger: Arc + Send + Sync>>, } @@ -61,7 +61,7 @@ where ) .await?, option.wal_buffer_size, - )) as Box; + )) as Box; wal = Some(Mutex::new(WalFile::new(file, file_id))); }; diff --git a/src/lib.rs b/src/lib.rs index 09ac0fb7..818ad1dd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -38,6 +38,7 @@ //! use tokio::fs; //! use tokio_util::bytes::Bytes; //! use tonbo::{executor::tokio::TokioExecutor, DbOption, Projection, Record, DB}; +//! use tonbo_ext_reader::lru_reader::LruReader; //! //! // use macro to define schema of column family just like ORM //! // it provides type safety read & write API @@ -56,7 +57,8 @@ //! //! let options = DbOption::from(Path::from_filesystem_path("./db_path/users").unwrap()); //! // pluggable async runtime and I/O -//! let db = DB::new(options, TokioExecutor::default()).await.unwrap(); +//! let db: DB = +//! DB::new(options, TokioExecutor::default()).await.unwrap(); //! // insert with owned value //! db.insert(User { //! name: "Alice".into(), @@ -150,6 +152,7 @@ use record::{ColumnDesc, DynRecord, Record, RecordInstance}; use thiserror::Error; use timestamp::{Timestamp, TimestampedRef}; use tokio::sync::oneshot; +use tonbo_ext_reader::{CacheError, CacheReader}; pub use tonbo_macros::{KeyAttributes, Record}; use tracing::error; use transaction::{CommitError, Transaction, TransactionEntry}; @@ -170,21 +173,23 @@ use crate::{ wal::{log::LogType, RecoverError, WalFile}, }; -pub struct DB +pub struct DB where R: Record, E: Executor, + C: CacheReader, { schema: Arc>>, - version_set: VersionSet, + version_set: VersionSet, lock_map: LockMap, manager: Arc, _p: PhantomData, } -impl DB +impl DB where E: Executor + Send + Sync + 'static, + C: CacheReader + 'static, { /// Open [`DB`] with schema which determined by [`ColumnDesc`]. pub async fn with_schema( @@ -202,11 +207,12 @@ where } } -impl DB +impl DB where R: Record + Send + Sync, R::Columns: Send + Sync, E: Executor + Send + Sync + 'static, + C: CacheReader + 'static, { /// Open [`DB`] with a [`DbOption`]. This will create a new directory at the /// path specified in [`DbOption`] (if it does not exist before) and run it @@ -246,7 +252,7 @@ where let schema = Arc::new(RwLock::new( Schema::new(option.clone(), task_tx, &version_set, instance, &manager).await?, )); - let mut compactor = Compactor::::new( + let mut compactor = Compactor::::new( schema.clone(), option.clone(), version_set.clone(), @@ -286,7 +292,7 @@ where } /// open an optimistic ACID transaction - pub async fn transaction(&self) -> Transaction<'_, R> { + pub async fn transaction(&self) -> Transaction<'_, R, C> { Transaction::new( self.version_set.current().await, self.schema.read().await, @@ -446,10 +452,10 @@ impl Schema where R: Record + Send, { - async fn new( + async fn new( option: Arc>, compaction_tx: Sender, - version_set: &VersionSet, + version_set: &VersionSet, record_instance: RecordInstance, manager: &StoreManager, ) -> Result> { @@ -555,9 +561,9 @@ where self.mutable.append(None, key, ts, value).await } - async fn get<'get>( + async fn get<'get, C: CacheReader + 'static>( &'get self, - version: &'get Version, + version: &'get Version, manager: &StoreManager, key: &'get R::Key, ts: Timestamp, @@ -612,9 +618,10 @@ where } /// scan configuration intermediate structure -pub struct Scan<'scan, R> +pub struct Scan<'scan, R, C> where R: Record, + C: CacheReader, { schema: &'scan Schema, manager: &'scan StoreManager, @@ -622,27 +629,28 @@ where upper: Bound<&'scan R::Key>, ts: Timestamp, - version: &'scan Version, + version: &'scan Version, fn_pre_stream: - Box) -> Option> + Send + 'scan>, + Box) -> Option> + Send + 'scan>, limit: Option, projection_indices: Option>, projection: ProjectionMask, } -impl<'scan, R> Scan<'scan, R> +impl<'scan, R, C> Scan<'scan, R, C> where R: Record + Send, + C: CacheReader + 'static, { fn new( schema: &'scan Schema, manager: &'scan StoreManager, (lower, upper): (Bound<&'scan R::Key>, Bound<&'scan R::Key>), ts: Timestamp, - version: &'scan Version, + version: &'scan Version, fn_pre_stream: Box< - dyn FnOnce(Option) -> Option> + Send + 'scan, + dyn FnOnce(Option) -> Option> + Send + 'scan, >, ) -> Self { Self { @@ -811,6 +819,8 @@ where UlidDecode(#[from] ulid::DecodeError), #[error("write fusio error: {0}")] Fusio(#[from] fusio::Error), + #[error("write cache error: {0}")] + Cache(#[from] CacheError), // #[error("write encode error: {0}")] // Encode(<::Ref as Encode>::Error), #[error("write recover error: {0}")] @@ -848,6 +858,7 @@ pub(crate) mod tests { use once_cell::sync::Lazy; use parquet::{arrow::ProjectionMask, format::SortingColumn, schema::types::ColumnPath}; use tempfile::TempDir; + use tonbo_ext_reader::{lru_reader::LruReader, CacheReader}; use tracing::error; use crate::{ @@ -1086,7 +1097,7 @@ pub(crate) mod tests { option: DbOption, executor: E, ) -> RecordBatch { - let db: DB = DB::new(option.clone(), executor).await.unwrap(); + let db: DB = DB::new(option.clone(), executor).await.unwrap(); let base_fs = db.manager.base_fs(); db.write( @@ -1231,18 +1242,19 @@ pub(crate) mod tests { )) } - pub(crate) async fn build_db( + pub(crate) async fn build_db( option: Arc>, compaction_rx: Receiver, executor: E, schema: crate::Schema, - version: Version, + version: Version, manager: Arc, - ) -> Result, DbError> + ) -> Result, DbError> where R: Record + Send + Sync, R::Columns: Send + Sync, E: Executor + Send + Sync + 'static, + C: CacheReader + 'static, { { let base_fs = manager.base_fs(); @@ -1256,7 +1268,7 @@ pub(crate) mod tests { let (mut cleaner, clean_sender) = Cleaner::::new(option.clone(), manager.clone()); let version_set = build_version_set(version, clean_sender, option.clone(), manager.clone()).await?; - let mut compactor = Compactor::::new( + let mut compactor = Compactor::::new( schema.clone(), option.clone(), version_set.clone(), @@ -1522,7 +1534,8 @@ pub(crate) mod tests { option.major_default_oldest_table_num = 1; option.trigger_type = TriggerType::Length(/* max_mutable_len */ 5); - let db: DB = DB::new(option, TokioExecutor::new()).await.unwrap(); + let db: DB = + DB::new(option, TokioExecutor::new()).await.unwrap(); for (i, item) in test_items().into_iter().enumerate() { db.write(item, 0.into()).await.unwrap(); @@ -1558,7 +1571,8 @@ pub(crate) mod tests { option.major_default_oldest_table_num = 1; option.trigger_type = TriggerType::Length(/* max_mutable_len */ 50); - let db: DB = DB::new(option, TokioExecutor::new()).await.unwrap(); + let db: DB = + DB::new(option, TokioExecutor::new()).await.unwrap(); for item in &test_items()[0..10] { db.write(item.clone(), 0.into()).await.unwrap(); @@ -1607,9 +1621,10 @@ pub(crate) mod tests { schema.flush_wal().await.unwrap(); drop(schema); - let db: DB = DB::new(option.as_ref().to_owned(), TokioExecutor::new()) - .await - .unwrap(); + let db: DB = + DB::new(option.as_ref().to_owned(), TokioExecutor::new()) + .await + .unwrap(); let mut sort_items = BTreeMap::new(); for item in test_items() { @@ -1679,7 +1694,7 @@ pub(crate) mod tests { "id".to_owned(), primary_key_index, ); - let db: DB = + let db: DB = DB::with_schema(option, TokioExecutor::new(), desc, primary_key_index) .await .unwrap(); @@ -1719,7 +1734,8 @@ pub(crate) mod tests { option.major_threshold_with_sst_size = 3; option.major_default_oldest_table_num = 1; option.trigger_type = TriggerType::Length(5); - let db: DB = DB::new(option, TokioExecutor::new()).await.unwrap(); + let db: DB = + DB::new(option, TokioExecutor::new()).await.unwrap(); for (idx, item) in test_items().into_iter().enumerate() { if idx % 2 == 0 { @@ -1761,7 +1777,7 @@ pub(crate) mod tests { option.major_default_oldest_table_num = 1; option.trigger_type = TriggerType::Length(5); - let db: DB = + let db: DB = DB::with_schema(option, TokioExecutor::new(), cols_desc, primary_key_index) .await .unwrap(); @@ -1991,7 +2007,7 @@ pub(crate) mod tests { option3.major_default_oldest_table_num = 1; option3.trigger_type = TriggerType::Length(5); - let db1: DB = DB::with_schema( + let db1: DB = DB::with_schema( option, TokioExecutor::new(), cols_desc.clone(), @@ -1999,7 +2015,7 @@ pub(crate) mod tests { ) .await .unwrap(); - let db2: DB = DB::with_schema( + let db2: DB = DB::with_schema( option2, TokioExecutor::new(), cols_desc.clone(), @@ -2007,7 +2023,7 @@ pub(crate) mod tests { ) .await .unwrap(); - let db3: DB = + let db3: DB = DB::with_schema(option3, TokioExecutor::new(), cols_desc, primary_key_index) .await .unwrap(); diff --git a/src/ondisk/scan.rs b/src/ondisk/scan.rs index 5a8ce66d..077f70a1 100644 --- a/src/ondisk/scan.rs +++ b/src/ondisk/scan.rs @@ -6,10 +6,10 @@ use std::{ }; use arrow::datatypes::Schema; -use fusio_parquet::reader::AsyncReader; use futures_core::{ready, Stream}; use parquet::arrow::{async_reader::ParquetRecordBatchStream, ProjectionMask}; use pin_project_lite::pin_project; +use tonbo_ext_reader::CacheReader; use crate::{ record::Record, @@ -18,9 +18,9 @@ use crate::{ pin_project! { #[derive(Debug)] - pub struct SsTableScan<'scan, R>{ + pub struct SsTableScan<'scan, R, C>{ #[pin] - stream: ParquetRecordBatchStream, + stream: ParquetRecordBatchStream, iter: Option>, projection_mask: ProjectionMask, full_schema: Arc, @@ -28,9 +28,9 @@ pin_project! { } } -impl SsTableScan<'_, R> { - pub fn new( - stream: ParquetRecordBatchStream, +impl SsTableScan<'_, R, C> { + pub(crate) fn new( + stream: ParquetRecordBatchStream, projection_mask: ProjectionMask, full_schema: Arc, ) -> Self { @@ -44,9 +44,10 @@ impl SsTableScan<'_, R> { } } -impl<'scan, R> Stream for SsTableScan<'scan, R> +impl<'scan, R, C> Stream for SsTableScan<'scan, R, C> where R: Record, + C: CacheReader + 'static, { type Item = Result, parquet::errors::ParquetError>; diff --git a/src/ondisk/sstable.rs b/src/ondisk/sstable.rs index 2304d6d3..8d0f5cdc 100644 --- a/src/ondisk/sstable.rs +++ b/src/ondisk/sstable.rs @@ -7,31 +7,46 @@ use parquet::arrow::{ arrow_reader::{ArrowReaderBuilder, ArrowReaderOptions}, ParquetRecordBatchStreamBuilder, ProjectionMask, }; +use tonbo_ext_reader::CacheReader; use super::{arrows::get_range_filter, scan::SsTableScan}; use crate::{ + fs::FileId, record::Record, stream::record_batch::RecordBatchEntry, timestamp::{Timestamp, TimestampedRef}, }; -pub(crate) struct SsTable +pub(crate) struct SsTable where R: Record, + C: CacheReader, { - reader: AsyncReader, + reader: C, _marker: PhantomData, } -impl SsTable +impl SsTable where R: Record, + C: CacheReader + 'static, { - pub(crate) async fn open(file: Box) -> Result { + pub(crate) async fn open( + file: Box, + gen: FileId, + range_cache: C::RangeCache, + meta_cache: C::MetaCache, + ) -> Result { let size = file.size().await?; + let reader = C::new( + meta_cache, + range_cache, + gen, + AsyncReader::new(file, size).await?, + ); Ok(SsTable { - reader: AsyncReader::new(file, size).await?, + reader, _marker: PhantomData, }) } @@ -40,9 +55,8 @@ where self, limit: Option, projection_mask: ProjectionMask, - ) -> parquet::errors::Result< - ArrowReaderBuilder>, - > { + ) -> parquet::errors::Result>> + { let mut builder = ParquetRecordBatchStreamBuilder::new_with_options( self.reader, ArrowReaderOptions::default().with_page_index(true), @@ -77,7 +91,7 @@ where ts: Timestamp, limit: Option, projection_mask: ProjectionMask, - ) -> Result, parquet::errors::ParquetError> { + ) -> Result, parquet::errors::ParquetError> { let builder = self .into_parquet_builder(limit, projection_mask.clone()) .await?; @@ -99,12 +113,16 @@ where #[cfg(test)] pub(crate) mod tests { - use std::{borrow::Borrow, fs::File, ops::Bound, sync::Arc}; + use std::{borrow::Borrow, fs::File, marker::PhantomData, ops::Bound, sync::Arc}; use arrow::array::RecordBatch; - use fusio::{dynamic::DynFile, path::Path, DynFs}; + use fusio::{ + dynamic::DynFile, + path::{path_to_local, Path}, + DynFs, + }; use fusio_dispatch::FsOptions; - use fusio_parquet::writer::AsyncWriter; + use fusio_parquet::{reader::AsyncReader, writer::AsyncWriter}; use futures_util::StreamExt; use parquet::{ arrow::{ @@ -114,11 +132,12 @@ pub(crate) mod tests { basic::{Compression, ZstdLevel}, file::properties::WriterProperties, }; + use tonbo_ext_reader::{lru_reader::LruReader, CacheReader}; use super::SsTable; use crate::{ executor::tokio::TokioExecutor, - fs::{manager::StoreManager, FileType}, + fs::{manager::StoreManager, FileId, FileType}, record::Record, tests::{get_test_record_batch, Test}, timestamp::Timestamped, @@ -152,33 +171,58 @@ pub(crate) mod tests { Ok(()) } - pub(crate) async fn open_sstable(store: &Arc, path: &Path) -> SsTable + pub(crate) async fn open_sstable( + store: &Arc, + path: Path, + gen: FileId, + option: &DbOption, + ) -> SsTable where R: Record, + C: CacheReader, { - SsTable::open( - store - .open_options(path, FileType::Parquet.open_options(true)) - .await - .unwrap(), + let file = store + .open_options(&path, FileType::Parquet.open_options(true)) + .await + .unwrap(); + let size = file.size().await.unwrap(); + let (meta_cache, range_cache) = C::build_caches( + path_to_local(&option.cache_path).unwrap(), + option.cache_meta_capacity, + option.cache_meta_shards, + option.cache_meta_ratio, + option.cache_range_memory, + option.cache_range_disk, + option.cache_range_capacity, + option.cache_range_shards, ) .await - .unwrap() + .unwrap(); + + SsTable { + reader: C::new( + meta_cache, + range_cache, + gen, + AsyncReader::new(file, size).await.unwrap(), + ), + _marker: PhantomData, + } } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn projection_query() { let temp_dir = tempfile::tempdir().unwrap(); + let manager = StoreManager::new(FsOptions::Local, vec![]).unwrap(); let base_fs = manager.base_fs(); - let record_batch = get_test_record_batch::( - DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()), - TokioExecutor::new(), - ) - .await; + let option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); + let record_batch = + get_test_record_batch::(option.clone(), TokioExecutor::new()).await; let table_path = temp_dir.path().join("projection_query_test.parquet"); let _ = File::create(&table_path).unwrap(); let table_path = Path::from_filesystem_path(table_path).unwrap(); + let table_gen = FileId::new(); let file = base_fs .open_options(&table_path, FileType::Parquet.open_options(false)) @@ -189,52 +233,55 @@ pub(crate) mod tests { let key = Timestamped::new("hello".to_owned(), 1.into()); { - let test_ref_1 = open_sstable::(base_fs, &table_path) - .await - .get( - key.borrow(), - ProjectionMask::roots( - &arrow_to_parquet_schema(Test::arrow_schema()).unwrap(), - [0, 1, 2, 3], - ), - ) - .await - .unwrap() - .unwrap(); + let test_ref_1 = + open_sstable::(base_fs, table_path.clone(), table_gen, &option) + .await + .get( + key.borrow(), + ProjectionMask::roots( + &arrow_to_parquet_schema(Test::arrow_schema()).unwrap(), + [0, 1, 2, 3], + ), + ) + .await + .unwrap() + .unwrap(); assert_eq!(test_ref_1.get().unwrap().vstring, "hello"); assert_eq!(test_ref_1.get().unwrap().vu32, Some(12)); assert_eq!(test_ref_1.get().unwrap().vbool, None); } { - let test_ref_2 = open_sstable::(base_fs, &table_path) - .await - .get( - key.borrow(), - ProjectionMask::roots( - &arrow_to_parquet_schema(Test::arrow_schema()).unwrap(), - [0, 1, 2, 4], - ), - ) - .await - .unwrap() - .unwrap(); + let test_ref_2 = + open_sstable::(base_fs, table_path.clone(), table_gen, &option) + .await + .get( + key.borrow(), + ProjectionMask::roots( + &arrow_to_parquet_schema(Test::arrow_schema()).unwrap(), + [0, 1, 2, 4], + ), + ) + .await + .unwrap() + .unwrap(); assert_eq!(test_ref_2.get().unwrap().vstring, "hello"); assert_eq!(test_ref_2.get().unwrap().vu32, None); assert_eq!(test_ref_2.get().unwrap().vbool, Some(true)); } { - let test_ref_3 = open_sstable::(base_fs, &table_path) - .await - .get( - key.borrow(), - ProjectionMask::roots( - &arrow_to_parquet_schema(Test::arrow_schema()).unwrap(), - [0, 1, 2], - ), - ) - .await - .unwrap() - .unwrap(); + let test_ref_3 = + open_sstable::(base_fs, table_path.clone(), table_gen, &option) + .await + .get( + key.borrow(), + ProjectionMask::roots( + &arrow_to_parquet_schema(Test::arrow_schema()).unwrap(), + [0, 1, 2], + ), + ) + .await + .unwrap() + .unwrap(); assert_eq!(test_ref_3.get().unwrap().vstring, "hello"); assert_eq!(test_ref_3.get().unwrap().vu32, None); assert_eq!(test_ref_3.get().unwrap().vbool, None); @@ -246,14 +293,14 @@ pub(crate) mod tests { let temp_dir = tempfile::tempdir().unwrap(); let manager = StoreManager::new(FsOptions::Local, vec![]).unwrap(); let base_fs = manager.base_fs(); - let record_batch = get_test_record_batch::( - DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()), - TokioExecutor::new(), - ) - .await; + + let option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); + let record_batch = + get_test_record_batch::(option.clone(), TokioExecutor::new()).await; let table_path = temp_dir.path().join("projection_scan_test.parquet"); let _ = File::create(&table_path).unwrap(); let table_path = Path::from_filesystem_path(table_path).unwrap(); + let table_gen = FileId::new(); let file = base_fs .open_options(&table_path, FileType::Parquet.open_options(false)) @@ -262,19 +309,20 @@ pub(crate) mod tests { write_record_batch(file, &record_batch).await.unwrap(); { - let mut test_ref_1 = open_sstable::(base_fs, &table_path) - .await - .scan( - (Bound::Unbounded, Bound::Unbounded), - 1_u32.into(), - None, - ProjectionMask::roots( - &arrow_to_parquet_schema(Test::arrow_schema()).unwrap(), - [0, 1, 2, 3], - ), - ) - .await - .unwrap(); + let mut test_ref_1 = + open_sstable::(base_fs, table_path.clone(), table_gen, &option) + .await + .scan( + (Bound::Unbounded, Bound::Unbounded), + 1_u32.into(), + None, + ProjectionMask::roots( + &arrow_to_parquet_schema(Test::arrow_schema()).unwrap(), + [0, 1, 2, 3], + ), + ) + .await + .unwrap(); let entry_0 = test_ref_1.next().await.unwrap().unwrap(); assert_eq!(entry_0.get().unwrap().vstring, "hello"); @@ -287,19 +335,20 @@ pub(crate) mod tests { assert_eq!(entry_1.get().unwrap().vbool, None); } { - let mut test_ref_2 = open_sstable::(base_fs, &table_path) - .await - .scan( - (Bound::Unbounded, Bound::Unbounded), - 1_u32.into(), - None, - ProjectionMask::roots( - &arrow_to_parquet_schema(Test::arrow_schema()).unwrap(), - [0, 1, 2, 4], - ), - ) - .await - .unwrap(); + let mut test_ref_2 = + open_sstable::(base_fs, table_path.clone(), table_gen, &option) + .await + .scan( + (Bound::Unbounded, Bound::Unbounded), + 1_u32.into(), + None, + ProjectionMask::roots( + &arrow_to_parquet_schema(Test::arrow_schema()).unwrap(), + [0, 1, 2, 4], + ), + ) + .await + .unwrap(); let entry_0 = test_ref_2.next().await.unwrap().unwrap(); assert_eq!(entry_0.get().unwrap().vstring, "hello"); @@ -312,19 +361,20 @@ pub(crate) mod tests { assert_eq!(entry_1.get().unwrap().vbool, None); } { - let mut test_ref_3 = open_sstable::(base_fs, &table_path) - .await - .scan( - (Bound::Unbounded, Bound::Unbounded), - 1_u32.into(), - None, - ProjectionMask::roots( - &arrow_to_parquet_schema(Test::arrow_schema()).unwrap(), - [0, 1, 2], - ), - ) - .await - .unwrap(); + let mut test_ref_3 = + open_sstable::(base_fs, table_path.clone(), table_gen, &option) + .await + .scan( + (Bound::Unbounded, Bound::Unbounded), + 1_u32.into(), + None, + ProjectionMask::roots( + &arrow_to_parquet_schema(Test::arrow_schema()).unwrap(), + [0, 1, 2], + ), + ) + .await + .unwrap(); let entry_0 = test_ref_3.next().await.unwrap().unwrap(); assert_eq!(entry_0.get().unwrap().vstring, "hello"); diff --git a/src/option.rs b/src/option.rs index fd679398..7a6709fe 100644 --- a/src/option.rs +++ b/src/option.rs @@ -11,6 +11,7 @@ use parquet::{ format::SortingColumn, schema::types::ColumnPath, }; +use tonbo_ext_reader::CacheReader; use crate::{ fs::{FileId, FileType}, @@ -25,6 +26,15 @@ const DEFAULT_WAL_BUFFER_SIZE: usize = 4 * 1024; /// configure the operating parameters of each component in the [`DB`](crate::DB) #[derive(Clone)] pub struct DbOption { + pub(crate) cache_path: Path, + pub(crate) cache_meta_capacity: usize, + pub(crate) cache_meta_shards: usize, + pub(crate) cache_meta_ratio: f64, + pub(crate) cache_range_memory: usize, + pub(crate) cache_range_disk: usize, + pub(crate) cache_range_capacity: usize, + pub(crate) cache_range_shards: usize, + pub(crate) clean_channel_buffer: usize, pub(crate) base_path: Path, pub(crate) base_fs: FsOptions, @@ -54,7 +64,26 @@ where let (column_paths, sorting_columns) = Self::primary_key_path(primary_key_name, primary_key_index); + Self::fn_new(base_path, column_paths, sorting_columns) + } + + fn fn_new( + base_path: Path, + column_paths: ColumnPath, + sorting_columns: Vec, + ) -> Self { + let cache_path = base_path.child("cache"); + let memory = 64 * 1024 * 1024; + DbOption { + cache_path, + cache_meta_capacity: 32, + cache_meta_shards: 4, + cache_meta_ratio: 0.1, + cache_range_memory: memory, + cache_range_disk: 8 * memory, + cache_range_capacity: 1024, + cache_range_shards: 32, immutable_chunk_num: 3, immutable_chunk_max_num: 5, major_threshold_with_sst_size: 4, @@ -103,32 +132,8 @@ where /// build the default configured [`DbOption`] based on the passed path fn from(base_path: Path) -> Self { let (column_paths, sorting_columns) = R::primary_key_path(); - DbOption { - immutable_chunk_num: 3, - immutable_chunk_max_num: 5, - major_threshold_with_sst_size: 4, - level_sst_magnification: 10, - max_sst_file_size: 256 * 1024 * 1024, - clean_channel_buffer: 10, - base_path, - base_fs: FsOptions::Local, - write_parquet_properties: WriterProperties::builder() - .set_compression(Compression::LZ4) - .set_column_statistics_enabled(column_paths.clone(), EnabledStatistics::Page) - .set_column_bloom_filter_enabled(column_paths.clone(), true) - .set_sorting_columns(Some(sorting_columns)) - .set_created_by(concat!("tonbo version ", env!("CARGO_PKG_VERSION")).to_owned()) - .build(), - use_wal: true, - wal_buffer_size: DEFAULT_WAL_BUFFER_SIZE, - major_default_oldest_table_num: 3, - major_l_selection_table_max_num: 4, - trigger_type: TriggerType::SizeOfMem(64 * 1024 * 1024), - _p: Default::default(), - version_log_snapshot_threshold: 200, - level_paths: vec![None; MAX_LEVEL], - } + DbOption::fn_new(base_path, column_paths, sorting_columns) } } @@ -281,8 +286,12 @@ where self.level_paths[level].as_ref().map(|(path, _)| path) } - pub(crate) fn is_threshold_exceeded_major(&self, version: &Version, level: usize) -> bool { - Version::::tables_len(version, level) + pub(crate) fn is_threshold_exceeded_major( + &self, + version: &Version, + level: usize, + ) -> bool { + Version::::tables_len(version, level) >= (self.major_threshold_with_sst_size * self.level_sst_magnification.pow(level as u32)) } } @@ -290,6 +299,12 @@ where impl Debug for DbOption { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("DbOption") + .field("cache_path", &self.cache_path) + .field("cache_meta_shards", &self.cache_meta_shards) + .field("cache_meta_capacity", &self.cache_meta_capacity) + .field("cache_meta_ratio", &self.cache_meta_ratio) + .field("cache_range_memory", &self.cache_range_memory) + .field("cache_range_disk", &self.cache_range_disk) .field("clean_channel_buffer", &self.clean_channel_buffer) .field("base_path", &self.base_path) // TODO diff --git a/src/stream/level.rs b/src/stream/level.rs index 2039d52c..b41326ec 100644 --- a/src/stream/level.rs +++ b/src/stream/level.rs @@ -13,6 +13,8 @@ use fusio::{ }; use futures_core::Stream; use parquet::{arrow::ProjectionMask, errors::ParquetError}; +use pin_project_lite::pin_project; +use tonbo_ext_reader::CacheReader; use crate::{ fs::{FileId, FileType}, @@ -25,44 +27,57 @@ use crate::{ DbOption, }; -enum FutureStatus<'level, R> +enum FutureStatus<'level, R, C> where R: Record, + C: CacheReader, { Init(FileId), - Ready(SsTableScan<'level, R>), + Ready(SsTableScan<'level, R, C>), OpenFile(Pin, Error>> + 'level>>), - OpenSst(Pin, Error>> + Send + 'level>>), + OpenSst(Pin, Error>> + Send + 'level>>), LoadStream( - Pin, ParquetError>> + Send + 'level>>, + Pin< + Box< + dyn Future, ParquetError>> + + Send + + 'level, + >, + >, ), } -pub(crate) struct LevelStream<'level, R> -where - R: Record, -{ - lower: Bound<&'level R::Key>, - upper: Bound<&'level R::Key>, - ts: Timestamp, - level: usize, - option: Arc>, - gens: VecDeque, - limit: Option, - projection_mask: ProjectionMask, - status: FutureStatus<'level, R>, - fs: Arc, - path: Option, +pin_project! { + pub(crate) struct LevelStream<'level, R, C> + where + R: Record, + C: CacheReader, + { + lower: Bound<&'level R::Key>, + upper: Bound<&'level R::Key>, + ts: Timestamp, + level: usize, + option: Arc>, + meta_cache: C::MetaCache, + range_cache: C::RangeCache, + gens: VecDeque, + limit: Option, + projection_mask: ProjectionMask, + status: FutureStatus<'level, R, C>, + fs: Arc, + path: Option<(Path, FileId)>, + } } -impl<'level, R> LevelStream<'level, R> +impl<'level, R, C> LevelStream<'level, R, C> where R: Record, + C: CacheReader, { // Kould: only used by Compaction now, and the start and end of the sstables range are known #[allow(clippy::too_many_arguments)] pub(crate) fn new( - version: &Version, + version: &Version, level: usize, start: usize, end: usize, @@ -86,6 +101,8 @@ where ts, level, option: version.option().clone(), + meta_cache: version.meta_cache(), + range_cache: version.range_cache(), gens, limit, projection_mask, @@ -96,21 +113,24 @@ where } } -impl<'level, R> Stream for LevelStream<'level, R> +impl<'level, R, C> Stream for LevelStream<'level, R, C> where R: Record, + C: CacheReader + 'static, { type Item = Result, ParquetError>; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + loop { - return match &mut self.status { + return match &mut this.status { FutureStatus::Init(gen) => { let gen = *gen; - self.path = Some(self.option.table_path(&gen, self.level)); + *this.path = Some((this.option.table_path(&gen, *this.level), gen)); - let reader = self.fs.open_options( - self.path.as_ref().unwrap(), + let reader = this.fs.open_options( + &this.path.as_ref().unwrap().0, FileType::Parquet.open_options(true), ); #[allow(clippy::missing_transmute_annotations)] @@ -125,17 +145,17 @@ where >, >(reader) }; - self.status = FutureStatus::OpenFile(reader); + *this.status = FutureStatus::OpenFile(reader); continue; } FutureStatus::Ready(stream) => match Pin::new(stream).poll_next(cx) { - Poll::Ready(None) => match self.gens.pop_front() { + Poll::Ready(None) => match this.gens.pop_front() { None => Poll::Ready(None), Some(gen) => { - self.path = Some(self.option.table_path(&gen, self.level)); + *this.path = Some((this.option.table_path(&gen, *this.level), gen)); - let reader = self.fs.open_options( - self.path.as_ref().unwrap(), + let reader = this.fs.open_options( + &this.path.as_ref().unwrap().0, FileType::Parquet.open_options(true), ); #[allow(clippy::missing_transmute_annotations)] @@ -151,12 +171,12 @@ where >, >(reader) }; - self.status = FutureStatus::OpenFile(reader); + *this.status = FutureStatus::OpenFile(reader); continue; } }, Poll::Ready(Some(result)) => { - if let Some(limit) = &mut self.limit { + if let Some(limit) = &mut this.limit { *limit -= 1; } Poll::Ready(Some(result)) @@ -165,7 +185,12 @@ where }, FutureStatus::OpenFile(file_future) => match Pin::new(file_future).poll(cx) { Poll::Ready(Ok(file)) => { - self.status = FutureStatus::OpenSst(Box::pin(SsTable::open(file))); + let meta_cache = this.meta_cache.clone(); + let range_cache = this.range_cache.clone(); + let (_, gen) = this.path.clone().unwrap(); + let future = + async move { SsTable::open(file, gen, range_cache, meta_cache).await }; + *this.status = FutureStatus::OpenSst(Box::pin(future)); continue; } Poll::Ready(Err(err)) => { @@ -175,11 +200,11 @@ where }, FutureStatus::OpenSst(sst_future) => match Pin::new(sst_future).poll(cx) { Poll::Ready(Ok(sst)) => { - self.status = FutureStatus::LoadStream(Box::pin(sst.scan( - (self.lower, self.upper), - self.ts, - self.limit, - self.projection_mask.clone(), + *this.status = FutureStatus::LoadStream(Box::pin(sst.scan( + (*this.lower, *this.upper), + *this.ts, + *this.limit, + this.projection_mask.clone(), ))); continue; } @@ -190,7 +215,7 @@ where }, FutureStatus::LoadStream(stream_future) => match Pin::new(stream_future).poll(cx) { Poll::Ready(Ok(scan)) => { - self.status = FutureStatus::Ready(scan); + *this.status = FutureStatus::Ready(scan); continue; } Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))), diff --git a/src/stream/mem_projection.rs b/src/stream/mem_projection.rs index 6671e285..230cc8be 100644 --- a/src/stream/mem_projection.rs +++ b/src/stream/mem_projection.rs @@ -7,6 +7,7 @@ use std::{ use futures_core::Stream; use parquet::{arrow::ProjectionMask, errors::ParquetError}; use pin_project_lite::pin_project; +use tonbo_ext_reader::CacheReader; use crate::{ record::Record, @@ -14,20 +15,25 @@ use crate::{ }; pin_project! { - pub struct MemProjectionStream<'projection, R> + pub struct MemProjectionStream<'projection, R, C> where R: Record, + C: CacheReader { - stream: Box>, + stream: Box>, projection_mask: Arc, } } -impl<'projection, R> MemProjectionStream<'projection, R> +impl<'projection, R, C> MemProjectionStream<'projection, R, C> where R: Record, + C: CacheReader, { - pub(crate) fn new(stream: ScanStream<'projection, R>, projection_mask: ProjectionMask) -> Self { + pub(crate) fn new( + stream: ScanStream<'projection, R, C>, + projection_mask: ProjectionMask, + ) -> Self { Self { stream: Box::new(stream), projection_mask: Arc::new(projection_mask), @@ -35,9 +41,10 @@ where } } -impl<'projection, R> Stream for MemProjectionStream<'projection, R> +impl<'projection, R, C> Stream for MemProjectionStream<'projection, R, C> where R: Record, + C: CacheReader + 'static, { type Item = Result, ParquetError>; @@ -61,6 +68,7 @@ mod tests { use fusio::{disk::TokioFs, path::Path, DynFs}; use futures_util::StreamExt; use parquet::arrow::{arrow_to_parquet_schema, ProjectionMask}; + use tonbo_ext_reader::lru_reader::LruReader; use crate::{ inmem::mutable::Mutable, record::Record, stream::mem_projection::MemProjectionStream, @@ -121,7 +129,7 @@ mod tests { vec![0, 1, 2, 4], ); - let mut stream = MemProjectionStream::::new( + let mut stream = MemProjectionStream::::new( mutable .scan((Bound::Unbounded, Bound::Unbounded), 6.into()) .into(), diff --git a/src/stream/merge.rs b/src/stream/merge.rs index f58583c6..f086d80e 100644 --- a/src/stream/merge.rs +++ b/src/stream/merge.rs @@ -8,16 +8,18 @@ use std::{ use futures_core::{ready, Stream}; use futures_util::stream::StreamExt; use pin_project_lite::pin_project; +use tonbo_ext_reader::CacheReader; use super::{Entry, ScanStream}; use crate::{record::Record, timestamp::Timestamp}; pin_project! { - pub struct MergeStream<'merge, R> + pub struct MergeStream<'merge, R, C> where R: Record, + C: CacheReader, { - streams: Vec>, + streams: Vec>, peeked: BinaryHeap>, buf: Option>, ts: Timestamp, @@ -25,12 +27,13 @@ pin_project! { } } -impl<'merge, R> MergeStream<'merge, R> +impl<'merge, R, C> MergeStream<'merge, R, C> where R: Record, + C: CacheReader + 'static, { pub(crate) async fn from_vec( - mut streams: Vec>, + mut streams: Vec>, ts: Timestamp, ) -> Result { let mut peeked = BinaryHeap::with_capacity(streams.len()); @@ -62,9 +65,10 @@ where } } -impl<'merge, R> Stream for MergeStream<'merge, R> +impl<'merge, R, C> Stream for MergeStream<'merge, R, C> where R: Record, + C: CacheReader + 'static, { type Item = Result, parquet::errors::ParquetError>; @@ -160,6 +164,7 @@ mod tests { use fusio::{disk::TokioFs, path::Path, DynFs}; use futures_util::StreamExt; + use tonbo_ext_reader::lru_reader::LruReader; use super::MergeStream; use crate::{ @@ -212,7 +217,7 @@ mod tests { let lower = "a".to_string(); let upper = "e".to_string(); let bound = (Bound::Included(&lower), Bound::Included(&upper)); - let mut merge = MergeStream::::from_vec( + let mut merge = MergeStream::::from_vec( vec![ m1.scan(bound, 6.into()).into(), m2.scan(bound, 6.into()).into(), @@ -291,10 +296,12 @@ mod tests { let lower = "1".to_string(); let upper = "4".to_string(); let bound = (Bound::Included(&lower), Bound::Included(&upper)); - let mut merge = - MergeStream::::from_vec(vec![m1.scan(bound, 0.into()).into()], 0.into()) - .await - .unwrap(); + let mut merge = MergeStream::::from_vec( + vec![m1.scan(bound, 0.into()).into()], + 0.into(), + ) + .await + .unwrap(); if let Some(Ok(Entry::Mutable(entry))) = merge.next().await { assert_eq!(entry.key().value, "1"); @@ -319,10 +326,12 @@ mod tests { let lower = "1".to_string(); let upper = "4".to_string(); let bound = (Bound::Included(&lower), Bound::Included(&upper)); - let mut merge = - MergeStream::::from_vec(vec![m1.scan(bound, 1.into()).into()], 1.into()) - .await - .unwrap(); + let mut merge = MergeStream::::from_vec( + vec![m1.scan(bound, 1.into()).into()], + 1.into(), + ) + .await + .unwrap(); if let Some(Ok(Entry::Mutable(entry))) = merge.next().await { assert_eq!(entry.key().value, "1"); @@ -371,7 +380,7 @@ mod tests { let lower = "1".to_string(); let upper = "3".to_string(); { - let mut merge = MergeStream::::from_vec( + let mut merge = MergeStream::::from_vec( vec![m1 .scan((Bound::Included(&lower), Bound::Included(&upper)), 0.into()) .into()], @@ -391,7 +400,7 @@ mod tests { assert!(merge.next().await.is_none()); } { - let mut merge = MergeStream::::from_vec( + let mut merge = MergeStream::::from_vec( vec![m1 .scan((Bound::Included(&lower), Bound::Included(&upper)), 0.into()) .into()], diff --git a/src/stream/mod.rs b/src/stream/mod.rs index fa0b5afe..c42db2ae 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -17,6 +17,7 @@ use futures_util::{ready, stream}; use parquet::arrow::ProjectionMask; use pin_project_lite::pin_project; use record_batch::RecordBatchEntry; +use tonbo_ext_reader::CacheReader; use crate::{ inmem::{immutable::ImmutableScan, mutable::MutableScan}, @@ -100,9 +101,10 @@ where pin_project! { #[project = ScanStreamProject] - pub enum ScanStream<'scan, R> + pub enum ScanStream<'scan, R, C> where R: Record, + C: CacheReader, { Transaction { #[pin] @@ -118,22 +120,23 @@ pin_project! { }, SsTable { #[pin] - inner: SsTableScan<'scan, R>, + inner: SsTableScan<'scan, R, C>, }, Level { #[pin] - inner: LevelStream<'scan, R>, + inner: LevelStream<'scan, R, C>, }, MemProjection { #[pin] - inner: MemProjectionStream<'scan, R>, + inner: MemProjectionStream<'scan, R, C>, } } } -impl<'scan, R> From> for ScanStream<'scan, R> +impl<'scan, R, C> From> for ScanStream<'scan, R, C> where R: Record, + C: CacheReader, { fn from(inner: TransactionScan<'scan, R>) -> Self { ScanStream::Transaction { @@ -142,9 +145,10 @@ where } } -impl<'scan, R> From> for ScanStream<'scan, R> +impl<'scan, R, C> From> for ScanStream<'scan, R, C> where R: Record, + C: CacheReader, { fn from(inner: MutableScan<'scan, R>) -> Self { ScanStream::Mutable { @@ -153,9 +157,10 @@ where } } -impl<'scan, R> From> for ScanStream<'scan, R> +impl<'scan, R, C> From> for ScanStream<'scan, R, C> where R: Record, + C: CacheReader, { fn from(inner: ImmutableScan<'scan, R>) -> Self { ScanStream::Immutable { @@ -164,27 +169,30 @@ where } } -impl<'scan, R> From> for ScanStream<'scan, R> +impl<'scan, R, C> From> for ScanStream<'scan, R, C> where R: Record, + C: CacheReader, { - fn from(inner: SsTableScan<'scan, R>) -> Self { + fn from(inner: SsTableScan<'scan, R, C>) -> Self { ScanStream::SsTable { inner } } } -impl<'scan, R> From> for ScanStream<'scan, R> +impl<'scan, R, C> From> for ScanStream<'scan, R, C> where R: Record, + C: CacheReader, { - fn from(inner: MemProjectionStream<'scan, R>) -> Self { + fn from(inner: MemProjectionStream<'scan, R, C>) -> Self { ScanStream::MemProjection { inner } } } -impl fmt::Debug for ScanStream<'_, R> +impl fmt::Debug for ScanStream<'_, R, C> where R: Record, + C: CacheReader, { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match self { @@ -198,9 +206,10 @@ where } } -impl<'scan, R> Stream for ScanStream<'scan, R> +impl<'scan, R, C> Stream for ScanStream<'scan, R, C> where R: Record, + C: CacheReader + 'static, { type Item = Result, parquet::errors::ParquetError>; diff --git a/src/stream/package.rs b/src/stream/package.rs index c8493e8f..5c63f782 100644 --- a/src/stream/package.rs +++ b/src/stream/package.rs @@ -5,6 +5,7 @@ use std::{ use futures_core::Stream; use pin_project_lite::pin_project; +use tonbo_ext_reader::CacheReader; use crate::{ inmem::immutable::{ArrowArrays, Builder}, @@ -13,25 +14,27 @@ use crate::{ }; pin_project! { - pub struct PackageStream<'package, R> + pub struct PackageStream<'package, R, C> where R: Record, + C: CacheReader, { row_count: usize, batch_size: usize, - inner: MergeStream<'package, R>, + inner: MergeStream<'package, R, C>, builder: ::Builder, projection_indices: Option>, } } -impl<'package, R> PackageStream<'package, R> +impl<'package, R, C> PackageStream<'package, R, C> where R: Record, + C: CacheReader, { pub(crate) fn new( batch_size: usize, - merge: MergeStream<'package, R>, + merge: MergeStream<'package, R, C>, projection_indices: Option>, instance: &RecordInstance, ) -> Self { @@ -45,9 +48,10 @@ where } } -impl<'package, R> Stream for PackageStream<'package, R> +impl<'package, R, C> Stream for PackageStream<'package, R, C> where R: Record, + C: CacheReader + 'static, { type Item = Result; @@ -85,6 +89,7 @@ mod tests { use fusio::{disk::TokioFs, path::Path, DynFs}; use futures_util::StreamExt; use tempfile::TempDir; + use tonbo_ext_reader::lru_reader::LruReader; use crate::{ inmem::{ @@ -177,7 +182,7 @@ mod tests { .await .unwrap(); - let merge = MergeStream::::from_vec( + let merge = MergeStream::::from_vec( vec![m1 .scan((Bound::Unbounded, Bound::Unbounded), 6.into()) .into()], diff --git a/src/transaction.rs b/src/transaction.rs index 3a9d38c1..3e9333df 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -13,6 +13,7 @@ use flume::SendError; use lockable::AsyncLimit; use parquet::{arrow::ProjectionMask, errors::ParquetError}; use thiserror::Error; +use tonbo_ext_reader::CacheReader; use crate::{ compaction::CompactTask, @@ -45,24 +46,26 @@ where } /// optimistic ACID transaction, open with /// [`DB::transaction`](crate::DB::transaction) method -pub struct Transaction<'txn, R> +pub struct Transaction<'txn, R, C> where R: Record, + C: CacheReader, { ts: Timestamp, local: BTreeMap>, share: RwLockReadGuard<'txn, Schema>, - version: VersionRef, + version: VersionRef, lock_map: LockMap, manager: Arc, } -impl<'txn, R> Transaction<'txn, R> +impl<'txn, R, C> Transaction<'txn, R, C> where R: Record + Send, + C: CacheReader + 'static, { pub(crate) fn new( - version: VersionRef, + version: VersionRef, share: RwLockReadGuard<'txn, Schema>, lock_map: LockMap, manager: Arc, @@ -104,7 +107,7 @@ where pub fn scan<'scan>( &'scan self, range: (Bound<&'scan R::Key>, Bound<&'scan R::Key>), - ) -> Scan<'scan, R> { + ) -> Scan<'scan, R, C> { Scan::new( &self.share, &self.manager, @@ -257,6 +260,7 @@ mod tests { use fusio_dispatch::FsOptions; use futures_util::StreamExt; use tempfile::TempDir; + use tonbo_ext_reader::lru_reader::LruReader; use crate::{ compaction::tests::build_version, @@ -276,7 +280,7 @@ mod tests { async fn transaction_read_write() { let temp_dir = TempDir::new().unwrap(); - let db = DB::::new( + let db = DB::::new( DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()), TokioExecutor::new(), ) @@ -404,7 +408,7 @@ mod tests { let temp_dir = TempDir::new().unwrap(); let option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); - let db = DB::::new(option, TokioExecutor::new()) + let db = DB::::new(option, TokioExecutor::new()) .await .unwrap(); @@ -437,7 +441,7 @@ mod tests { let temp_dir = TempDir::new().unwrap(); let option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); - let db = DB::::new(option, TokioExecutor::new()) + let db = DB::::new(option, TokioExecutor::new()) .await .unwrap(); @@ -807,7 +811,7 @@ mod tests { "age".to_string(), 0, ); - let db = DB::with_schema(option, TokioExecutor::default(), descs, 0) + let db = DB::<_, _, LruReader>::with_schema(option, TokioExecutor::default(), descs, 0) .await .unwrap(); diff --git a/src/version/mod.rs b/src/version/mod.rs index 37d15c65..7ee5eeab 100644 --- a/src/version/mod.rs +++ b/src/version/mod.rs @@ -3,6 +3,7 @@ pub(crate) mod edit; pub(crate) mod set; use std::{ + fmt::{Debug, Formatter}, ops::Bound, sync::{ atomic::{AtomicU32, Ordering}, @@ -14,6 +15,7 @@ use flume::{SendError, Sender}; use fusio::DynFs; use parquet::arrow::ProjectionMask; use thiserror::Error; +use tonbo_ext_reader::{CacheError, CacheReader}; use tracing::error; use crate::{ @@ -30,7 +32,7 @@ use crate::{ pub(crate) const MAX_LEVEL: usize = 7; -pub(crate) type VersionRef = Arc>; +pub(crate) type VersionRef = Arc>; pub(crate) trait TransactionTs { fn load_ts(&self) -> Timestamp; @@ -38,10 +40,10 @@ pub(crate) trait TransactionTs { fn increase_ts(&self) -> Timestamp; } -#[derive(Debug)] -pub(crate) struct Version +pub(crate) struct Version where R: Record, + C: CacheReader, { ts: Timestamp, pub(crate) level_slice: [Vec>; MAX_LEVEL], @@ -49,17 +51,42 @@ where option: Arc>, timestamp: Arc, log_length: u32, + + range_cache: C::RangeCache, + meta_cache: C::MetaCache, } -impl Version +impl Debug for Version where R: Record, + C: CacheReader, +{ + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Version") + .field("ts", &self.ts) + .field("level_slice", &self.level_slice) + .field("clean_sender", &format_args!("{:?}", self.clean_sender)) + .field("option", &format_args!("{:?}", self.option)) + .field("timestamp", &format_args!("{:?}", self.timestamp)) + .field("log_length", &format_args!("{}", self.log_length)) + .field("range_cache", &format_args!("{:?}", self.range_cache)) + .field("meta_cache", &format_args!("{:?}", self.meta_cache)) + .finish() + } +} + +impl Version +where + R: Record, + C: CacheReader, { #[cfg(test)] pub(crate) fn new( option: Arc>, clean_sender: Sender, timestamp: Arc, + range_cache: C::RangeCache, + meta_cache: C::MetaCache, ) -> Self { Version { ts: Timestamp::from(0), @@ -68,17 +95,27 @@ where option: option.clone(), timestamp, log_length: 0, + range_cache, + meta_cache, } } pub(crate) fn option(&self) -> &Arc> { &self.option } + + pub(crate) fn meta_cache(&self) -> C::MetaCache { + self.meta_cache.clone() + } + pub(crate) fn range_cache(&self) -> C::RangeCache { + self.range_cache.clone() + } } -impl TransactionTs for Version +impl TransactionTs for Version where R: Record, + C: CacheReader, { fn load_ts(&self) -> Timestamp { self.timestamp.load(Ordering::Acquire).into() @@ -89,9 +126,10 @@ where } } -impl Clone for Version +impl Clone for Version where R: Record, + C: CacheReader, { fn clone(&self) -> Self { let mut level_slice = [const { Vec::new() }; MAX_LEVEL]; @@ -107,13 +145,16 @@ where option: self.option.clone(), timestamp: self.timestamp.clone(), log_length: self.log_length, + range_cache: self.range_cache.clone(), + meta_cache: self.meta_cache.clone(), } } } -impl Version +impl Version where R: Record, + C: CacheReader + 'static, { pub(crate) async fn query( &self, @@ -176,14 +217,12 @@ where gen: &FileId, projection_mask: ProjectionMask, ) -> Result>, VersionError> { + let path = self.option.table_path(gen, level); let file = store - .open_options( - &self.option.table_path(gen, level), - FileType::Parquet.open_options(true), - ) + .open_options(&path, FileType::Parquet.open_options(true)) .await .map_err(VersionError::Fusio)?; - SsTable::::open(file) + SsTable::::open(file, *gen, self.range_cache(), self.meta_cache()) .await? .get(key, projection_mask) .await @@ -203,7 +242,7 @@ where pub(crate) async fn streams<'streams>( &self, manager: &StoreManager, - streams: &mut Vec>, + streams: &mut Vec>, range: (Bound<&'streams R::Key>, Bound<&'streams R::Key>), ts: Timestamp, limit: Option, @@ -218,14 +257,13 @@ where if !scope.meets_range(range) { continue; } + let path = self.option.table_path(&scope.gen, 0); let file = level_0_fs - .open_options( - &self.option.table_path(&scope.gen, 0), - FileType::Parquet.open_options(true), - ) + .open_options(&path, FileType::Parquet.open_options(true)) .await .map_err(VersionError::Fusio)?; - let table = SsTable::open(file).await?; + let table = + SsTable::open(file, scope.gen, self.range_cache(), self.meta_cache()).await?; streams.push(ScanStream::SsTable { inner: table @@ -294,9 +332,10 @@ where } } -impl Drop for Version +impl Drop for Version where R: Record, + C: CacheReader, { fn drop(&mut self) { if let Err(err) = self.clean_sender.send(CleanTag::Clean { ts: self.ts }) { @@ -318,6 +357,8 @@ where Parquet(#[from] parquet::errors::ParquetError), #[error("version fusio error: {0}")] Fusio(#[from] fusio::Error), + #[error("version cache error: {0}")] + Cache(#[from] CacheError), #[error("version ulid decode error: {0}")] UlidDecode(#[from] ulid::DecodeError), #[error("version send error: {0}")] diff --git a/src/version/set.rs b/src/version/set.rs index 586ca89e..05a9cf89 100644 --- a/src/version/set.rs +++ b/src/version/set.rs @@ -10,8 +10,9 @@ use std::{ use async_lock::RwLock; use flume::Sender; -use fusio::{dynamic::DynFile, fs::FileMeta}; +use fusio::{dynamic::DynFile, fs::FileMeta, path::path_to_local}; use futures_util::StreamExt; +use tonbo_ext_reader::CacheReader; use super::{TransactionTs, MAX_LEVEL}; use crate::{ @@ -45,28 +46,34 @@ impl Ord for CmpMeta { } } -pub(crate) struct VersionSetInner +pub(crate) struct VersionSetInner where R: Record, + C: CacheReader, { - current: VersionRef, + current: VersionRef, log_with_id: (Box, FileId), } -pub(crate) struct VersionSet +pub(crate) struct VersionSet where R: Record, + C: CacheReader, { - inner: Arc>>, + inner: Arc>>, clean_sender: Sender, timestamp: Arc, option: Arc>, manager: Arc, + + range_cache: C::RangeCache, + meta_cache: C::MetaCache, } -impl Clone for VersionSet +impl Clone for VersionSet where R: Record, + C: CacheReader, { fn clone(&self) -> Self { VersionSet { @@ -75,13 +82,16 @@ where timestamp: self.timestamp.clone(), option: self.option.clone(), manager: self.manager.clone(), + range_cache: self.range_cache.clone(), + meta_cache: self.meta_cache.clone(), } } } -impl TransactionTs for VersionSet +impl TransactionTs for VersionSet where R: Record, + C: CacheReader, { fn load_ts(&self) -> Timestamp { self.timestamp.load(Ordering::Acquire).into() @@ -92,9 +102,10 @@ where } } -impl VersionSet +impl VersionSet where R: Record, + C: CacheReader + 'static, { pub(crate) async fn new( clean_sender: Sender, @@ -148,15 +159,30 @@ where let timestamp = Arc::new(AtomicU32::default()); drop(log_stream); - let set = VersionSet:: { + + let (meta_cache, range_cache) = C::build_caches( + path_to_local(&option.cache_path).unwrap(), + option.cache_meta_capacity, + option.cache_meta_shards, + option.cache_meta_ratio, + option.cache_range_memory, + option.cache_range_disk, + option.cache_range_capacity, + option.cache_range_shards, + ) + .await?; + + let set = VersionSet:: { inner: Arc::new(RwLock::new(VersionSetInner { - current: Arc::new(Version:: { + current: Arc::new(Version:: { ts: Timestamp::from(0), level_slice: [const { Vec::new() }; MAX_LEVEL], clean_sender: clean_sender.clone(), option: option.clone(), timestamp: timestamp.clone(), log_length: 0, + range_cache: range_cache.clone(), + meta_cache: meta_cache.clone(), }), log_with_id: (log, log_id), })), @@ -164,13 +190,15 @@ where timestamp, option, manager, + range_cache, + meta_cache, }; set.apply_edits(edits, None, true).await?; Ok(set) } - pub(crate) async fn current(&self) -> VersionRef { + pub(crate) async fn current(&self) -> VersionRef { self.inner.read().await.current.clone() } @@ -290,10 +318,11 @@ pub(crate) mod tests { use async_lock::RwLock; use flume::{bounded, Sender}; - use fusio::path::Path; + use fusio::path::{path_to_local, Path}; use fusio_dispatch::FsOptions; use futures_util::StreamExt; use tempfile::TempDir; + use tonbo_ext_reader::{lru_reader::LruReader, CacheReader}; use crate::{ fs::{manager::StoreManager, FileId, FileType}, @@ -308,14 +337,15 @@ pub(crate) mod tests { DbOption, }; - pub(crate) async fn build_version_set( - version: Version, + pub(crate) async fn build_version_set( + version: Version, clean_sender: Sender, option: Arc>, manager: Arc, - ) -> Result, VersionError> + ) -> Result, VersionError> where R: Record, + C: CacheReader, { let log_id = FileId::new(); let log = manager @@ -326,8 +356,19 @@ pub(crate) mod tests { ) .await?; let timestamp = version.timestamp.clone(); - - Ok(VersionSet:: { + let (meta_cache, range_cache) = C::build_caches( + path_to_local(&option.cache_path).unwrap(), + option.cache_meta_capacity, + option.cache_meta_shards, + option.cache_meta_ratio, + option.cache_range_memory, + option.cache_range_disk, + option.cache_range_capacity, + option.cache_range_shards, + ) + .await?; + + Ok(VersionSet:: { inner: Arc::new(RwLock::new(VersionSetInner { current: Arc::new(version), log_with_id: (log, log_id), @@ -336,6 +377,8 @@ pub(crate) mod tests { timestamp, option, manager, + range_cache, + meta_cache, }) } @@ -353,7 +396,7 @@ pub(crate) mod tests { .await .unwrap(); - let version_set: VersionSet = + let version_set: VersionSet = VersionSet::new(sender.clone(), option.clone(), manager.clone()) .await .unwrap(); @@ -369,7 +412,7 @@ pub(crate) mod tests { drop(version_set); - let version_set: VersionSet = + let version_set: VersionSet = VersionSet::new(sender.clone(), option.clone(), manager) .await .unwrap(); @@ -391,7 +434,7 @@ pub(crate) mod tests { .await .unwrap(); - let version_set: VersionSet = + let version_set: VersionSet = VersionSet::new(sender.clone(), option.clone(), manager.clone()) .await .unwrap(); @@ -519,7 +562,7 @@ pub(crate) mod tests { .await .unwrap(); - let version_set: VersionSet = + let version_set: VersionSet = VersionSet::new(sender.clone(), option.clone(), manager) .await .unwrap(); diff --git a/tests/data_integrity.rs b/tests/data_integrity.rs index 239d0d83..0e59c62e 100644 --- a/tests/data_integrity.rs +++ b/tests/data_integrity.rs @@ -6,6 +6,7 @@ mod tests { use futures_util::StreamExt; use tempfile::TempDir; use tonbo::{executor::tokio::TokioExecutor, DbOption, Record, DB}; + use tonbo_ext_reader::lru_reader::LruReader; const WRITE_TIMES: usize = 500_000; const STRING_SIZE: usize = 50; @@ -62,8 +63,8 @@ mod tests { } } - #[tokio::test] #[ignore] + #[tokio::test] async fn test_data_integrity() { let mut rng = fastrand::Rng::with_seed(42); let mut primary_key_count = 0; @@ -72,7 +73,8 @@ mod tests { let temp_dir = TempDir::new().unwrap(); let option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); - let db: DB = DB::new(option, TokioExecutor::new()).await.unwrap(); + let db: DB = + DB::new(option, TokioExecutor::new()).await.unwrap(); for _ in 0..WRITE_TIMES { let customer = gen_record(&mut rng, &mut primary_key_count); diff --git a/tonbo_ext_reader/Cargo.toml b/tonbo_ext_reader/Cargo.toml new file mode 100644 index 00000000..5b9bb4b2 --- /dev/null +++ b/tonbo_ext_reader/Cargo.toml @@ -0,0 +1,37 @@ +[package] +description = "ExtReader for Tonbo." +name = "tonbo_ext_reader" +version = "0.1.0" +edition = "2021" + +[features] +foyer = [] + +[dependencies] +anyhow = "1" +arrow = "53" +bytes = { version = "1.7", features = ["serde"] } +foyer = { version = "0.12" } +futures-core = "0.3" +futures-util = "0.3" +fusio-parquet = { package = "fusio-parquet", version = "0.2.2" } +lru = "0.12" +parking_lot = "0.12" +parquet = { version = "53", features = ["async"] } +thiserror = "1" +ulid = { version = "1", features = ["serde"] } + +[dev-dependencies] +fusio = { package = "fusio", version = "0.3.3", features = [ + "aws", + "dyn", + "fs", + "object_store", + "tokio", + "tokio-http", +] } +fusio-dispatch = { package = "fusio-dispatch", version = "0.2.2", features = [ + "tokio", +] } +tempfile = "3" +tokio = { version = "1", features = ["full"] } diff --git a/tonbo_ext_reader/src/foyer_reader.rs b/tonbo_ext_reader/src/foyer_reader.rs new file mode 100644 index 00000000..70090bad --- /dev/null +++ b/tonbo_ext_reader/src/foyer_reader.rs @@ -0,0 +1,134 @@ +use std::{ops::Range, sync::Arc}; + +use bytes::Bytes; +use foyer::{ + Cache, CacheBuilder, DirectFsDeviceOptions, Engine, HybridCache, HybridCacheBuilder, LruConfig, +}; +use fusio_parquet::reader::AsyncReader; +use futures_core::future::BoxFuture; +use futures_util::FutureExt; +use parquet::{arrow::async_reader::AsyncFileReader, file::metadata::ParquetMetaData}; +use ulid::Ulid; + +use crate::{CacheError, CacheReader, TonboCache}; + +#[derive(Debug, Clone)] +pub struct FoyerMetaCache(Cache>); +#[derive(Debug, Clone)] +pub struct FoyerRangeCache(HybridCache<(Ulid, Range), Bytes>); + +pub struct FoyerReader { + gen: Ulid, + inner: AsyncReader, + range_cache: FoyerRangeCache, + meta_cache: FoyerMetaCache, +} + +impl TonboCache> for FoyerMetaCache { + async fn get(&self, gen: &Ulid) -> Result>, CacheError> { + Ok(self.0.get(gen).map(|entry| entry.value().clone())) + } + + fn insert(&self, gen: Ulid, data: Arc) -> Arc { + self.0.insert(gen, data).value().clone() + } +} + +impl TonboCache<(Ulid, Range), Bytes> for FoyerRangeCache { + async fn get(&self, key: &(Ulid, Range)) -> Result, CacheError> { + Ok(self.0.get(key).await?.map(|entry| entry.value().clone())) + } + + fn insert(&self, key: (Ulid, Range), bytes: Bytes) -> Bytes { + self.0.insert(key, bytes).value().clone() + } +} + +impl CacheReader for FoyerReader { + type MetaCache = FoyerMetaCache; + type RangeCache = FoyerRangeCache; + + fn new( + meta_cache: Self::MetaCache, + range_cache: Self::RangeCache, + gen: Ulid, + inner: AsyncReader, + ) -> Self { + Self { + gen, + inner, + range_cache, + meta_cache, + } + } + + async fn build_caches( + cache_path: impl AsRef + Send, + cache_meta_capacity: usize, + cache_meta_shards: usize, + cache_meta_ratio: f64, + cache_range_memory: usize, + cache_range_disk: usize, + _: usize, + _: usize, + ) -> Result<(Self::MetaCache, Self::RangeCache), CacheError> { + let meta_cache = CacheBuilder::new(cache_meta_capacity) + .with_shards(cache_meta_shards) + .with_eviction_config(LruConfig { + high_priority_pool_ratio: cache_meta_ratio, + }) + .build(); + let range_cache = HybridCacheBuilder::new() + .memory(cache_range_memory) + .storage(Engine::Large) + .with_device_options( + DirectFsDeviceOptions::new(cache_path).with_capacity(cache_range_disk), + ) + .build() + .await + .map_err(CacheError::from)?; + Ok((FoyerMetaCache(meta_cache), FoyerRangeCache(range_cache))) + } +} + +impl AsyncFileReader for FoyerReader { + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, parquet::errors::Result> { + async move { + let key = (self.gen, range); + if let Some(bytes) = self + .range_cache + .get(&key) + .await + .map_err(|e| parquet::errors::ParquetError::External(From::from(e)))? + { + return Ok(bytes); + } + + let bytes = self.inner.get_bytes(key.1.clone()).await?; + Ok(self.range_cache.insert(key, bytes)) + } + .boxed() + } + + fn get_metadata(&mut self) -> BoxFuture<'_, parquet::errors::Result>> { + async move { + if let Some(meta) = self + .meta_cache + .get(&self.gen) + .await + .map_err(|e| parquet::errors::ParquetError::External(From::from(e)))? + { + return Ok(meta); + } + + let meta = self + .inner + .get_metadata() + .await + .map_err(|e| parquet::errors::ParquetError::External(From::from(e)))?; + + Ok(self.meta_cache.insert(self.gen, meta)) + } + .boxed() + } +} diff --git a/tonbo_ext_reader/src/lib.rs b/tonbo_ext_reader/src/lib.rs new file mode 100644 index 00000000..81fdfbd5 --- /dev/null +++ b/tonbo_ext_reader/src/lib.rs @@ -0,0 +1,215 @@ +use std::{fmt::Debug, io, ops::Range, sync::Arc}; + +use bytes::Bytes; +use fusio_parquet::reader::AsyncReader; +use parquet::{arrow::async_reader::AsyncFileReader, file::metadata::ParquetMetaData}; +use thiserror::Error; +use ulid::Ulid; + +#[cfg(feature = "foyer")] +pub mod foyer_reader; +pub mod lru_reader; + +pub trait TonboCache: Sync + Send + Clone + Debug { + fn get( + &self, + key: &K, + ) -> impl std::future::Future, CacheError>> + Send; + + fn insert(&self, key: K, value: V) -> V; +} + +pub trait CacheReader: AsyncFileReader + Unpin { + type MetaCache: TonboCache>; + type RangeCache: TonboCache<(Ulid, Range), Bytes>; + + fn new( + meta_cache: Self::MetaCache, + range_cache: Self::RangeCache, + gen: Ulid, + inner: AsyncReader, + ) -> Self; + + #[allow(clippy::too_many_arguments)] + fn build_caches( + path: impl AsRef + Send, + meta_capacity: usize, + meta_shards: usize, + meta_ratio: f64, + range_memory: usize, + range_disk: usize, + range_capacity: usize, + range_shards: usize, + ) -> impl std::future::Future> + Send; +} + +#[derive(Debug, Error)] +pub enum CacheError { + #[error("cache io error: {0}")] + Io(#[from] io::Error), + #[error("foyer error: {0}")] + Foyer(#[from] anyhow::Error), +} + +#[cfg(test)] +pub(crate) mod tests { + use std::{ + fs::File, + ops::Range, + sync::{ + atomic::{AtomicUsize, Ordering::SeqCst}, + Arc, + }, + }; + + use arrow::{ + array::{BooleanArray, RecordBatch, StringArray, UInt32Array}, + datatypes::{DataType, Field, Schema}, + }; + use fusio::{ + dynamic::DynFile, fs::OpenOptions, path::Path, Error, IoBuf, IoBufMut, Read, Write, + }; + use fusio_dispatch::FsOptions; + use fusio_parquet::{reader::AsyncReader, writer::AsyncWriter}; + use parquet::arrow::{async_reader::AsyncFileReader, AsyncArrowWriter}; + use tempfile::TempDir; + use ulid::Ulid; + + use crate::{lru_reader::LruReader, CacheReader}; + + struct CountFile { + inner: Box, + read_count: Arc, + } + + impl Read for CountFile { + async fn read_exact_at(&mut self, buf: B, pos: u64) -> (Result<(), Error>, B) { + self.read_count.fetch_add(1, SeqCst); + self.inner.read_exact_at(buf, pos).await + } + + async fn read_to_end_at(&mut self, buf: Vec, pos: u64) -> (Result<(), Error>, Vec) { + self.read_count.fetch_add(1, SeqCst); + self.inner.read_to_end_at(buf, pos).await + } + + async fn size(&self) -> Result { + self.inner.size().await + } + } + + impl Write for CountFile { + async fn write_all(&mut self, buf: B) -> (Result<(), Error>, B) { + self.inner.write_all(buf).await + } + + async fn flush(&mut self) -> Result<(), Error> { + self.inner.flush().await + } + + async fn close(&mut self) -> Result<(), Error> { + self.inner.close().await + } + } + + async fn inner_test_cache_read() { + let temp_dir = TempDir::new().unwrap(); + + let parquet_path = { + let path = temp_dir.path().join("test.parquet"); + let _ = File::create(&path).unwrap(); + + Path::from_filesystem_path(&path).unwrap() + }; + let fs = FsOptions::Local.parse().unwrap(); + let schema = Arc::new(Schema::new(vec![ + Field::new("_null", DataType::Boolean, false), + Field::new("_ts", DataType::UInt32, false), + Field::new("vstring", DataType::Utf8, false), + Field::new("vu32", DataType::UInt32, false), + Field::new("vbool", DataType::Boolean, true), + ])); + let mut writer = AsyncArrowWriter::try_new( + AsyncWriter::new( + fs.open_options( + &parquet_path, + OpenOptions::default().read(true).write(true).create(true), + ) + .await + .unwrap(), + ), + schema.clone(), + None, + ) + .unwrap(); + writer + .write( + &RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(BooleanArray::from(vec![false, false, false])), + Arc::new(UInt32Array::from(vec![0, 1, 2])), + Arc::new(StringArray::from(vec!["a", "b", "c"])), + Arc::new(UInt32Array::from(vec![0, 1, 2])), + Arc::new(BooleanArray::from(vec![true, true, true])), + ], + ) + .unwrap(), + ) + .await + .unwrap(); + writer.close().await.unwrap(); + + let read_count = Arc::new(AtomicUsize::new(0)); + let (meta_cache, range_cache) = R::build_caches( + temp_dir.path().join("cache"), + 32, + 4, + 0.1, + 64 * 1024 * 1024, + 254 * 1024 * 1024, + 128, + 16, + ) + .await + .unwrap(); + + let gen = Ulid::new(); + for _ in 0..1000 { + let file = fs + .open_options(&parquet_path, OpenOptions::default().read(true)) + .await + .unwrap(); + let content_len = file.size().await.unwrap(); + + let mut reader = R::new( + meta_cache.clone(), + range_cache.clone(), + gen, + AsyncReader::new( + Box::new(CountFile { + inner: file, + read_count: read_count.clone(), + }), + content_len, + ) + .await + .unwrap(), + ); + + let _ = AsyncFileReader::get_metadata(&mut reader).await.unwrap(); + let _ = AsyncFileReader::get_bytes(&mut reader, Range { start: 0, end: 10 }) + .await + .unwrap(); + } + + assert_eq!(read_count.load(SeqCst), 2); + } + + #[tokio::test] + async fn test_cache_read() { + #[cfg(feature = "foyer")] + inner_test_cache_read::().await; + inner_test_cache_read::().await; + } +} diff --git a/tonbo_ext_reader/src/lru_reader.rs b/tonbo_ext_reader/src/lru_reader.rs new file mode 100644 index 00000000..4a2cbc77 --- /dev/null +++ b/tonbo_ext_reader/src/lru_reader.rs @@ -0,0 +1,204 @@ +use std::{ + hash::{BuildHasher, Hash, RandomState}, + num::NonZeroUsize, + ops::Range, + path::Path, + sync::Arc, +}; + +use bytes::Bytes; +use fusio_parquet::reader::AsyncReader; +use futures_core::future::BoxFuture; +use futures_util::FutureExt; +use lru::LruCache; +use parking_lot::Mutex; +use parquet::{arrow::async_reader::AsyncFileReader, file::metadata::ParquetMetaData}; +use ulid::Ulid; + +use crate::{CacheError, CacheReader, TonboCache}; + +pub(crate) trait SharedKey: Hash + PartialEq + Eq { + fn shared(&self, hash_builder: &S, shared: usize) -> usize; +} + +#[derive(Debug)] +pub(crate) struct SharedLruCache +where + K: SharedKey + Clone, + V: Clone, + S: BuildHasher, +{ + caches: Vec>>, + hasher: S, +} + +impl SharedLruCache +where + K: SharedKey + Clone, + V: Clone, + S: BuildHasher, +{ + pub fn new(cap: usize, shared: usize, hasher: S) -> Self { + assert_eq!(cap % shared, 0); + + let mut caches = Vec::with_capacity(shared); + + for _ in 0..shared { + caches.push(Mutex::new(LruCache::new(NonZeroUsize::new(cap).unwrap()))); + } + + SharedLruCache { caches, hasher } + } + + #[inline] + pub fn get) -> Option>(&self, key: &K, fn_value: F) -> Option { + let mut guard = self.shard(key).lock(); + fn_value(guard.get(key)) + } + + #[inline] + pub fn put(&self, key: K, value: V) -> Option { + self.shard(&key).lock().put(key, value) + } + + fn sharding_size(&self) -> usize { + self.caches.len() + } + + fn shard(&self, key: &K) -> &Mutex> { + let pos = key.shared(&self.hasher, self.sharding_size()); + &self.caches[pos] + } +} + +impl SharedKey for Ulid { + fn shared(&self, hash_builder: &S, shared: usize) -> usize { + hash_builder.hash_one(self) as usize % shared + } +} + +// let the Range of the same Gen be sharded to the same LRU +impl SharedKey for (Ulid, Range) { + fn shared(&self, hash_builder: &S, shared: usize) -> usize { + self.0.shared(hash_builder, shared) + } +} + +#[derive(Debug, Clone)] +pub struct LruMetaCache(Arc>>); +#[derive(Debug, Clone)] +pub struct LruRangeCache(Arc), Bytes>>); + +pub struct LruReader { + gen: Ulid, + inner: AsyncReader, + range_cache: LruRangeCache, + meta_cache: LruMetaCache, +} + +impl TonboCache> for LruMetaCache { + async fn get(&self, gen: &Ulid) -> Result>, CacheError> { + Ok(self.0.get(gen, |v| v.map(Arc::clone))) + } + + fn insert(&self, gen: Ulid, data: Arc) -> Arc { + let _ = self.0.put(gen, data.clone()); + data + } +} + +impl TonboCache<(Ulid, Range), Bytes> for LruRangeCache { + async fn get(&self, key: &(Ulid, Range)) -> Result, CacheError> { + Ok(self.0.get(key, |v| v.cloned())) + } + + fn insert(&self, key: (Ulid, Range), bytes: Bytes) -> Bytes { + let _ = self.0.put(key, bytes.clone()); + bytes + } +} + +impl AsyncFileReader for LruReader { + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, parquet::errors::Result> { + async move { + let key = (self.gen, range); + if let Some(bytes) = self + .range_cache + .get(&key) + .await + .map_err(|e| parquet::errors::ParquetError::External(From::from(e)))? + { + return Ok(bytes); + } + + let bytes = self.inner.get_bytes(key.1.clone()).await?; + Ok(self.range_cache.insert(key, bytes)) + } + .boxed() + } + + fn get_metadata(&mut self) -> BoxFuture<'_, parquet::errors::Result>> { + async move { + if let Some(meta) = self + .meta_cache + .get(&self.gen) + .await + .map_err(|e| parquet::errors::ParquetError::External(From::from(e)))? + { + return Ok(meta); + } + + let meta = self + .inner + .get_metadata() + .await + .map_err(|e| parquet::errors::ParquetError::External(From::from(e)))?; + + Ok(self.meta_cache.insert(self.gen, meta)) + } + .boxed() + } +} + +impl CacheReader for LruReader { + type MetaCache = LruMetaCache; + type RangeCache = LruRangeCache; + + fn new( + meta_cache: Self::MetaCache, + range_cache: Self::RangeCache, + gen: Ulid, + inner: AsyncReader, + ) -> Self { + LruReader { + gen, + inner, + range_cache, + meta_cache, + } + } + + async fn build_caches( + _: impl AsRef + Send, + cache_meta_capacity: usize, + cache_meta_shards: usize, + _: f64, + _: usize, + _: usize, + cache_range_capacity: usize, + cache_range_shards: usize, + ) -> Result<(Self::MetaCache, Self::RangeCache), CacheError> { + let meta_cache = LruMetaCache(Arc::new(SharedLruCache::new( + cache_meta_capacity, + cache_meta_shards, + RandomState::default(), + ))); + let range_cache = LruRangeCache(Arc::new(SharedLruCache::new( + cache_range_capacity, + cache_range_shards, + RandomState::default(), + ))); + + Ok((meta_cache, range_cache)) + } +}