diff --git a/Cargo.toml b/Cargo.toml index 68686ae..e1fd332 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -58,7 +58,7 @@ crc32fast = "1" crossbeam-skiplist = "0.1" datafusion = { version = "42", optional = true } flume = { version = "0.11", features = ["async"] } -fusio = { git = "https://github.com/tonbo-io/fusio.git", rev = "d2a5e7c0d17d173b1f67af25b7ff133e93244160", package = "fusio", version = "0.2.1", features = [ +fusio = { git = "https://github.com/tonbo-io/fusio.git", rev = "a97407aaea1a82fde1cff997e4db13be0f24b517", package = "fusio", version = "0.3.1", features = [ "aws", "dyn", "fs", @@ -66,11 +66,11 @@ fusio = { git = "https://github.com/tonbo-io/fusio.git", rev = "d2a5e7c0d17d173b "tokio", "tokio-http", ] } -fusio-dispatch = { git = "https://github.com/tonbo-io/fusio.git", rev = "d2a5e7c0d17d173b1f67af25b7ff133e93244160", package = "fusio-dispatch", version = "0.2.0", features = [ +fusio-dispatch = { git = "https://github.com/tonbo-io/fusio.git", rev = "a97407aaea1a82fde1cff997e4db13be0f24b517", package = "fusio-dispatch", version = "0.2.1", features = [ "aws", "tokio", ] } -fusio-parquet = { git = "https://github.com/tonbo-io/fusio.git", rev = "d2a5e7c0d17d173b1f67af25b7ff133e93244160", package = "fusio-parquet", version = "0.2.0" } +fusio-parquet = { git = "https://github.com/tonbo-io/fusio.git", rev = "a97407aaea1a82fde1cff997e4db13be0f24b517", package = "fusio-parquet", version = "0.2.1" } futures-core = "0.3" futures-io = "0.3" futures-util = "0.3" diff --git a/benches/common.rs b/benches/common.rs index 537da11..5289425 100644 --- a/benches/common.rs +++ b/benches/common.rs @@ -7,6 +7,7 @@ use std::{ }; use async_stream::stream; +use fusio_dispatch::FsOptions; use futures_core::Stream; use futures_util::StreamExt; use parquet::data_type::AsBytes; @@ -185,6 +186,97 @@ pub trait BenchReader { ) -> impl Stream + 'a; } +pub struct TonboS3BenchDataBase { + db: tonbo::DB, +} + +impl TonboS3BenchDataBase { + #[allow(dead_code)] + pub fn new(db: tonbo::DB) -> Self { + TonboS3BenchDataBase { db } + } +} + +impl BenchDatabase for TonboS3BenchDataBase { + type W<'db> + = TonboBenchWriteTransaction<'db> + where + Self: 'db; + type R<'db> + = TonboBenchReadTransaction<'db> + where + Self: 'db; + + fn db_type_name() -> &'static str { + "tonbo on s3" + } + + async fn write_transaction(&self) -> Self::W<'_> { + TonboBenchWriteTransaction { + txn: self.db.transaction().await, + } + } + + async fn read_transaction(&self) -> Self::R<'_> { + TonboBenchReadTransaction { + txn: self.db.transaction().await, + } + } + + async fn build(path: impl AsRef) -> Self { + create_dir_all(path.as_ref()).await.unwrap(); + + let fs_options = FsOptions::S3 { + bucket: "data".to_string(), + credential: Some(fusio::remotes::aws::credential::AwsCredential { + key_id: "user".to_string(), + secret_key: "password".to_string(), + token: None, + }), + endpoint: Some("http://localhost:9000".to_string()), + sign_payload: None, + checksum: None, + region: None, + }; + + let path = fusio::path::Path::from_filesystem_path(path.as_ref()).unwrap(); + let option = DbOption::from(path.clone()) + .level_path( + 0, + fusio::path::Path::from_url_path("/l0").unwrap(), + fs_options.clone(), + ) + .unwrap() + .level_path( + 1, + fusio::path::Path::from_url_path("/l1").unwrap(), + fs_options.clone(), + ) + .unwrap() + .level_path( + 2, + fusio::path::Path::from_url_path("/l2").unwrap(), + fs_options.clone(), + ) + .unwrap() + .level_path( + 3, + fusio::path::Path::from_url_path("/l3").unwrap(), + fs_options.clone(), + ) + .unwrap() + .level_path( + 4, + fusio::path::Path::from_url_path("/l4").unwrap(), + fs_options.clone(), + ) + .unwrap() + .disable_wal(); + + TonboS3BenchDataBase::new(tonbo::DB::new(option, TokioExecutor::new()).await.unwrap()) + } +} + pub struct TonboBenchDataBase { db: tonbo::DB, } diff --git a/benches/read_bench.rs b/benches/read_bench.rs index c849fcb..f209945 100644 --- a/benches/read_bench.rs +++ b/benches/read_bench.rs @@ -12,7 +12,8 @@ use tokio::{fs, io::AsyncWriteExt}; use crate::common::{ read_tbl, BenchDatabase, BenchReadTransaction, BenchReader, RedbBenchDatabase, - RocksdbBenchDatabase, SledBenchDatabase, TonboBenchDataBase, ITERATIONS, NUM_SCAN, READ_TIMES, + RocksdbBenchDatabase, SledBenchDatabase, TonboBenchDataBase, TonboS3BenchDataBase, ITERATIONS, + NUM_SCAN, READ_TIMES, }; async fn benchmark( @@ -152,10 +153,13 @@ async fn main() { load::(&tbl_path, data_dir.join("tonbo")).await; load::(&tbl_path, data_dir.join("rocksdb")).await; + load::(&tbl_path, data_dir.join("tonbo_s3")).await; } let tonbo_latency_results = { benchmark::(data_dir.join("tonbo")).await }; let rocksdb_results = { benchmark::(data_dir.join("rocksdb")).await }; + let tonbo_s3_latency_results = + { benchmark::(data_dir.join("tonbo_s3")).await }; let mut rows: Vec> = Vec::new(); @@ -163,7 +167,11 @@ async fn main() { rows.push(vec![benchmark.to_string()]); } - for results in [tonbo_latency_results, rocksdb_results] { + for results in [ + tonbo_latency_results, + rocksdb_results, + tonbo_s3_latency_results, + ] { for (i, (_benchmark, duration)) in results.iter().enumerate() { rows[i].push(format!("{}ms", duration.as_millis())); } @@ -171,7 +179,7 @@ async fn main() { let mut table = comfy_table::Table::new(); table.set_width(100); - table.set_header(["", "tonbo", "rocksdb"]); + table.set_header(["", "tonbo", "rocksdb", "tonbo_s3"]); for row in rows { table.add_row(row); } diff --git a/benches/write_bench.rs b/benches/write_bench.rs index f2f4335..2d2a3af 100644 --- a/benches/write_bench.rs +++ b/benches/write_bench.rs @@ -201,6 +201,10 @@ async fn main() { let tmp_file: TempDir = tempfile::tempdir_in(&tmpdir).unwrap(); benchmark::(tmp_file.path()).await }; + let tonbo_s3_latency_results = { + let tmp_file: TempDir = tempfile::tempdir_in(&tmpdir).unwrap(); + benchmark::(tmp_file.path()).await + }; let _ = fs::remove_dir_all(&tmpdir); @@ -210,7 +214,11 @@ async fn main() { rows.push(vec![benchmark.to_string()]); } - for results in [tonbo_latency_results, rocksdb_results] { + for results in [ + tonbo_latency_results, + rocksdb_results, + tonbo_s3_latency_results, + ] { for (i, (_benchmark, duration)) in results.iter().enumerate() { rows[i].push(format!("{}ms", duration.as_millis())); } @@ -218,7 +226,7 @@ async fn main() { let mut table = comfy_table::Table::new(); table.set_width(100); - table.set_header(["", "tonbo", "rocksdb"]); + table.set_header(["", "tonbo", "rocksdb", "tonbo_s3"]); for row in rows { table.add_row(row); } diff --git a/tonbo_ext_reader/Cargo.toml b/tonbo_ext_reader/Cargo.toml index 1e792d3..dcb011e 100644 --- a/tonbo_ext_reader/Cargo.toml +++ b/tonbo_ext_reader/Cargo.toml @@ -11,13 +11,13 @@ bytes = { version = "1.7", features = ["serde"] } foyer = { version = "0.12" } futures-core = "0.3" futures-util = "0.3" -fusio-parquet = { git = "https://github.com/tonbo-io/fusio.git", rev = "d2a5e7c0d17d173b1f67af25b7ff133e93244160", package = "fusio-parquet", version = "0.2.0" } +fusio-parquet = { git = "https://github.com/tonbo-io/fusio.git", rev = "a97407aaea1a82fde1cff997e4db13be0f24b517", package = "fusio-parquet", version = "0.2.1" } parquet = { version = "53", features = ["async"] } thiserror = "1" ulid = { version = "1", features = ["serde"] } [dev-dependencies] -fusio = { git = "https://github.com/tonbo-io/fusio.git", rev = "d2a5e7c0d17d173b1f67af25b7ff133e93244160", package = "fusio", version = "0.2.1", features = [ +fusio = { git = "https://github.com/tonbo-io/fusio.git", rev = "a97407aaea1a82fde1cff997e4db13be0f24b517", package = "fusio", version = "0.3.1", features = [ "aws", "dyn", "fs", @@ -25,7 +25,7 @@ fusio = { git = "https://github.com/tonbo-io/fusio.git", rev = "d2a5e7c0d17d173b "tokio", "tokio-http", ] } -fusio-dispatch = { git = "https://github.com/tonbo-io/fusio.git", rev = "d2a5e7c0d17d173b1f67af25b7ff133e93244160", package = "fusio-dispatch", version = "0.2.0", features = [ +fusio-dispatch = { git = "https://github.com/tonbo-io/fusio.git", rev = "a97407aaea1a82fde1cff997e4db13be0f24b517", package = "fusio-dispatch", version = "0.2.1", features = [ "tokio", ] } tempfile = "3"