Skip to content

Commit

Permalink
chore: remove foyer and tonbo coupling
Browse files Browse the repository at this point in the history
  • Loading branch information
KKould committed Oct 31, 2024
1 parent 3091cec commit 7f73d70
Show file tree
Hide file tree
Showing 23 changed files with 593 additions and 396 deletions.
1 change: 1 addition & 0 deletions bindings/python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@ pyo3-asyncio = { package = "pyo3-asyncio-0-21", version = "0.21", features = [
] }
tokio = { version = "1", features = ["rt-multi-thread"] }
tonbo = { version = ">=0", path = "../../" }
tonbo_ext_reader = { path = "../../tonbo_ext_reader" }
5 changes: 2 additions & 3 deletions bindings/python/src/db.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::sync::Arc;

use pyo3::{
prelude::*,
pyclass, pymethods,
Expand All @@ -12,7 +11,7 @@ use tonbo::{
record::{ColumnDesc, DynRecord},
DB,
};

use tonbo_ext_reader::foyer_reader::FoyerReader;
use crate::{
column::Column,
error::{CommitError, DbError},
Expand All @@ -28,7 +27,7 @@ type PyExecutor = TokioExecutor;
pub struct TonboDB {
desc: Arc<Vec<Column>>,
primary_key_index: usize,
db: Arc<DB<DynRecord, PyExecutor>>,
db: Arc<DB<DynRecord, PyExecutor, FoyerReader>>,
}

#[pymethods]
Expand Down
1 change: 1 addition & 0 deletions bindings/python/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ impl From<DbError> for PyErr {
tonbo::DbError::Fusio(err) => InnerError::new_err(err.to_string()),
tonbo::DbError::Recover(err) => RecoverError::new_err(err.to_string()),
tonbo::DbError::WalWrite(err) => PyIOError::new_err(err.to_string()),
tonbo::DbError::Cache(err) => InnerError::new_err(err.to_string()),
tonbo::DbError::ExceedsMaxLevel => ExceedsMaxLevelError::new_err("Exceeds max level"),
}
}
Expand Down
18 changes: 9 additions & 9 deletions bindings/python/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use pyo3::{
};
use pyo3_asyncio::tokio::future_into_py;
use tonbo::{record::DynRecord, transaction, Projection};

use tonbo_ext_reader::foyer_reader::FoyerReader;
use crate::{
column::Column,
error::{repeated_commit_err, CommitError, DbError},
Expand All @@ -18,14 +18,14 @@ use crate::{

#[pyclass]
pub struct Transaction {
txn: Option<transaction::Transaction<'static, DynRecord>>,
txn: Option<transaction::Transaction<'static, DynRecord, FoyerReader>>,
desc: Arc<Vec<Column>>,
primary_key_index: usize,
}

impl Transaction {
pub(crate) fn new<'txn>(
txn: transaction::Transaction<'txn, DynRecord>,
txn: transaction::Transaction<'txn, DynRecord, FoyerReader>,
desc: Arc<Vec<Column>>,
) -> Self {
let primary_key_index = desc
Expand All @@ -37,8 +37,8 @@ impl Transaction {
Transaction {
txn: Some(unsafe {
transmute::<
transaction::Transaction<'txn, DynRecord>,
transaction::Transaction<'static, DynRecord>,
transaction::Transaction<'txn, DynRecord, FoyerReader>,
transaction::Transaction<'static, DynRecord, FoyerReader>,
>(txn)
}),
desc,
Expand Down Expand Up @@ -84,8 +84,8 @@ impl Transaction {
let txn = self.txn.as_ref().unwrap();
let txn = unsafe {
transmute::<
&transaction::Transaction<'_, DynRecord>,
&'static transaction::Transaction<'_, DynRecord>,
&transaction::Transaction<'_, DynRecord, FoyerReader>,
&'static transaction::Transaction<'_, DynRecord, FoyerReader>,
>(txn)
};

Expand Down Expand Up @@ -169,8 +169,8 @@ impl Transaction {
let txn = self.txn.as_ref().unwrap();
let txn = unsafe {
transmute::<
&transaction::Transaction<'_, DynRecord>,
&'static transaction::Transaction<'_, DynRecord>,
&transaction::Transaction<'_, DynRecord, FoyerReader>,
&'static transaction::Transaction<'_, DynRecord, FoyerReader>,
>(txn)
};
let col_desc = self.desc.get(self.primary_key_index).unwrap();
Expand Down
10 changes: 7 additions & 3 deletions examples/datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use tokio::fs;
use tonbo::{
executor::tokio::TokioExecutor, inmem::immutable::ArrowArrays, record::Record, DbOption, DB,
};
use tonbo_ext_reader::foyer_reader::FoyerReader;
use tonbo_macros::Record;

#[derive(Record, Debug)]
Expand All @@ -41,12 +42,12 @@ pub struct Music {
}

struct MusicProvider {
db: Arc<DB<Music, TokioExecutor>>,
db: Arc<DB<Music, TokioExecutor, FoyerReader>>,
}

struct MusicExec {
cache: PlanProperties,
db: Arc<DB<Music, TokioExecutor>>,
db: Arc<DB<Music, TokioExecutor, FoyerReader>>,
projection: Option<Vec<usize>>,
limit: Option<usize>,
range: (Bound<<Music as Record>::Key>, Bound<<Music as Record>::Key>),
Expand Down Expand Up @@ -95,7 +96,10 @@ impl TableProvider for MusicProvider {
}

impl MusicExec {
fn new(db: Arc<DB<Music, TokioExecutor>>, projection: Option<&Vec<usize>>) -> Self {
fn new(
db: Arc<DB<Music, TokioExecutor, FoyerReader>>,
projection: Option<&Vec<usize>>,
) -> Self {
let schema = Music::arrow_schema();
let schema = if let Some(projection) = &projection {
Arc::new(schema.project(projection).unwrap())
Expand Down
5 changes: 4 additions & 1 deletion examples/declare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use fusio::path::Path;
use futures_util::stream::StreamExt;
use tokio::fs;
use tonbo::{executor::tokio::TokioExecutor, DbOption, Projection, Record, DB};
use tonbo_ext_reader::foyer_reader::FoyerReader;

/// Use macro to define schema of column family just like ORM
/// It provides type-safe read & write API
Expand All @@ -24,7 +25,9 @@ async fn main() {

let options = DbOption::from(Path::from_filesystem_path("./db_path/users").unwrap());
// pluggable async runtime and I/O
let db = DB::new(options, TokioExecutor::default()).await.unwrap();
let db = DB::<_, _, FoyerReader>::new(options, TokioExecutor::default())
.await
.unwrap();

// insert with owned value
db.insert(User {
Expand Down
56 changes: 31 additions & 25 deletions src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use futures_util::StreamExt;
use parquet::arrow::{AsyncArrowWriter, ProjectionMask};
use thiserror::Error;
use tokio::sync::oneshot;
use tonbo_ext_reader::CacheError;
use tonbo_ext_reader::{CacheError, CacheReader};
use ulid::Ulid;

use crate::{
Expand All @@ -33,27 +33,29 @@ pub enum CompactTask {
Flush(Option<oneshot::Sender<()>>),
}

pub(crate) struct Compactor<R>
pub(crate) struct Compactor<R, C>
where
R: Record,
C: CacheReader,
{
pub(crate) option: Arc<DbOption<R>>,
pub(crate) schema: Arc<RwLock<Schema<R>>>,
pub(crate) version_set: VersionSet<R>,
pub(crate) version_set: VersionSet<R, C>,
pub(crate) manager: Arc<StoreManager>,
}

impl<R> Compactor<R>
impl<R, C> Compactor<R, C>
where
R: Record,
C: CacheReader + 'static,
{
pub(crate) fn new(
schema: Arc<RwLock<Schema<R>>>,
option: Arc<DbOption<R>>,
version_set: VersionSet<R>,
version_set: VersionSet<R, C>,
manager: Arc<StoreManager>,
) -> Self {
Compactor::<R> {
Compactor::<R, C> {
option,
schema,
version_set,
Expand Down Expand Up @@ -188,7 +190,7 @@ where

#[allow(clippy::too_many_arguments)]
pub(crate) async fn major_compaction(
version: &Version<R>,
version: &Version<R, C>,
option: &DbOption<R>,
mut min: &R::Key,
mut max: &R::Key,
Expand Down Expand Up @@ -305,7 +307,7 @@ where
}

fn next_level_scopes<'a>(
version: &'a Version<R>,
version: &'a Version<R, C>,
min: &mut &'a <R as Record>::Key,
max: &mut &'a <R as Record>::Key,
level: usize,
Expand All @@ -328,8 +330,8 @@ where
.max()
.ok_or(CompactionError::EmptyLevel)?;

start_ll = Version::<R>::scope_search(min, &version.level_slice[level + 1]);
end_ll = Version::<R>::scope_search(max, &version.level_slice[level + 1]);
start_ll = Version::<R, C>::scope_search(min, &version.level_slice[level + 1]);
end_ll = Version::<R, C>::scope_search(max, &version.level_slice[level + 1]);

let next_level_len = version.level_slice[level + 1].len();
for scope in version.level_slice[level + 1]
Expand All @@ -345,13 +347,13 @@ where
}

fn this_level_scopes<'a>(
version: &'a Version<R>,
version: &'a Version<R, C>,
min: &<R as Record>::Key,
max: &<R as Record>::Key,
level: usize,
) -> (Vec<&'a Scope<<R as Record>::Key>>, usize, usize) {
let mut meet_scopes_l = Vec::new();
let mut start_l = Version::<R>::scope_search(min, &version.level_slice[level]);
let mut start_l = Version::<R, C>::scope_search(min, &version.level_slice[level]);
let mut end_l = start_l;
let option = version.option();

Expand Down Expand Up @@ -386,11 +388,11 @@ where
option: &DbOption<R>,
version_edits: &mut Vec<VersionEdit<<R as Record>::Key>>,
level: usize,
streams: Vec<ScanStream<'scan, R>>,
streams: Vec<ScanStream<'scan, R, C>>,
instance: &RecordInstance,
fs: &Arc<dyn DynFs>,
) -> Result<(), CompactionError<R>> {
let mut stream = MergeStream::<R>::from_vec(streams, u32::MAX.into()).await?;
let mut stream = MergeStream::<R, C>::from_vec(streams, u32::MAX.into()).await?;

// Kould: is the capacity parameter necessary?
let mut builder = R::Columns::builder(&instance.arrow_schema::<R>(), 8192);
Expand Down Expand Up @@ -523,7 +525,7 @@ pub(crate) mod tests {
use fusio_parquet::writer::AsyncWriter;
use parquet::arrow::AsyncArrowWriter;
use tempfile::TempDir;
use tonbo_ext_reader::foyer_reader::build_cache;
use tonbo_ext_reader::{foyer_reader::FoyerReader, CacheReader};

use crate::{
compaction::Compactor,
Expand Down Expand Up @@ -682,7 +684,7 @@ pub(crate) mod tests {
.await
.unwrap();

let scope = Compactor::<Test>::minor_compaction(
let scope = Compactor::<Test, FoyerReader>::minor_compaction(
&option,
None,
&vec![
Expand Down Expand Up @@ -746,7 +748,7 @@ pub(crate) mod tests {
.await
.unwrap();

let scope = Compactor::<DynRecord>::minor_compaction(
let scope = Compactor::<DynRecord, FoyerReader>::minor_compaction(
&option,
None,
&vec![
Expand Down Expand Up @@ -811,7 +813,7 @@ pub(crate) mod tests {
let max = 5.to_string();
let mut version_edits = Vec::new();

Compactor::<Test>::major_compaction(
Compactor::<Test, FoyerReader>::major_compaction(
&version,
&option,
&min,
Expand Down Expand Up @@ -855,7 +857,10 @@ pub(crate) mod tests {
pub(crate) async fn build_version(
option: &Arc<DbOption<Test>>,
manager: &StoreManager,
) -> ((FileId, FileId, FileId, FileId, FileId), Version<Test>) {
) -> (
(FileId, FileId, FileId, FileId, FileId),
Version<Test, FoyerReader>,
) {
let level_0_fs = option
.level_fs_path(0)
.map(|path| manager.get_fs(path))
Expand Down Expand Up @@ -1065,7 +1070,7 @@ pub(crate) mod tests {
.unwrap();

let (sender, _) = bounded(1);
let (meta_cache, range_cache) = build_cache(
let (meta_cache, range_cache) = FoyerReader::build_caches(
path_to_local(&option.cache_path).unwrap(),
option.cache_meta_capacity,
option.cache_meta_shards,
Expand All @@ -1075,7 +1080,7 @@ pub(crate) mod tests {
)
.await
.unwrap();
let mut version = Version::<Test>::new(
let mut version = Version::<Test, FoyerReader>::new(
option.clone(),
sender,
Arc::new(AtomicU32::default()),
Expand Down Expand Up @@ -1198,7 +1203,7 @@ pub(crate) mod tests {

let option = Arc::new(option);
let (sender, _) = bounded(1);
let (meta_cache, range_cache) = build_cache(
let (meta_cache, range_cache) = FoyerReader::build_caches(
path_to_local(&option.cache_path).unwrap(),
option.cache_meta_capacity,
option.cache_meta_shards,
Expand All @@ -1208,7 +1213,7 @@ pub(crate) mod tests {
)
.await
.unwrap();
let mut version = Version::<Test>::new(
let mut version = Version::<Test, FoyerReader>::new(
option.clone(),
sender,
Arc::new(AtomicU32::default()),
Expand All @@ -1232,7 +1237,7 @@ pub(crate) mod tests {
let min = 6.to_string();
let max = 9.to_string();

Compactor::<Test>::major_compaction(
Compactor::<Test, FoyerReader>::major_compaction(
&version,
&option,
&min,
Expand Down Expand Up @@ -1261,7 +1266,8 @@ pub(crate) mod tests {
option.major_default_oldest_table_num = 1;
option.trigger_type = TriggerType::Length(5);

let db: DB<Test, TokioExecutor> = DB::new(option, TokioExecutor::new()).await.unwrap();
let db: DB<Test, TokioExecutor, FoyerReader> =
DB::new(option, TokioExecutor::new()).await.unwrap();

for i in 5..9 {
let item = Test {
Expand Down
Loading

0 comments on commit 7f73d70

Please sign in to comment.