Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 65 additions & 15 deletions parquet/src/arrow/array_reader/primitive_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@ use crate::column::page::PageIterator;
use crate::data_type::{DataType, Int96};
use crate::errors::{ParquetError, Result};
use crate::schema::types::ColumnDescPtr;
use arrow_array::Decimal256Array;
use arrow_array::{
builder::TimestampNanosecondBufferBuilder, ArrayRef, BooleanArray, Decimal128Array,
Float32Array, Float64Array, Int32Array, Int64Array, TimestampNanosecondArray, UInt32Array,
UInt64Array,
builder::{
TimestampMicrosecondBufferBuilder, TimestampMillisecondBufferBuilder,
TimestampNanosecondBufferBuilder, TimestampSecondBufferBuilder,
},
ArrayRef, BooleanArray, Decimal128Array, Decimal256Array, Float32Array, Float64Array,
Int32Array, Int64Array, TimestampMicrosecondArray, TimestampMillisecondArray,
TimestampNanosecondArray, TimestampSecondArray, UInt32Array, UInt64Array,
};
use arrow_buffer::{i256, BooleanBuffer, Buffer};
use arrow_data::ArrayDataBuilder;
Expand All @@ -37,13 +40,13 @@ use std::sync::Arc;

/// Provides conversion from `Vec<T>` to `Buffer`
pub trait IntoBuffer {
fn into_buffer(self) -> Buffer;
fn into_buffer(self, target_type: &ArrowType) -> Buffer;
}

