Skip to content
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
228 changes: 214 additions & 14 deletions datafusion/physical-expr/src/expressions/in_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::physical_expr::physical_exprs_bag_equal;
use arrow::array::*;
use arrow::buffer::{BooleanBuffer, NullBuffer};
use arrow::compute::kernels::boolean::{not, or_kleene};
use arrow::compute::kernels::cmp::eq as arrow_eq;
use arrow::compute::{SortOptions, take};
use arrow::datatypes::*;
use arrow::util::bit_iterator::BitIndexIterator;
Expand Down Expand Up @@ -771,32 +772,43 @@ impl PhysicalExpr for InListExpr {
}
}
None => {
// No static filter: iterate through each expression, compare, and OR results
// No static filter: iterate through each expression, compare, and OR results.
// Use Arrow's vectorized eq kernel for primitive/string/binary types,
// falling back to row-by-row comparator for nested types (Struct, List, etc.)
// where eq semantics are ambiguous.
let value = value.into_array(num_rows)?;
let use_arrow_eq = !value.data_type().is_nested();

@neilconway neilconway Feb 25, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we know for sure that Arrow's eq kernel will work for all non-nested types? Perhaps that would be a bit fragile if we add new types in the future. I wonder if we should explicitly whitelist the types we know that work?

Digging around a bit, it seems we panic if we try to pass RunEndEncoded types to eq, but REE types aren't considered nested.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the great catch! I've replaced the !is_nested() check with an explicit whitelist (supports_arrow_eq) that only enables Arrow's eq kernel for known-supported types. Please take another look when you have a chance.

let found = self.list.iter().map(|expr| expr.evaluate(batch)).try_fold(
BooleanArray::new(BooleanBuffer::new_unset(num_rows), None),
|result, expr| -> Result<BooleanArray> {
let rhs = match expr? {
ColumnarValue::Array(array) => {
let cmp = make_comparator(
value.as_ref(),
array.as_ref(),
SortOptions::default(),
)?;
(0..num_rows)
.map(|i| {
if value.is_null(i) || array.is_null(i) {
return None;
}
Some(cmp(i, i).is_eq())
})
.collect::<BooleanArray>()
if use_arrow_eq {
arrow_eq(&value, &array)?
} else {
let cmp = make_comparator(
value.as_ref(),
array.as_ref(),
SortOptions::default(),
)?;
(0..num_rows)
.map(|i| {
if value.is_null(i) || array.is_null(i) {
return None;
}
Some(cmp(i, i).is_eq())
})
.collect::<BooleanArray>()
}
}
ColumnarValue::Scalar(scalar) => {
// Check if scalar is null once, before the loop
if scalar.is_null() {
// If scalar is null, all comparisons return null
BooleanArray::from(vec![None; num_rows])
} else if use_arrow_eq {
let scalar_datum = scalar.to_scalar()?;
arrow_eq(&value, &scalar_datum)?
} else {
// Convert scalar to 1-element array
let array = scalar.to_array()?;
Expand Down Expand Up @@ -3507,4 +3519,192 @@ mod tests {

Ok(())
}

/// Helper: creates an InListExpr with `static_filter = None`
/// to force the column-reference evaluation path.
fn make_in_list_with_columns(
expr: Arc<dyn PhysicalExpr>,
list: Vec<Arc<dyn PhysicalExpr>>,
negated: bool,
) -> Arc<InListExpr> {
Arc::new(InListExpr::new(expr, list, negated, None))
}

#[test]
fn test_in_list_with_columns_int32_scalars() -> Result<()> {
// Column-reference path with scalar literals (bypassing static filter)
let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
let col_a = col("a", &schema)?;
let batch = RecordBatch::try_new(
Arc::new(schema),
vec![Arc::new(Int32Array::from(vec![
Some(1),
Some(2),
Some(3),
None,
]))],
)?;

let list = vec![
lit(ScalarValue::Int32(Some(1))),
lit(ScalarValue::Int32(Some(3))),
];
let expr = make_in_list_with_columns(col_a, list, false);

let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
let result = as_boolean_array(&result);
assert_eq!(
result,
&BooleanArray::from(vec![Some(true), Some(false), Some(true), None,])
);
Ok(())
}

#[test]
fn test_in_list_with_columns_int32_column_refs() -> Result<()> {
// IN list with column references
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
Field::new("c", DataType::Int32, true),
]);
let batch = RecordBatch::try_new(
Arc::new(schema.clone()),
vec![
Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3), None])),
Arc::new(Int32Array::from(vec![
Some(1),
Some(99),
Some(99),
Some(99),
])),
Arc::new(Int32Array::from(vec![Some(99), Some(99), Some(3), None])),
],
)?;

let col_a = col("a", &schema)?;
let list = vec![col("b", &schema)?, col("c", &schema)?];
let expr = make_in_list_with_columns(col_a, list, false);

let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
let result = as_boolean_array(&result);
// row 0: 1 IN (1, 99) → true
// row 1: 2 IN (99, 99) → false
// row 2: 3 IN (99, 3) → true
// row 3: NULL IN (99, NULL) → NULL
assert_eq!(
result,
&BooleanArray::from(vec![Some(true), Some(false), Some(true), None,])
);
Ok(())
}

