diff --git a/parquet-variant-compute/src/shred_variant.rs b/parquet-variant-compute/src/shred_variant.rs index 45e7fc95c9f9..7f253d249dfb 100644 --- a/parquet-variant-compute/src/shred_variant.rs +++ b/parquet-variant-compute/src/shred_variant.rs @@ -19,19 +19,17 @@ use crate::variant_array::{ShreddedVariantFieldArray, StructArrayBuilder}; use crate::variant_to_arrow::{ - PrimitiveVariantToArrowRowBuilder, make_primitive_variant_to_arrow_row_builder, + ArrayVariantToArrowRowBuilder, PrimitiveVariantToArrowRowBuilder, + make_primitive_variant_to_arrow_row_builder, }; use crate::{VariantArray, VariantValueArrayBuilder}; -use arrow::array::{ - ArrayRef, BinaryViewArray, GenericListArray, GenericListViewArray, NullBufferBuilder, - OffsetSizeTrait, -}; -use arrow::buffer::{NullBuffer, OffsetBuffer, ScalarBuffer}; +use arrow::array::{ArrayRef, BinaryViewArray, NullBufferBuilder}; +use arrow::buffer::NullBuffer; use arrow::compute::CastOptions; -use arrow::datatypes::{ArrowNativeTypeOp, DataType, Field, FieldRef, Fields, TimeUnit}; +use arrow::datatypes::{DataType, Field, FieldRef, Fields, TimeUnit}; use arrow::error::{ArrowError, Result}; use indexmap::IndexMap; -use parquet_variant::{Variant, VariantBuilderExt, VariantList, VariantPath, VariantPathElement}; +use parquet_variant::{Variant, VariantBuilderExt, VariantPath, VariantPathElement}; use std::collections::BTreeMap; use std::sync::Arc; @@ -123,7 +121,8 @@ pub(crate) fn make_variant_to_shredded_variant_arrow_row_builder<'a>( DataType::List(_) | DataType::LargeList(_) | DataType::ListView(_) - | DataType::LargeListView(_) => { + | DataType::LargeListView(_) + | DataType::FixedSizeList(..) => { let typed_value_builder = VariantToShreddedArrayVariantRowBuilder::try_new( data_type, cast_options, @@ -131,11 +130,6 @@ pub(crate) fn make_variant_to_shredded_variant_arrow_row_builder<'a>( )?; VariantToShreddedVariantRowBuilder::Array(typed_value_builder) } - DataType::FixedSizeList(..) => { - return Err(ArrowError::NotYetImplemented( - "Shredding variant array values as fixed-size lists".to_string(), - )); - } // Supported shredded primitive types, see Variant shredding spec: // https://github.com/apache/parquet-format/blob/master/VariantShredding.md#shredded-value-types DataType::Boolean @@ -312,171 +306,6 @@ impl<'a> VariantToShreddedArrayVariantRowBuilder<'a> { } } -enum ArrayVariantToArrowRowBuilder<'a> { - List(VariantToListArrowRowBuilder<'a, i32, false>), - LargeList(VariantToListArrowRowBuilder<'a, i64, false>), - ListView(VariantToListArrowRowBuilder<'a, i32, true>), - LargeListView(VariantToListArrowRowBuilder<'a, i64, true>), -} - -impl<'a> ArrayVariantToArrowRowBuilder<'a> { - fn try_new( - data_type: &'a DataType, - cast_options: &'a CastOptions, - capacity: usize, - ) -> Result { - use ArrayVariantToArrowRowBuilder::*; - - // Make List/ListView builders without repeating the constructor boilerplate. - macro_rules! make_list_builder { - ($variant:ident, $offset:ty, $is_view:expr, $field:ident) => { - $variant(VariantToListArrowRowBuilder::<$offset, $is_view>::try_new( - $field.clone(), - $field.data_type(), - cast_options, - capacity, - )?) - }; - } - - let builder = match data_type { - DataType::List(field) => make_list_builder!(List, i32, false, field), - DataType::LargeList(field) => make_list_builder!(LargeList, i64, false, field), - DataType::ListView(field) => make_list_builder!(ListView, i32, true, field), - DataType::LargeListView(field) => make_list_builder!(LargeListView, i64, true, field), - other => { - return Err(ArrowError::InvalidArgumentError(format!( - "Casting to {other:?} is not applicable for array Variant types" - ))); - } - }; - Ok(builder) - } - - fn append_null(&mut self) { - match self { - Self::List(builder) => builder.append_null(), - Self::LargeList(builder) => builder.append_null(), - Self::ListView(builder) => builder.append_null(), - Self::LargeListView(builder) => builder.append_null(), - } - } - - fn append_value(&mut self, list: VariantList<'_, '_>) -> Result<()> { - match self { - Self::List(builder) => builder.append_value(list), - Self::LargeList(builder) => builder.append_value(list), - Self::ListView(builder) => builder.append_value(list), - Self::LargeListView(builder) => builder.append_value(list), - } - } - - fn finish(self) -> Result { - match self { - Self::List(builder) => builder.finish(), - Self::LargeList(builder) => builder.finish(), - Self::ListView(builder) => builder.finish(), - Self::LargeListView(builder) => builder.finish(), - } - } -} - -struct VariantToListArrowRowBuilder<'a, O, const IS_VIEW: bool> -where - O: OffsetSizeTrait + ArrowNativeTypeOp, -{ - field: FieldRef, - offsets: Vec, - element_builder: Box>, - nulls: NullBufferBuilder, - current_offset: O, -} - -impl<'a, O, const IS_VIEW: bool> VariantToListArrowRowBuilder<'a, O, IS_VIEW> -where - O: OffsetSizeTrait + ArrowNativeTypeOp, -{ - fn try_new( - field: FieldRef, - element_data_type: &'a DataType, - cast_options: &'a CastOptions, - capacity: usize, - ) -> Result { - if capacity >= isize::MAX as usize { - return Err(ArrowError::ComputeError( - "Capacity exceeds isize::MAX when reserving list offsets".to_string(), - )); - } - let mut offsets = Vec::with_capacity(capacity + 1); - offsets.push(O::ZERO); - let element_builder = make_variant_to_shredded_variant_arrow_row_builder( - element_data_type, - cast_options, - capacity, - false, - )?; - Ok(Self { - field, - offsets, - element_builder: Box::new(element_builder), - nulls: NullBufferBuilder::new(capacity), - current_offset: O::ZERO, - }) - } - - fn append_null(&mut self) { - self.offsets.push(self.current_offset); - self.nulls.append_null(); - } - - fn append_value(&mut self, list: VariantList<'_, '_>) -> Result<()> { - for element in list.iter() { - self.element_builder.append_value(element)?; - self.current_offset = self.current_offset.add_checked(O::ONE)?; - } - self.offsets.push(self.current_offset); - self.nulls.append_non_null(); - Ok(()) - } - - fn finish(mut self) -> Result { - let (value, typed_value, nulls) = self.element_builder.finish()?; - let element_array = - ShreddedVariantFieldArray::from_parts(Some(value), Some(typed_value), nulls); - let field = Arc::new( - self.field - .as_ref() - .clone() - .with_data_type(element_array.data_type().clone()), - ); - - if IS_VIEW { - // NOTE: `offsets` is never empty (constructor pushes an entry) - let mut sizes = Vec::with_capacity(self.offsets.len() - 1); - for i in 1..self.offsets.len() { - sizes.push(self.offsets[i] - self.offsets[i - 1]); - } - self.offsets.pop(); - let list_view_array = GenericListViewArray::::new( - field, - ScalarBuffer::from(self.offsets), - ScalarBuffer::from(sizes), - ArrayRef::from(element_array), - self.nulls.finish(), - ); - Ok(Arc::new(list_view_array)) - } else { - let list_array = GenericListArray::::new( - field, - OffsetBuffer::::new(ScalarBuffer::from(self.offsets)), - ArrayRef::from(element_array), - self.nulls.finish(), - ); - Ok(Arc::new(list_array)) - } - } -} - pub(crate) struct VariantToShreddedObjectVariantRowBuilder<'a> { value_builder: VariantValueArrayBuilder, typed_value_builders: IndexMap<&'a str, VariantToShreddedVariantRowBuilder<'a>>, @@ -1513,6 +1342,22 @@ mod tests { ); } + #[test] + fn test_array_shredding_as_fixed_size_list() { + let input = build_variant_array(vec![VariantRow::List(vec![ + VariantValue::from(1i64), + VariantValue::from(2i64), + VariantValue::from(3i64), + ])]); + let list_schema = + DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, true)), 2); + let err = shred_variant(&input, &list_schema).unwrap_err(); + assert_eq!( + err.to_string(), + "Not yet implemented: Converting unshredded variant arrays to arrow fixed-size lists" + ); + } + #[test] fn test_array_shredding_with_array_elements() { let input = build_variant_array(vec![ diff --git a/parquet-variant-compute/src/variant_to_arrow.rs b/parquet-variant-compute/src/variant_to_arrow.rs index 57d9944bb527..172bd4811bc3 100644 --- a/parquet-variant-compute/src/variant_to_arrow.rs +++ b/parquet-variant-compute/src/variant_to_arrow.rs @@ -15,23 +15,117 @@ // specific language governing permissions and limitations // under the License. +use crate::shred_variant::{ + VariantToShreddedVariantRowBuilder, make_variant_to_shredded_variant_arrow_row_builder, +}; +use crate::type_conversion::{ + PrimitiveFromVariant, TimestampFromVariant, variant_to_unscaled_decimal, +}; +use crate::variant_array::ShreddedVariantFieldArray; +use crate::{VariantArray, VariantValueArrayBuilder}; use arrow::array::{ - ArrayRef, BinaryBuilder, BinaryLikeArrayBuilder, BinaryViewArray, BinaryViewBuilder, - BooleanBuilder, FixedSizeBinaryBuilder, LargeBinaryBuilder, LargeStringBuilder, NullArray, - NullBufferBuilder, PrimitiveBuilder, StringBuilder, StringLikeArrayBuilder, StringViewBuilder, + ArrayRef, ArrowNativeTypeOp, BinaryBuilder, BinaryLikeArrayBuilder, BinaryViewArray, + BinaryViewBuilder, BooleanBuilder, FixedSizeBinaryBuilder, GenericListArray, + GenericListViewArray, LargeBinaryBuilder, LargeStringBuilder, NullArray, NullBufferBuilder, + OffsetSizeTrait, PrimitiveBuilder, StringBuilder, StringLikeArrayBuilder, StringViewBuilder, }; +use arrow::buffer::{OffsetBuffer, ScalarBuffer}; use arrow::compute::{CastOptions, DecimalCast}; use arrow::datatypes::{self, DataType, DecimalType}; use arrow::error::{ArrowError, Result}; -use parquet_variant::{Variant, VariantPath}; +use arrow_schema::{FieldRef, TimeUnit}; +use parquet_variant::{Variant, VariantList, VariantPath}; +use std::sync::Arc; -use crate::type_conversion::{ - PrimitiveFromVariant, TimestampFromVariant, variant_to_unscaled_decimal, -}; -use crate::{VariantArray, VariantValueArrayBuilder}; +/// Builder for converting variant values into strongly typed Arrow arrays. +/// +/// Useful for variant_get kernels that need to extract specific paths from variant values, possibly +/// with casting of leaf values to specific types. +pub(crate) enum VariantToArrowRowBuilder<'a> { + Primitive(PrimitiveVariantToArrowRowBuilder<'a>), + BinaryVariant(VariantToBinaryVariantArrowRowBuilder), -use arrow_schema::TimeUnit; -use std::sync::Arc; + // Path extraction wrapper - contains a boxed enum for any of the above + WithPath(VariantPathRowBuilder<'a>), +} + +impl<'a> VariantToArrowRowBuilder<'a> { + pub fn append_null(&mut self) -> Result<()> { + use VariantToArrowRowBuilder::*; + match self { + Primitive(b) => b.append_null(), + BinaryVariant(b) => b.append_null(), + WithPath(path_builder) => path_builder.append_null(), + } + } + + pub fn append_value(&mut self, value: Variant<'_, '_>) -> Result { + use VariantToArrowRowBuilder::*; + match self { + Primitive(b) => b.append_value(&value), + BinaryVariant(b) => b.append_value(value), + WithPath(path_builder) => path_builder.append_value(value), + } + } + + pub fn finish(self) -> Result { + use VariantToArrowRowBuilder::*; + match self { + Primitive(b) => b.finish(), + BinaryVariant(b) => b.finish(), + WithPath(path_builder) => path_builder.finish(), + } + } +} + +pub(crate) fn make_variant_to_arrow_row_builder<'a>( + metadata: &BinaryViewArray, + path: VariantPath<'a>, + data_type: Option<&'a DataType>, + cast_options: &'a CastOptions, + capacity: usize, +) -> Result> { + use VariantToArrowRowBuilder::*; + + let mut builder = match data_type { + // If no data type was requested, build an unshredded VariantArray. + None => BinaryVariant(VariantToBinaryVariantArrowRowBuilder::new( + metadata.clone(), + capacity, + )), + Some(DataType::Struct(_)) => { + return Err(ArrowError::NotYetImplemented( + "Converting unshredded variant objects to arrow structs".to_string(), + )); + } + Some( + DataType::List(_) + | DataType::LargeList(_) + | DataType::ListView(_) + | DataType::LargeListView(_) + | DataType::FixedSizeList(..), + ) => { + return Err(ArrowError::NotYetImplemented( + "Converting unshredded variant arrays to arrow lists".to_string(), + )); + } + Some(data_type) => { + let builder = + make_primitive_variant_to_arrow_row_builder(data_type, cast_options, capacity)?; + Primitive(builder) + } + }; + + // Wrap with path extraction if needed + if !path.is_empty() { + builder = WithPath(VariantPathRowBuilder { + builder: Box::new(builder), + path, + }) + }; + + Ok(builder) +} /// Builder for converting primitive variant values to Arrow arrays. It is used by both /// `VariantToArrowRowBuilder` (below) and `VariantToShreddedPrimitiveVariantRowBuilder` (in @@ -81,18 +175,6 @@ pub(crate) enum PrimitiveVariantToArrowRowBuilder<'a> { BinaryView(VariantToBinaryArrowRowBuilder<'a, BinaryViewBuilder>), } -/// Builder for converting variant values into strongly typed Arrow arrays. -/// -/// Useful for variant_get kernels that need to extract specific paths from variant values, possibly -/// with casting of leaf values to specific types. -pub(crate) enum VariantToArrowRowBuilder<'a> { - Primitive(PrimitiveVariantToArrowRowBuilder<'a>), - BinaryVariant(VariantToBinaryVariantArrowRowBuilder), - - // Path extraction wrapper - contains a boxed enum for any of the above - WithPath(VariantPathRowBuilder<'a>), -} - impl<'a> PrimitiveVariantToArrowRowBuilder<'a> { pub fn append_null(&mut self) -> Result<()> { use PrimitiveVariantToArrowRowBuilder::*; @@ -227,35 +309,6 @@ impl<'a> PrimitiveVariantToArrowRowBuilder<'a> { } } -impl<'a> VariantToArrowRowBuilder<'a> { - pub fn append_null(&mut self) -> Result<()> { - use VariantToArrowRowBuilder::*; - match self { - Primitive(b) => b.append_null(), - BinaryVariant(b) => b.append_null(), - WithPath(path_builder) => path_builder.append_null(), - } - } - - pub fn append_value(&mut self, value: Variant<'_, '_>) -> Result { - use VariantToArrowRowBuilder::*; - match self { - Primitive(b) => b.append_value(&value), - BinaryVariant(b) => b.append_value(value), - WithPath(path_builder) => path_builder.append_value(value), - } - } - - pub fn finish(self) -> Result { - use VariantToArrowRowBuilder::*; - match self { - Primitive(b) => b.finish(), - BinaryVariant(b) => b.finish(), - WithPath(path_builder) => path_builder.finish(), - } - } -} - /// Creates a row builder that converts primitive `Variant` values into the requested Arrow data type. pub(crate) fn make_primitive_variant_to_arrow_row_builder<'a>( data_type: &'a DataType, @@ -427,53 +480,78 @@ pub(crate) fn make_primitive_variant_to_arrow_row_builder<'a>( Ok(builder) } -pub(crate) fn make_variant_to_arrow_row_builder<'a>( - metadata: &BinaryViewArray, - path: VariantPath<'a>, - data_type: Option<&'a DataType>, - cast_options: &'a CastOptions, - capacity: usize, -) -> Result> { - use VariantToArrowRowBuilder::*; +pub(crate) enum ArrayVariantToArrowRowBuilder<'a> { + List(VariantToListArrowRowBuilder<'a, i32, false>), + LargeList(VariantToListArrowRowBuilder<'a, i64, false>), + ListView(VariantToListArrowRowBuilder<'a, i32, true>), + LargeListView(VariantToListArrowRowBuilder<'a, i64, true>), +} - let mut builder = match data_type { - // If no data type was requested, build an unshredded VariantArray. - None => BinaryVariant(VariantToBinaryVariantArrowRowBuilder::new( - metadata.clone(), - capacity, - )), - Some(DataType::Struct(_)) => { - return Err(ArrowError::NotYetImplemented( - "Converting unshredded variant objects to arrow structs".to_string(), - )); - } - Some( - DataType::List(_) - | DataType::LargeList(_) - | DataType::ListView(_) - | DataType::LargeListView(_) - | DataType::FixedSizeList(..), - ) => { - return Err(ArrowError::NotYetImplemented( - "Converting unshredded variant arrays to arrow lists".to_string(), - )); +impl<'a> ArrayVariantToArrowRowBuilder<'a> { + pub(crate) fn try_new( + data_type: &'a DataType, + cast_options: &'a CastOptions, + capacity: usize, + ) -> Result { + use ArrayVariantToArrowRowBuilder::*; + + // Make List/ListView builders without repeating the constructor boilerplate. + macro_rules! make_list_builder { + ($variant:ident, $offset:ty, $is_view:expr, $field:ident) => { + $variant(VariantToListArrowRowBuilder::<$offset, $is_view>::try_new( + $field.clone(), + $field.data_type(), + cast_options, + capacity, + )?) + }; } - Some(data_type) => { - let builder = - make_primitive_variant_to_arrow_row_builder(data_type, cast_options, capacity)?; - Primitive(builder) + + let builder = match data_type { + DataType::List(field) => make_list_builder!(List, i32, false, field), + DataType::LargeList(field) => make_list_builder!(LargeList, i64, false, field), + DataType::ListView(field) => make_list_builder!(ListView, i32, true, field), + DataType::LargeListView(field) => make_list_builder!(LargeListView, i64, true, field), + DataType::FixedSizeList(..) => { + return Err(ArrowError::NotYetImplemented( + "Converting unshredded variant arrays to arrow fixed-size lists".to_string(), + )); + } + other => { + return Err(ArrowError::InvalidArgumentError(format!( + "Casting to {other:?} is not applicable for array Variant types" + ))); + } + }; + Ok(builder) + } + + pub(crate) fn append_null(&mut self) { + match self { + Self::List(builder) => builder.append_null(), + Self::LargeList(builder) => builder.append_null(), + Self::ListView(builder) => builder.append_null(), + Self::LargeListView(builder) => builder.append_null(), } - }; + } - // Wrap with path extraction if needed - if !path.is_empty() { - builder = WithPath(VariantPathRowBuilder { - builder: Box::new(builder), - path, - }) - }; + pub(crate) fn append_value(&mut self, list: VariantList<'_, '_>) -> Result<()> { + match self { + Self::List(builder) => builder.append_value(list), + Self::LargeList(builder) => builder.append_value(list), + Self::ListView(builder) => builder.append_value(list), + Self::LargeListView(builder) => builder.append_value(list), + } + } - Ok(builder) + pub(crate) fn finish(self) -> Result { + match self { + Self::List(builder) => builder.finish(), + Self::LargeList(builder) => builder.finish(), + Self::ListView(builder) => builder.finish(), + Self::LargeListView(builder) => builder.finish(), + } + } } /// A thin wrapper whose only job is to extract a specific path from a variant value and pass the @@ -708,6 +786,102 @@ impl<'a> VariantToUuidArrowRowBuilder<'a> { } } +pub(crate) struct VariantToListArrowRowBuilder<'a, O, const IS_VIEW: bool> +where + O: OffsetSizeTrait + ArrowNativeTypeOp, +{ + field: FieldRef, + offsets: Vec, + element_builder: Box>, + nulls: NullBufferBuilder, + current_offset: O, +} + +impl<'a, O, const IS_VIEW: bool> VariantToListArrowRowBuilder<'a, O, IS_VIEW> +where + O: OffsetSizeTrait + ArrowNativeTypeOp, +{ + fn try_new( + field: FieldRef, + element_data_type: &'a DataType, + cast_options: &'a CastOptions, + capacity: usize, + ) -> Result { + if capacity >= isize::MAX as usize { + return Err(ArrowError::ComputeError( + "Capacity exceeds isize::MAX when reserving list offsets".to_string(), + )); + } + let mut offsets = Vec::with_capacity(capacity + 1); + offsets.push(O::ZERO); + let element_builder = make_variant_to_shredded_variant_arrow_row_builder( + element_data_type, + cast_options, + capacity, + false, + )?; + Ok(Self { + field, + offsets, + element_builder: Box::new(element_builder), + nulls: NullBufferBuilder::new(capacity), + current_offset: O::ZERO, + }) + } + + fn append_null(&mut self) { + self.offsets.push(self.current_offset); + self.nulls.append_null(); + } + + fn append_value(&mut self, list: VariantList<'_, '_>) -> Result<()> { + for element in list.iter() { + self.element_builder.append_value(element)?; + self.current_offset = self.current_offset.add_checked(O::ONE)?; + } + self.offsets.push(self.current_offset); + self.nulls.append_non_null(); + Ok(()) + } + + fn finish(mut self) -> Result { + let (value, typed_value, nulls) = self.element_builder.finish()?; + let element_array = + ShreddedVariantFieldArray::from_parts(Some(value), Some(typed_value), nulls); + let field = Arc::new( + self.field + .as_ref() + .clone() + .with_data_type(element_array.data_type().clone()), + ); + + if IS_VIEW { + // NOTE: `offsets` is never empty (constructor pushes an entry) + let mut sizes = Vec::with_capacity(self.offsets.len() - 1); + for i in 1..self.offsets.len() { + sizes.push(self.offsets[i] - self.offsets[i - 1]); + } + self.offsets.pop(); + let list_view_array = GenericListViewArray::::new( + field, + ScalarBuffer::from(self.offsets), + ScalarBuffer::from(sizes), + ArrayRef::from(element_array), + self.nulls.finish(), + ); + Ok(Arc::new(list_view_array)) + } else { + let list_array = GenericListArray::::new( + field, + OffsetBuffer::::new(ScalarBuffer::from(self.offsets)), + ArrayRef::from(element_array), + self.nulls.finish(), + ); + Ok(Arc::new(list_array)) + } + } +} + /// Builder for creating VariantArray output (for path extraction without type conversion) pub(crate) struct VariantToBinaryVariantArrowRowBuilder { metadata: BinaryViewArray,