Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions datafusion/physical-plan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ name = "spill_io"
harness = false
name = "sort_preserving_merge"

[[bench]]
harness = false
name = "sort_merge_join"
required-features = ["test_utils"]

[[bench]]
harness = false
name = "aggregate_vectorized"
Expand Down
204 changes: 204 additions & 0 deletions datafusion/physical-plan/benches/sort_merge_join.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Criterion benchmarks for Sort Merge Join
//!
//! These benchmarks measure the join kernel in isolation by feeding
//! pre-sorted RecordBatches directly into SortMergeJoinExec, avoiding
//! sort / scan overhead.

use std::sync::Arc;

use arrow::array::{Int64Array, RecordBatch, StringArray};
use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
use datafusion_common::NullEquality;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::expressions::col;
use datafusion_physical_plan::collect;
use datafusion_physical_plan::joins::{SortMergeJoinExec, utils::JoinOn};
use datafusion_physical_plan::test::TestMemoryExec;
use tokio::runtime::Runtime;

/// Build pre-sorted RecordBatches (split into ~8192-row chunks).
///
/// Schema: (key: Int64, data: Int64, payload: Utf8)
///
/// `key_mod` controls distinct key count: key = row_index % key_mod.
fn build_sorted_batches(
num_rows: usize,
key_mod: usize,
schema: &SchemaRef,
) -> Vec<RecordBatch> {
let mut rows: Vec<(i64, i64)> = (0..num_rows)
.map(|i| ((i % key_mod) as i64, i as i64))
.collect();
rows.sort();

let keys: Vec<i64> = rows.iter().map(|(k, _)| *k).collect();
let data: Vec<i64> = rows.iter().map(|(_, d)| *d).collect();
let payload: Vec<String> = data.iter().map(|d| format!("val_{d}")).collect();

let batch = RecordBatch::try_new(
Arc::clone(schema),
vec![
Arc::new(Int64Array::from(keys)),
Arc::new(Int64Array::from(data)),
Arc::new(StringArray::from(payload)),
],
)
.unwrap();

let batch_size = 8192;
let mut batches = Vec::new();
let mut offset = 0;
while offset < batch.num_rows() {
let len = (batch.num_rows() - offset).min(batch_size);
batches.push(batch.slice(offset, len));
offset += len;
}
batches
}

fn make_exec(
batches: &[RecordBatch],
schema: &SchemaRef,
) -> Arc<dyn datafusion_physical_plan::ExecutionPlan> {
TestMemoryExec::try_new_exec(&[batches.to_vec()], Arc::clone(schema), None).unwrap()
}

fn schema() -> SchemaRef {
Arc::new(Schema::new(vec![
Field::new("key", DataType::Int64, false),
Field::new("data", DataType::Int64, false),
Field::new("payload", DataType::Utf8, false),
]))
}

fn do_join(
left: Arc<dyn datafusion_physical_plan::ExecutionPlan>,
right: Arc<dyn datafusion_physical_plan::ExecutionPlan>,
join_type: datafusion_common::JoinType,
rt: &Runtime,
) -> usize {
let on: JoinOn = vec![(
col("key", &left.schema()).unwrap(),
col("key", &right.schema()).unwrap(),
)];
let join = SortMergeJoinExec::try_new(
left,
right,
on,
None,
join_type,
vec![SortOptions::default()],
NullEquality::NullEqualsNothing,
)
.unwrap();

let task_ctx = Arc::new(TaskContext::default());
rt.block_on(async {
let batches = collect(Arc::new(join), task_ctx).await.unwrap();
batches.iter().map(|b| b.num_rows()).sum()
})
}

