diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index 50f69fea5441..454f8455ffcd 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -39,7 +39,6 @@ ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] [dependencies] arrow-array = { workspace = true, optional = true } arrow-buffer = { workspace = true, optional = true } -arrow-cast = { workspace = true, optional = true } arrow-csv = { workspace = true, optional = true } arrow-data = { workspace = true, optional = true } arrow-schema = { workspace = true, optional = true } @@ -91,6 +90,7 @@ lz4_flex = { version = "0.12", default-features = false, features = ["std", "fra zstd = { version = "0.13", default-features = false } serde_json = { version = "1.0", features = ["std"], default-features = false } arrow = { workspace = true, features = ["ipc", "test_utils", "prettyprint", "json"] } +arrow-cast = { workspace = true } tokio = { version = "1.0", default-features = false, features = ["macros", "rt-multi-thread", "io-util", "fs"] } rand = { version = "0.9", default-features = false, features = ["std", "std_rng", "thread_rng"] } object_store = { version = "0.12.0", default-features = false, features = ["azure", "fs"] } @@ -104,7 +104,7 @@ default = ["arrow", "snap", "brotli", "flate2-zlib-rs", "lz4", "zstd", "base64", # Enable lz4 lz4 = ["lz4_flex"] # Enable arrow reader/writer APIs -arrow = ["base64", "arrow-array", "arrow-buffer", "arrow-cast", "arrow-data", "arrow-schema", "arrow-select", "arrow-ipc"] +arrow = ["base64", "arrow-array", "arrow-buffer", "arrow-data", "arrow-schema", "arrow-select", "arrow-ipc"] # Enable support for arrow canonical extension types arrow_canonical_extension_types = ["arrow-schema?/canonical_extension_types"] # Enable CLI tools diff --git a/parquet/src/arrow/array_reader/primitive_array.rs b/parquet/src/arrow/array_reader/primitive_array.rs index 7c5d03e0203c..362f10366111 100644 --- a/parquet/src/arrow/array_reader/primitive_array.rs +++ b/parquet/src/arrow/array_reader/primitive_array.rs @@ -21,17 +21,16 @@ use crate::arrow::schema::parquet_to_arrow_field; use crate::basic::Type as PhysicalType; use crate::column::page::PageIterator; use crate::data_type::{DataType, Int96}; -use crate::errors::{ParquetError, Result}; +use crate::errors::Result; use crate::schema::types::ColumnDescPtr; use arrow_array::{ - ArrayRef, BooleanArray, Decimal32Array, Decimal64Array, Decimal128Array, Decimal256Array, - Float32Array, Float64Array, Int8Array, Int16Array, Int32Array, Int64Array, + Array, ArrayRef, Date64Array, Decimal64Array, Decimal128Array, Decimal256Array, Int8Array, + Int16Array, Int32Array, Int64Array, PrimitiveArray, UInt8Array, UInt16Array, + builder::PrimitiveDictionaryBuilder, cast::AsArray, downcast_integer, make_array, types::*, +}; +use arrow_array::{ TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, - TimestampSecondArray, UInt8Array, UInt16Array, UInt32Array, UInt64Array, - builder::{ - TimestampMicrosecondBufferBuilder, TimestampMillisecondBufferBuilder, - TimestampNanosecondBufferBuilder, TimestampSecondBufferBuilder, - }, + TimestampSecondArray, UInt32Array, UInt64Array, }; use arrow_buffer::{BooleanBuffer, Buffer, i256}; use arrow_data::ArrayDataBuilder; @@ -63,37 +62,23 @@ impl IntoBuffer for Vec { impl IntoBuffer for Vec { fn into_buffer(self, target_type: &ArrowType) -> Buffer { + let mut builder = Vec::with_capacity(self.len()); match target_type { ArrowType::Timestamp(TimeUnit::Second, _) => { - let mut builder = TimestampSecondBufferBuilder::new(self.len()); - for v in self { - builder.append(v.to_seconds()) - } - builder.finish() + builder.extend(self.iter().map(|x| x.to_seconds())); } ArrowType::Timestamp(TimeUnit::Millisecond, _) => { - let mut builder = TimestampMillisecondBufferBuilder::new(self.len()); - for v in self { - builder.append(v.to_millis()) - } - builder.finish() + builder.extend(self.iter().map(|x| x.to_millis())); } ArrowType::Timestamp(TimeUnit::Microsecond, _) => { - let mut builder = TimestampMicrosecondBufferBuilder::new(self.len()); - for v in self { - builder.append(v.to_micros()) - } - builder.finish() + builder.extend(self.iter().map(|x| x.to_micros())); } ArrowType::Timestamp(TimeUnit::Nanosecond, _) => { - let mut builder = TimestampNanosecondBufferBuilder::new(self.len()); - for v in self { - builder.append(v.to_nanos()) - } - builder.finish() + builder.extend(self.iter().map(|x| x.to_nanos())); } _ => unreachable!("Invalid target_type for Int96."), } + Buffer::from_vec(builder) } } @@ -168,45 +153,17 @@ where let target_type = &self.data_type; let arrow_data_type = match T::get_physical_type() { PhysicalType::BOOLEAN => ArrowType::Boolean, - PhysicalType::INT32 => { - match target_type { - ArrowType::UInt32 => { - // follow C++ implementation and use overflow/reinterpret cast from i32 to u32 which will map - // `i32::MIN..0` to `(i32::MAX as u32)..u32::MAX` - ArrowType::UInt32 - } - ArrowType::Decimal32(_, _) => target_type.clone(), - _ => ArrowType::Int32, - } - } - PhysicalType::INT64 => { - match target_type { - ArrowType::UInt64 => { - // follow C++ implementation and use overflow/reinterpret cast from i64 to u64 which will map - // `i64::MIN..0` to `(i64::MAX as u64)..u64::MAX` - ArrowType::UInt64 - } - ArrowType::Decimal64(_, _) => target_type.clone(), - _ => ArrowType::Int64, - } - } + PhysicalType::INT32 => ArrowType::Int32, + PhysicalType::INT64 => ArrowType::Int64, PhysicalType::FLOAT => ArrowType::Float32, PhysicalType::DOUBLE => ArrowType::Float64, - PhysicalType::INT96 => match target_type { - ArrowType::Timestamp(TimeUnit::Second, _) => target_type.clone(), - ArrowType::Timestamp(TimeUnit::Millisecond, _) => target_type.clone(), - ArrowType::Timestamp(TimeUnit::Microsecond, _) => target_type.clone(), - ArrowType::Timestamp(TimeUnit::Nanosecond, _) => target_type.clone(), - _ => unreachable!("INT96 must be a timestamp."), - }, + PhysicalType::INT96 => ArrowType::Int64, PhysicalType::BYTE_ARRAY | PhysicalType::FIXED_LEN_BYTE_ARRAY => { unreachable!("PrimitiveArrayReaders don't support complex physical types"); } }; - // Convert to arrays by using the Parquet physical type. - // The physical types are then cast to Arrow types if necessary - + // Convert to equivalent arrow type to parquet physical type let record_data = self .record_reader .consume_record_data() @@ -218,271 +175,10 @@ where .null_bit_buffer(self.record_reader.consume_bitmap_buffer()); let array_data = unsafe { array_data.build_unchecked() }; - let array: ArrayRef = match T::get_physical_type() { - PhysicalType::BOOLEAN => Arc::new(BooleanArray::from(array_data)), - PhysicalType::INT32 => match array_data.data_type() { - ArrowType::UInt32 => Arc::new(UInt32Array::from(array_data)), - ArrowType::Int32 => Arc::new(Int32Array::from(array_data)), - ArrowType::Decimal32(_, _) => Arc::new(Decimal32Array::from(array_data)), - _ => unreachable!(), - }, - PhysicalType::INT64 => match array_data.data_type() { - ArrowType::UInt64 => Arc::new(UInt64Array::from(array_data)), - ArrowType::Int64 => Arc::new(Int64Array::from(array_data)), - ArrowType::Decimal64(_, _) => Arc::new(Decimal64Array::from(array_data)), - _ => unreachable!(), - }, - PhysicalType::FLOAT => Arc::new(Float32Array::from(array_data)), - PhysicalType::DOUBLE => Arc::new(Float64Array::from(array_data)), - PhysicalType::INT96 => match target_type { - ArrowType::Timestamp(TimeUnit::Second, _) => { - Arc::new(TimestampSecondArray::from(array_data)) - } - ArrowType::Timestamp(TimeUnit::Millisecond, _) => { - Arc::new(TimestampMillisecondArray::from(array_data)) - } - ArrowType::Timestamp(TimeUnit::Microsecond, _) => { - Arc::new(TimestampMicrosecondArray::from(array_data)) - } - ArrowType::Timestamp(TimeUnit::Nanosecond, _) => { - Arc::new(TimestampNanosecondArray::from(array_data)) - } - _ => unreachable!("INT96 must be a timestamp."), - }, + let array: ArrayRef = make_array(array_data); - PhysicalType::BYTE_ARRAY | PhysicalType::FIXED_LEN_BYTE_ARRAY => { - unreachable!("PrimitiveArrayReaders don't support complex physical types"); - } - }; - - // cast to Arrow type - // We make a strong assumption here that the casts should be infallible. - // If the cast fails because of incompatible datatypes, then there might - // be a bigger problem with how Arrow schemas are converted to Parquet. - // - // As there is not always a 1:1 mapping between Arrow and Parquet, there - // are datatypes which we must convert explicitly. - // These are: - // - date64: cast int32 to date32, then date32 to date64. - // - decimal: cast int32 to decimal, int64 to decimal - let array = match target_type { - // Using `arrow_cast::cast` has been found to be very slow for converting - // INT32 physical type to lower bitwidth logical types. Since rust casts - // are infallible, instead use `unary` which is much faster (by up to 40%). - // One consequence of this approach is that some malformed integer columns - // will return (an arguably correct) result rather than null. - // See https://github.com/apache/arrow-rs/issues/7040 for a discussion of this - // issue. - ArrowType::UInt8 if *(array.data_type()) == ArrowType::Int32 => { - let array = array - .as_any() - .downcast_ref::() - .unwrap() - .unary(|i| i as u8) as UInt8Array; - Arc::new(array) as ArrayRef - } - ArrowType::Int8 if *(array.data_type()) == ArrowType::Int32 => { - let array = array - .as_any() - .downcast_ref::() - .unwrap() - .unary(|i| i as i8) as Int8Array; - Arc::new(array) as ArrayRef - } - ArrowType::UInt16 if *(array.data_type()) == ArrowType::Int32 => { - let array = array - .as_any() - .downcast_ref::() - .unwrap() - .unary(|i| i as u16) as UInt16Array; - Arc::new(array) as ArrayRef - } - ArrowType::Int16 if *(array.data_type()) == ArrowType::Int32 => { - let array = array - .as_any() - .downcast_ref::() - .unwrap() - .unary(|i| i as i16) as Int16Array; - Arc::new(array) as ArrayRef - } - ArrowType::Date64 if *(array.data_type()) == ArrowType::Int32 => { - // this is cheap as it internally reinterprets the data - let a = arrow_cast::cast(&array, &ArrowType::Date32)?; - arrow_cast::cast(&a, target_type)? - } - ArrowType::Decimal64(p, s) if *(array.data_type()) == ArrowType::Int32 => { - // Apply conversion to all elements regardless of null slots as the conversion - // to `i64` is infallible. This improves performance by avoiding a branch in - // the inner loop (see docs for `PrimitiveArray::unary`). - let array = match array.data_type() { - ArrowType::Int32 => array - .as_any() - .downcast_ref::() - .unwrap() - .unary(|i| i as i64) - as Decimal64Array, - _ => { - return Err(arrow_err!( - "Cannot convert {:?} to decimal", - array.data_type() - )); - } - } - .with_precision_and_scale(*p, *s)?; - - Arc::new(array) as ArrayRef - } - ArrowType::Decimal128(p, s) => { - // See above comment. Conversion to `i128` is likewise infallible. - let array = match array.data_type() { - ArrowType::Int32 => array - .as_any() - .downcast_ref::() - .unwrap() - .unary(|i| i as i128) - as Decimal128Array, - ArrowType::Int64 => array - .as_any() - .downcast_ref::() - .unwrap() - .unary(|i| i as i128) - as Decimal128Array, - _ => { - return Err(arrow_err!( - "Cannot convert {:?} to decimal", - array.data_type() - )); - } - } - .with_precision_and_scale(*p, *s)?; - - Arc::new(array) as ArrayRef - } - ArrowType::Decimal256(p, s) => { - // See above comment. Conversion to `i256` is likewise infallible. - let array = match array.data_type() { - ArrowType::Int32 => array - .as_any() - .downcast_ref::() - .unwrap() - .unary(|i| i256::from_i128(i as i128)) - as Decimal256Array, - ArrowType::Int64 => array - .as_any() - .downcast_ref::() - .unwrap() - .unary(|i| i256::from_i128(i as i128)) - as Decimal256Array, - _ => { - return Err(arrow_err!( - "Cannot convert {:?} to decimal", - array.data_type() - )); - } - } - .with_precision_and_scale(*p, *s)?; - - Arc::new(array) as ArrayRef - } - ArrowType::Dictionary(_, value_type) => match value_type.as_ref() { - ArrowType::Decimal32(p, s) => { - let array = match array.data_type() { - ArrowType::Int32 => array - .as_any() - .downcast_ref::() - .unwrap() - .unary(|i| i) - as Decimal32Array, - _ => { - return Err(arrow_err!( - "Cannot convert {:?} to decimal dictionary", - array.data_type() - )); - } - } - .with_precision_and_scale(*p, *s)?; - - arrow_cast::cast(&array, target_type)? - } - ArrowType::Decimal64(p, s) => { - let array = match array.data_type() { - ArrowType::Int32 => array - .as_any() - .downcast_ref::() - .unwrap() - .unary(|i| i as i64) - as Decimal64Array, - ArrowType::Int64 => array - .as_any() - .downcast_ref::() - .unwrap() - .unary(|i| i) - as Decimal64Array, - _ => { - return Err(arrow_err!( - "Cannot convert {:?} to decimal dictionary", - array.data_type() - )); - } - } - .with_precision_and_scale(*p, *s)?; - - arrow_cast::cast(&array, target_type)? - } - ArrowType::Decimal128(p, s) => { - let array = match array.data_type() { - ArrowType::Int32 => array - .as_any() - .downcast_ref::() - .unwrap() - .unary(|i| i as i128) - as Decimal128Array, - ArrowType::Int64 => array - .as_any() - .downcast_ref::() - .unwrap() - .unary(|i| i as i128) - as Decimal128Array, - _ => { - return Err(arrow_err!( - "Cannot convert {:?} to decimal dictionary", - array.data_type() - )); - } - } - .with_precision_and_scale(*p, *s)?; - - arrow_cast::cast(&array, target_type)? - } - ArrowType::Decimal256(p, s) => { - let array = match array.data_type() { - ArrowType::Int32 => array - .as_any() - .downcast_ref::() - .unwrap() - .unary(i256::from) - as Decimal256Array, - ArrowType::Int64 => array - .as_any() - .downcast_ref::() - .unwrap() - .unary(i256::from) - as Decimal256Array, - _ => { - return Err(arrow_err!( - "Cannot convert {:?} to decimal dictionary", - array.data_type() - )); - } - } - .with_precision_and_scale(*p, *s)?; - - arrow_cast::cast(&array, target_type)? - } - _ => arrow_cast::cast(&array, target_type)?, - }, - _ => arrow_cast::cast(&array, target_type)?, - }; + // Coerce the arrow type to the desired array type + let array = coerce_array(array, target_type)?; // save definition and repetition buffers self.def_levels_buffer = self.record_reader.consume_def_levels(); @@ -504,6 +200,220 @@ where } } +/// Coerce the parquet physical type array to the target type +/// +/// This should match the logic in schema::primitive::apply_hint +fn coerce_array(array: ArrayRef, target_type: &ArrowType) -> Result { + if let ArrowType::Dictionary(key_type, value_type) = target_type { + let dictionary = pack_dictionary(key_type, array.as_ref())?; + let any_dictionary = dictionary.as_any_dictionary(); + + let coerced_values = + coerce_array(Arc::clone(any_dictionary.values()), value_type.as_ref())?; + + return Ok(any_dictionary.with_values(coerced_values)); + } + + match array.data_type() { + ArrowType::Int32 => coerce_i32(array.as_primitive(), target_type), + ArrowType::Int64 => coerce_i64(array.as_primitive(), target_type), + ArrowType::Boolean | ArrowType::Float32 | ArrowType::Float64 => Ok(array), + _ => unreachable!(), + } +} + +fn coerce_i32(array: &Int32Array, target_type: &ArrowType) -> Result { + Ok(match target_type { + ArrowType::UInt8 => { + let array = array.unary(|i| i as u8) as UInt8Array; + Arc::new(array) as ArrayRef + } + ArrowType::Int8 => { + let array = array.unary(|i| i as i8) as Int8Array; + Arc::new(array) as ArrayRef + } + ArrowType::UInt16 => { + let array = array.unary(|i| i as u16) as UInt16Array; + Arc::new(array) as ArrayRef + } + ArrowType::Int16 => { + let array = array.unary(|i| i as i16) as Int16Array; + Arc::new(array) as ArrayRef + } + ArrowType::Int32 => Arc::new(array.clone()), + // follow C++ implementation and use overflow/reinterpret cast from i32 to u32 which will map + // `i32::MIN..0` to `(i32::MAX as u32)..u32::MAX` + ArrowType::UInt32 => Arc::new(UInt32Array::new( + array.values().inner().clone().into(), + array.nulls().cloned(), + )) as ArrayRef, + ArrowType::Date32 => Arc::new(array.reinterpret_cast::()) as _, + ArrowType::Date64 => { + let array: Date64Array = array.unary(|x| x as i64 * 86_400_000); + Arc::new(array) as ArrayRef + } + ArrowType::Time32(TimeUnit::Second) => { + Arc::new(array.reinterpret_cast::()) as ArrayRef + } + ArrowType::Time32(TimeUnit::Millisecond) => { + Arc::new(array.reinterpret_cast::()) as ArrayRef + } + ArrowType::Timestamp(time_unit, timezone) => match time_unit { + TimeUnit::Second => { + let array: TimestampSecondArray = array + .unary(|x| x as i64) + .with_timezone_opt(timezone.clone()); + Arc::new(array) as _ + } + TimeUnit::Millisecond => { + let array: TimestampMillisecondArray = array + .unary(|x| x as i64) + .with_timezone_opt(timezone.clone()); + Arc::new(array) as _ + } + TimeUnit::Microsecond => { + let array: TimestampMicrosecondArray = array + .unary(|x| x as i64) + .with_timezone_opt(timezone.clone()); + Arc::new(array) as _ + } + TimeUnit::Nanosecond => { + let array: TimestampNanosecondArray = array + .unary(|x| x as i64) + .with_timezone_opt(timezone.clone()); + Arc::new(array) as _ + } + }, + ArrowType::Decimal32(p, s) => { + let array = array + .reinterpret_cast::() + .with_precision_and_scale(*p, *s)?; + Arc::new(array) as ArrayRef + } + ArrowType::Decimal64(p, s) => { + let array: Decimal64Array = + array.unary(|i| i as i64).with_precision_and_scale(*p, *s)?; + Arc::new(array) as ArrayRef + } + ArrowType::Decimal128(p, s) => { + let array: Decimal128Array = array + .unary(|i| i as i128) + .with_precision_and_scale(*p, *s)?; + Arc::new(array) as ArrayRef + } + ArrowType::Decimal256(p, s) => { + let array: Decimal256Array = array + .unary(|i| i256::from_i128(i as i128)) + .with_precision_and_scale(*p, *s)?; + Arc::new(array) as ArrayRef + } + _ => unreachable!("Cannot coerce i32 to {target_type}"), + }) +} + +fn coerce_i64(array: &Int64Array, target_type: &ArrowType) -> Result { + Ok(match target_type { + ArrowType::Int64 => Arc::new(array.clone()) as _, + // follow C++ implementation and use overflow/reinterpret cast from i64 to u64 which will map + // `i64::MIN..0` to `(i64::MAX as u64)..u64::MAX` + ArrowType::UInt64 => Arc::new(UInt64Array::new( + array.values().inner().clone().into(), + array.nulls().cloned(), + )) as ArrayRef, + ArrowType::Date64 => Arc::new(array.reinterpret_cast::()) as _, + ArrowType::Time64(TimeUnit::Microsecond) => { + Arc::new(array.reinterpret_cast::()) as _ + } + ArrowType::Time64(TimeUnit::Nanosecond) => { + Arc::new(array.reinterpret_cast::()) as _ + } + ArrowType::Duration(unit) => match unit { + TimeUnit::Second => Arc::new(array.reinterpret_cast::()) as _, + TimeUnit::Millisecond => { + Arc::new(array.reinterpret_cast::()) as _ + } + TimeUnit::Microsecond => { + Arc::new(array.reinterpret_cast::()) as _ + } + TimeUnit::Nanosecond => { + Arc::new(array.reinterpret_cast::()) as _ + } + }, + ArrowType::Timestamp(time_unit, timezone) => match time_unit { + TimeUnit::Second => { + let array = array + .reinterpret_cast::() + .with_timezone_opt(timezone.clone()); + Arc::new(array) as _ + } + TimeUnit::Millisecond => { + let array = array + .reinterpret_cast::() + .with_timezone_opt(timezone.clone()); + Arc::new(array) as _ + } + TimeUnit::Microsecond => { + let array = array + .reinterpret_cast::() + .with_timezone_opt(timezone.clone()); + Arc::new(array) as _ + } + TimeUnit::Nanosecond => { + let array = array + .reinterpret_cast::() + .with_timezone_opt(timezone.clone()); + Arc::new(array) as _ + } + }, + ArrowType::Decimal64(p, s) => { + let array = array + .reinterpret_cast::() + .with_precision_and_scale(*p, *s)?; + Arc::new(array) as _ + } + ArrowType::Decimal128(p, s) => { + let array: Decimal128Array = array + .unary(|i| i as i128) + .with_precision_and_scale(*p, *s)?; + Arc::new(array) as _ + } + ArrowType::Decimal256(p, s) => { + let array: Decimal256Array = array + .unary(|i| i256::from_i128(i as i128)) + .with_precision_and_scale(*p, *s)?; + Arc::new(array) as _ + } + _ => unreachable!("Cannot coerce i64 to {target_type}"), + }) +} + +macro_rules! pack_dictionary_helper { + ($t:ty, $values:ident) => { + match $values.data_type() { + ArrowType::Int32 => pack_dictionary_impl::<$t, Int32Type>($values.as_primitive()), + ArrowType::Int64 => pack_dictionary_impl::<$t, Int64Type>($values.as_primitive()), + ArrowType::Float32 => pack_dictionary_impl::<$t, Float32Type>($values.as_primitive()), + ArrowType::Float64 => pack_dictionary_impl::<$t, Float64Type>($values.as_primitive()), + _ => unreachable!("Invalid physical type"), + } + }; +} + +fn pack_dictionary(key: &ArrowType, values: &dyn Array) -> Result { + downcast_integer! { + key => (pack_dictionary_helper, values), + _ => unreachable!("Invalid key type"), + } +} + +fn pack_dictionary_impl( + values: &PrimitiveArray, +) -> Result { + let mut builder = PrimitiveDictionaryBuilder::::with_capacity(1024, values.len()); + builder.extend(values); + Ok(Arc::new(builder.finish())) +} + #[cfg(test)] mod tests { use super::*; diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index a6cd2006782f..e5c5500d638d 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -26,8 +26,10 @@ use std::vec::IntoIter; use arrow_array::cast::AsArray; use arrow_array::types::*; -use arrow_array::{ArrayRef, RecordBatch, RecordBatchWriter}; -use arrow_schema::{ArrowError, DataType as ArrowDataType, Field, IntervalUnit, SchemaRef}; +use arrow_array::{ArrayRef, Int32Array, RecordBatch, RecordBatchWriter}; +use arrow_schema::{ + ArrowError, DataType as ArrowDataType, Field, IntervalUnit, SchemaRef, TimeUnit, +}; use super::schema::{add_encoded_arrow_schema_to_metadata, decimal_length_from_precision}; @@ -819,7 +821,15 @@ impl ArrowColumnWriter { pub fn write(&mut self, col: &ArrowLeafColumn) -> Result<()> { match &mut self.writer { ArrowColumnWriterImpl::Column(c) => { - write_leaf(c, &col.0)?; + let leaf = col.0.array(); + match leaf.as_any_dictionary_opt() { + Some(dictionary) => { + let materialized = + arrow_select::take::take(dictionary.values(), dictionary.keys(), None)?; + write_leaf(c, &materialized, &col.0)? + } + None => write_leaf(c, leaf, &col.0)?, + }; } ArrowColumnWriterImpl::ByteArray(c) => { write_primitive(c, col.0.array().as_ref(), &col.0)?; @@ -1132,26 +1142,65 @@ impl ArrowColumnWriterFactory { } } -fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result { - let column = levels.array().as_ref(); +fn write_leaf( + writer: &mut ColumnWriter<'_>, + column: &dyn arrow_array::Array, + levels: &ArrayLevels, +) -> Result { let indices = levels.non_null_indices(); + match writer { + // Note: this should match the contents of arrow_to_parquet_type ColumnWriter::Int32ColumnWriter(typed) => { match column.data_type() { - ArrowDataType::Date64 => { - // If the column is a Date64, we cast it to a Date32, and then interpret that as Int32 - let array = arrow_cast::cast(column, &ArrowDataType::Date32)?; - let array = arrow_cast::cast(&array, &ArrowDataType::Int32)?; - - let array = array.as_primitive::(); + ArrowDataType::Null => { + let array = Int32Array::new_null(column.len()); + write_primitive(typed, array.values(), levels) + } + ArrowDataType::Int8 => { + let array: Int32Array = column.as_primitive::().unary(|x| x as i32); + write_primitive(typed, array.values(), levels) + } + ArrowDataType::Int16 => { + let array: Int32Array = column.as_primitive::().unary(|x| x as i32); + write_primitive(typed, array.values(), levels) + } + ArrowDataType::Int32 => { + write_primitive(typed, column.as_primitive::().values(), levels) + } + ArrowDataType::UInt8 => { + let array: Int32Array = column.as_primitive::().unary(|x| x as i32); + write_primitive(typed, array.values(), levels) + } + ArrowDataType::UInt16 => { + let array: Int32Array = column.as_primitive::().unary(|x| x as i32); write_primitive(typed, array.values(), levels) } ArrowDataType::UInt32 => { - let values = column.as_primitive::().values(); // follow C++ implementation and use overflow/reinterpret cast from u32 to i32 which will map // `(i32::MAX as u32)..u32::MAX` to `i32::MIN..0` - let array = values.inner().typed_data::(); - write_primitive(typed, array, levels) + let array = column.as_primitive::(); + write_primitive(typed, array.values().inner().typed_data(), levels) + } + ArrowDataType::Date32 => { + let array = column.as_primitive::(); + write_primitive(typed, array.values(), levels) + } + ArrowDataType::Time32(TimeUnit::Second) => { + let array = column.as_primitive::(); + write_primitive(typed, array.values(), levels) + } + ArrowDataType::Time32(TimeUnit::Millisecond) => { + let array = column.as_primitive::(); + write_primitive(typed, array.values(), levels) + } + ArrowDataType::Date64 => { + // If the column is a Date64, we truncate it + let array: Int32Array = column + .as_primitive::() + .unary(|x| (x / 86_400_000) as _); + + write_primitive(typed, array.values(), levels) } ArrowDataType::Decimal32(_, _) => { let array = column @@ -1180,46 +1229,7 @@ fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result(|v| v.as_i128() as i32); write_primitive(typed, array.values(), levels) } - ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() { - ArrowDataType::Decimal32(_, _) => { - let array = arrow_cast::cast(column, value_type)?; - let array = array - .as_primitive::() - .unary::<_, Int32Type>(|v| v); - write_primitive(typed, array.values(), levels) - } - ArrowDataType::Decimal64(_, _) => { - let array = arrow_cast::cast(column, value_type)?; - let array = array - .as_primitive::() - .unary::<_, Int32Type>(|v| v as i32); - write_primitive(typed, array.values(), levels) - } - ArrowDataType::Decimal128(_, _) => { - let array = arrow_cast::cast(column, value_type)?; - let array = array - .as_primitive::() - .unary::<_, Int32Type>(|v| v as i32); - write_primitive(typed, array.values(), levels) - } - ArrowDataType::Decimal256(_, _) => { - let array = arrow_cast::cast(column, value_type)?; - let array = array - .as_primitive::() - .unary::<_, Int32Type>(|v| v.as_i128() as i32); - write_primitive(typed, array.values(), levels) - } - _ => { - let array = arrow_cast::cast(column, &ArrowDataType::Int32)?; - let array = array.as_primitive::(); - write_primitive(typed, array.values(), levels) - } - }, - _ => { - let array = arrow_cast::cast(column, &ArrowDataType::Int32)?; - let array = array.as_primitive::(); - write_primitive(typed, array.values(), levels) - } + d => Err(ParquetError::General(format!("Cannot coerce {d} to I32"))), } } ColumnWriter::BoolColumnWriter(typed) => { @@ -1233,9 +1243,10 @@ fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result { match column.data_type() { ArrowDataType::Date64 => { - let array = arrow_cast::cast(column, &ArrowDataType::Int64)?; + let array = column + .as_primitive::() + .reinterpret_cast::(); - let array = array.as_primitive::(); write_primitive(typed, array.values(), levels) } ArrowDataType::Int64 => { @@ -1249,10 +1260,54 @@ fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result(); write_primitive(typed, array, levels) } + ArrowDataType::Time64(TimeUnit::Microsecond) => { + let array = column.as_primitive::(); + write_primitive(typed, array.values(), levels) + } + ArrowDataType::Time64(TimeUnit::Nanosecond) => { + let array = column.as_primitive::(); + write_primitive(typed, array.values(), levels) + } + ArrowDataType::Timestamp(unit, _) => match unit { + TimeUnit::Second => { + let array = column.as_primitive::(); + write_primitive(typed, array.values(), levels) + } + TimeUnit::Millisecond => { + let array = column.as_primitive::(); + write_primitive(typed, array.values(), levels) + } + TimeUnit::Microsecond => { + let array = column.as_primitive::(); + write_primitive(typed, array.values(), levels) + } + TimeUnit::Nanosecond => { + let array = column.as_primitive::(); + write_primitive(typed, array.values(), levels) + } + }, + ArrowDataType::Duration(unit) => match unit { + TimeUnit::Second => { + let array = column.as_primitive::(); + write_primitive(typed, array.values(), levels) + } + TimeUnit::Millisecond => { + let array = column.as_primitive::(); + write_primitive(typed, array.values(), levels) + } + TimeUnit::Microsecond => { + let array = column.as_primitive::(); + write_primitive(typed, array.values(), levels) + } + TimeUnit::Nanosecond => { + let array = column.as_primitive::(); + write_primitive(typed, array.values(), levels) + } + }, ArrowDataType::Decimal64(_, _) => { let array = column .as_primitive::() - .unary::<_, Int64Type>(|v| v); + .reinterpret_cast::(); write_primitive(typed, array.values(), levels) } ArrowDataType::Decimal128(_, _) => { @@ -1269,39 +1324,7 @@ fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result(|v| v.as_i128() as i64); write_primitive(typed, array.values(), levels) } - ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() { - ArrowDataType::Decimal64(_, _) => { - let array = arrow_cast::cast(column, value_type)?; - let array = array - .as_primitive::() - .unary::<_, Int64Type>(|v| v); - write_primitive(typed, array.values(), levels) - } - ArrowDataType::Decimal128(_, _) => { - let array = arrow_cast::cast(column, value_type)?; - let array = array - .as_primitive::() - .unary::<_, Int64Type>(|v| v as i64); - write_primitive(typed, array.values(), levels) - } - ArrowDataType::Decimal256(_, _) => { - let array = arrow_cast::cast(column, value_type)?; - let array = array - .as_primitive::() - .unary::<_, Int64Type>(|v| v.as_i128() as i64); - write_primitive(typed, array.values(), levels) - } - _ => { - let array = arrow_cast::cast(column, &ArrowDataType::Int64)?; - let array = array.as_primitive::(); - write_primitive(typed, array.values(), levels) - } - }, - _ => { - let array = arrow_cast::cast(column, &ArrowDataType::Int64)?; - let array = array.as_primitive::(); - write_primitive(typed, array.values(), levels) - } + d => Err(ParquetError::General(format!("Cannot coerce {d} to I64"))), } } ColumnWriter::Int96ColumnWriter(_typed) => { @@ -1322,17 +1345,11 @@ fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result match interval_unit { IntervalUnit::YearMonth => { - let array = column - .as_any() - .downcast_ref::() - .unwrap(); + let array = column.as_primitive::(); get_interval_ym_array_slice(array, indices) } IntervalUnit::DayTime => { - let array = column - .as_any() - .downcast_ref::() - .unwrap(); + let array = column.as_primitive::(); get_interval_dt_array_slice(array, indices) } _ => { @@ -1342,10 +1359,7 @@ fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result { - let array = column - .as_any() - .downcast_ref::() - .unwrap(); + let array = column.as_fixed_size_binary(); get_fsb_array_slice(array, indices) } ArrowDataType::Decimal32(_, _) => { @@ -1361,10 +1375,7 @@ fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result { - let array = column - .as_any() - .downcast_ref::() - .unwrap(); + let array = column.as_primitive::(); get_decimal_256_array_slice(array, indices) } ArrowDataType::Float16 => { @@ -1533,7 +1544,7 @@ mod tests { use crate::file::page_index::column_index::ColumnIndexMetaData; use crate::file::reader::SerializedPageReader; use crate::parquet_thrift::{ReadThrift, ThriftSliceInputProtocol}; - use crate::schema::types::{ColumnPath, Type}; + use crate::schema::types::ColumnPath; use arrow::datatypes::ToByteSlice; use arrow::datatypes::{DataType, Schema}; use arrow::error::Result as ArrowResult; @@ -4203,68 +4214,6 @@ mod tests { } } - #[test] - fn test_arrow_writer_explicit_schema() { - // Write an int32 array using explicit int64 storage - let batch_schema = Arc::new(Schema::new(vec![Field::new( - "integers", - DataType::Int32, - true, - )])); - let parquet_schema = Type::group_type_builder("root") - .with_fields(vec![ - Type::primitive_type_builder("integers", crate::basic::Type::INT64) - .build() - .unwrap() - .into(), - ]) - .build() - .unwrap(); - let parquet_schema_descr = SchemaDescriptor::new(parquet_schema.into()); - - let batch = RecordBatch::try_new( - batch_schema.clone(), - vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _], - ) - .unwrap(); - - let explicit_schema_options = - ArrowWriterOptions::new().with_parquet_schema(parquet_schema_descr); - let mut buf = Vec::with_capacity(1024); - let mut writer = ArrowWriter::try_new_with_options( - &mut buf, - batch_schema.clone(), - explicit_schema_options, - ) - .unwrap(); - writer.write(&batch).unwrap(); - writer.close().unwrap(); - - let bytes = Bytes::from(buf); - let reader_builder = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap(); - - let expected_schema = Arc::new(Schema::new(vec![Field::new( - "integers", - DataType::Int64, - true, - )])); - assert_eq!(reader_builder.schema(), &expected_schema); - - let batches = reader_builder - .build() - .unwrap() - .collect::, ArrowError>>() - .unwrap(); - assert_eq!(batches.len(), 1); - - let expected_batch = RecordBatch::try_new( - expected_schema.clone(), - vec![Arc::new(Int64Array::from(vec![1, 2, 3, 4])) as _], - ) - .unwrap(); - assert_eq!(batches[0], expected_batch); - } - #[test] fn mismatched_schemas() { let batch_schema = Schema::new(vec![Field::new("count", DataType::Int32, false)]); diff --git a/parquet/src/arrow/buffer/dictionary_buffer.rs b/parquet/src/arrow/buffer/dictionary_buffer.rs index 49328d8c9610..71fb18917d9a 100644 --- a/parquet/src/arrow/buffer/dictionary_buffer.rs +++ b/parquet/src/arrow/buffer/dictionary_buffer.rs @@ -18,7 +18,14 @@ use crate::arrow::buffer::offset_buffer::OffsetBuffer; use crate::arrow::record_reader::buffer::ValuesBuffer; use crate::errors::{ParquetError, Result}; -use arrow_array::{Array, ArrayRef, OffsetSizeTrait, make_array}; +use arrow_array::{Array, GenericByteArray, downcast_integer}; +use arrow_array::{ + ArrayRef, FixedSizeBinaryArray, OffsetSizeTrait, + builder::{FixedSizeBinaryDictionaryBuilder, GenericByteDictionaryBuilder}, + cast::AsArray, + make_array, + types::{ArrowDictionaryKeyType, ByteArrayType}, +}; use arrow_buffer::{ArrowNativeType, Buffer}; use arrow_data::ArrayDataBuilder; use arrow_schema::DataType as ArrowType; @@ -158,7 +165,12 @@ impl DictionaryBuffer { unreachable!() }; let values = if let ArrowType::FixedSizeBinary(size) = **value_type { - arrow_cast::cast(&values, &ArrowType::FixedSizeBinary(size)).unwrap() + let binary = values.as_binary::(); + Arc::new(FixedSizeBinaryArray::new( + size, + binary.values().clone(), + binary.nulls().cloned(), + )) as _ } else { values }; @@ -177,17 +189,13 @@ impl DictionaryBuffer { Ok(make_array(data)) } Self::Values { values } => { - let value_type = match data_type { - ArrowType::Dictionary(_, v) => v.as_ref().clone(), + let (key_type, value_type) = match data_type { + ArrowType::Dictionary(k, v) => (k, v.as_ref().clone()), _ => unreachable!(), }; - // This will compute a new dictionary - let array = - arrow_cast::cast(&values.into_array(null_buffer, value_type), data_type) - .expect("cast should be infallible"); - - Ok(array) + let array = values.into_array(null_buffer, value_type); + pack_values(key_type, &array) } } } @@ -213,6 +221,60 @@ impl ValuesBuffer for DictionaryBuffer { + match $array.data_type() { + ArrowType::Utf8 => pack_values_impl::<$k, _>($array.as_string::()), + ArrowType::LargeUtf8 => pack_values_impl::<$k, _>($array.as_string::()), + ArrowType::Binary => pack_values_impl::<$k, _>($array.as_binary::()), + ArrowType::LargeBinary => pack_values_impl::<$k, _>($array.as_binary::()), + ArrowType::FixedSizeBinary(_) => { + pack_fixed_values_impl::<$k>($array.as_fixed_size_binary()) + } + _ => unreachable!(), + } + }; +} + +fn pack_values(key_type: &ArrowType, values: &ArrayRef) -> Result { + downcast_integer! { + key_type => (dict_helper, values), + _ => unreachable!(), + } +} + +fn pack_values_impl( + array: &GenericByteArray, +) -> Result { + let mut builder = GenericByteDictionaryBuilder::::with_capacity(array.len(), 1024, 1024); + for x in array { + match x { + Some(x) => builder.append_value(x), + None => builder.append_null(), + } + } + let raw = builder.finish(); + Ok(Arc::new(raw)) +} + +fn pack_fixed_values_impl( + array: &FixedSizeBinaryArray, +) -> Result { + let mut builder = FixedSizeBinaryDictionaryBuilder::::with_capacity( + array.len(), + 1024, + array.value_length(), + ); + for x in array { + match x { + Some(x) => builder.append_value(x), + None => builder.append_null(), + } + } + let raw = builder.finish(); + Ok(Arc::new(raw)) +} + #[cfg(test)] mod tests { use super::*;