Skip to content

Commit

Permalink
chore: optimization with review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
KKould committed Nov 11, 2024
1 parent 3b5e11a commit 89f5d59
Show file tree
Hide file tree
Showing 18 changed files with 121 additions and 111 deletions.
11 changes: 6 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@ resolver = "2"
version = "0.2.0"

[package.metadata]
msrv = "1.81.0"
msrv = "1.82.0"

[features]
bench = ["redb", "rocksdb", "sled"]
bench = ["redb", "rocksdb", "sled", "foyer"]
bytes = ["dep:bytes"]
datafusion = ["dep:async-trait", "dep:datafusion"]
default = ["bytes", "tokio"]
foyer = ["tonbo_ext_reader/foyer"]
load_tbl = []
redb = ["dep:redb"]
rocksdb = ["dep:rocksdb"]
Expand Down Expand Up @@ -58,19 +59,19 @@ crc32fast = "1"
crossbeam-skiplist = "0.1"
datafusion = { version = "42", optional = true }
flume = { version = "0.11", features = ["async"] }
fusio = { git = "https://github.com/tonbo-io/fusio.git", rev = "db305599ae88c7a934ab1ab235d8b3efda95a242", package = "fusio", version = "0.3.1", features = [
fusio = { package = "fusio", version = "0.3.3", features = [
"aws",
"dyn",
"fs",
"object_store",
"tokio",
"tokio-http",
] }
fusio-dispatch = { git = "https://github.com/tonbo-io/fusio.git", rev = "db305599ae88c7a934ab1ab235d8b3efda95a242", package = "fusio-dispatch", version = "0.2.1", features = [
fusio-dispatch = { package = "fusio-dispatch", version = "0.2.2", features = [
"aws",
"tokio",
] }
fusio-parquet = { git = "https://github.com/tonbo-io/fusio.git", rev = "db305599ae88c7a934ab1ab235d8b3efda95a242", package = "fusio-parquet", version = "0.2.1" }
fusio-parquet = { package = "fusio-parquet", version = "0.2.2" }
futures-core = "0.3"
futures-io = "0.3"
futures-util = "0.3"
Expand Down
4 changes: 2 additions & 2 deletions bindings/python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ crate-type = ["cdylib"]
[workspace]

[dependencies]
fusio = { git = "https://github.com/tonbo-io/fusio.git", rev = "db305599ae88c7a934ab1ab235d8b3efda95a242", package = "fusio", version = "0.3.1", features = ["aws", "tokio"] }
fusio-dispatch = { git = "https://github.com/tonbo-io/fusio.git", rev = "db305599ae88c7a934ab1ab235d8b3efda95a242", package = "fusio-dispatch", version = "0.2.1", features = [
fusio = { package = "fusio", version = "0.3.3", features = ["aws", "tokio"] }
fusio-dispatch = { package = "fusio-dispatch", version = "0.2.2", features = [
"aws",
"tokio",
] }
Expand Down
4 changes: 2 additions & 2 deletions bindings/python/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use tonbo::{
record::{ColumnDesc, DynRecord},
DB,
};
use tonbo_ext_reader::foyer_reader::FoyerReader;
use tonbo_ext_reader::lru_reader::LruReader;
use crate::{
column::Column,
error::{CommitError, DbError},
Expand All @@ -27,7 +27,7 @@ type PyExecutor = TokioExecutor;
pub struct TonboDB {
desc: Arc<Vec<Column>>,
primary_key_index: usize,
db: Arc<DB<DynRecord, PyExecutor, FoyerReader>>,
db: Arc<DB<DynRecord, PyExecutor, LruReader>>,
}

#[pymethods]
Expand Down
18 changes: 9 additions & 9 deletions bindings/python/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use pyo3::{
};
use pyo3_asyncio::tokio::future_into_py;
use tonbo::{record::DynRecord, transaction, Projection};
use tonbo_ext_reader::foyer_reader::FoyerReader;
use tonbo_ext_reader::lru_reader::LruReader;
use crate::{
column::Column,
error::{repeated_commit_err, CommitError, DbError},
Expand All @@ -18,14 +18,14 @@ use crate::{

#[pyclass]
pub struct Transaction {
txn: Option<transaction::Transaction<'static, DynRecord, FoyerReader>>,
txn: Option<transaction::Transaction<'static, DynRecord, LruReader>>,
desc: Arc<Vec<Column>>,
primary_key_index: usize,
}

impl Transaction {
pub(crate) fn new<'txn>(
txn: transaction::Transaction<'txn, DynRecord, FoyerReader>,
txn: transaction::Transaction<'txn, DynRecord, LruReader>,
desc: Arc<Vec<Column>>,
) -> Self {
let primary_key_index = desc
Expand All @@ -37,8 +37,8 @@ impl Transaction {
Transaction {
txn: Some(unsafe {
transmute::<
transaction::Transaction<'txn, DynRecord, FoyerReader>,
transaction::Transaction<'static, DynRecord, FoyerReader>,
transaction::Transaction<'txn, DynRecord, LruReader>,
transaction::Transaction<'static, DynRecord, LruReader>,
>(txn)
}),
desc,
Expand Down Expand Up @@ -84,8 +84,8 @@ impl Transaction {
let txn = self.txn.as_ref().unwrap();
let txn = unsafe {
transmute::<
&transaction::Transaction<'_, DynRecord, FoyerReader>,
&'static transaction::Transaction<'_, DynRecord, FoyerReader>,
&transaction::Transaction<'_, DynRecord, LruReader>,
&'static transaction::Transaction<'_, DynRecord, LruReader>,
>(txn)
};

Expand Down Expand Up @@ -169,8 +169,8 @@ impl Transaction {
let txn = self.txn.as_ref().unwrap();
let txn = unsafe {
transmute::<
&transaction::Transaction<'_, DynRecord, FoyerReader>,
&'static transaction::Transaction<'_, DynRecord, FoyerReader>,
&transaction::Transaction<'_, DynRecord, LruReader>,
&'static transaction::Transaction<'_, DynRecord, LruReader>,
>(txn)
};
let col_desc = self.desc.get(self.primary_key_index).unwrap();
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[toolchain]
channel = "1.81.0"
channel = "1.82.0"
components = ["clippy", "rust-analyzer", "rustfmt"]
22 changes: 11 additions & 11 deletions src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ pub(crate) mod tests {
use fusio_parquet::writer::AsyncWriter;
use parquet::arrow::AsyncArrowWriter;
use tempfile::TempDir;
use tonbo_ext_reader::{foyer_reader::FoyerReader, CacheReader};
use tonbo_ext_reader::{lru_reader::LruReader, CacheReader};

use crate::{
compaction::Compactor,
Expand Down Expand Up @@ -684,7 +684,7 @@ pub(crate) mod tests {
.await
.unwrap();

let scope = Compactor::<Test, FoyerReader>::minor_compaction(
let scope = Compactor::<Test, LruReader>::minor_compaction(
&option,
None,
&vec![
Expand Down Expand Up @@ -748,7 +748,7 @@ pub(crate) mod tests {
.await
.unwrap();

let scope = Compactor::<DynRecord, FoyerReader>::minor_compaction(
let scope = Compactor::<DynRecord, LruReader>::minor_compaction(
&option,
None,
&vec![
Expand Down Expand Up @@ -813,7 +813,7 @@ pub(crate) mod tests {
let max = 5.to_string();
let mut version_edits = Vec::new();

Compactor::<Test, FoyerReader>::major_compaction(
Compactor::<Test, LruReader>::major_compaction(
&version,
&option,
&min,
Expand Down Expand Up @@ -859,7 +859,7 @@ pub(crate) mod tests {
manager: &StoreManager,
) -> (
(FileId, FileId, FileId, FileId, FileId),
Version<Test, FoyerReader>,
Version<Test, LruReader>,
) {
let level_0_fs = option
.level_fs_path(0)
Expand Down Expand Up @@ -1070,7 +1070,7 @@ pub(crate) mod tests {
.unwrap();

let (sender, _) = bounded(1);
let (meta_cache, range_cache) = FoyerReader::build_caches(
let (meta_cache, range_cache) = LruReader::build_caches(
path_to_local(&option.cache_path).unwrap(),
option.cache_meta_capacity,
option.cache_meta_shards,
Expand All @@ -1082,7 +1082,7 @@ pub(crate) mod tests {
)
.await
.unwrap();
let mut version = Version::<Test, FoyerReader>::new(
let mut version = Version::<Test, LruReader>::new(
option.clone(),
sender,
Arc::new(AtomicU32::default()),
Expand Down Expand Up @@ -1205,7 +1205,7 @@ pub(crate) mod tests {

let option = Arc::new(option);
let (sender, _) = bounded(1);
let (meta_cache, range_cache) = FoyerReader::build_caches(
let (meta_cache, range_cache) = LruReader::build_caches(
path_to_local(&option.cache_path).unwrap(),
option.cache_meta_capacity,
option.cache_meta_shards,
Expand All @@ -1217,7 +1217,7 @@ pub(crate) mod tests {
)
.await
.unwrap();
let mut version = Version::<Test, FoyerReader>::new(
let mut version = Version::<Test, LruReader>::new(
option.clone(),
sender,
Arc::new(AtomicU32::default()),
Expand All @@ -1241,7 +1241,7 @@ pub(crate) mod tests {
let min = 6.to_string();
let max = 9.to_string();

Compactor::<Test, FoyerReader>::major_compaction(
Compactor::<Test, LruReader>::major_compaction(
&version,
&option,
&min,
Expand Down Expand Up @@ -1270,7 +1270,7 @@ pub(crate) mod tests {
option.major_default_oldest_table_num = 1;
option.trigger_type = TriggerType::Length(5);

let db: DB<Test, TokioExecutor, FoyerReader> =
let db: DB<Test, TokioExecutor, LruReader> =
DB::new(option, TokioExecutor::new()).await.unwrap();

for i in 5..9 {
Expand Down
6 changes: 3 additions & 3 deletions src/inmem/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crossbeam_skiplist::{
map::{Entry, Range},
SkipMap,
};
use fusio::{buffered::BufWriter, dynamic::DynFile, DynFs};
use fusio::{buffered::BufWriter, DynFs, DynWrite};
use ulid::Ulid;

use crate::{
Expand Down Expand Up @@ -37,7 +37,7 @@ where
R: Record,
{
pub(crate) data: SkipMap<Timestamped<R::Key>, Option<R>>,
wal: Option<Mutex<WalFile<Box<dyn DynFile>, R>>>,
wal: Option<Mutex<WalFile<Box<dyn DynWrite>, R>>>,
pub(crate) trigger: Arc<Box<dyn Trigger<R> + Send + Sync>>,
}

Expand All @@ -61,7 +61,7 @@ where
)
.await?,
option.wal_buffer_size,
)) as Box<dyn DynFile>;
)) as Box<dyn DynWrite>;

wal = Some(Mutex::new(WalFile::new(file, file_id)));
};
Expand Down
26 changes: 13 additions & 13 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
//! use tokio::fs;
//! use tokio_util::bytes::Bytes;
//! use tonbo::{executor::tokio::TokioExecutor, DbOption, Projection, Record, DB};
//! use tonbo_ext_reader::foyer_reader::FoyerReader;
//! use tonbo_ext_reader::lru_reader::LruReader;
//!
//! // use macro to define schema of column family just like ORM
//! // it provides type safety read & write API
Expand All @@ -57,7 +57,7 @@
//!
//! let options = DbOption::from(Path::from_filesystem_path("./db_path/users").unwrap());
//! // pluggable async runtime and I/O
//! let db: DB<User, TokioExecutor, FoyerReader> =
//! let db: DB<User, TokioExecutor, LruReader> =
//! DB::new(options, TokioExecutor::default()).await.unwrap();
//! // insert with owned value
//! db.insert(User {
Expand Down Expand Up @@ -858,7 +858,7 @@ pub(crate) mod tests {
use once_cell::sync::Lazy;
use parquet::{arrow::ProjectionMask, format::SortingColumn, schema::types::ColumnPath};
use tempfile::TempDir;
use tonbo_ext_reader::{foyer_reader::FoyerReader, CacheReader};
use tonbo_ext_reader::{lru_reader::LruReader, CacheReader};
use tracing::error;

use crate::{
Expand Down Expand Up @@ -1097,7 +1097,7 @@ pub(crate) mod tests {
option: DbOption<Test>,
executor: E,
) -> RecordBatch {
let db: DB<Test, E, FoyerReader> = DB::new(option.clone(), executor).await.unwrap();
let db: DB<Test, E, LruReader> = DB::new(option.clone(), executor).await.unwrap();
let base_fs = db.manager.base_fs();

db.write(
Expand Down Expand Up @@ -1534,7 +1534,7 @@ pub(crate) mod tests {
option.major_default_oldest_table_num = 1;
option.trigger_type = TriggerType::Length(/* max_mutable_len */ 5);

let db: DB<Test, TokioExecutor, FoyerReader> =
let db: DB<Test, TokioExecutor, LruReader> =
DB::new(option, TokioExecutor::new()).await.unwrap();

for (i, item) in test_items().into_iter().enumerate() {
Expand Down Expand Up @@ -1571,7 +1571,7 @@ pub(crate) mod tests {
option.major_default_oldest_table_num = 1;
option.trigger_type = TriggerType::Length(/* max_mutable_len */ 50);

let db: DB<Test, TokioExecutor, FoyerReader> =
let db: DB<Test, TokioExecutor, LruReader> =
DB::new(option, TokioExecutor::new()).await.unwrap();

for item in &test_items()[0..10] {
Expand Down Expand Up @@ -1621,7 +1621,7 @@ pub(crate) mod tests {
schema.flush_wal().await.unwrap();
drop(schema);

let db: DB<Test, TokioExecutor, FoyerReader> =
let db: DB<Test, TokioExecutor, LruReader> =
DB::new(option.as_ref().to_owned(), TokioExecutor::new())
.await
.unwrap();
Expand Down Expand Up @@ -1694,7 +1694,7 @@ pub(crate) mod tests {
"id".to_owned(),
primary_key_index,
);
let db: DB<DynRecord, TokioExecutor, FoyerReader> =
let db: DB<DynRecord, TokioExecutor, LruReader> =
DB::with_schema(option, TokioExecutor::new(), desc, primary_key_index)
.await
.unwrap();
Expand Down Expand Up @@ -1734,7 +1734,7 @@ pub(crate) mod tests {
option.major_threshold_with_sst_size = 3;
option.major_default_oldest_table_num = 1;
option.trigger_type = TriggerType::Length(5);
let db: DB<Test, TokioExecutor, FoyerReader> =
let db: DB<Test, TokioExecutor, LruReader> =
DB::new(option, TokioExecutor::new()).await.unwrap();

for (idx, item) in test_items().into_iter().enumerate() {
Expand Down Expand Up @@ -1777,7 +1777,7 @@ pub(crate) mod tests {
option.major_default_oldest_table_num = 1;
option.trigger_type = TriggerType::Length(5);

let db: DB<DynRecord, TokioExecutor, FoyerReader> =
let db: DB<DynRecord, TokioExecutor, LruReader> =
DB::with_schema(option, TokioExecutor::new(), cols_desc, primary_key_index)
.await
.unwrap();
Expand Down Expand Up @@ -2007,23 +2007,23 @@ pub(crate) mod tests {
option3.major_default_oldest_table_num = 1;
option3.trigger_type = TriggerType::Length(5);

let db1: DB<DynRecord, TokioExecutor, FoyerReader> = DB::with_schema(
let db1: DB<DynRecord, TokioExecutor, LruReader> = DB::with_schema(
option,
TokioExecutor::new(),
cols_desc.clone(),
primary_key_index,
)
.await
.unwrap();
let db2: DB<DynRecord, TokioExecutor, FoyerReader> = DB::with_schema(
let db2: DB<DynRecord, TokioExecutor, LruReader> = DB::with_schema(
option2,
TokioExecutor::new(),
cols_desc.clone(),
primary_key_index,
)
.await
.unwrap();
let db3: DB<DynRecord, TokioExecutor, FoyerReader> =
let db3: DB<DynRecord, TokioExecutor, LruReader> =
DB::with_schema(option3, TokioExecutor::new(), cols_desc, primary_key_index)
.await
.unwrap();
Expand Down
Loading

0 comments on commit 89f5d59

Please sign in to comment.