From 3a983fdbd695e6de8fcc210c643dc7122d142413 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 17 Jan 2021 14:41:10 +0100 Subject: [PATCH 01/15] Datafusion solution --- .gitignore | 2 ++ run.conf | 2 +- run.sh | 2 ++ 3 files changed, 5 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 3f072cfb..c77f2bc9 100644 --- a/.gitignore +++ b/.gitignore @@ -30,3 +30,5 @@ run.out clickhouse/etc_sudoers.bak workdir/ timeout-exit-codes.out +*/target +*.lock diff --git a/run.conf b/run.conf index de58a433..c2fa9160 100644 --- a/run.conf +++ b/run.conf @@ -1,7 +1,7 @@ # task, used in init-setup-iteration.R export RUN_TASKS="groupby join" # solution, used in init-setup-iteration.R -export RUN_SOLUTIONS="data.table pydatatable dplyr pandas spark dask juliadf cudf clickhouse polars" +export RUN_SOLUTIONS="datafusion" # flag to upgrade tools, used in run.sh on init export DO_UPGRADE=true diff --git a/run.sh b/run.sh index c1c45e07..4ea0853c 100755 --- a/run.sh +++ b/run.sh @@ -62,6 +62,8 @@ if [[ "$DO_UPGRADE" == true && "$RUN_SOLUTIONS" =~ "h2o" ]]; then ./h2o/upg-h2o. if [[ "$RUN_SOLUTIONS" =~ "h2o" ]]; then ./h2o/ver-h2o.sh; fi; if [[ "$DO_UPGRADE" == true && "$RUN_SOLUTIONS" =~ "polars" ]]; then ./polars/upg-polars.sh; fi; if [[ "$RUN_SOLUTIONS" =~ "polars" ]]; then ./polars/ver-polars.sh; fi; +if [[ "$DO_UPGRADE" == true && "$RUN_SOLUTIONS" =~ "datafusion" ]]; then ./datafusion/upg-datafusion.sh; fi; +if [[ "$RUN_SOLUTIONS" =~ "polars" ]]; then ./datafusion/ver-datafusion.sh; fi; # run if [[ -f ./stop ]]; then echo "# Benchmark run $BATCH has been interrupted after $(($(date +%s)-$BATCH))s due to 'stop' file" && rm -f ./stop && rm -f ./run.lock && exit; fi; From b1f613ee44e26aeab9d7a81367914c43513ee52f Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 17 Jan 2021 14:42:01 +0100 Subject: [PATCH 02/15] Datafusion solution --- datafusion/Cargo.toml | 13 ++++++ datafusion/setup-datafusion.sh | 5 +++ datafusion/src/main.rs | 78 ++++++++++++++++++++++++++++++++++ datafusion/upg-datafusion.sh | 8 ++++ datafusion/ver-datafusion.sh | 1 + 5 files changed, 105 insertions(+) create mode 100644 datafusion/Cargo.toml create mode 100755 datafusion/setup-datafusion.sh create mode 100644 datafusion/src/main.rs create mode 100755 datafusion/upg-datafusion.sh create mode 100755 datafusion/ver-datafusion.sh diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml new file mode 100644 index 00000000..e2da7664 --- /dev/null +++ b/datafusion/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "db-benchmark" +version = "0.1.0" +edition = "2018" + +[dependencies] +datafusion = { git = "https://github.com/apache/arrow.git", features = ["simd"] } +arrow = { git = "https://github.com/apache/arrow.git", features = ["simd"] } +tokio = { version = "0.2", features = ["macros", "rt-core", "rt-threaded"] } +snmalloc-rs = "0.2" + +[profile.release] +lto = true \ No newline at end of file diff --git a/datafusion/setup-datafusion.sh b/datafusion/setup-datafusion.sh new file mode 100755 index 00000000..c8422bc4 --- /dev/null +++ b/datafusion/setup-datafusion.sh @@ -0,0 +1,5 @@ +#!/bin/bash +set -e + +curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y + diff --git a/datafusion/src/main.rs b/datafusion/src/main.rs new file mode 100644 index 00000000..650d065c --- /dev/null +++ b/datafusion/src/main.rs @@ -0,0 +1,78 @@ +use arrow::datatypes::{DataType, Field, Schema}; +use datafusion::datasource::{CsvFile, MemTable}; +use datafusion::error::Result; +use datafusion::prelude::*; +use std::env; +use std::time::Instant; + +#[cfg(feature = "snmalloc")] +#[global_allocator] +static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc; + +#[tokio::main] +async fn main() -> Result<()> { + let mut ctx = ExecutionContext::new(); + let data = format!("../data/{}.csv", env::var("SRC_DATANAME").unwrap()); + + let schema = Schema::new(vec![ + Field::new("id1", DataType::Utf8, false), + Field::new("id2", DataType::Utf8, false), + Field::new("id3", DataType::Utf8, false), + Field::new("id4", DataType::Int32, false), + Field::new("id5", DataType::Int32, false), + Field::new("id6", DataType::Int32, false), + Field::new("v1", DataType::Int32, false), + Field::new("v2", DataType::Int32, false), + Field::new("v3", DataType::Float64, false), + ]); + let options = CsvReadOptions::new().schema(&schema).has_header(true); + + let csv = CsvFile::try_new(&data, options).unwrap(); + let batch_size = 16384; + let memtable = MemTable::load(&csv, batch_size).await?; + ctx.register_table("t", Box::new(memtable)); + + // "q1" + let start = Instant::now(); + let df = ctx.sql("SELECT id1, SUM(v1) AS v1 FROM t GROUP BY id1")?; + + let _results = df.collect().await?; + + println!("q1 took {} ms", start.elapsed().as_millis()); + + // "q2" + let start = Instant::now(); + let df = ctx.sql("SELECT id1, id2, SUM(v1) AS v1 FROM t GROUP BY id1, id2")?; + + let _results = df.collect().await?; + + println!("q2 took {} ms", start.elapsed().as_millis()); + + // "q3" + let start = Instant::now(); + let df = ctx.sql("SELECT id3, SUM(v1) AS v1, AVG(v3) AS v3 FROM t GROUP BY id3")?; + + let _results = df.collect().await?; + + println!("q3 took {} ms", start.elapsed().as_millis()); + + // "q4" + let start = Instant::now(); + let df = + ctx.sql("SELECT id1, AVG(v1) AS v1, AVG(v2) AS v2, AVG(v3) AS v3 FROM t GROUP BY id4")?; + + let _results = df.collect().await?; + + println!("q4 took {} ms", start.elapsed().as_millis()); + + // "q5" + let start = Instant::now(); + let df = + ctx.sql("SELECT id6, SUM(v1) AS v1, SUM(v2) AS v2, SUM(v3) AS v3 FROM t GROUP BY id6")?; + + let _results = df.collect().await?; + + println!("q5 took {} ms", start.elapsed().as_millis()); + + Ok(()) +} diff --git a/datafusion/upg-datafusion.sh b/datafusion/upg-datafusion.sh new file mode 100755 index 00000000..c0bb8469 --- /dev/null +++ b/datafusion/upg-datafusion.sh @@ -0,0 +1,8 @@ +#!/bin/bash +set -e + +cd datafusion + +cargo update + +cd ../ \ No newline at end of file diff --git a/datafusion/ver-datafusion.sh b/datafusion/ver-datafusion.sh new file mode 100755 index 00000000..13aaea6f --- /dev/null +++ b/datafusion/ver-datafusion.sh @@ -0,0 +1 @@ +cargo tree | grep "├── datafusion (.*)" \ No newline at end of file From 51ce127a7e315fb3984aaec0a048b9d66d33f109 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 17 Jan 2021 14:49:39 +0100 Subject: [PATCH 03/15] Query fix --- datafusion/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/src/main.rs b/datafusion/src/main.rs index 650d065c..dd002728 100644 --- a/datafusion/src/main.rs +++ b/datafusion/src/main.rs @@ -59,7 +59,7 @@ async fn main() -> Result<()> { // "q4" let start = Instant::now(); let df = - ctx.sql("SELECT id1, AVG(v1) AS v1, AVG(v2) AS v2, AVG(v3) AS v3 FROM t GROUP BY id4")?; + ctx.sql("SELECT id4, AVG(v1) AS v1, AVG(v2) AS v2, AVG(v3) AS v3 FROM t GROUP BY id4")?; let _results = df.collect().await?; From 3343428c858689b3cca0682e24100ba1e29629ee Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 17 Jan 2021 15:22:33 +0100 Subject: [PATCH 04/15] Undo change --- run.conf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/run.conf b/run.conf index c2fa9160..37d08736 100644 --- a/run.conf +++ b/run.conf @@ -1,7 +1,7 @@ # task, used in init-setup-iteration.R export RUN_TASKS="groupby join" # solution, used in init-setup-iteration.R -export RUN_SOLUTIONS="datafusion" +export RUN_SOLUTIONS="data.table pydatatable dplyr pandas spark dask juliadf cudf clickhouse polars datafusion" # flag to upgrade tools, used in run.sh on init export DO_UPGRADE=true From d1e7ff36a846c23b0e18c0d23d18c11f2386d0f7 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 17 Jan 2021 17:39:26 +0100 Subject: [PATCH 05/15] Increase batch size --- datafusion/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/src/main.rs b/datafusion/src/main.rs index dd002728..d85912aa 100644 --- a/datafusion/src/main.rs +++ b/datafusion/src/main.rs @@ -28,7 +28,7 @@ async fn main() -> Result<()> { let options = CsvReadOptions::new().schema(&schema).has_header(true); let csv = CsvFile::try_new(&data, options).unwrap(); - let batch_size = 16384; + let batch_size = 65536; let memtable = MemTable::load(&csv, batch_size).await?; ctx.register_table("t", Box::new(memtable)); From 58be012d8e24e79634ec8527549640f4879f68d9 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Mon, 18 Jan 2021 08:47:01 +0100 Subject: [PATCH 06/15] Rename to ans --- datafusion/src/main.rs | 39 +++++++++++++++++++++------------------ run.sh | 2 +- 2 files changed, 22 insertions(+), 19 deletions(-) diff --git a/datafusion/src/main.rs b/datafusion/src/main.rs index d85912aa..933208e6 100644 --- a/datafusion/src/main.rs +++ b/datafusion/src/main.rs @@ -30,47 +30,50 @@ async fn main() -> Result<()> { let csv = CsvFile::try_new(&data, options).unwrap(); let batch_size = 65536; let memtable = MemTable::load(&csv, batch_size).await?; - ctx.register_table("t", Box::new(memtable)); + ctx.register_table("tbl", Box::new(memtable)); // "q1" let start = Instant::now(); - let df = ctx.sql("SELECT id1, SUM(v1) AS v1 FROM t GROUP BY id1")?; - - let _results = df.collect().await?; + let ans = ctx + .sql("SELECT id1, SUM(v1) AS v1 FROM tbl GROUP BY id1")? + .collect() + .await?; println!("q1 took {} ms", start.elapsed().as_millis()); // "q2" let start = Instant::now(); - let df = ctx.sql("SELECT id1, id2, SUM(v1) AS v1 FROM t GROUP BY id1, id2")?; - - let _results = df.collect().await?; + let ans = ctx + .sql("SELECT id1, id2, SUM(v1) AS v1 FROM tbl GROUP BY id1, id2")? + .collect() + .await?; println!("q2 took {} ms", start.elapsed().as_millis()); // "q3" let start = Instant::now(); - let df = ctx.sql("SELECT id3, SUM(v1) AS v1, AVG(v3) AS v3 FROM t GROUP BY id3")?; - - let _results = df.collect().await?; + let ans = ctx + .sql("SELECT id3, SUM(v1) AS v1, AVG(v3) AS v3 FROM tbl GROUP BY id3")? + .collect() + .await?; println!("q3 took {} ms", start.elapsed().as_millis()); // "q4" let start = Instant::now(); - let df = - ctx.sql("SELECT id4, AVG(v1) AS v1, AVG(v2) AS v2, AVG(v3) AS v3 FROM t GROUP BY id4")?; - - let _results = df.collect().await?; + let ans = ctx + .sql("SELECT id4, AVG(v1) AS v1, AVG(v2) AS v2, AVG(v3) AS v3 FROM tbl GROUP BY id4")? + .collect() + .await?; println!("q4 took {} ms", start.elapsed().as_millis()); // "q5" let start = Instant::now(); - let df = - ctx.sql("SELECT id6, SUM(v1) AS v1, SUM(v2) AS v2, SUM(v3) AS v3 FROM t GROUP BY id6")?; - - let _results = df.collect().await?; + let ans = ctx + .sql("SELECT id6, SUM(v1) AS v1, SUM(v2) AS v2, SUM(v3) AS v3 FROM tbl GROUP BY id6")? + .collect() + .await?; println!("q5 took {} ms", start.elapsed().as_millis()); diff --git a/run.sh b/run.sh index 4ea0853c..e7c30c02 100755 --- a/run.sh +++ b/run.sh @@ -63,7 +63,7 @@ if [[ "$RUN_SOLUTIONS" =~ "h2o" ]]; then ./h2o/ver-h2o.sh; fi; if [[ "$DO_UPGRADE" == true && "$RUN_SOLUTIONS" =~ "polars" ]]; then ./polars/upg-polars.sh; fi; if [[ "$RUN_SOLUTIONS" =~ "polars" ]]; then ./polars/ver-polars.sh; fi; if [[ "$DO_UPGRADE" == true && "$RUN_SOLUTIONS" =~ "datafusion" ]]; then ./datafusion/upg-datafusion.sh; fi; -if [[ "$RUN_SOLUTIONS" =~ "polars" ]]; then ./datafusion/ver-datafusion.sh; fi; +if [[ "$RUN_SOLUTIONS" =~ "datafusion" ]]; then ./datafusion/ver-datafusion.sh; fi; # run if [[ -f ./stop ]]; then echo "# Benchmark run $BATCH has been interrupted after $(($(date +%s)-$BATCH))s due to 'stop' file" && rm -f ./stop && rm -f ./run.lock && exit; fi; From d87c92dbb08439b8c2ec208b0136bba0c16701a1 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Mon, 18 Jan 2021 08:49:07 +0100 Subject: [PATCH 07/15] Fix --- datafusion/ver-datafusion.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/ver-datafusion.sh b/datafusion/ver-datafusion.sh index 13aaea6f..d2a75cef 100755 --- a/datafusion/ver-datafusion.sh +++ b/datafusion/ver-datafusion.sh @@ -1 +1 @@ -cargo tree | grep "├── datafusion (.*)" \ No newline at end of file +cargo tree | grep "├── datafusion" From 2b67e2a506900a7dd019f97b8ecdf11c878aa2c6 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Mon, 18 Jan 2021 17:22:45 +0100 Subject: [PATCH 08/15] Add q7/q10 --- datafusion/src/main.rs | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/datafusion/src/main.rs b/datafusion/src/main.rs index 933208e6..d9b54004 100644 --- a/datafusion/src/main.rs +++ b/datafusion/src/main.rs @@ -77,5 +77,23 @@ async fn main() -> Result<()> { println!("q5 took {} ms", start.elapsed().as_millis()); + // "q7" + let start = Instant::now(); + let ans = ctx + .sql("SELECT id3, MAX(v1) - MIN(v2) AS range_v1_v2 FROM tbl GROUP BY id3")? + .collect() + .await?; + + println!("q7 took {} ms", start.elapsed().as_millis()); + + // "q10" + let start = Instant::now(); + let ans = ctx + .sql("SELECT id1, id2, id3, id4, id5, id6, SUM(v3) as v5, COUNT(*) AS cnt FROM tbl GROUP BY id1, id2, id3, id4, id5, id6")? + .collect() + .await?; + + println!("q10 took {} ms", start.elapsed().as_millis()); + Ok(()) } From d217e3713e0caedc2667d4e11809cfd63d552552 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Mon, 18 Jan 2021 22:53:15 +0100 Subject: [PATCH 09/15] Use multiple threads better --- datafusion/Cargo.toml | 5 +++-- datafusion/src/main.rs | 4 +++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml index e2da7664..e6075567 100644 --- a/datafusion/Cargo.toml +++ b/datafusion/Cargo.toml @@ -4,10 +4,11 @@ version = "0.1.0" edition = "2018" [dependencies] -datafusion = { git = "https://github.com/apache/arrow.git", features = ["simd"] } -arrow = { git = "https://github.com/apache/arrow.git", features = ["simd"] } +datafusion = { git = "https://github.com/Dandandan/arrow.git", features = ["simd"], branch="mem_table_repartition" } +arrow = { git = "https://github.com/Dandandan/arrow.git", features = ["simd"], branch="mem_table_repartition"} tokio = { version = "0.2", features = ["macros", "rt-core", "rt-threaded"] } snmalloc-rs = "0.2" +num_cpus="1.0" [profile.release] lto = true \ No newline at end of file diff --git a/datafusion/src/main.rs b/datafusion/src/main.rs index d9b54004..9b8dcd2f 100644 --- a/datafusion/src/main.rs +++ b/datafusion/src/main.rs @@ -29,7 +29,9 @@ async fn main() -> Result<()> { let csv = CsvFile::try_new(&data, options).unwrap(); let batch_size = 65536; - let memtable = MemTable::load(&csv, batch_size).await?; + let partition_size = num_cpus::get() * 2; + + let memtable = MemTable::load(&csv, batch_size, Some(partition_size)).await?; ctx.register_table("tbl", Box::new(memtable)); // "q1" From 5a3e5ec115b621f9237debe686dd2dfafd43d803 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Mon, 18 Jan 2021 22:53:36 +0100 Subject: [PATCH 10/15] Add exec script --- datafusion/exec.sh | 4 ++++ 1 file changed, 4 insertions(+) create mode 100755 datafusion/exec.sh diff --git a/datafusion/exec.sh b/datafusion/exec.sh new file mode 100755 index 00000000..2709a0ad --- /dev/null +++ b/datafusion/exec.sh @@ -0,0 +1,4 @@ +#!/bin/bash +set -e + +RUSTFLAGS='-C target-cpu=native' cargo run --release \ No newline at end of file From f83905048cfe28456806f72dc10fcec13acbd064 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Mon, 18 Jan 2021 23:14:38 +0100 Subject: [PATCH 11/15] Some cleanup --- datafusion/src/main.rs | 110 ++++++++++++++++++----------------------- 1 file changed, 48 insertions(+), 62 deletions(-) diff --git a/datafusion/src/main.rs b/datafusion/src/main.rs index 9b8dcd2f..4f9d412a 100644 --- a/datafusion/src/main.rs +++ b/datafusion/src/main.rs @@ -9,6 +9,17 @@ use std::time::Instant; #[global_allocator] static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc; +async fn exec_query(ctx: &mut ExecutionContext, query: &str, name: &str) -> Result<()> { + let start = Instant::now(); + + let ans = ctx.sql(query)?.collect().await?; + + // TODO: print details + + println!("{} took {} ms", name, start.elapsed().as_millis()); + + Ok(()) +} #[tokio::main] async fn main() -> Result<()> { let mut ctx = ExecutionContext::new(); @@ -34,68 +45,43 @@ async fn main() -> Result<()> { let memtable = MemTable::load(&csv, batch_size, Some(partition_size)).await?; ctx.register_table("tbl", Box::new(memtable)); - // "q1" - let start = Instant::now(); - let ans = ctx - .sql("SELECT id1, SUM(v1) AS v1 FROM tbl GROUP BY id1")? - .collect() - .await?; - - println!("q1 took {} ms", start.elapsed().as_millis()); - - // "q2" - let start = Instant::now(); - let ans = ctx - .sql("SELECT id1, id2, SUM(v1) AS v1 FROM tbl GROUP BY id1, id2")? - .collect() - .await?; - - println!("q2 took {} ms", start.elapsed().as_millis()); - - // "q3" - let start = Instant::now(); - let ans = ctx - .sql("SELECT id3, SUM(v1) AS v1, AVG(v3) AS v3 FROM tbl GROUP BY id3")? - .collect() - .await?; - - println!("q3 took {} ms", start.elapsed().as_millis()); - - // "q4" - let start = Instant::now(); - let ans = ctx - .sql("SELECT id4, AVG(v1) AS v1, AVG(v2) AS v2, AVG(v3) AS v3 FROM tbl GROUP BY id4")? - .collect() - .await?; - - println!("q4 took {} ms", start.elapsed().as_millis()); - - // "q5" - let start = Instant::now(); - let ans = ctx - .sql("SELECT id6, SUM(v1) AS v1, SUM(v2) AS v2, SUM(v3) AS v3 FROM tbl GROUP BY id6")? - .collect() - .await?; - - println!("q5 took {} ms", start.elapsed().as_millis()); - - // "q7" - let start = Instant::now(); - let ans = ctx - .sql("SELECT id3, MAX(v1) - MIN(v2) AS range_v1_v2 FROM tbl GROUP BY id3")? - .collect() - .await?; - - println!("q7 took {} ms", start.elapsed().as_millis()); - - // "q10" - let start = Instant::now(); - let ans = ctx - .sql("SELECT id1, id2, id3, id4, id5, id6, SUM(v3) as v5, COUNT(*) AS cnt FROM tbl GROUP BY id1, id2, id3, id4, id5, id6")? - .collect() - .await?; - - println!("q10 took {} ms", start.elapsed().as_millis()); + exec_query( + &mut ctx, + "SELECT id1, SUM(v1) AS v1 FROM tbl GROUP BY id1", + "q1", + ) + .await?; + exec_query( + &mut ctx, + "SELECT id1, id2, SUM(v1) AS v1 FROM tbl GROUP BY id1, id2", + "q2", + ) + .await?; + exec_query( + &mut ctx, + "SELECT id3, SUM(v1) AS v1, AVG(v3) AS v3 FROM tbl GROUP BY id3", + "q3", + ) + .await?; + exec_query( + &mut ctx, + "SELECT id4, AVG(v1) AS v1, AVG(v2) AS v2, AVG(v3) AS v3 FROM tbl GROUP BY id4", + "q4", + ) + .await?; + exec_query( + &mut ctx, + "SELECT id6, SUM(v1) AS v1, SUM(v2) AS v2, SUM(v3) AS v3 FROM tbl GROUP BY id6", + "q5", + ) + .await?; + exec_query( + &mut ctx, + "SELECT id3, MAX(v1) - MIN(v2) AS range_v1_v2 FROM tbl GROUP BY id3", + "q7", + ) + .await?; + exec_query(&mut ctx, "SELECT id1, id2, id3, id4, id5, id6, SUM(v3) as v5, COUNT(*) AS cnt FROM tbl GROUP BY id1, id2, id3, id4, id5, id6", "q10").await?; Ok(()) } From 6cb14f5364d141ff8e6b86e470a8c0b1fc35f04b Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Mon, 18 Jan 2021 23:17:52 +0100 Subject: [PATCH 12/15] Rename --- datafusion/src/main.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/datafusion/src/main.rs b/datafusion/src/main.rs index 4f9d412a..2bf6474a 100644 --- a/datafusion/src/main.rs +++ b/datafusion/src/main.rs @@ -81,7 +81,8 @@ async fn main() -> Result<()> { "q7", ) .await?; - exec_query(&mut ctx, "SELECT id1, id2, id3, id4, id5, id6, SUM(v3) as v5, COUNT(*) AS cnt FROM tbl GROUP BY id1, id2, id3, id4, id5, id6", "q10").await?; + + exec_query(&mut ctx, "SELECT id1, id2, id3, id4, id5, id6, SUM(v3) as v3, COUNT(*) AS cnt FROM tbl GROUP BY id1, id2, id3, id4, id5, id6", "q10").await?; Ok(()) } From 63fe38b7170fd18f22cc0553c21abbb76bf0f745 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Tue, 19 Jan 2021 08:21:11 +0100 Subject: [PATCH 13/15] Fix disabled snmalloc --- datafusion/Cargo.toml | 5 +++-- datafusion/src/main.rs | 3 +-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml index e6075567..a6586e5d 100644 --- a/datafusion/Cargo.toml +++ b/datafusion/Cargo.toml @@ -7,8 +7,9 @@ edition = "2018" datafusion = { git = "https://github.com/Dandandan/arrow.git", features = ["simd"], branch="mem_table_repartition" } arrow = { git = "https://github.com/Dandandan/arrow.git", features = ["simd"], branch="mem_table_repartition"} tokio = { version = "0.2", features = ["macros", "rt-core", "rt-threaded"] } -snmalloc-rs = "0.2" +snmalloc-rs = {version = "0.2", features= ["cache-friendly"]} num_cpus="1.0" [profile.release] -lto = true \ No newline at end of file +lto = true +codegen-units = 1 diff --git a/datafusion/src/main.rs b/datafusion/src/main.rs index 2bf6474a..bb407c96 100644 --- a/datafusion/src/main.rs +++ b/datafusion/src/main.rs @@ -5,7 +5,6 @@ use datafusion::prelude::*; use std::env; use std::time::Instant; -#[cfg(feature = "snmalloc")] #[global_allocator] static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc; @@ -40,7 +39,7 @@ async fn main() -> Result<()> { let csv = CsvFile::try_new(&data, options).unwrap(); let batch_size = 65536; - let partition_size = num_cpus::get() * 2; + let partition_size = num_cpus::get(); let memtable = MemTable::load(&csv, batch_size, Some(partition_size)).await?; ctx.register_table("tbl", Box::new(memtable)); From cbecfbcaf44fc4da591b77fee65000d730c3371b Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Tue, 19 Jan 2021 20:51:10 +0100 Subject: [PATCH 14/15] Use arrow master again --- datafusion/Cargo.toml | 6 +++--- datafusion/ver-datafusion.sh | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml index a6586e5d..b5ae9b38 100644 --- a/datafusion/Cargo.toml +++ b/datafusion/Cargo.toml @@ -4,11 +4,11 @@ version = "0.1.0" edition = "2018" [dependencies] -datafusion = { git = "https://github.com/Dandandan/arrow.git", features = ["simd"], branch="mem_table_repartition" } -arrow = { git = "https://github.com/Dandandan/arrow.git", features = ["simd"], branch="mem_table_repartition"} +datafusion = { git = "https://github.com/apache/arrow.git", features = ["simd"]} +arrow = { git = "https://github.com/apache/arrow.git", features = ["simd"]} tokio = { version = "0.2", features = ["macros", "rt-core", "rt-threaded"] } snmalloc-rs = {version = "0.2", features= ["cache-friendly"]} -num_cpus="1.0" +num_cpus = "1.0" [profile.release] lto = true diff --git a/datafusion/ver-datafusion.sh b/datafusion/ver-datafusion.sh index d2a75cef..05e4f20c 100755 --- a/datafusion/ver-datafusion.sh +++ b/datafusion/ver-datafusion.sh @@ -1 +1 @@ -cargo tree | grep "├── datafusion" +cargo tree | grep "├── datafusion" | cut -d ' ' -f3 \ No newline at end of file From 88ba39171e455a55557c3613d08886c228335f2d Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Wed, 19 May 2021 22:53:59 +0200 Subject: [PATCH 15/15] Update benchmark code --- datafusion/Cargo.toml | 5 ++--- datafusion/exec.sh | 2 +- datafusion/src/main.rs | 9 ++++----- 3 files changed, 7 insertions(+), 9 deletions(-) diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml index b5ae9b38..16e3632c 100644 --- a/datafusion/Cargo.toml +++ b/datafusion/Cargo.toml @@ -4,9 +4,8 @@ version = "0.1.0" edition = "2018" [dependencies] -datafusion = { git = "https://github.com/apache/arrow.git", features = ["simd"]} -arrow = { git = "https://github.com/apache/arrow.git", features = ["simd"]} -tokio = { version = "0.2", features = ["macros", "rt-core", "rt-threaded"] } +datafusion = { git = "https://github.com/apache/arrow-datafusion.git", features = ["simd"]} +tokio = { version = "^1.0", features = ["macros", "rt", "rt-multi-thread"] } snmalloc-rs = {version = "0.2", features= ["cache-friendly"]} num_cpus = "1.0" diff --git a/datafusion/exec.sh b/datafusion/exec.sh index 2709a0ad..649afefa 100755 --- a/datafusion/exec.sh +++ b/datafusion/exec.sh @@ -1,4 +1,4 @@ #!/bin/bash set -e -RUSTFLAGS='-C target-cpu=native' cargo run --release \ No newline at end of file +RUSTFLAGS='-C target-cpu=native' cargo +nightly run --release \ No newline at end of file diff --git a/datafusion/src/main.rs b/datafusion/src/main.rs index bb407c96..949fddee 100644 --- a/datafusion/src/main.rs +++ b/datafusion/src/main.rs @@ -1,8 +1,7 @@ -use arrow::datatypes::{DataType, Field, Schema}; -use datafusion::datasource::{CsvFile, MemTable}; +use datafusion::{arrow::datatypes::{DataType, Field, Schema}, datasource::{CsvFile, MemTable}}; use datafusion::error::Result; use datafusion::prelude::*; -use std::env; +use std::{env, sync::Arc}; use std::time::Instant; #[global_allocator] @@ -41,8 +40,8 @@ async fn main() -> Result<()> { let batch_size = 65536; let partition_size = num_cpus::get(); - let memtable = MemTable::load(&csv, batch_size, Some(partition_size)).await?; - ctx.register_table("tbl", Box::new(memtable)); + let memtable = MemTable::load(Arc::new(csv), batch_size, Some(partition_size)).await?; + ctx.register_table("tbl", Arc::new(memtable)); exec_query( &mut ctx,