#[test]
fn test_in_list_with_columns_utf8_column_refs() -> Result<()> {
// IN list with Utf8 column references
let schema = Schema::new(vec![
Field::new("a", DataType::Utf8, false),
Field::new("b", DataType::Utf8, false),
]);
let batch = RecordBatch::try_new(
Arc::new(schema.clone()),
vec![
Arc::new(StringArray::from(vec!["x", "y", "z"])),
Arc::new(StringArray::from(vec!["x", "x", "z"])),
],
)?;

let col_a = col("a", &schema)?;
let list = vec![col("b", &schema)?];
let expr = make_in_list_with_columns(col_a, list, false);

let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
let result = as_boolean_array(&result);
// row 0: "x" IN ("x") → true
// row 1: "y" IN ("x") → false
// row 2: "z" IN ("z") → true
assert_eq!(result, &BooleanArray::from(vec![true, false, true]));
Ok(())
}

#[test]
fn test_in_list_with_columns_negated() -> Result<()> {
// NOT IN with column references
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]);
let batch = RecordBatch::try_new(
Arc::new(schema.clone()),
vec![
Arc::new(Int32Array::from(vec![1, 2, 3])),
Arc::new(Int32Array::from(vec![1, 99, 3])),
],
)?;

let col_a = col("a", &schema)?;
let list = vec![col("b", &schema)?];
let expr = make_in_list_with_columns(col_a, list, true);

let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
let result = as_boolean_array(&result);
// row 0: 1 NOT IN (1) → false
// row 1: 2 NOT IN (99) → true
// row 2: 3 NOT IN (3) → false
assert_eq!(result, &BooleanArray::from(vec![false, true, false]));
Ok(())
}

#[test]
fn test_in_list_with_columns_null_in_list() -> Result<()> {
// IN list with NULL scalar (column-reference path)
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
let col_a = col("a", &schema)?;
let batch = RecordBatch::try_new(
Arc::new(schema),
vec![Arc::new(Int32Array::from(vec![1, 2]))],
)?;

let list = vec![
lit(ScalarValue::Int32(None)),
lit(ScalarValue::Int32(Some(1))),
];
let expr = make_in_list_with_columns(col_a, list, false);

let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
let result = as_boolean_array(&result);
// row 0: 1 IN (NULL, 1) → true (true OR null = true)
// row 1: 2 IN (NULL, 1) → NULL (false OR null = null)
assert_eq!(result, &BooleanArray::from(vec![Some(true), None]));
Ok(())
}

#[test]
fn test_in_list_with_columns_float_nan() -> Result<()> {
// Verify NaN == NaN is true in the column-reference path
// (consistent with Arrow's totalOrder semantics)
let schema = Schema::new(vec![
Field::new("a", DataType::Float64, false),
Field::new("b", DataType::Float64, false),
]);
let batch = RecordBatch::try_new(
Arc::new(schema.clone()),
vec![
Arc::new(Float64Array::from(vec![f64::NAN, 1.0, f64::NAN])),
Arc::new(Float64Array::from(vec![f64::NAN, 2.0, 0.0])),
],
)?;

let col_a = col("a", &schema)?;
let list = vec![col("b", &schema)?];
let expr = make_in_list_with_columns(col_a, list, false);

let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
let result = as_boolean_array(&result);
// row 0: NaN IN (NaN) → true
// row 1: 1.0 IN (2.0) → false
// row 2: NaN IN (0.0) → false
assert_eq!(result, &BooleanArray::from(vec![true, false, false]));
Ok(())
}
}
Loading