diff --git a/datafusion/functions-nested/src/extract.rs b/datafusion/functions-nested/src/extract.rs index a46c9c75094c6..57505c59493af 100644 --- a/datafusion/functions-nested/src/extract.rs +++ b/datafusion/functions-nested/src/extract.rs @@ -18,18 +18,21 @@ //! [`ScalarUDFImpl`] definitions for array_element, array_slice, array_pop_front, array_pop_back, and array_any_value functions. use arrow::array::{ - Array, ArrayRef, ArrowNativeTypeOp, Capacities, GenericListArray, Int64Array, + Array, ArrayRef, Capacities, GenericListArray, GenericListViewArray, Int64Array, MutableArrayData, NullArray, NullBufferBuilder, OffsetSizeTrait, }; -use arrow::buffer::OffsetBuffer; +use arrow::buffer::{OffsetBuffer, ScalarBuffer}; use arrow::datatypes::DataType; use arrow::datatypes::{ - DataType::{FixedSizeList, LargeList, List, Null}, + DataType::{FixedSizeList, LargeList, LargeListView, List, ListView, Null}, Field, }; -use datafusion_common::cast::as_int64_array; use datafusion_common::cast::as_large_list_array; use datafusion_common::cast::as_list_array; +use datafusion_common::cast::{ + as_int64_array, as_large_list_view_array, as_list_view_array, +}; +use datafusion_common::internal_err; use datafusion_common::utils::ListCoercion; use datafusion_common::{ exec_datafusion_err, exec_err, internal_datafusion_err, plan_err, @@ -449,10 +452,162 @@ fn array_slice_inner(args: &[ArrayRef]) -> Result { let array = as_large_list_array(&args[0])?; general_array_slice::(array, from_array, to_array, stride) } + ListView(_) => { + let array = as_list_view_array(&args[0])?; + general_list_view_array_slice::(array, from_array, to_array, stride) + } + LargeListView(_) => { + let array = as_large_list_view_array(&args[0])?; + general_list_view_array_slice::(array, from_array, to_array, stride) + } _ => exec_err!("array_slice does not support type: {}", array_data_type), } } +fn adjusted_from_index(index: i64, len: O) -> Result> +where + i64: TryInto, +{ + // 0 ~ len - 1 + let adjusted_zero_index = if index < 0 { + if let Ok(index) = index.try_into() { + // When index < 0 and -index > length, index is clamped to the beginning of the list. + // Otherwise, when index < 0, the index is counted from the end of the list. + // + // Note, we actually test the contrapositive, index < -length, because negating a + // negative will panic if the negative is equal to the smallest representable value + // while negating a positive is always safe. + if index < (O::zero() - O::one()) * len { + O::zero() + } else { + index + len + } + } else { + return exec_err!("array_slice got invalid index: {}", index); + } + } else { + // array_slice(arr, 1, to) is the same as array_slice(arr, 0, to) + if let Ok(index) = index.try_into() { + std::cmp::max(index - O::usize_as(1), O::usize_as(0)) + } else { + return exec_err!("array_slice got invalid index: {}", index); + } + }; + + if O::usize_as(0) <= adjusted_zero_index && adjusted_zero_index < len { + Ok(Some(adjusted_zero_index)) + } else { + // Out of bounds + Ok(None) + } +} + +fn adjusted_to_index(index: i64, len: O) -> Result> +where + i64: TryInto, +{ + // 0 ~ len - 1 + let adjusted_zero_index = if index < 0 { + // array_slice in duckdb with negative to_index is python-like, so index itself is exclusive + if let Ok(index) = index.try_into() { + index + len + } else { + return exec_err!("array_slice got invalid index: {}", index); + } + } else { + // array_slice(arr, from, len + 1) is the same as array_slice(arr, from, len) + if let Ok(index) = index.try_into() { + std::cmp::min(index - O::usize_as(1), len - O::usize_as(1)) + } else { + return exec_err!("array_slice got invalid index: {}", index); + } + }; + + if O::usize_as(0) <= adjusted_zero_index && adjusted_zero_index < len { + Ok(Some(adjusted_zero_index)) + } else { + // Out of bounds + Ok(None) + } +} + +/// Internal plan describing how to materialize a single row's slice after +/// the slice bounds/stride have been normalized. Both list layouts consume +/// this to drive their copy logic. +enum SlicePlan { + /// No values should be produced. + Empty, + /// A contiguous run starting at `start` (relative to the row) with `len` + /// elements can be copied in one go. + Contiguous { start: O, len: O }, + /// Arbitrary positions (already relative to the row) must be copied in + /// sequence. + Indices(Vec), +} + +/// Produces a [`SlicePlan`] for the given logical slice parameters. +fn compute_slice_plan( + len: O, + from_raw: i64, + to_raw: i64, + stride_raw: Option, +) -> Result> +where + i64: TryInto, +{ + if len == O::usize_as(0) { + return Ok(SlicePlan::Empty); + } + + let from_index = adjusted_from_index::(from_raw, len)?; + let to_index = adjusted_to_index::(to_raw, len)?; + + let (Some(from), Some(to)) = (from_index, to_index) else { + return Ok(SlicePlan::Empty); + }; + + let stride_value = stride_raw.unwrap_or(1); + if stride_value == 0 { + return exec_err!( + "array_slice got invalid stride: {:?}, it cannot be 0", + stride_value + ); + } + + if (from < to && stride_value.is_negative()) + || (from > to && stride_value.is_positive()) + { + return Ok(SlicePlan::Empty); + } + + let stride: O = stride_value.try_into().map_err(|_| { + internal_datafusion_err!("array_slice got invalid stride: {}", stride_value) + })?; + + if from <= to && stride_value.is_positive() { + if stride_value == 1 { + let len = to - from + O::usize_as(1); + Ok(SlicePlan::Contiguous { start: from, len }) + } else { + let mut indices = Vec::new(); + let mut index = from; + while index <= to { + indices.push(index); + index += stride; + } + Ok(SlicePlan::Indices(indices)) + } + } else { + let mut indices = Vec::new(); + let mut index = from; + while index >= to { + indices.push(index); + index += stride; + } + Ok(SlicePlan::Indices(indices)) + } +} + fn general_array_slice( array: &GenericListArray, from_array: &Int64Array, @@ -472,73 +627,6 @@ where // We have the slice syntax compatible with DuckDB v0.8.1. // The rule `adjusted_from_index` and `adjusted_to_index` follows the rule of array_slice in duckdb. - fn adjusted_from_index(index: i64, len: O) -> Result> - where - i64: TryInto, - { - // 0 ~ len - 1 - let adjusted_zero_index = if index < 0 { - if let Ok(index) = index.try_into() { - // When index < 0 and -index > length, index is clamped to the beginning of the list. - // Otherwise, when index < 0, the index is counted from the end of the list. - // - // Note, we actually test the contrapositive, index < -length, because negating a - // negative will panic if the negative is equal to the smallest representable value - // while negating a positive is always safe. - if index < (O::zero() - O::one()) * len { - O::zero() - } else { - index + len - } - } else { - return exec_err!("array_slice got invalid index: {}", index); - } - } else { - // array_slice(arr, 1, to) is the same as array_slice(arr, 0, to) - if let Ok(index) = index.try_into() { - std::cmp::max(index - O::usize_as(1), O::usize_as(0)) - } else { - return exec_err!("array_slice got invalid index: {}", index); - } - }; - - if O::usize_as(0) <= adjusted_zero_index && adjusted_zero_index < len { - Ok(Some(adjusted_zero_index)) - } else { - // Out of bounds - Ok(None) - } - } - - fn adjusted_to_index(index: i64, len: O) -> Result> - where - i64: TryInto, - { - // 0 ~ len - 1 - let adjusted_zero_index = if index < 0 { - // array_slice in duckdb with negative to_index is python-like, so index itself is exclusive - if let Ok(index) = index.try_into() { - index + len - } else { - return exec_err!("array_slice got invalid index: {}", index); - } - } else { - // array_slice(arr, from, len + 1) is the same as array_slice(arr, from, len) - if let Ok(index) = index.try_into() { - std::cmp::min(index - O::usize_as(1), len - O::usize_as(1)) - } else { - return exec_err!("array_slice got invalid index: {}", index); - } - }; - - if O::usize_as(0) <= adjusted_zero_index && adjusted_zero_index < len { - Ok(Some(adjusted_zero_index)) - } else { - // Out of bounds - Ok(None) - } - } - let mut offsets = vec![O::usize_as(0)]; let mut null_builder = NullBufferBuilder::new(array.len()); @@ -551,6 +639,7 @@ where if array.is_null(row_index) || from_array.is_null(row_index) || to_array.is_null(row_index) + || stride.is_some_and(|s| s.is_null(row_index)) { mutable.extend_nulls(1); offsets.push(offsets[row_index] + O::usize_as(1)); @@ -565,72 +654,32 @@ where continue; } - let from_index = adjusted_from_index::(from_array.value(row_index), len)?; - let to_index = adjusted_to_index::(to_array.value(row_index), len)?; - - if let (Some(from), Some(to)) = (from_index, to_index) { - let stride = stride.map(|s| s.value(row_index)); - // Default stride is 1 if not provided - let stride = stride.unwrap_or(1); - if stride.is_zero() { - return exec_err!( - "array_slice got invalid stride: {:?}, it cannot be 0", - stride - ); - } else if (from < to && stride.is_negative()) - || (from > to && stride.is_positive()) - { - // return empty array - offsets.push(offsets[row_index]); - continue; + let slice_plan = compute_slice_plan::( + len, + from_array.value(row_index), + to_array.value(row_index), + stride.map(|s| s.value(row_index)), + )?; + + match slice_plan { + SlicePlan::Empty => offsets.push(offsets[row_index]), + SlicePlan::Contiguous { + start: rel_start, + len: slice_len, + } => { + let start_index = (start + rel_start).to_usize().unwrap(); + let end_index = (start + rel_start + slice_len).to_usize().unwrap(); + mutable.extend(0, start_index, end_index); + offsets.push(offsets[row_index] + slice_len); } - - let stride: O = stride.try_into().map_err(|_| { - internal_datafusion_err!("array_slice got invalid stride: {}", stride) - })?; - - if from <= to && stride > O::zero() { - assert!(start + to <= end); - if stride.eq(&O::one()) { - // stride is default to 1 - mutable.extend( - 0, - (start + from).to_usize().unwrap(), - (start + to + O::usize_as(1)).to_usize().unwrap(), - ); - offsets.push(offsets[row_index] + (to - from + O::usize_as(1))); - continue; - } - let mut index = start + from; - let mut cnt = 0; - while index <= start + to { - mutable.extend( - 0, - index.to_usize().unwrap(), - index.to_usize().unwrap() + 1, - ); - index += stride; - cnt += 1; + SlicePlan::Indices(indices) => { + let count = indices.len(); + for rel_index in indices { + let absolute_index = (start + rel_index).to_usize().unwrap(); + mutable.extend(0, absolute_index, absolute_index + 1); } - offsets.push(offsets[row_index] + O::usize_as(cnt)); - } else { - let mut index = start + from; - let mut cnt = 0; - while index >= start + to { - mutable.extend( - 0, - index.to_usize().unwrap(), - index.to_usize().unwrap() + 1, - ); - index += stride; - cnt += 1; - } - // invalid range, return empty array - offsets.push(offsets[row_index] + O::usize_as(cnt)); + offsets.push(offsets[row_index] + O::usize_as(count)); } - } else { - // invalid range, return empty array - offsets.push(offsets[row_index]); } } @@ -644,6 +693,107 @@ where )?)) } +fn general_list_view_array_slice( + array: &GenericListViewArray, + from_array: &Int64Array, + to_array: &Int64Array, + stride: Option<&Int64Array>, +) -> Result +where + i64: TryInto, +{ + let values = array.values(); + let original_data = values.to_data(); + let capacity = Capacities::Array(original_data.len()); + let field = match array.data_type() { + ListView(field) | LargeListView(field) => Arc::clone(field), + other => { + return internal_err!("array_slice got unexpected data type: {}", other); + } + }; + + let mut mutable = + MutableArrayData::with_capacities(vec![&original_data], true, capacity); + + // We must build `offsets` and `sizes` buffers manually as ListView does not enforce + // monotonically increasing offsets. + let mut offsets = Vec::with_capacity(array.len()); + let mut sizes = Vec::with_capacity(array.len()); + let mut current_offset = O::usize_as(0); + let mut null_builder = NullBufferBuilder::new(array.len()); + + for row_index in 0..array.len() { + // Propagate NULL semantics: any NULL input yields a NULL output slot. + if array.is_null(row_index) + || from_array.is_null(row_index) + || to_array.is_null(row_index) + || stride.is_some_and(|s| s.is_null(row_index)) + { + null_builder.append_null(); + offsets.push(current_offset); + sizes.push(O::usize_as(0)); + continue; + } + null_builder.append_non_null(); + + let len = array.value_size(row_index); + + // Empty arrays always return an empty array. + if len == O::usize_as(0) { + offsets.push(current_offset); + sizes.push(O::usize_as(0)); + continue; + } + + let slice_plan = compute_slice_plan::( + len, + from_array.value(row_index), + to_array.value(row_index), + stride.map(|s| s.value(row_index)), + )?; + + let start = array.value_offset(row_index); + match slice_plan { + SlicePlan::Empty => { + offsets.push(current_offset); + sizes.push(O::usize_as(0)); + } + SlicePlan::Contiguous { + start: rel_start, + len: slice_len, + } => { + let start_index = (start + rel_start).to_usize().unwrap(); + let end_index = (start + rel_start + slice_len).to_usize().unwrap(); + mutable.extend(0, start_index, end_index); + offsets.push(current_offset); + sizes.push(slice_len); + current_offset += slice_len; + } + SlicePlan::Indices(indices) => { + let count = indices.len(); + for rel_index in indices { + let absolute_index = (start + rel_index).to_usize().unwrap(); + mutable.extend(0, absolute_index, absolute_index + 1); + } + let length = O::usize_as(count); + offsets.push(current_offset); + sizes.push(length); + current_offset += length; + } + } + } + + let data = mutable.freeze(); + + Ok(Arc::new(GenericListViewArray::::try_new( + field, + ScalarBuffer::from(offsets), + ScalarBuffer::from(sizes), + arrow::array::make_array(data), + null_builder.finish(), + )?)) +} + #[user_doc( doc_section(label = "Array Functions"), description = "Returns the array without the first element.", @@ -977,12 +1127,28 @@ where #[cfg(test)] mod tests { - use super::array_element_udf; + use super::{array_element_udf, general_list_view_array_slice}; + use arrow::array::{ + cast::AsArray, Array, ArrayRef, GenericListViewArray, Int32Array, Int64Array, + ListViewArray, + }; + use arrow::buffer::ScalarBuffer; use arrow::datatypes::{DataType, Field}; - use datafusion_common::{Column, DFSchema}; + use datafusion_common::{Column, DFSchema, Result}; use datafusion_expr::expr::ScalarFunction; use datafusion_expr::{Expr, ExprSchemable}; use std::collections::HashMap; + use std::sync::Arc; + + fn list_view_values(array: &GenericListViewArray) -> Vec> { + (0..array.len()) + .map(|i| { + let child = array.value(i); + let values = child.as_any().downcast_ref::().unwrap(); + values.iter().map(|v| v.unwrap()).collect() + }) + .collect() + } // Regression test for https://github.com/apache/datafusion/issues/13755 #[test] @@ -1028,4 +1194,164 @@ mod tests { fixed_size_list_type ); } + + #[test] + fn test_array_slice_list_view_basic() -> Result<()> { + let values: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])); + let offsets = ScalarBuffer::from(vec![0, 3]); + let sizes = ScalarBuffer::from(vec![3, 2]); + let field = Arc::new(Field::new("item", DataType::Int32, true)); + let array = ListViewArray::new(field, offsets, sizes, values, None); + + let from = Int64Array::from(vec![2, 1]); + let to = Int64Array::from(vec![3, 2]); + + let result = general_list_view_array_slice::( + &array, + &from, + &to, + None::<&Int64Array>, + )?; + let result = result.as_ref().as_list_view::(); + + assert_eq!(list_view_values(result), vec![vec![2, 3], vec![4, 5]]); + Ok(()) + } + + #[test] + fn test_array_slice_list_view_non_monotonic_offsets() -> Result<()> { + // First list references the tail of the values buffer, second list references the head. + let values: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])); + let offsets = ScalarBuffer::from(vec![3, 0]); + let sizes = ScalarBuffer::from(vec![2, 3]); + let field = Arc::new(Field::new("item", DataType::Int32, true)); + let array = ListViewArray::new(field, offsets, sizes, values, None); + + let from = Int64Array::from(vec![1, 1]); + let to = Int64Array::from(vec![2, 2]); + + let result = general_list_view_array_slice::( + &array, + &from, + &to, + None::<&Int64Array>, + )?; + let result = result.as_ref().as_list_view::(); + + assert_eq!(list_view_values(result), vec![vec![4, 5], vec![1, 2]]); + Ok(()) + } + + #[test] + fn test_array_slice_list_view_negative_stride() -> Result<()> { + let values: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])); + let offsets = ScalarBuffer::from(vec![0, 3]); + let sizes = ScalarBuffer::from(vec![3, 2]); + let field = Arc::new(Field::new("item", DataType::Int32, true)); + let array = ListViewArray::new(field, offsets, sizes, values, None); + + let from = Int64Array::from(vec![3, 2]); + let to = Int64Array::from(vec![1, 1]); + let stride = Int64Array::from(vec![-1, -1]); + + let result = + general_list_view_array_slice::(&array, &from, &to, Some(&stride))?; + let result = result.as_ref().as_list_view::(); + + assert_eq!(list_view_values(result), vec![vec![3, 2, 1], vec![5, 4]]); + Ok(()) + } + + #[test] + fn test_array_slice_list_view_out_of_order() -> Result<()> { + let values: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])); + let offsets = ScalarBuffer::from(vec![3, 1, 0]); + let sizes = ScalarBuffer::from(vec![2, 2, 1]); + let field = Arc::new(Field::new("item", DataType::Int32, true)); + let array = ListViewArray::new(field, offsets, sizes, values, None); + assert_eq!( + list_view_values(&array), + vec![vec![4, 5], vec![2, 3], vec![1]] + ); + + let from = Int64Array::from(vec![2, 2, 2]); + let to = Int64Array::from(vec![1, 1, 1]); + let stride = Int64Array::from(vec![-1, -1, -1]); + + let result = + general_list_view_array_slice::(&array, &from, &to, Some(&stride))?; + let result = result.as_ref().as_list_view::(); + + assert_eq!( + list_view_values(result), + vec![vec![5, 4], vec![3, 2], vec![]] + ); + Ok(()) + } + + #[test] + fn test_array_slice_list_view_with_nulls() -> Result<()> { + let values: ArrayRef = Arc::new(Int32Array::from(vec![ + Some(1), + None, + Some(3), + Some(4), + Some(5), + ])); + let offsets = ScalarBuffer::from(vec![0, 2, 5]); + let sizes = ScalarBuffer::from(vec![2, 3, 0]); + let field = Arc::new(Field::new("item", DataType::Int32, true)); + let array = ListViewArray::new(field, offsets, sizes, values, None); + + let from = Int64Array::from(vec![1, 1, 1]); + let to = Int64Array::from(vec![2, 2, 1]); + + let result = general_list_view_array_slice::(&array, &from, &to, None)?; + let result = result.as_ref().as_list_view::(); + + let actual: Vec>> = (0..result.len()) + .map(|i| { + result + .value(i) + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .collect() + }) + .collect(); + + assert_eq!( + actual, + vec![vec![Some(1), None], vec![Some(3), Some(4)], Vec::new(),] + ); + + // Test with NULL stride - should return NULL for rows with NULL stride + let stride_with_null = Int64Array::from(vec![Some(1), None, Some(1)]); + let result = general_list_view_array_slice::( + &array, + &from, + &to, + Some(&stride_with_null), + )?; + let result = result.as_ref().as_list_view::(); + + // First row: stride = 1, should return [1, None] + // Second row: stride = NULL, should return NULL + // Third row: stride = 1, empty array should return empty + assert!(!result.is_null(0)); // First row should not be null + assert!(result.is_null(1)); // Second row should be null (stride is NULL) + assert!(!result.is_null(2)); // Third row should not be null + + let first_row: Vec> = result + .value(0) + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .collect(); + assert_eq!(first_row, vec![Some(1), None]); + + Ok(()) + } } diff --git a/datafusion/sqllogictest/test_files/array.slt b/datafusion/sqllogictest/test_files/array.slt index 00629c392df48..7aa267a4dc6d7 100644 --- a/datafusion/sqllogictest/test_files/array.slt +++ b/datafusion/sqllogictest/test_files/array.slt @@ -1943,6 +1943,19 @@ select array_slice(make_array(1, 2, 3, 4, 5), 5, 1, -2), array_slice(make_array( ---- [5, 3, 1] [o, l, h] +# Test NULL stride +query ?? +select array_slice(make_array(1, 2, 3, 4, 5), 1, 5, NULL), array_slice(make_array('h', 'e', 'l', 'l', 'o'), 1, 5, NULL); +---- +NULL NULL + +# Test NULL stride +query ?? +select array_slice(arrow_cast(make_array(1, 2, 3, 4, 5), 'LargeList(Int64)'), 1, 5, NULL), + array_slice(arrow_cast(make_array('h', 'e', 'l', 'l', 'o'), 'LargeList(Utf8)'), 1, 5, NULL); +---- +NULL NULL + query ?? select array_slice(arrow_cast(make_array(1, 2, 3, 4, 5), 'LargeList(Int64)'), 2, 4), array_slice(arrow_cast(make_array('h', 'e', 'l', 'l', 'o'), 'LargeList(Utf8)'), 1, 2); ---- @@ -1965,6 +1978,14 @@ select array_slice(arrow_cast(make_array(1, 2, 3, 4, 5), 'LargeList(Int64)'), 0, ---- [1, 2, 3, 4, 5] [h, e, l, l, o] +# TODO: Enable once arrow_cast supports ListView types. +# Expected output (once supported): +# ---- +# [1, 2, 3, 4, 5] [h, e, l, l, o] +query error DataFusion error: Execution error: Unsupported type 'ListView\(Int64\)'. Must be a supported arrow type name such as 'Int32' or 'Timestamp\(ns\)'. Error unknown token: ListView +select array_slice(arrow_cast(make_array(1, 2, 3, 4, 5), 'ListView(Int64)'), 0, 6), + array_slice(arrow_cast(make_array('h', 'e', 'l', 'l', 'o'), 'ListView(Utf8)'), 0, 5); + query ?? select array_slice(arrow_cast(make_array(1, 2, 3, 4, 5), 'FixedSizeList(5, Int64)'), 0, 6), array_slice(arrow_cast(make_array('h', 'e', 'l', 'l', 'o'), 'FixedSizeList(5, Utf8)'), 0, 5); @@ -2004,6 +2025,14 @@ select array_slice(arrow_cast(make_array(1, 2, 3, 4, 5), 'LargeList(Int64)'), 2, ---- [2, 3, 4, 5] [l, l, o] +# TODO: Enable once arrow_cast supports LargeListView types. +# Expected output (once supported): +# ---- +# [2, 3, 4, 5] [l, l, o] +query error DataFusion error: Execution error: Unsupported type 'LargeListView\(Int64\)'. Must be a supported arrow type name such as 'Int32' or 'Timestamp\(ns\)'. Error unknown token: LargeListView +select array_slice(arrow_cast(make_array(1, 2, 3, 4, 5), 'LargeListView(Int64)'), 2, 6), + array_slice(arrow_cast(make_array('h', 'e', 'l', 'l', 'o'), 'LargeListView(Utf8)'), 3, 7); + # array_slice scalar function #6 (with positive indexes; nested array) query ? select array_slice(make_array(make_array(1, 2, 3, 4, 5), make_array(6, 7, 8, 9, 10)), 1, 1);