Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions datafusion/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,12 @@ pub use error::{
// The HashMap and HashSet implementations that should be used as the uniform defaults
pub type HashMap<K, V, S = DefaultHashBuilder> = hashbrown::HashMap<K, V, S>;
pub type HashSet<T, S = DefaultHashBuilder> = hashbrown::HashSet<T, S>;
pub mod hash_map {
pub use hashbrown::hash_map::Entry;
}
pub mod hash_set {
pub use hashbrown::hash_set::Entry;
}
Comment on lines +111 to +116
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this to avoid adding an explicit hashbrown dependency to functions-aggregate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the datafusion_common::HashMap::entry API was not usable without adding an explicit dependency on the hashbrown crate, at which point there is little benefit over using hashbrown::HashMap directly

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I guess it makes sense given we already export hashbrown::HashMap here already 👍


/// Downcast an Arrow Array to a concrete type, return an `DataFusionError::Internal` if the cast is
/// not possible. In normal usage of DataFusion the downcast should always succeed.
Expand Down
4 changes: 4 additions & 0 deletions datafusion/functions-aggregate/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,7 @@ harness = false
[[bench]]
name = "array_agg"
harness = false

[[bench]]
harness = false
name = "min_max_bytes"
92 changes: 92 additions & 0 deletions datafusion/functions-aggregate/benches/min_max_bytes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// A minimal benchmark of the min_max accumulator for byte-like data types.
//
// The benchmark simulates the insertion of NUM_BATCHES batches into an aggregation,
// where every row belongs to a distinct group. The data generated beforehand to
// ensure that (mostly) the cost of the update_batch method is measured.
//
// The throughput value describes the rows per second that are ingested.

use std::sync::Arc;

use arrow::{
array::{ArrayRef, StringArray},
datatypes::{DataType, Field, Schema},
};
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use datafusion_expr::{function::AccumulatorArgs, GroupsAccumulator};
use datafusion_functions_aggregate::min_max;
use datafusion_physical_expr::expressions::col;

const BATCH_SIZE: usize = 8192;

fn create_max_bytes_accumulator() -> Box<dyn GroupsAccumulator> {
let input_schema =
Arc::new(Schema::new(vec![Field::new("value", DataType::Utf8, true)]));

let max = min_max::max_udaf();
max.create_groups_accumulator(AccumulatorArgs {
return_field: Arc::new(Field::new("value", DataType::Utf8, true)),
schema: &input_schema,
ignore_nulls: true,
order_bys: &[],
is_reversed: false,
name: "max_utf8",
is_distinct: true,
exprs: &[col("value", &input_schema).unwrap()],
})
.unwrap()
}

fn bench_min_max_bytes(c: &mut Criterion) {
let mut group = c.benchmark_group("min_max_bytes");

for num_batches in [10, 20, 50, 100, 150, 200, 300, 400, 500] {
let id = BenchmarkId::from_parameter(num_batches);
group.throughput(Throughput::Elements((num_batches * BATCH_SIZE) as u64));
group.bench_with_input(id, &num_batches, |bencher, num_batches| {
bencher.iter_with_large_drop(|| {
let mut accumulator = create_max_bytes_accumulator();
let mut group_indices = Vec::with_capacity(BATCH_SIZE);
let strings: ArrayRef = Arc::new(StringArray::from_iter_values(
(0..BATCH_SIZE).map(|i| i.to_string()),
));

for batch_idx in 0..*num_batches {
group_indices.clear();
group_indices
.extend((batch_idx * BATCH_SIZE)..(batch_idx + 1) * BATCH_SIZE);
let total_num_groups = (batch_idx + 1) * BATCH_SIZE;

accumulator
.update_batch(
&[Arc::clone(&strings)],
&group_indices,
None,
total_num_groups,
)
.unwrap()
}
});
});
}
}

criterion_group!(benches, bench_min_max_bytes);
criterion_main!(benches);
50 changes: 20 additions & 30 deletions datafusion/functions-aggregate/src/min_max/min_max_bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ use arrow::array::{
LargeBinaryBuilder, LargeStringBuilder, StringBuilder, StringViewBuilder,
};
use arrow::datatypes::DataType;
use datafusion_common::{internal_err, Result};
use datafusion_common::hash_map::Entry;
use datafusion_common::{internal_err, HashMap, Result};
use datafusion_expr::{EmitTo, GroupsAccumulator};
use datafusion_functions_aggregate_common::aggregate::groups_accumulator::nulls::apply_filter_as_nulls;
use std::mem::size_of;
Expand Down Expand Up @@ -391,14 +392,6 @@ struct MinMaxBytesState {
total_data_bytes: usize,
}

#[derive(Debug, Clone, Copy)]
enum MinMaxLocation<'a> {
/// the min/max value is stored in the existing `min_max` array
ExistingMinMax,
/// the min/max value is stored in the input array at the given index
Input(&'a [u8]),
}

/// Implement the MinMaxBytesAccumulator with a comparison function
/// for comparing strings
impl MinMaxBytesState {
Expand Down Expand Up @@ -450,7 +443,7 @@ impl MinMaxBytesState {
// Minimize value copies by calculating the new min/maxes for each group
// in this batch (either the existing min/max or the new input value)
// and updating the owned values in `self.min_maxes` at most once
let mut locations = vec![MinMaxLocation::ExistingMinMax; total_num_groups];
let mut locations = HashMap::<usize, &[u8]>::with_capacity(group_indices.len());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i tried a few different thing to avoid having to allocate a HashMap -- like ideally we could at least reuse the allocation from invocation to invocation

However, as it is setup now it has a slice into the input array so any structure ends up with a lifetime tied to the input, which I couldn't figure out how to reuse across calls to different input 🤔


// Figure out the new min value for each group
for (new_val, group_index) in iter.into_iter().zip(group_indices.iter()) {
Expand All @@ -459,32 +452,29 @@ impl MinMaxBytesState {
continue; // skip nulls
};

let existing_val = match locations[group_index] {
// previous input value was the min/max, so compare it
MinMaxLocation::Input(existing_val) => existing_val,
MinMaxLocation::ExistingMinMax => {
let Some(existing_val) = self.min_max[group_index].as_ref() else {
// no existing min/max, so this is the new min/max
locations[group_index] = MinMaxLocation::Input(new_val);
continue;
};
existing_val.as_ref()
match locations.entry(group_index) {
Entry::Occupied(mut occupied_entry) => {
if cmp(new_val, occupied_entry.get()) {
occupied_entry.insert(new_val);
}
}
Entry::Vacant(vacant_entry) => {
if let Some(old_val) = self.min_max[group_index].as_ref() {
if cmp(new_val, old_val) {
vacant_entry.insert(new_val);
}
} else {
vacant_entry.insert(new_val);
}
}
};

// Compare the new value to the existing value, replacing if necessary
if cmp(new_val, existing_val) {
locations[group_index] = MinMaxLocation::Input(new_val);
}
}

// Update self.min_max with any new min/max values we found in the input
for (group_index, location) in locations.iter().enumerate() {
match location {
MinMaxLocation::ExistingMinMax => {}
MinMaxLocation::Input(new_val) => self.set_value(group_index, new_val),
}
for (group_index, location) in locations.iter() {
self.set_value(*group_index, location);
}

Ok(())
}

Expand Down