Skip to content

Commit

Permalink
Move dynamic array builder out of metadata_scan
Browse files Browse the repository at this point in the history
  • Loading branch information
rshkv committed Jan 2, 2025
1 parent b908544 commit cca84fc
Show file tree
Hide file tree
Showing 2 changed files with 214 additions and 200 deletions.
187 changes: 187 additions & 0 deletions crates/iceberg/src/arrow/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,193 @@ get_parquet_stat_as_datum!(min);

get_parquet_stat_as_datum!(max);

/// Utilities to deal with [arrow_array::builder] types in the Iceberg context.
pub(crate) mod builder {
use arrow_array::builder::*;
use arrow_array::cast::AsArray;
use arrow_array::types::*;
use arrow_array::{ArrayRef, Datum as ArrowDatum};
use arrow_schema::{DataType, TimeUnit};
use ordered_float::OrderedFloat;

use crate::spec::{Literal, PrimitiveLiteral};
use crate::{Error, ErrorKind};

/// A helper wrapping [ArrayBuilder] for building arrays without declaring the inner type at
/// compile-time when types are determined dynamically (e.g. based on some column type).
/// A [DataType] is given at construction time which is used to later downcast the inner array
/// and provided values.
pub(crate) struct AnyArrayBuilder {
data_type: DataType,
inner: Box<dyn ArrayBuilder>,
}

impl AnyArrayBuilder {
pub(crate) fn new(data_type: &DataType) -> Self {
Self {
data_type: data_type.clone(),
inner: make_builder(data_type, 0),
}
}

pub(crate) fn finish(&mut self) -> ArrayRef {
self.inner.finish()
}

/// Append an [[arrow_array::Datum]] value.
pub(crate) fn append_datum(&mut self, value: &dyn ArrowDatum) -> crate::Result<()> {
let (array, is_scalar) = value.get();
assert!(is_scalar, "Can only append scalar datum");

match array.data_type() {
DataType::Boolean => self
.builder::<BooleanBuilder>()?
.append_value(array.as_boolean().value(0)),
DataType::Int32 => self
.builder::<Int32Builder>()?
.append_value(array.as_primitive::<Int32Type>().value(0)),
DataType::Int64 => self
.builder::<Int64Builder>()?
.append_value(array.as_primitive::<Int64Type>().value(0)),
DataType::Float32 => self
.builder::<Float32Builder>()?
.append_value(array.as_primitive::<Float32Type>().value(0)),
DataType::Float64 => self
.builder::<Float64Builder>()?
.append_value(array.as_primitive::<Float64Type>().value(0)),
DataType::Decimal128(_, _) => self
.builder::<Decimal128Builder>()?
.append_value(array.as_primitive::<Decimal128Type>().value(0)),
DataType::Date32 => self
.builder::<Date32Builder>()?
.append_value(array.as_primitive::<Date32Type>().value(0)),
DataType::Time64(TimeUnit::Microsecond) => self
.builder::<Time64MicrosecondBuilder>()?
.append_value(array.as_primitive::<Time64MicrosecondType>().value(0)),
DataType::Timestamp(TimeUnit::Microsecond, _) => self
.builder::<TimestampMicrosecondBuilder>()?
.append_value(array.as_primitive::<TimestampMicrosecondType>().value(0)),
DataType::Timestamp(TimeUnit::Nanosecond, _) => self
.builder::<TimestampNanosecondBuilder>()?
.append_value(array.as_primitive::<TimestampNanosecondType>().value(0)),
DataType::Utf8 => self
.builder::<StringBuilder>()?
.append_value(array.as_string::<i32>().value(0)),
DataType::FixedSizeBinary(_) => self
.builder::<BinaryBuilder>()?
.append_value(array.as_fixed_size_binary().value(0)),
DataType::LargeBinary => self
.builder::<LargeBinaryBuilder>()?
.append_value(array.as_binary::<i64>().value(0)),
_ => {
return Err(Error::new(
ErrorKind::FeatureUnsupported,
format!("Cannot append data type: {:?}", array.data_type(),),
));
}
}
Ok(())
}

/// Append a literal with the provided [DataType]. We're not solely relying on the literal to
/// infer the type because [Literal] values do not specify the expected type of builder. E.g.,
/// a [PrimitiveLiteral::Long] may go into an array builder for longs but also for timestamps.
pub(crate) fn append_literal(&mut self, value: &Literal) -> crate::Result<()> {
let Some(primitive) = value.as_primitive_literal() else {
return Err(Error::new(
ErrorKind::FeatureUnsupported,
"Expected primitive type",
));
};

match (&self.data_type, primitive.clone()) {
(DataType::Boolean, PrimitiveLiteral::Boolean(value)) => {
self.builder::<BooleanBuilder>()?.append_value(value)
}
(DataType::Int32, PrimitiveLiteral::Int(value)) => {
self.builder::<Int32Builder>()?.append_value(value)
}
(DataType::Int64, PrimitiveLiteral::Long(value)) => {
self.builder::<Int64Builder>()?.append_value(value)
}
(DataType::Float32, PrimitiveLiteral::Float(OrderedFloat(value))) => {
self.builder::<Float32Builder>()?.append_value(value)
}
(DataType::Float64, PrimitiveLiteral::Double(OrderedFloat(value))) => {
self.builder::<Float64Builder>()?.append_value(value)
}
(DataType::Utf8, PrimitiveLiteral::String(value)) => {
self.builder::<StringBuilder>()?.append_value(value)
}
(DataType::FixedSizeBinary(_), PrimitiveLiteral::Binary(value)) => self
.builder::<FixedSizeBinaryBuilder>()?
.append_value(value)?,
(DataType::LargeBinary, PrimitiveLiteral::Binary(value)) => {
self.builder::<LargeBinaryBuilder>()?.append_value(value)
}
(_, _) => {
return Err(Error::new(
ErrorKind::FeatureUnsupported,
format!(
"Builder of type {:?} does not accept literal {:?}",
self.data_type, primitive
),
));
}
}

Ok(())
}

/// Append a null value for the provided [DataType].
pub(crate) fn append_null(&mut self) -> crate::Result<()> {
match self.data_type {
DataType::Boolean => self.builder::<BooleanBuilder>()?.append_null(),
DataType::Int32 => self.builder::<Int32Builder>()?.append_null(),
DataType::Int64 => self.builder::<Int64Builder>()?.append_null(),
DataType::Float32 => self.builder::<Float32Builder>()?.append_null(),
DataType::Float64 => self.builder::<Float64Builder>()?.append_null(),
DataType::Decimal128(_, _) => self.builder::<Decimal128Builder>()?.append_null(),
DataType::Date32 => self.builder::<Date32Builder>()?.append_null(),
DataType::Time64(TimeUnit::Microsecond) => {
self.builder::<Time64MicrosecondBuilder>()?.append_null()
}
DataType::Timestamp(TimeUnit::Microsecond, _) => {
self.builder::<TimestampMicrosecondBuilder>()?.append_null()
}
DataType::Timestamp(TimeUnit::Nanosecond, _) => {
self.builder::<TimestampNanosecondBuilder>()?.append_null()
}
DataType::Utf8 => self.builder::<StringBuilder>()?.append_null(),
DataType::FixedSizeBinary(_) => {
self.builder::<FixedSizeBinaryBuilder>()?.append_null()
}
DataType::LargeBinary => self.builder::<LargeBinaryBuilder>()?.append_null(),
_ => {
return Err(Error::new(
ErrorKind::FeatureUnsupported,
format!(
"Cannot append null values for data type: {:?}",
self.data_type
),
))
}
}
Ok(())
}

/// Cast the `inner` builder to a specific type or return [Error].
fn builder<T: ArrayBuilder>(&mut self) -> crate::Result<&mut T> {
self.inner.as_any_mut().downcast_mut::<T>().ok_or_else(|| {
Error::new(
ErrorKind::Unexpected,
"Failed to cast builder to expected type",
)
})
}
}
}

impl TryFrom<&ArrowSchema> for crate::spec::Schema {
type Error = Error;

Expand Down
Loading

0 comments on commit cca84fc

Please sign in to comment.