Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 107 additions & 29 deletions rust/parquet/src/arrow/array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ use arrow::array::{
use arrow::buffer::{Buffer, MutableBuffer};
use arrow::datatypes::{
ArrowPrimitiveType, BooleanType as ArrowBooleanType, DataType as ArrowType,
Date32Type as ArrowDate32Type, Date64Type as ArrowDate64Type,
Date32Type as ArrowDate32Type, Date64Type as ArrowDate64Type, DateUnit,
DurationMicrosecondType as ArrowDurationMicrosecondType,
DurationMillisecondType as ArrowDurationMillisecondType,
DurationNanosecondType as ArrowDurationNanosecondType,
DurationSecondType as ArrowDurationSecondType, Field,
Float32Type as ArrowFloat32Type, Float64Type as ArrowFloat64Type,
Int16Type as ArrowInt16Type, Int32Type as ArrowInt32Type,
Int64Type as ArrowInt64Type, Int8Type as ArrowInt8Type, Schema,
Int64Type as ArrowInt64Type, Int8Type as ArrowInt8Type, IntervalUnit, Schema,
Time32MillisecondType as ArrowTime32MillisecondType,
Time32SecondType as ArrowTime32SecondType,
Time64MicrosecondType as ArrowTime64MicrosecondType,
Expand All @@ -57,7 +57,9 @@ use arrow::util::bit_util;
use crate::arrow::converter::{
BinaryArrayConverter, BinaryConverter, Converter, DecimalArrayConverter,
DecimalConverter, FixedLenBinaryConverter, FixedSizeArrayConverter,
Int96ArrayConverter, Int96Converter, LargeBinaryArrayConverter, LargeBinaryConverter,
Int96ArrayConverter, Int96Converter, IntervalDayTimeArrayConverter,
IntervalDayTimeConverter, IntervalYearMonthArrayConverter,
IntervalYearMonthConverter, LargeBinaryArrayConverter, LargeBinaryConverter,
LargeUtf8ArrayConverter, LargeUtf8Converter, Utf8ArrayConverter, Utf8Converter,
};
use crate::arrow::record_reader::RecordReader;
Expand Down Expand Up @@ -333,11 +335,23 @@ impl<T: DataType> ArrayReader for PrimitiveArrayReader<T> {
};

// cast to Arrow type
// TODO: we need to check if it's fine for this to be fallible.
// My assumption is that we can't get to an illegal cast as we can only
// generate types that are supported, because we'd have gotten them from
// the metadata which was written to the Parquet sink
let array = arrow::compute::cast(&array, self.get_data_type())?;
// We make a strong assumption here that the casts should be infallible.
// If the cast fails because of incompatible datatypes, then there might
// be a bigger problem with how Arrow schemas are converted to Parquet.
//
// As there is not always a 1:1 mapping between Arrow and Parquet, there
// are datatypes which we must convert explicitly.
// These are:
// - date64: we should cast int32 to date32, then date32 to date64.
let target_type = self.get_data_type();
let array = match target_type {
ArrowType::Date64(_) => {
// this is cheap as it internally reinterprets the data
let a = arrow::compute::cast(&array, &ArrowType::Date32(DateUnit::Day))?;
arrow::compute::cast(&a, target_type)?
}
_ => arrow::compute::cast(&array, target_type)?,
};

// save definition and repetition buffers
self.def_levels_buffer = self.record_reader.consume_def_levels()?;
Expand Down Expand Up @@ -1570,28 +1584,92 @@ impl<'a> ArrayReaderBuilder {
)?))
}
PhysicalType::FIXED_LEN_BYTE_ARRAY => {
let byte_width = match *cur_type {
Type::PrimitiveType {
ref type_length, ..
} => *type_length,
_ => {
return Err(ArrowError(
"Expected a physical type, not a group type".to_string(),
))
if cur_type.get_basic_info().logical_type() == LogicalType::INTERVAL {
let byte_width = match *cur_type {
Type::PrimitiveType {
ref type_length, ..
} => *type_length,
_ => {
return Err(ArrowError(
"Expected a physical type, not a group type".to_string(),
))
}
};
if byte_width != 12 {
return Err(ArrowError(format!(
"Parquet interval type should have length of 12, found {}",
byte_width
)));
}
};
let converter = FixedLenBinaryConverter::new(
FixedSizeArrayConverter::new(byte_width),
);
Ok(Box::new(ComplexObjectArrayReader::<
FixedLenByteArrayType,
FixedLenBinaryConverter,
>::new(
page_iterator,
column_desc,
converter,
arrow_type,
)?))
match arrow_type {
Some(ArrowType::Interval(IntervalUnit::DayTime)) => {
let converter = IntervalDayTimeConverter::new(
IntervalDayTimeArrayConverter {},
);
Ok(Box::new(ComplexObjectArrayReader::<
FixedLenByteArrayType,
IntervalDayTimeConverter,
>::new(
page_iterator,
column_desc,
converter,
arrow_type,
)?))
}
Some(ArrowType::Interval(IntervalUnit::YearMonth)) => {
let converter = IntervalYearMonthConverter::new(
IntervalYearMonthArrayConverter {},
);
Ok(Box::new(ComplexObjectArrayReader::<
FixedLenByteArrayType,
IntervalYearMonthConverter,
>::new(
page_iterator,
column_desc,
converter,
arrow_type,
)?))
}
Some(t) => Err(ArrowError(format!(
"Cannot write a Parquet interval to {:?}",
t
))),
None => {
// we do not support an interval not matched to an Arrow type,
// because we risk data loss as we won't know which of the 12 bytes
// are or should be populated
Err(ArrowError(
"Cannot write a Parquet interval with no Arrow type specified.
There is a risk of data loss as Arrow either supports YearMonth or
DayTime precision. Without the Arrow type, we cannot infer the type.
".to_string()
))
}
}
} else {
let byte_width = match *cur_type {
Type::PrimitiveType {
ref type_length, ..
} => *type_length,
_ => {
return Err(ArrowError(
"Expected a physical type, not a group type".to_string(),
))
}
};
let converter = FixedLenBinaryConverter::new(
FixedSizeArrayConverter::new(byte_width),
);
Ok(Box::new(ComplexObjectArrayReader::<
FixedLenByteArrayType,
FixedLenBinaryConverter,
>::new(
page_iterator,
column_desc,
converter,
arrow_type,
)?))
}
}
}
}
Expand Down
24 changes: 20 additions & 4 deletions rust/parquet/src/arrow/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,8 @@ impl ParquetRecordBatchReader {
mod tests {
use crate::arrow::arrow_reader::{ArrowReader, ParquetFileArrowReader};
use crate::arrow::converter::{
Converter, FixedSizeArrayConverter, FromConverter, Utf8ArrayConverter,
Converter, FixedSizeArrayConverter, FromConverter, IntervalDayTimeArrayConverter,
Utf8ArrayConverter,
};
use crate::column::writer::get_typed_column_writer_mut;
use crate::data_type::{
Expand All @@ -249,9 +250,7 @@ mod tests {
use crate::schema::parser::parse_message_type;
use crate::schema::types::TypePtr;
use crate::util::test_common::{get_temp_filename, RandGen};
use arrow::array::{
Array, BooleanArray, FixedSizeBinaryArray, StringArray, StructArray,
};
use arrow::array::*;
use arrow::record_batch::RecordBatchReader;
use rand::RngCore;
use serde_json::json;
Expand Down Expand Up @@ -362,6 +361,23 @@ mod tests {
>(20, message_type, &converter);
}

#[test]
fn test_interval_day_time_column_reader() {
let message_type = "
message test_schema {
REQUIRED FIXED_LEN_BYTE_ARRAY (12) leaf (INTERVAL);
}
";

let converter = IntervalDayTimeArrayConverter {};
run_single_column_reader_tests::<
FixedLenByteArrayType,
IntervalDayTimeArray,
IntervalDayTimeArrayConverter,
RandFixedLenGen,
>(12, message_type, &converter);
}

struct RandUtf8Gen {}

impl RandGen<ByteArrayType> for RandUtf8Gen {
Expand Down
105 changes: 94 additions & 11 deletions rust/parquet/src/arrow/arrow_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use std::sync::Arc;

use arrow::array as arrow_array;
use arrow::datatypes::{DataType as ArrowDataType, SchemaRef};
use arrow::datatypes::{DataType as ArrowDataType, DateUnit, IntervalUnit, SchemaRef};
use arrow::record_batch::RecordBatch;
use arrow_array::Array;

Expand Down Expand Up @@ -217,15 +217,20 @@ fn write_leaf(
let indices = filter_array_indices(&levels);
let written = match writer {
ColumnWriter::Int32ColumnWriter(ref mut typed) => {
let array = arrow::compute::cast(column, &ArrowDataType::Int32)?;
// If the column is a Date64, we cast it to a Date32, and then interpret that as Int32
let array = if let ArrowDataType::Date64(_) = column.data_type() {
let array =
arrow::compute::cast(column, &ArrowDataType::Date32(DateUnit::Day))?;
Arc::new(arrow_array::Int32Array::from(array.data()))
} else {
arrow::compute::cast(column, &ArrowDataType::Int32)?
};
let array = array
.as_any()
.downcast_ref::<arrow_array::Int32Array>()
.expect("Unable to get int32 array");
// assigning values to make it easier to debug
let slice = get_numeric_array_slice::<Int32Type, _>(&array, &indices);
typed.write_batch(
slice.as_slice(),
get_numeric_array_slice::<Int32Type, _>(&array, &indices).as_slice(),
Some(levels.definition.as_slice()),
levels.repetition.as_deref(),
)?
Expand Down Expand Up @@ -300,8 +305,43 @@ fn write_leaf(
}
_ => unreachable!("Currently unreachable because data type not supported"),
},
ColumnWriter::FixedLenByteArrayColumnWriter(ref mut _typed) => {
unreachable!("Currently unreachable because data type not supported")
ColumnWriter::FixedLenByteArrayColumnWriter(ref mut typed) => {
let bytes = match column.data_type() {
ArrowDataType::Interval(interval_unit) => match interval_unit {
IntervalUnit::YearMonth => {
let array = column
.as_any()
.downcast_ref::<arrow_array::IntervalYearMonthArray>()
.unwrap();
get_interval_ym_array_slice(&array, &indices)
}
IntervalUnit::DayTime => {
let array = column
.as_any()
.downcast_ref::<arrow_array::IntervalDayTimeArray>()
.unwrap();
get_interval_dt_array_slice(&array, &indices)
}
},
ArrowDataType::FixedSizeBinary(_) => {
let array = column
.as_any()
.downcast_ref::<arrow_array::FixedSizeBinaryArray>()
.unwrap();
get_fsb_array_slice(&array, &indices)
}
_ => {
return Err(ParquetError::NYI(
"Attempting to write an Arrow type that is not yet implemented"
.to_string(),
));
}
};
typed.write_batch(
bytes.as_slice(),
Some(levels.definition.as_slice()),
levels.repetition.as_deref(),
)?
}
};
Ok(written as i64)
Expand Down Expand Up @@ -358,6 +398,51 @@ fn get_bool_array_slice(
values
}

/// Returns 12-byte values representing 3 values of months, days and milliseconds (4-bytes each).
/// An Arrow YearMonth interval only stores months, thus only the first 4 bytes are populated.
fn get_interval_ym_array_slice(
array: &arrow_array::IntervalYearMonthArray,
indices: &[usize],
) -> Vec<FixedLenByteArray> {
let mut values = Vec::with_capacity(indices.len());
for i in indices {
let mut value = array.value(*i).to_le_bytes().to_vec();
let mut suffix = vec![0; 8];
value.append(&mut suffix);
values.push(FixedLenByteArray::from(ByteArray::from(value)))
}
values
}

/// Returns 12-byte values representing 3 values of months, days and milliseconds (4-bytes each).
/// An Arrow DayTime interval only stores days and millis, thus the first 4 bytes are not populated.
fn get_interval_dt_array_slice(
array: &arrow_array::IntervalDayTimeArray,
indices: &[usize],
) -> Vec<FixedLenByteArray> {
let mut values = Vec::with_capacity(indices.len());
for i in indices {
let mut prefix = vec![0; 4];
let mut value = array.value(*i).to_le_bytes().to_vec();
prefix.append(&mut value);
debug_assert_eq!(prefix.len(), 12);
values.push(FixedLenByteArray::from(ByteArray::from(prefix)));
}
values
}

fn get_fsb_array_slice(
array: &arrow_array::FixedSizeBinaryArray,
indices: &[usize],
) -> Vec<FixedLenByteArray> {
let mut values = Vec::with_capacity(indices.len());
for i in indices {
let value = array.value(*i).to_vec();
values.push(FixedLenByteArray::from(ByteArray::from(value)))
}
values
}

/// Given a level's information, calculate the offsets required to index an array
/// correctly.
fn filter_array_indices(level: &LevelInfo) -> Vec<usize> {
Expand Down Expand Up @@ -955,10 +1040,10 @@ mod tests {
}

#[test]
#[ignore] // Date support isn't correct yet
fn date64_single_column() {
// Date64 must be a multiple of 86400000, see ARROW-10925
required_and_optional::<Date64Array, _>(
0..SMALL_SIZE as i64,
(0..(SMALL_SIZE as i64 * 86400000)).step_by(86400000),
"date64_single_column",
);
}
Expand Down Expand Up @@ -1032,7 +1117,6 @@ mod tests {
}

#[test]
#[should_panic(expected = "Currently unreachable because data type not supported")]
fn interval_year_month_single_column() {
required_and_optional::<IntervalYearMonthArray, _>(
0..SMALL_SIZE as i32,
Expand All @@ -1041,7 +1125,6 @@ mod tests {
}

#[test]
#[should_panic(expected = "Currently unreachable because data type not supported")]
fn interval_day_time_single_column() {
required_and_optional::<IntervalDayTimeArray, _>(
0..SMALL_SIZE as i64,
Expand Down
Loading