Skip to content

Commit

Permalink
bench: add tonbo_s3
Browse files Browse the repository at this point in the history
  • Loading branch information
KKould committed Nov 1, 2024
1 parent 054296b commit 65c0d0f
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 11 deletions.
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,19 +58,19 @@ crc32fast = "1"
crossbeam-skiplist = "0.1"
datafusion = { version = "42", optional = true }
flume = { version = "0.11", features = ["async"] }
fusio = { git = "https://github.com/tonbo-io/fusio.git", rev = "d2a5e7c0d17d173b1f67af25b7ff133e93244160", package = "fusio", version = "0.2.1", features = [
fusio = { git = "https://github.com/tonbo-io/fusio.git", rev = "a97407aaea1a82fde1cff997e4db13be0f24b517", package = "fusio", version = "0.3.1", features = [
"aws",
"dyn",
"fs",
"object_store",
"tokio",
"tokio-http",
] }
fusio-dispatch = { git = "https://github.com/tonbo-io/fusio.git", rev = "d2a5e7c0d17d173b1f67af25b7ff133e93244160", package = "fusio-dispatch", version = "0.2.0", features = [
fusio-dispatch = { git = "https://github.com/tonbo-io/fusio.git", rev = "a97407aaea1a82fde1cff997e4db13be0f24b517", package = "fusio-dispatch", version = "0.2.1", features = [
"aws",
"tokio",
] }
fusio-parquet = { git = "https://github.com/tonbo-io/fusio.git", rev = "d2a5e7c0d17d173b1f67af25b7ff133e93244160", package = "fusio-parquet", version = "0.2.0" }
fusio-parquet = { git = "https://github.com/tonbo-io/fusio.git", rev = "a97407aaea1a82fde1cff997e4db13be0f24b517", package = "fusio-parquet", version = "0.2.1" }
futures-core = "0.3"
futures-io = "0.3"
futures-util = "0.3"
Expand Down
92 changes: 92 additions & 0 deletions benches/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{
};

use async_stream::stream;
use fusio_dispatch::FsOptions;
use futures_core::Stream;
use futures_util::StreamExt;
use parquet::data_type::AsBytes;
Expand Down Expand Up @@ -185,6 +186,97 @@ pub trait BenchReader {
) -> impl Stream<Item = ProjectionResult> + 'a;
}

pub struct TonboS3BenchDataBase {
db: tonbo::DB<Customer, TokioExecutor, FoyerReader>,
}

impl TonboS3BenchDataBase {
#[allow(dead_code)]
pub fn new(db: tonbo::DB<Customer, TokioExecutor, FoyerReader>) -> Self {
TonboS3BenchDataBase { db }
}
}

impl BenchDatabase for TonboS3BenchDataBase {
type W<'db>
= TonboBenchWriteTransaction<'db>
where
Self: 'db;
type R<'db>
= TonboBenchReadTransaction<'db>
where
Self: 'db;

fn db_type_name() -> &'static str {
"tonbo on s3"
}

async fn write_transaction(&self) -> Self::W<'_> {
TonboBenchWriteTransaction {
txn: self.db.transaction().await,
}
}

async fn read_transaction(&self) -> Self::R<'_> {
TonboBenchReadTransaction {
txn: self.db.transaction().await,
}
}

async fn build(path: impl AsRef<Path>) -> Self {
create_dir_all(path.as_ref()).await.unwrap();

let fs_options = FsOptions::S3 {
bucket: "data".to_string(),
credential: Some(fusio::remotes::aws::credential::AwsCredential {
key_id: "user".to_string(),
secret_key: "password".to_string(),
token: None,
}),
endpoint: Some("http://localhost:9000".to_string()),
sign_payload: None,
checksum: None,
region: None,
};

let path = fusio::path::Path::from_filesystem_path(path.as_ref()).unwrap();
let option = DbOption::from(path.clone())
.level_path(
0,
fusio::path::Path::from_url_path("/l0").unwrap(),
fs_options.clone(),
)
.unwrap()
.level_path(
1,
fusio::path::Path::from_url_path("/l1").unwrap(),
fs_options.clone(),
)
.unwrap()
.level_path(
2,
fusio::path::Path::from_url_path("/l2").unwrap(),
fs_options.clone(),
)
.unwrap()
.level_path(
3,
fusio::path::Path::from_url_path("/l3").unwrap(),
fs_options.clone(),
)
.unwrap()
.level_path(
4,
fusio::path::Path::from_url_path("/l4").unwrap(),
fs_options.clone(),
)
.unwrap()
.disable_wal();

TonboS3BenchDataBase::new(tonbo::DB::new(option, TokioExecutor::new()).await.unwrap())
}
}

pub struct TonboBenchDataBase {
db: tonbo::DB<Customer, TokioExecutor, FoyerReader>,
}
Expand Down
14 changes: 11 additions & 3 deletions benches/read_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ use tokio::{fs, io::AsyncWriteExt};