macro_rules! native_buffer {
($($t:ty),*) => {
$(impl IntoBuffer for Vec<$t> {
fn into_buffer(self) -> Buffer {
fn into_buffer(self, _target_type: &ArrowType) -> Buffer {
Buffer::from_vec(self)
}
})*
Expand All @@ -52,18 +55,44 @@ macro_rules! native_buffer {
native_buffer!(i8, i16, i32, i64, u8, u16, u32, u64, f32, f64);

impl IntoBuffer for Vec<bool> {
fn into_buffer(self) -> Buffer {
fn into_buffer(self, _target_type: &ArrowType) -> Buffer {
BooleanBuffer::from_iter(self).into_inner()
}
}

impl IntoBuffer for Vec<Int96> {
fn into_buffer(self) -> Buffer {
let mut builder = TimestampNanosecondBufferBuilder::new(self.len());
for v in self {
builder.append(v.to_nanos())
fn into_buffer(self, target_type: &ArrowType) -> Buffer {
match target_type {
ArrowType::Timestamp(TimeUnit::Second, _) => {
let mut builder = TimestampSecondBufferBuilder::new(self.len());
for v in self {
builder.append(v.to_seconds())
}
builder.finish()
}
ArrowType::Timestamp(TimeUnit::Millisecond, _) => {
let mut builder = TimestampMillisecondBufferBuilder::new(self.len());
for v in self {
builder.append(v.to_millis())
}
builder.finish()
}
ArrowType::Timestamp(TimeUnit::Microsecond, _) => {
let mut builder = TimestampMicrosecondBufferBuilder::new(self.len());
for v in self {
builder.append(v.to_micros())
}
builder.finish()
}
ArrowType::Timestamp(TimeUnit::Nanosecond, _) => {
let mut builder = TimestampNanosecondBufferBuilder::new(self.len());
for v in self {
builder.append(v.to_nanos())
}
builder.finish()
}
_ => unreachable!("Invalid target_type for Int96."),
}
builder.finish()
}
}

Expand Down Expand Up @@ -161,8 +190,11 @@ where
PhysicalType::FLOAT => ArrowType::Float32,
PhysicalType::DOUBLE => ArrowType::Float64,
PhysicalType::INT96 => match target_type {
ArrowType::Timestamp(TimeUnit::Second, _) => target_type.clone(),
ArrowType::Timestamp(TimeUnit::Millisecond, _) => target_type.clone(),
ArrowType::Timestamp(TimeUnit::Microsecond, _) => target_type.clone(),
ArrowType::Timestamp(TimeUnit::Nanosecond, _) => target_type.clone(),
_ => unreachable!("INT96 must be timestamp nanosecond"),
_ => unreachable!("INT96 must be a timestamp."),
},
PhysicalType::BYTE_ARRAY | PhysicalType::FIXED_LEN_BYTE_ARRAY => {
unreachable!("PrimitiveArrayReaders don't support complex physical types");
Expand All @@ -172,7 +204,10 @@ where
// Convert to arrays by using the Parquet physical type.
// The physical types are then cast to Arrow types if necessary

let record_data = self.record_reader.consume_record_data().into_buffer();
let record_data = self
.record_reader
.consume_record_data()
.into_buffer(target_type);

let array_data = ArrayDataBuilder::new(arrow_data_type)
.len(self.record_reader.num_values())
Expand All @@ -194,7 +229,22 @@ where
},
PhysicalType::FLOAT => Arc::new(Float32Array::from(array_data)),
PhysicalType::DOUBLE => Arc::new(Float64Array::from(array_data)),
PhysicalType::INT96 => Arc::new(TimestampNanosecondArray::from(array_data)),
PhysicalType::INT96 => match target_type {
ArrowType::Timestamp(TimeUnit::Second, _) => {
Arc::new(TimestampSecondArray::from(array_data))
}
ArrowType::Timestamp(TimeUnit::Millisecond, _) => {
Arc::new(TimestampMillisecondArray::from(array_data))
}
ArrowType::Timestamp(TimeUnit::Microsecond, _) => {
Arc::new(TimestampMicrosecondArray::from(array_data))
}
ArrowType::Timestamp(TimeUnit::Nanosecond, _) => {
Arc::new(TimestampNanosecondArray::from(array_data))
}
_ => unreachable!("INT96 must be a timestamp."),
},

PhysicalType::BYTE_ARRAY | PhysicalType::FIXED_LEN_BYTE_ARRAY => {
unreachable!("PrimitiveArrayReaders don't support complex physical types");
}
Expand Down
79 changes: 69 additions & 10 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1015,7 +1015,7 @@ mod tests {
use crate::column::reader::decoder::REPETITION_LEVELS_BATCH_SIZE;
use crate::data_type::{
BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray, FixedLenByteArrayType,
FloatType, Int32Type, Int64Type, Int96Type,
FloatType, Int32Type, Int64Type, Int96, Int96Type,
};
use crate::errors::Result;
use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
Expand Down Expand Up @@ -1517,17 +1517,76 @@ mod tests {
#[test]
fn test_int96_single_column_reader_test() {
let encodings = &[Encoding::PLAIN, Encoding::RLE_DICTIONARY];
run_single_column_reader_tests::<Int96Type, _, Int96Type>(
2,
ConvertedType::NONE,
None,
|vals| {

type TypeHintAndConversionFunction =
(Option<ArrowDataType>, fn(&[Option<Int96>]) -> ArrayRef);

let resolutions: Vec<TypeHintAndConversionFunction> = vec![
// Test without a specified ArrowType hint.
(None, |vals: &[Option<Int96>]| {
Arc::new(TimestampNanosecondArray::from_iter(
vals.iter().map(|x| x.map(|x| x.to_nanos())),
)) as _
},
encodings,
);
)) as ArrayRef
}),
// Test other TimeUnits as ArrowType hints.
(
Some(ArrowDataType::Timestamp(TimeUnit::Second, None)),
|vals: &[Option<Int96>]| {
Arc::new(TimestampSecondArray::from_iter(
vals.iter().map(|x| x.map(|x| x.to_seconds())),
)) as ArrayRef
},
),
(
Some(ArrowDataType::Timestamp(TimeUnit::Millisecond, None)),
|vals: &[Option<Int96>]| {
Arc::new(TimestampMillisecondArray::from_iter(
vals.iter().map(|x| x.map(|x| x.to_millis())),
)) as ArrayRef
},
),
(
Some(ArrowDataType::Timestamp(TimeUnit::Microsecond, None)),
|vals: &[Option<Int96>]| {
Arc::new(TimestampMicrosecondArray::from_iter(
vals.iter().map(|x| x.map(|x| x.to_micros())),
)) as ArrayRef
},
),
(
Some(ArrowDataType::Timestamp(TimeUnit::Nanosecond, None)),
|vals: &[Option<Int96>]| {
Arc::new(TimestampNanosecondArray::from_iter(
vals.iter().map(|x| x.map(|x| x.to_nanos())),
)) as ArrayRef
},
),
// Test another timezone with TimeUnit as ArrowType hints.
(
Some(ArrowDataType::Timestamp(
TimeUnit::Second,
Some(Arc::from("-05:00")),
)),
|vals: &[Option<Int96>]| {
Arc::new(
TimestampSecondArray::from_iter(
vals.iter().map(|x| x.map(|x| x.to_seconds())),
)
.with_timezone("-05:00"),
) as ArrayRef
},
),
];

resolutions.iter().for_each(|(arrow_type, converter)| {
run_single_column_reader_tests::<Int96Type, _, Int96Type>(
2,
ConvertedType::NONE,
arrow_type.clone(),
converter,
encodings,
);
})
}

struct RandUtf8Gen {}
Expand Down
17 changes: 16 additions & 1 deletion parquet/src/arrow/schema/primitive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,24 @@ fn apply_hint(parquet: DataType, hint: DataType) -> DataType {
// Coerce Date32 back to Date64 (#1666)
(DataType::Date32, DataType::Date64) => hint,

// Determine timezone
// Timestamps of the same resolution can be converted to a a different timezone.
(DataType::Timestamp(p, _), DataType::Timestamp(h, Some(_))) if p == h => hint,

// INT96 default to Timestamp(TimeUnit::Nanosecond, None) (see from_parquet below).
// Allow different resolutions to support larger date ranges.
(
DataType::Timestamp(TimeUnit::Nanosecond, None),
DataType::Timestamp(TimeUnit::Second, _),
) => hint,
(
DataType::Timestamp(TimeUnit::Nanosecond, None),
DataType::Timestamp(TimeUnit::Millisecond, _),
) => hint,
(
DataType::Timestamp(TimeUnit::Nanosecond, None),
DataType::Timestamp(TimeUnit::Microsecond, _),
) => hint,

// Determine offset size
(DataType::Utf8, DataType::LargeUtf8) => hint,
(DataType::Binary, DataType::LargeBinary) => hint,
Expand Down
78 changes: 63 additions & 15 deletions parquet/src/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,24 @@ pub struct Int96 {
value: [u32; 3],
}

const JULIAN_DAY_OF_EPOCH: i64 = 2_440_588;

/// Number of seconds in a day
const SECONDS_IN_DAY: i64 = 86_400;
/// Number of milliseconds in a second
const MILLISECONDS: i64 = 1_000;
/// Number of microseconds in a second
const MICROSECONDS: i64 = 1_000_000;
/// Number of nanoseconds in a second
const NANOSECONDS: i64 = 1_000_000_000;

/// Number of milliseconds in a day
const MILLISECONDS_IN_DAY: i64 = SECONDS_IN_DAY * MILLISECONDS;
/// Number of microseconds in a day
const MICROSECONDS_IN_DAY: i64 = SECONDS_IN_DAY * MICROSECONDS;
/// Number of nanoseconds in a day
const NANOSECONDS_IN_DAY: i64 = SECONDS_IN_DAY * NANOSECONDS;

impl Int96 {
/// Creates new INT96 type struct with no data set.
pub fn new() -> Self {
Expand All @@ -57,30 +75,60 @@ impl Int96 {
}

/// Converts this INT96 into an i64 representing the number of MILLISECONDS since Epoch
#[deprecated(since = "54.0.0", note = "Use `to_millis` instead")]
pub fn to_i64(&self) -> i64 {
let (seconds, nanoseconds) = self.to_seconds_and_nanos();
seconds * 1_000 + nanoseconds / 1_000_000
self.to_millis()
}

/// Converts this INT96 into an i64 representing the number of SECONDS since EPOCH
///
/// Will wrap around on overflow
#[inline]
pub fn to_seconds(&self) -> i64 {
let (day, nanos) = self.data_as_days_and_nanos();
(day as i64 - JULIAN_DAY_OF_EPOCH)
.wrapping_mul(SECONDS_IN_DAY)
.wrapping_add(nanos / 1_000_000_000)
}

/// Converts this INT96 into an i64 representing the number of MILLISECONDS since EPOCH
///
/// Will wrap around on overflow
#[inline]
pub fn to_millis(&self) -> i64 {
let (day, nanos) = self.data_as_days_and_nanos();
(day as i64 - JULIAN_DAY_OF_EPOCH)
.wrapping_mul(MILLISECONDS_IN_DAY)
.wrapping_add(nanos / 1_000_000)
}

/// Converts this INT96 into an i64 representing the number of MICROSECONDS since EPOCH
///
/// Will wrap around on overflow
#[inline]
pub fn to_micros(&self) -> i64 {
let (day, nanos) = self.data_as_days_and_nanos();
(day as i64 - JULIAN_DAY_OF_EPOCH)
.wrapping_mul(MICROSECONDS_IN_DAY)
.wrapping_add(nanos / 1_000)
}

/// Converts this INT96 into an i64 representing the number of NANOSECONDS since EPOCH
///
/// Will wrap around on overflow
#[inline]
pub fn to_nanos(&self) -> i64 {
let (seconds, nanoseconds) = self.to_seconds_and_nanos();
seconds
.wrapping_mul(1_000_000_000)
.wrapping_add(nanoseconds)
let (day, nanos) = self.data_as_days_and_nanos();
(day as i64 - JULIAN_DAY_OF_EPOCH)
.wrapping_mul(NANOSECONDS_IN_DAY)
.wrapping_add(nanos)
}

/// Converts this INT96 to a number of seconds and nanoseconds since EPOCH
pub fn to_seconds_and_nanos(&self) -> (i64, i64) {
const JULIAN_DAY_OF_EPOCH: i64 = 2_440_588;
const SECONDS_PER_DAY: i64 = 86_400;

let day = self.data()[2] as i64;
let nanoseconds = ((self.data()[1] as i64) << 32) + self.data()[0] as i64;
let seconds = (day - JULIAN_DAY_OF_EPOCH) * SECONDS_PER_DAY;
(seconds, nanoseconds)
#[inline]
fn data_as_days_and_nanos(&self) -> (i32, i64) {
let day = self.data()[2] as i32;
let nanos = ((self.data()[1] as i64) << 32) + self.data()[0] as i64;
(day, nanos)
}
}

Expand Down
2 changes: 1 addition & 1 deletion parquet/src/record/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,7 @@ impl Field {
/// `Timestamp` value.
#[inline]
pub fn convert_int96(_descr: &ColumnDescPtr, value: Int96) -> Self {
Field::TimestampMillis(value.to_i64())
Field::TimestampMillis(value.to_millis())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be to_nanos() to preserve the old behavior

But then again it doesn't make sense to erturn a nanosecond timestamp for a value with millisecond precision 🤔

Copy link
Contributor Author

@mbutrovich mbutrovich Mar 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be to_nanos() to preserve the old behavior

The old behavior is actually to convert it to millis.

Current behavior for convert_i96 has it call to_i64 which converts to millis, so I tried to keep the behavior the same.

}

/// Converts Parquet FLOAT type with logical type into `f32` value.
Expand Down
Loading