fn bench_smj(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let s = schema();

let mut group = c.benchmark_group("sort_merge_join");

// 1:1 Inner Join — 100K rows each, unique keys
// Best case for contiguous-range optimization: every index array is [0,1,2,...].
{
let n = 100_000;
let left_batches = build_sorted_batches(n, n, &s);
let right_batches = build_sorted_batches(n, n, &s);
group.bench_function(BenchmarkId::new("inner_1to1", n), |b| {
b.iter(|| {
let left = make_exec(&left_batches, &s);
let right = make_exec(&right_batches, &s);
do_join(left, right, datafusion_common::JoinType::Inner, &rt)
})
});
}

// 1:10 Inner Join — 100K left, 100K right, 10K distinct keys
{
let n = 100_000;
let key_mod = 10_000;
let left_batches = build_sorted_batches(n, key_mod, &s);
let right_batches = build_sorted_batches(n, key_mod, &s);
group.bench_function(BenchmarkId::new("inner_1to10", n), |b| {
b.iter(|| {
let left = make_exec(&left_batches, &s);
let right = make_exec(&right_batches, &s);
do_join(left, right, datafusion_common::JoinType::Inner, &rt)
})
});
}

// Left Join — 100K each, ~5% unmatched on left
{
let n = 100_000;
let left_batches = build_sorted_batches(n, n + n / 20, &s);
let right_batches = build_sorted_batches(n, n, &s);
group.bench_function(BenchmarkId::new("left_1to1_unmatched", n), |b| {
b.iter(|| {
let left = make_exec(&left_batches, &s);
let right = make_exec(&right_batches, &s);
do_join(left, right, datafusion_common::JoinType::Left, &rt)
})
});
}

// Left Semi Join — 100K left, 100K right, 10K keys
{
let n = 100_000;
let key_mod = 10_000;
let left_batches = build_sorted_batches(n, key_mod, &s);
let right_batches = build_sorted_batches(n, key_mod, &s);
group.bench_function(BenchmarkId::new("left_semi_1to10", n), |b| {
b.iter(|| {
let left = make_exec(&left_batches, &s);
let right = make_exec(&right_batches, &s);
do_join(left, right, datafusion_common::JoinType::LeftSemi, &rt)
})
});
}

// Left Anti Join — 100K left, 100K right, partial match
{
let n = 100_000;
let left_batches = build_sorted_batches(n, n + n / 5, &s);
let right_batches = build_sorted_batches(n, n, &s);
group.bench_function(BenchmarkId::new("left_anti_partial", n), |b| {
b.iter(|| {
let left = make_exec(&left_batches, &s);
let right = make_exec(&right_batches, &s);
do_join(left, right, datafusion_common::JoinType::LeftAnti, &rt)
})
});
}

group.finish();
}

criterion_group!(benches, bench_smj);
criterion_main!(benches);
70 changes: 57 additions & 13 deletions datafusion/physical-plan/src/joins/sort_merge_join/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1490,13 +1490,24 @@ impl SortMergeJoinStream {
continue;
}

let mut left_columns = self
.streamed_batch
.batch
.columns()
.iter()
.map(|column| take(column, &left_indices, None))
.collect::<Result<Vec<_>, ArrowError>>()?;
let mut left_columns = if let Some(range) = is_contiguous_range(&left_indices)
{
// When indices form a contiguous range (common for the streamed
// side which advances sequentially), use zero-copy slice instead
// of the O(n) take kernel.
self.streamed_batch
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great idea - it is something that could be done for probe side of hash join as well.

.batch
.slice(range.start, range.len())
.columns()
.to_vec()
} else {
self.streamed_batch
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.batch
.columns()
.iter()
.map(|column| take(column, &left_indices, None))
.collect::<Result<Vec<_>, ArrowError>>()?
};

// The row indices of joined buffered batch
let right_indices: UInt64Array = chunk.buffered_indices.finish();
Expand Down Expand Up @@ -1972,6 +1983,30 @@ fn produce_buffered_null_batch(
)?))
}

/// Checks if a `UInt64Array` contains a contiguous ascending range (e.g. [3,4,5,6]).
/// Returns `Some(start..start+len)` if so, `None` otherwise.
/// This allows replacing an O(n) `take` with an O(1) `slice`.
#[inline]
fn is_contiguous_range(indices: &UInt64Array) -> Option<Range<usize>> {
if indices.is_empty() || indices.null_count() > 0 {
return None;
}
let start = indices.value(0);
let len = indices.len() as u64;
// Quick rejection: if last element doesn't match expected, not contiguous
if indices.value(indices.len() - 1) != start + len - 1 {
return None;
}
// Verify every element is sequential (handles duplicates and gaps)
let values = indices.values();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could move the let values = indices.values() to the top, so you can use values[i] indexing everywhere.

for i in 1..values.len() {
if values[i] != start + i as u64 {
return None;
}
}
Some(start as usize..(start + len) as usize)
}

/// Get `buffered_indices` rows for `buffered_data[buffered_batch_idx]` by specific column indices
#[inline(always)]
fn fetch_right_columns_by_idxs(
Expand All @@ -1992,12 +2027,21 @@ fn fetch_right_columns_from_batch_by_idxs(
) -> Result<Vec<ArrayRef>> {
match &buffered_batch.batch {
// In memory batch
BufferedBatchState::InMemory(batch) => Ok(batch
.columns()
.iter()
.map(|column| take(column, &buffered_indices, None))
.collect::<Result<Vec<_>, ArrowError>>()
.map_err(Into::<DataFusionError>::into)?),
// In memory batch
BufferedBatchState::InMemory(batch) => {
// When indices form a contiguous range (common in SMJ since the
// buffered side is scanned sequentially), use zero-copy slice.
if let Some(range) = is_contiguous_range(buffered_indices) {
Ok(batch.slice(range.start, range.len()).columns().to_vec())
} else {
Ok(batch
.columns()
.iter()
.map(|column| take(column, buffered_indices, None))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.collect::<Result<Vec<_>, ArrowError>>()
.map_err(Into::<DataFusionError>::into)?)
}
}
// If the batch was spilled to disk, less likely
BufferedBatchState::Spilled(spill_file) => {
let mut buffered_cols: Vec<ArrayRef> =
Expand Down
Loading