use crate::common::{
read_tbl, BenchDatabase, BenchReadTransaction, BenchReader, RedbBenchDatabase,
RocksdbBenchDatabase, SledBenchDatabase, TonboBenchDataBase, ITERATIONS, NUM_SCAN, READ_TIMES,
RocksdbBenchDatabase, SledBenchDatabase, TonboBenchDataBase, TonboS3BenchDataBase, ITERATIONS,
NUM_SCAN, READ_TIMES,
};

async fn benchmark<T: BenchDatabase + Send + Sync>(
Expand Down Expand Up @@ -152,26 +153,33 @@ async fn main() {

load::<TonboBenchDataBase>(&tbl_path, data_dir.join("tonbo")).await;
load::<RocksdbBenchDatabase>(&tbl_path, data_dir.join("rocksdb")).await;
load::<TonboS3BenchDataBase>(&tbl_path, data_dir.join("tonbo_s3")).await;
}

let tonbo_latency_results = { benchmark::<TonboBenchDataBase>(data_dir.join("tonbo")).await };
let rocksdb_results = { benchmark::<RocksdbBenchDatabase>(data_dir.join("rocksdb")).await };
let tonbo_s3_latency_results =
{ benchmark::<TonboS3BenchDataBase>(data_dir.join("tonbo_s3")).await };

let mut rows: Vec<Vec<String>> = Vec::new();

for (benchmark, _duration) in &tonbo_latency_results {
rows.push(vec![benchmark.to_string()]);
}

for results in [tonbo_latency_results, rocksdb_results] {
for results in [
tonbo_latency_results,
rocksdb_results,
tonbo_s3_latency_results,
] {
for (i, (_benchmark, duration)) in results.iter().enumerate() {
rows[i].push(format!("{}ms", duration.as_millis()));
}
}

let mut table = comfy_table::Table::new();
table.set_width(100);
table.set_header(["", "tonbo", "rocksdb"]);
table.set_header(["", "tonbo", "rocksdb", "tonbo_s3"]);
for row in rows {
table.add_row(row);
}
Expand Down
12 changes: 10 additions & 2 deletions benches/write_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,10 @@ async fn main() {
let tmp_file: TempDir = tempfile::tempdir_in(&tmpdir).unwrap();
benchmark::<RocksdbBenchDatabase>(tmp_file.path()).await
};
let tonbo_s3_latency_results = {
let tmp_file: TempDir = tempfile::tempdir_in(&tmpdir).unwrap();
benchmark::<TonboS3BenchDataBase>(tmp_file.path()).await
};

let _ = fs::remove_dir_all(&tmpdir);

Expand All @@ -210,15 +214,19 @@ async fn main() {
rows.push(vec![benchmark.to_string()]);
}

for results in [tonbo_latency_results, rocksdb_results] {
for results in [
tonbo_latency_results,
rocksdb_results,
tonbo_s3_latency_results,
] {
for (i, (_benchmark, duration)) in results.iter().enumerate() {
rows[i].push(format!("{}ms", duration.as_millis()));
}
}

let mut table = comfy_table::Table::new();
table.set_width(100);
table.set_header(["", "tonbo", "rocksdb"]);
table.set_header(["", "tonbo", "rocksdb", "tonbo_s3"]);
for row in rows {
table.add_row(row);
}
Expand Down
6 changes: 3 additions & 3 deletions tonbo_ext_reader/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,21 @@ bytes = { version = "1.7", features = ["serde"] }
foyer = { version = "0.12" }
futures-core = "0.3"
futures-util = "0.3"
fusio-parquet = { git = "https://github.com/tonbo-io/fusio.git", rev = "d2a5e7c0d17d173b1f67af25b7ff133e93244160", package = "fusio-parquet", version = "0.2.0" }
fusio-parquet = { git = "https://github.com/tonbo-io/fusio.git", rev = "a97407aaea1a82fde1cff997e4db13be0f24b517", package = "fusio-parquet", version = "0.2.1" }
parquet = { version = "53", features = ["async"] }
thiserror = "1"
ulid = { version = "1", features = ["serde"] }

[dev-dependencies]
fusio = { git = "https://github.com/tonbo-io/fusio.git", rev = "d2a5e7c0d17d173b1f67af25b7ff133e93244160", package = "fusio", version = "0.2.1", features = [
fusio = { git = "https://github.com/tonbo-io/fusio.git", rev = "a97407aaea1a82fde1cff997e4db13be0f24b517", package = "fusio", version = "0.3.1", features = [
"aws",
"dyn",
"fs",
"object_store",
"tokio",
"tokio-http",
] }
fusio-dispatch = { git = "https://github.com/tonbo-io/fusio.git", rev = "d2a5e7c0d17d173b1f67af25b7ff133e93244160", package = "fusio-dispatch", version = "0.2.0", features = [
fusio-dispatch = { git = "https://github.com/tonbo-io/fusio.git", rev = "a97407aaea1a82fde1cff997e4db13be0f24b517", package = "fusio-dispatch", version = "0.2.1", features = [
"tokio",
] }
tempfile = "3"
Expand Down

0 comments on commit 65c0d0f

Please sign in to comment.