From 11bf46e48acf8d15415e00ace2fb79089e7b88a8 Mon Sep 17 00:00:00 2001 From: Namgung Chan <9511chn@gmail.com> Date: Mon, 5 May 2025 22:02:32 +0900 Subject: [PATCH 1/8] PERF : modify SMJ shuffle file reader to skip validation --- datafusion/physical-plan/src/joins/sort_merge_join.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs b/datafusion/physical-plan/src/joins/sort_merge_join.rs index 6817bd9b76dd..751388507456 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs @@ -2313,7 +2313,7 @@ fn fetch_right_columns_from_batch_by_idxs( Vec::with_capacity(buffered_indices.len()); let file = BufReader::new(File::open(spill_file.path())?); - let reader = StreamReader::try_new(file, None)?; + let reader = unsafe {StreamReader::try_new(file, None)?.with_skip_validation(true)}; for batch in reader { batch?.columns().iter().for_each(|column| { From 4c19675cc3425086a84d20dc182e5df1799a743a Mon Sep 17 00:00:00 2001 From: Namgung Chan <9511chn@gmail.com> Date: Tue, 6 May 2025 17:11:45 +0900 Subject: [PATCH 2/8] add bench --- datafusion/physical-plan/Cargo.toml | 4 + .../physical-plan/benches/sort_merge_join.rs | 97 +++++++++++++++++++ datafusion/physical-plan/src/lib.rs | 1 - 3 files changed, 101 insertions(+), 1 deletion(-) create mode 100644 datafusion/physical-plan/benches/sort_merge_join.rs diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index 5210ee26755c..8eebaa4a628d 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -86,3 +86,7 @@ name = "partial_ordering" [[bench]] harness = false name = "spill_io" + +[[bench]] +harness = false +name = "sort_merge_join" \ No newline at end of file diff --git a/datafusion/physical-plan/benches/sort_merge_join.rs b/datafusion/physical-plan/benches/sort_merge_join.rs new file mode 100644 index 000000000000..e77b40074162 --- /dev/null +++ b/datafusion/physical-plan/benches/sort_merge_join.rs @@ -0,0 +1,97 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_schema::SortOptions; +use criterion::{criterion_group, criterion_main, Criterion}; +use datafusion_common::JoinType::Inner; +use datafusion_execution::config::SessionConfig; +use datafusion_execution::disk_manager::DiskManagerConfig; +use datafusion_execution::runtime_env::RuntimeEnvBuilder; +use datafusion_execution::TaskContext; +use datafusion_physical_expr::expressions::Column; +use datafusion_physical_plan::common::collect; +use datafusion_physical_plan::joins::SortMergeJoinExec; +use datafusion_physical_plan::test::{build_table_i32, TestMemoryExec}; +use datafusion_physical_plan::ExecutionPlan; +use std::sync::Arc; +use tokio::runtime::Runtime; + +fn create_test_data() -> SortMergeJoinExec { + let left_batch = build_table_i32( + ("a1", &vec![0, 1, 2, 3, 4, 5]), + ("b1", &vec![1, 2, 3, 4, 5, 6]), + ("c1", &vec![4, 5, 6, 7, 8, 9]), + ); + let left_schema = left_batch.schema(); + let left = + TestMemoryExec::try_new_exec(&[vec![left_batch]], left_schema, None).unwrap(); + let right_batch = build_table_i32( + ("a2", &vec![0, 10, 20, 30, 40]), + ("b2", &vec![1, 3, 4, 6, 8]), + ("c2", &vec![50, 60, 70, 80, 90]), + ); + let right_schema = right_batch.schema(); + let right = + TestMemoryExec::try_new_exec(&[vec![right_batch]], right_schema, None).unwrap(); + let on = vec![( + Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _, + Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _, + )]; + let sort_options = vec![SortOptions::default(); on.len()]; + + SortMergeJoinExec::try_new(left, right, on, None, Inner, sort_options, false) + .unwrap() +} + +// `cargo bench --bench sort_merge_join` +fn bench_spill(c: &mut Criterion) { + let sort_merge_join_exec = create_test_data(); + + let mut group = c.benchmark_group("sort_merge_join_spill"); + let rt = Runtime::new().unwrap(); + + let runtime = RuntimeEnvBuilder::new() + .with_memory_limit(100, 1.0) // Set memory limit to 100 bytes + .with_disk_manager(DiskManagerConfig::NewOs) // Enable DiskManager to allow spilling + .build_arc() + .unwrap(); + let session_config = SessionConfig::default(); + let task_ctx = Arc::new( + TaskContext::default() + .with_session_config(session_config.clone()) + .with_runtime(Arc::clone(&runtime)), + ); + + group.bench_function("SortMergeJoinExec_spill", |b| { + b.iter(|| { + criterion::black_box( + rt.block_on(async { + let stream = sort_merge_join_exec.execute(0, Arc::clone(&task_ctx)).unwrap(); + collect(stream).await.unwrap() + }) + ) + }) + }); + group.finish(); + + assert!(sort_merge_join_exec.metrics().unwrap().spill_count().unwrap() > 0); + assert!(sort_merge_join_exec.metrics().unwrap().spilled_bytes().unwrap() > 0); + assert!(sort_merge_join_exec.metrics().unwrap().spilled_rows().unwrap() > 0); +} + +criterion_group!(benches, bench_spill); +criterion_main!(benches); diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index a1862554b303..414044978a88 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -92,5 +92,4 @@ pub mod udaf { } pub mod coalesce; -#[cfg(test)] pub mod test; From ee2a6d6c2a1a896bfdea8a3fac350cf8546fcb3d Mon Sep 17 00:00:00 2001 From: Namgung Chan <9511chn@gmail.com> Date: Tue, 6 May 2025 17:16:01 +0900 Subject: [PATCH 3/8] cargo fmt --- .../physical-plan/benches/sort_merge_join.rs | 42 ++++++++++++++----- 1 file changed, 31 insertions(+), 11 deletions(-) diff --git a/datafusion/physical-plan/benches/sort_merge_join.rs b/datafusion/physical-plan/benches/sort_merge_join.rs index e77b40074162..848bcb38acf9 100644 --- a/datafusion/physical-plan/benches/sort_merge_join.rs +++ b/datafusion/physical-plan/benches/sort_merge_join.rs @@ -53,8 +53,7 @@ fn create_test_data() -> SortMergeJoinExec { )]; let sort_options = vec![SortOptions::default(); on.len()]; - SortMergeJoinExec::try_new(left, right, on, None, Inner, sort_options, false) - .unwrap() + SortMergeJoinExec::try_new(left, right, on, None, Inner, sort_options, false).unwrap() } // `cargo bench --bench sort_merge_join` @@ -78,19 +77,40 @@ fn bench_spill(c: &mut Criterion) { group.bench_function("SortMergeJoinExec_spill", |b| { b.iter(|| { - criterion::black_box( - rt.block_on(async { - let stream = sort_merge_join_exec.execute(0, Arc::clone(&task_ctx)).unwrap(); - collect(stream).await.unwrap() - }) - ) + criterion::black_box(rt.block_on(async { + let stream = sort_merge_join_exec + .execute(0, Arc::clone(&task_ctx)) + .unwrap(); + collect(stream).await.unwrap() + })) }) }); group.finish(); - assert!(sort_merge_join_exec.metrics().unwrap().spill_count().unwrap() > 0); - assert!(sort_merge_join_exec.metrics().unwrap().spilled_bytes().unwrap() > 0); - assert!(sort_merge_join_exec.metrics().unwrap().spilled_rows().unwrap() > 0); + assert!( + sort_merge_join_exec + .metrics() + .unwrap() + .spill_count() + .unwrap() + > 0 + ); + assert!( + sort_merge_join_exec + .metrics() + .unwrap() + .spilled_bytes() + .unwrap() + > 0 + ); + assert!( + sort_merge_join_exec + .metrics() + .unwrap() + .spilled_rows() + .unwrap() + > 0 + ); } criterion_group!(benches, bench_spill); From a5bda29905286638f9983ddb37c893d7ff5e7463 Mon Sep 17 00:00:00 2001 From: Namgung Chan <9511chn@gmail.com> Date: Tue, 6 May 2025 17:36:45 +0900 Subject: [PATCH 4/8] cargo fmt --- datafusion/physical-plan/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index 8eebaa4a628d..093d12e1b557 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -89,4 +89,4 @@ name = "spill_io" [[bench]] harness = false -name = "sort_merge_join" \ No newline at end of file +name = "sort_merge_join" From b17f5fdefb2f891649680ea08d6feec23dd3b9f6 Mon Sep 17 00:00:00 2001 From: Namgung Chan <9511chn@gmail.com> Date: Tue, 6 May 2025 23:59:13 +0900 Subject: [PATCH 5/8] move bench --- datafusion/core/Cargo.toml | 4 ++++ datafusion/{physical-plan => core}/benches/sort_merge_join.rs | 1 - datafusion/physical-plan/Cargo.toml | 4 ---- datafusion/physical-plan/src/lib.rs | 1 + 4 files changed, 5 insertions(+), 5 deletions(-) rename datafusion/{physical-plan => core}/benches/sort_merge_join.rs (98%) diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 4b6d8f274932..0ce67ee35625 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -244,3 +244,7 @@ name = "dataframe" [[bench]] harness = false name = "spm" + +[[bench]] +harness = false +name = "sort_merge_join" diff --git a/datafusion/physical-plan/benches/sort_merge_join.rs b/datafusion/core/benches/sort_merge_join.rs similarity index 98% rename from datafusion/physical-plan/benches/sort_merge_join.rs rename to datafusion/core/benches/sort_merge_join.rs index 848bcb38acf9..174e92fe439e 100644 --- a/datafusion/physical-plan/benches/sort_merge_join.rs +++ b/datafusion/core/benches/sort_merge_join.rs @@ -25,7 +25,6 @@ use datafusion_execution::TaskContext; use datafusion_physical_expr::expressions::Column; use datafusion_physical_plan::common::collect; use datafusion_physical_plan::joins::SortMergeJoinExec; -use datafusion_physical_plan::test::{build_table_i32, TestMemoryExec}; use datafusion_physical_plan::ExecutionPlan; use std::sync::Arc; use tokio::runtime::Runtime; diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index 093d12e1b557..5210ee26755c 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -86,7 +86,3 @@ name = "partial_ordering" [[bench]] harness = false name = "spill_io" - -[[bench]] -harness = false -name = "sort_merge_join" diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index 414044978a88..a1862554b303 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -92,4 +92,5 @@ pub mod udaf { } pub mod coalesce; +#[cfg(test)] pub mod test; From 28839e3164f0e0bff42311a5c9627d40b6ffcdce Mon Sep 17 00:00:00 2001 From: Namgung Chan <9511chn@gmail.com> Date: Thu, 8 May 2025 00:42:37 +0900 Subject: [PATCH 6/8] increase sample data size --- datafusion/core/benches/sort_merge_join.rs | 134 ++++++++++----------- 1 file changed, 63 insertions(+), 71 deletions(-) diff --git a/datafusion/core/benches/sort_merge_join.rs b/datafusion/core/benches/sort_merge_join.rs index 174e92fe439e..5d3c7de80c62 100644 --- a/datafusion/core/benches/sort_merge_join.rs +++ b/datafusion/core/benches/sort_merge_join.rs @@ -15,9 +15,11 @@ // specific language governing permissions and limitations // under the License. -use arrow_schema::SortOptions; +use arrow::array::{Int32Array, RecordBatch}; +use arrow_schema::{DataType, Field, Schema, SortOptions}; use criterion::{criterion_group, criterion_main, Criterion}; use datafusion_common::JoinType::Inner; +use datafusion_datasource::memory::MemorySourceConfig; use datafusion_execution::config::SessionConfig; use datafusion_execution::disk_manager::DiskManagerConfig; use datafusion_execution::runtime_env::RuntimeEnvBuilder; @@ -29,88 +31,78 @@ use datafusion_physical_plan::ExecutionPlan; use std::sync::Arc; use tokio::runtime::Runtime; -fn create_test_data() -> SortMergeJoinExec { - let left_batch = build_table_i32( - ("a1", &vec![0, 1, 2, 3, 4, 5]), - ("b1", &vec![1, 2, 3, 4, 5, 6]), - ("c1", &vec![4, 5, 6, 7, 8, 9]), - ); - let left_schema = left_batch.schema(); - let left = - TestMemoryExec::try_new_exec(&[vec![left_batch]], left_schema, None).unwrap(); - let right_batch = build_table_i32( - ("a2", &vec![0, 10, 20, 30, 40]), - ("b2", &vec![1, 3, 4, 6, 8]), - ("c2", &vec![50, 60, 70, 80, 90]), - ); - let right_schema = right_batch.schema(); - let right = - TestMemoryExec::try_new_exec(&[vec![right_batch]], right_schema, None).unwrap(); +fn create_smj_exec(array_len: usize, batch_size: usize) -> SortMergeJoinExec { + // define a schema. + let schema = Arc::new(Schema::new(vec![ + Field::new("a1", DataType::Int32, false), + Field::new("b1", DataType::Int32, false), + Field::new("c1", DataType::Int32, false), + ])); + // define data. + let batches = (0..array_len / batch_size) + .map(|i| { + RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(vec![i as i32; batch_size])), + Arc::new(Int32Array::from(vec![i as i32; batch_size])), + Arc::new(Int32Array::from(vec![i as i32; batch_size])), + ], + ) + .unwrap() + }) + .collect::>(); + let datasource_exec = + MemorySourceConfig::try_new_exec(&vec![batches], Arc::clone(&schema), None) + .unwrap(); + let on = vec![( - Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _, - Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _, + Arc::new(Column::new_with_schema("b1", &schema).unwrap()) as _, + Arc::new(Column::new_with_schema("b1", &schema).unwrap()) as _, )]; let sort_options = vec![SortOptions::default(); on.len()]; - - SortMergeJoinExec::try_new(left, right, on, None, Inner, sort_options, false).unwrap() + SortMergeJoinExec::try_new( + datasource_exec.clone(), + datasource_exec.clone(), + on, + None, + Inner, + sort_options, + false, + ) + .unwrap() } // `cargo bench --bench sort_merge_join` fn bench_spill(c: &mut Criterion) { - let sort_merge_join_exec = create_test_data(); - - let mut group = c.benchmark_group("sort_merge_join_spill"); let rt = Runtime::new().unwrap(); + c.bench_function("SortMergeJoinExec_spill", |b| { + let join_exec = create_smj_exec(1_048_576, 4096); - let runtime = RuntimeEnvBuilder::new() - .with_memory_limit(100, 1.0) // Set memory limit to 100 bytes - .with_disk_manager(DiskManagerConfig::NewOs) // Enable DiskManager to allow spilling - .build_arc() - .unwrap(); - let session_config = SessionConfig::default(); - let task_ctx = Arc::new( - TaskContext::default() - .with_session_config(session_config.clone()) - .with_runtime(Arc::clone(&runtime)), - ); + // create a session context. enable spilling + let runtime_env = RuntimeEnvBuilder::new() + .with_memory_limit(1024 * 1, 1.0) // Set memory limit to 1MB + .with_disk_manager(DiskManagerConfig::NewOs) // Enable DiskManager to allow spilling + .build_arc() + .unwrap(); + let task_ctx = Arc::new( + TaskContext::default() + .with_session_config(SessionConfig::new()) + .with_runtime(Arc::clone(&runtime_env)), + ); - group.bench_function("SortMergeJoinExec_spill", |b| { b.iter(|| { - criterion::black_box(rt.block_on(async { - let stream = sort_merge_join_exec - .execute(0, Arc::clone(&task_ctx)) - .unwrap(); - collect(stream).await.unwrap() - })) - }) + let stream = join_exec.execute(0, Arc::clone(&task_ctx)).unwrap(); + criterion::black_box(rt.block_on(collect(stream))).unwrap(); + }); + // check if spilling happened + assert!(join_exec.metrics().unwrap().spilled_rows().unwrap() > 0); }); - group.finish(); - - assert!( - sort_merge_join_exec - .metrics() - .unwrap() - .spill_count() - .unwrap() - > 0 - ); - assert!( - sort_merge_join_exec - .metrics() - .unwrap() - .spilled_bytes() - .unwrap() - > 0 - ); - assert!( - sort_merge_join_exec - .metrics() - .unwrap() - .spilled_rows() - .unwrap() - > 0 - ); } -criterion_group!(benches, bench_spill); +criterion_group!( + name = benches; + config = Criterion::default().sample_size(10); + targets = bench_spill +); criterion_main!(benches); From dab92e12e94bdbd88fe55e8289744a0249a00d97 Mon Sep 17 00:00:00 2001 From: Namgung Chan <9511chn@gmail.com> Date: Thu, 8 May 2025 01:00:10 +0900 Subject: [PATCH 7/8] clippy --- datafusion/core/benches/sort_merge_join.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/core/benches/sort_merge_join.rs b/datafusion/core/benches/sort_merge_join.rs index 5d3c7de80c62..a18e7e9e94d0 100644 --- a/datafusion/core/benches/sort_merge_join.rs +++ b/datafusion/core/benches/sort_merge_join.rs @@ -53,7 +53,7 @@ fn create_smj_exec(array_len: usize, batch_size: usize) -> SortMergeJoinExec { }) .collect::>(); let datasource_exec = - MemorySourceConfig::try_new_exec(&vec![batches], Arc::clone(&schema), None) + MemorySourceConfig::try_new_exec(&[batches], Arc::clone(&schema), None) .unwrap(); let on = vec![( @@ -81,7 +81,7 @@ fn bench_spill(c: &mut Criterion) { // create a session context. enable spilling let runtime_env = RuntimeEnvBuilder::new() - .with_memory_limit(1024 * 1, 1.0) // Set memory limit to 1MB + .with_memory_limit(1024, 1.0) // Set memory limit to 1KB .with_disk_manager(DiskManagerConfig::NewOs) // Enable DiskManager to allow spilling .build_arc() .unwrap(); From 0423d1928a380381b71a65e1c739a6e384989721 Mon Sep 17 00:00:00 2001 From: Namgung Chan <9511chn@gmail.com> Date: Thu, 8 May 2025 19:10:58 +0900 Subject: [PATCH 8/8] fmt --- datafusion/core/benches/sort_merge_join.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/datafusion/core/benches/sort_merge_join.rs b/datafusion/core/benches/sort_merge_join.rs index a18e7e9e94d0..be891f9c0b7d 100644 --- a/datafusion/core/benches/sort_merge_join.rs +++ b/datafusion/core/benches/sort_merge_join.rs @@ -53,8 +53,7 @@ fn create_smj_exec(array_len: usize, batch_size: usize) -> SortMergeJoinExec { }) .collect::>(); let datasource_exec = - MemorySourceConfig::try_new_exec(&[batches], Arc::clone(&schema), None) - .unwrap(); + MemorySourceConfig::try_new_exec(&[batches], Arc::clone(&schema), None).unwrap(); let on = vec![( Arc::new(Column::new_with_schema("b1", &schema).unwrap()) as _,