Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 96 additions & 4 deletions datafusion/expr-common/src/columnar_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,12 @@ impl ColumnarValue {
}
}

/// Convert a columnar value into an Arrow [`ArrayRef`] with the specified
/// number of rows. [`Self::Scalar`] is converted by repeating the same
/// scalar multiple times which is not as efficient as handling the scalar
/// directly.
/// Convert any [`Self::Scalar`] into an Arrow [`ArrayRef`] with the specified
/// number of rows by repeating the same scalar multiple times,
/// which is not as efficient as handling the scalar directly.
/// [`Self::Array`] will just be returned as is.
///
/// See [`Self::into_array_of_size`] if you need to validate the length of the output array.
///
/// See [`Self::values_to_arrays`] to convert multiple columnar values into
/// arrays of the same length.
Expand All @@ -135,6 +137,38 @@ impl ColumnarValue {
/// number of rows. [`Self::Scalar`] is converted by repeating the same
/// scalar multiple times which is not as efficient as handling the scalar
/// directly.
/// This validates that if this is [`Self::Array`], it has the expected length.
///
/// See [`Self::values_to_arrays`] to convert multiple columnar values into
/// arrays of the same length.
///
/// # Errors
///
/// Errors if `self` is a Scalar that fails to be converted into an array of size or
/// if the array length does not match the expected length
pub fn into_array_of_size(self, num_rows: usize) -> Result<ArrayRef> {
match self {
ColumnarValue::Array(array) => {
if array.len() == num_rows {
Ok(array)
} else {
internal_err!(
"Array length {} does not match expected length {}",
array.len(),
num_rows
)
}
}
ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows),
}
}

/// Convert any [`Self::Scalar`] into an Arrow [`ArrayRef`] with the specified
/// number of rows by repeating the same scalar multiple times,
/// which is not as efficient as handling the scalar directly.
/// [`Self::Array`] will just be returned as is.
///
/// See [`Self::to_array_of_size`] if you need to validate the length of the output array.
///
/// See [`Self::values_to_arrays`] to convert multiple columnar values into
/// arrays of the same length.
Expand All @@ -149,6 +183,36 @@ impl ColumnarValue {
})
}

/// Convert a columnar value into an Arrow [`ArrayRef`] with the specified
/// number of rows. [`Self::Scalar`] is converted by repeating the same
/// scalar multiple times which is not as efficient as handling the scalar
/// directly.
/// This validates that if this is [`Self::Array`], it has the expected length.
///
/// See [`Self::values_to_arrays`] to convert multiple columnar values into
/// arrays of the same length.
///
/// # Errors
///
/// Errors if `self` is a Scalar that fails to be converted into an array of size or
/// if the array length does not match the expected length
pub fn to_array_of_size(&self, num_rows: usize) -> Result<ArrayRef> {
match self {
ColumnarValue::Array(array) => {
if array.len() == num_rows {
Ok(Arc::clone(array))
} else {
internal_err!(
"Array length {} does not match expected length {}",
array.len(),
num_rows
)
}
}
ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows),
}
}

/// Null columnar values are implemented as a null array in order to pass batch
/// num_rows
pub fn create_null_array(num_rows: usize) -> Self {
Expand Down Expand Up @@ -249,6 +313,34 @@ mod tests {
use super::*;
use arrow::array::Int32Array;

#[test]
fn into_array_of_size() {
// Array case
let arr = make_array(1, 3);
let arr_columnar_value = ColumnarValue::Array(Arc::clone(&arr));
assert_eq!(&arr_columnar_value.into_array_of_size(3).unwrap(), &arr);

// Scalar case
let scalar_columnar_value = ColumnarValue::Scalar(ScalarValue::Int32(Some(42)));
let expected_array = make_array(42, 100);
assert_eq!(
&scalar_columnar_value.into_array_of_size(100).unwrap(),
&expected_array
);

// Array case with wrong size
let arr = make_array(1, 3);
let arr_columnar_value = ColumnarValue::Array(Arc::clone(&arr));
let result = arr_columnar_value.into_array_of_size(5);
let err = result.unwrap_err();
assert!(
err.to_string().starts_with(
"Internal error: Array length 3 does not match expected length 5"
),
"Found: {err}"
);
}

#[test]
fn values_to_arrays() {
// (input, expected)
Expand Down
21 changes: 21 additions & 0 deletions datafusion/physical-expr-common/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::tree_node::ExprContext;

use arrow::array::{make_array, Array, ArrayRef, BooleanArray, MutableArrayData};
use arrow::compute::{and_kleene, is_not_null, SlicesIterator};
use arrow::record_batch::RecordBatch;
use datafusion_common::Result;
use datafusion_expr_common::sort_properties::ExprProperties;

Expand Down Expand Up @@ -91,6 +92,26 @@ pub fn scatter(mask: &BooleanArray, truthy: &dyn Array) -> Result<ArrayRef> {
Ok(make_array(data))
}

/// Evaluates expressions against a record batch.
/// This will convert the resulting ColumnarValues to ArrayRefs,
/// duplicating any ScalarValues that may have been returned,
/// and validating that the returned arrays all have the same
/// number of rows as the input batch.
#[inline]
pub fn evaluate_expressions_to_arrays<'a>(
exprs: impl IntoIterator<Item = &'a Arc<dyn PhysicalExpr>>,
batch: &RecordBatch,
) -> Result<Vec<ArrayRef>> {
let num_rows = batch.num_rows();
exprs
.into_iter()
.map(|e| {
e.evaluate(batch)
.and_then(|col| col.into_array_of_size(num_rows))
})
.collect::<Result<Vec<ArrayRef>>>()
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use arrow::record_batch::RecordBatch;
use datafusion_common::Result;
use datafusion_expr::{LimitEffect, PartitionEvaluator};

use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays;
use std::any::Any;
use std::sync::Arc;

Expand Down Expand Up @@ -57,13 +58,7 @@ pub trait StandardWindowFunctionExpr: Send + Sync + std::fmt::Debug {
///
/// Typically, the resulting vector is a single element vector.
fn evaluate_args(&self, batch: &RecordBatch) -> Result<Vec<ArrayRef>> {
self.expressions()
.iter()
.map(|e| {
e.evaluate(batch)
.and_then(|v| v.into_array(batch.num_rows()))
})
.collect()
evaluate_expressions_to_arrays(&self.expressions(), batch)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😍

}

/// Create a [`PartitionEvaluator`] for evaluating the function on
Expand Down
9 changes: 2 additions & 7 deletions datafusion/physical-expr/src/window/window_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use datafusion_expr::window_state::{
use datafusion_expr::{Accumulator, PartitionEvaluator, WindowFrame, WindowFrameBound};
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;

use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays;
use indexmap::IndexMap;

/// Common trait for [window function] implementations
Expand Down Expand Up @@ -90,13 +91,7 @@ pub trait WindowExpr: Send + Sync + Debug {
/// Evaluate the window function arguments against the batch and return
/// array ref, normally the resulting `Vec` is a single element one.
fn evaluate_args(&self, batch: &RecordBatch) -> Result<Vec<ArrayRef>> {
self.expressions()
.iter()
.map(|e| {
e.evaluate(batch)
.and_then(|v| v.into_array(batch.num_rows()))
})
.collect()
evaluate_expressions_to_arrays(&self.expressions(), batch)
}

/// Evaluate the window function values against the batch
Expand Down
45 changes: 13 additions & 32 deletions datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ use datafusion_physical_expr_common::sort_expr::{
};

use datafusion_expr::utils::AggregateOrderSensitivity;
use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays;
use itertools::Itertools;

pub mod group_values;
Expand Down Expand Up @@ -1434,25 +1435,14 @@ pub fn finalize_aggregation(
}
}

/// Evaluates expressions against a record batch.
fn evaluate(
expr: &[Arc<dyn PhysicalExpr>],
batch: &RecordBatch,
) -> Result<Vec<ArrayRef>> {
expr.iter()
.map(|expr| {
expr.evaluate(batch)
.and_then(|v| v.into_array(batch.num_rows()))
})
.collect()
}

/// Evaluates expressions against a record batch.
/// Evaluates groups of expressions against a record batch.
pub fn evaluate_many(
expr: &[Vec<Arc<dyn PhysicalExpr>>],
batch: &RecordBatch,
) -> Result<Vec<Vec<ArrayRef>>> {
expr.iter().map(|expr| evaluate(expr, batch)).collect()
expr.iter()
.map(|expr| evaluate_expressions_to_arrays(expr, batch))
.collect()
}

fn evaluate_optional(
Expand Down Expand Up @@ -1506,23 +1496,14 @@ pub fn evaluate_group_by(
group_by: &PhysicalGroupBy,
batch: &RecordBatch,
) -> Result<Vec<Vec<ArrayRef>>> {
let exprs: Vec<ArrayRef> = group_by
.expr
.iter()
.map(|(expr, _)| {
let value = expr.evaluate(batch)?;
value.into_array(batch.num_rows())
})
.collect::<Result<Vec<_>>>()?;

let null_exprs: Vec<ArrayRef> = group_by
.null_expr
.iter()
.map(|(expr, _)| {
let value = expr.evaluate(batch)?;
value.into_array(batch.num_rows())
})
.collect::<Result<Vec<_>>>()?;
let exprs = evaluate_expressions_to_arrays(
group_by.expr.iter().map(|(expr, _)| expr),
batch,
)?;
let null_exprs = evaluate_expressions_to_arrays(
group_by.null_expr.iter().map(|(expr, _)| expr),
batch,
)?;

group_by
.groups
Expand Down
11 changes: 3 additions & 8 deletions datafusion/physical-plan/src/aggregates/no_grouping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ use std::borrow::Cow;
use std::sync::Arc;
use std::task::{Context, Poll};

use super::AggregateExec;
use crate::filter::batch_filter;
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays;
use futures::stream::{Stream, StreamExt};

use super::AggregateExec;

/// stream struct for aggregation without grouping columns
pub(crate) struct AggregateStream {
stream: BoxStream<'static, Result<RecordBatch>>,
Expand Down Expand Up @@ -219,13 +219,8 @@ fn aggregate_batch(
None => Cow::Borrowed(&batch),
};

let n_rows = batch.num_rows();

// 1.3
let values = expr
.iter()
.map(|e| e.evaluate(&batch).and_then(|v| v.into_array(n_rows)))
.collect::<Result<Vec<_>>>()?;
let values = evaluate_expressions_to_arrays(expr, batch.as_ref())?;

// 1.4
let size_pre = accum.size();
Expand Down
9 changes: 2 additions & 7 deletions datafusion/physical-plan/src/joins/hash_join/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef};

use ahash::RandomState;
use datafusion_physical_expr_common::physical_expr::fmt_sql;
use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays;
use futures::TryStreamExt;
use parking_lot::Mutex;

Expand Down Expand Up @@ -1465,13 +1466,7 @@ async fn collect_left_input(
BooleanBufferBuilder::new(0)
};

let left_values = on_left
.iter()
.map(|c| {
c.evaluate(&single_batch)?
.into_array(single_batch.num_rows())
})
.collect::<Result<Vec<_>>>()?;
let left_values = evaluate_expressions_to_arrays(&on_left, &single_batch)?;

// Compute bounds for dynamic filter if enabled
let bounds = match bounds_accumulators {
Expand Down
7 changes: 2 additions & 5 deletions datafusion/physical-plan/src/joins/hash_join/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use datafusion_common::{
use datafusion_physical_expr::PhysicalExprRef;

use ahash::RandomState;
use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays;
use futures::{ready, Stream, StreamExt};

/// Represents build-side of hash join.
Expand Down Expand Up @@ -447,11 +448,7 @@ impl HashJoinStream {
}
Some(Ok(batch)) => {
// Precalculate hash values for fetched batch
let keys_values = self
.on_right
.iter()
.map(|c| c.evaluate(&batch)?.into_array(batch.num_rows()))
.collect::<Result<Vec<_>>>()?;
let keys_values = evaluate_expressions_to_arrays(&self.on_right, &batch)?;

self.hashes_buffer.clear();
self.hashes_buffer.resize(batch.num_rows(), 0);
Expand Down
11 changes: 3 additions & 8 deletions datafusion/physical-plan/src/joins/symmetric_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ use datafusion_physical_expr_common::physical_expr::{fmt_sql, PhysicalExprRef};
use datafusion_physical_expr_common::sort_expr::{LexOrdering, OrderingRequirements};

use ahash::RandomState;
use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays;
use futures::{ready, Stream, StreamExt};
use parking_lot::Mutex;

Expand Down Expand Up @@ -1065,14 +1066,8 @@ fn lookup_join_hashmap(
hashes_buffer: &mut Vec<u64>,
deleted_offset: Option<usize>,
) -> Result<(UInt64Array, UInt32Array)> {
let keys_values = probe_on
.iter()
.map(|c| c.evaluate(probe_batch)?.into_array(probe_batch.num_rows()))
.collect::<Result<Vec<_>>>()?;
let build_join_values = build_on
.iter()
.map(|c| c.evaluate(build_batch)?.into_array(build_batch.num_rows()))
.collect::<Result<Vec<_>>>()?;
let keys_values = evaluate_expressions_to_arrays(probe_on, probe_batch)?;
let build_join_values = evaluate_expressions_to_arrays(build_on, build_batch)?;

hashes_buffer.clear();
hashes_buffer.resize(probe_batch.num_rows(), 0);
Expand Down
6 changes: 2 additions & 4 deletions datafusion/physical-plan/src/joins/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ use datafusion_physical_expr::{
};

use datafusion_physical_expr_common::datum::compare_op_for_nested;
use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays;
use futures::future::{BoxFuture, Shared};
use futures::{ready, FutureExt};
use parking_lot::Mutex;
Expand Down Expand Up @@ -1668,10 +1669,7 @@ pub fn update_hash(
fifo_hashmap: bool,
) -> Result<()> {
// evaluate the keys
let keys_values = on
.iter()
.map(|c| c.evaluate(batch)?.into_array(batch.num_rows()))
.collect::<Result<Vec<_>>>()?;
let keys_values = evaluate_expressions_to_arrays(on, batch)?;

// calculate the hash values
let hash_values = create_hashes(&keys_values, random_state, hashes_buffer)?;
Expand Down
Loading