Skip to content

Commit

Permalink
Minor: add parquet page stats for float{16, 32, 64}
Browse files Browse the repository at this point in the history
  • Loading branch information
tmi committed Jun 18, 2024
1 parent a2c9d1a commit c582054
Show file tree
Hide file tree
Showing 3 changed files with 195 additions and 8 deletions.
100 changes: 92 additions & 8 deletions datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ use arrow_array::{
use arrow_schema::{Field, FieldRef, Schema, TimeUnit};
use datafusion_common::{internal_datafusion_err, internal_err, plan_err, Result};
use half::f16;
use parquet::data_type::FixedLenByteArray;
use parquet::file::metadata::{ParquetColumnIndex, ParquetOffsetIndex, RowGroupMetaData};
use parquet::file::page_index::index::Index;
use parquet::file::page_index::index::{Index, PageIndex};
use parquet::file::statistics::Statistics as ParquetStatistics;
use parquet::schema::types::SchemaDescriptor;
use paste::paste;
Expand Down Expand Up @@ -495,7 +496,7 @@ macro_rules! get_statistics {
}

macro_rules! make_data_page_stats_iterator {
($iterator_type: ident, $func: ident, $index_type: path, $stat_value_type: ty) => {
($iterator_type: ident, $func: expr, $index_type: path, $stat_value_type: ty) => {
struct $iterator_type<'a, I>
where
I: Iterator<Item = (usize, &'a Index)>,
Expand Down Expand Up @@ -526,7 +527,7 @@ macro_rules! make_data_page_stats_iterator {
native_index
.indexes
.iter()
.map(|x| x.$func)
.map(|x| $func(x))
.collect::<Vec<_>>(),
),
// No matching `Index` found;
Expand All @@ -548,11 +549,66 @@ macro_rules! make_data_page_stats_iterator {
};
}

make_data_page_stats_iterator!(MinInt32DataPageStatsIterator, min, Index::INT32, i32);
make_data_page_stats_iterator!(MaxInt32DataPageStatsIterator, max, Index::INT32, i32);
make_data_page_stats_iterator!(MinInt64DataPageStatsIterator, min, Index::INT64, i64);
make_data_page_stats_iterator!(MaxInt64DataPageStatsIterator, max, Index::INT64, i64);

make_data_page_stats_iterator!(
MinInt32DataPageStatsIterator,
|x: &PageIndex<i32>| { x.min },
Index::INT32,
i32
);
make_data_page_stats_iterator!(
MaxInt32DataPageStatsIterator,
|x: &PageIndex<i32>| { x.max },
Index::INT32,
i32
);
make_data_page_stats_iterator!(
MinInt64DataPageStatsIterator,
|x: &PageIndex<i64>| { x.min },
Index::INT64,
i64
);
make_data_page_stats_iterator!(
MaxInt64DataPageStatsIterator,
|x: &PageIndex<i64>| { x.max },
Index::INT64,
i64
);
make_data_page_stats_iterator!(
MinFloat16DataPageStatsIterator,
|x: &PageIndex<FixedLenByteArray>| { x.min.clone() },
Index::FIXED_LEN_BYTE_ARRAY,
FixedLenByteArray
);
make_data_page_stats_iterator!(
MaxFloat16DataPageStatsIterator,
|x: &PageIndex<FixedLenByteArray>| { x.max.clone() },
Index::FIXED_LEN_BYTE_ARRAY,
FixedLenByteArray
);
make_data_page_stats_iterator!(
MinFloat32DataPageStatsIterator,
|x: &PageIndex<f32>| { x.min },
Index::FLOAT,
f32
);
make_data_page_stats_iterator!(
MaxFloat32DataPageStatsIterator,
|x: &PageIndex<f32>| { x.max },
Index::FLOAT,
f32
);
make_data_page_stats_iterator!(
MinFloat64DataPageStatsIterator,
|x: &PageIndex<f64>| { x.min },
Index::DOUBLE,
f64
);
make_data_page_stats_iterator!(
MaxFloat64DataPageStatsIterator,
|x: &PageIndex<f64>| { x.max },
Index::DOUBLE,
f64
);
macro_rules! get_data_page_statistics {
($stat_type_prefix: ident, $data_type: ident, $iterator: ident) => {
paste! {
Expand Down Expand Up @@ -581,6 +637,19 @@ macro_rules! get_data_page_statistics {
)),
Some(DataType::Int32) => Ok(Arc::new(Int32Array::from_iter([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator).flatten()))),
Some(DataType::Int64) => Ok(Arc::new(Int64Array::from_iter([<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten()))),
Some(DataType::Float16) => Ok(Arc::new(
Float16Array::from_iter(
[<$stat_type_prefix Float16DataPageStatsIterator>]::new($iterator)
.map(|x| {
x.into_iter().filter_map(|x| {
x.and_then(|x| Some(from_bytes_to_f16(x.data())))
})
})
.flatten()
)
)),
Some(DataType::Float32) => Ok(Arc::new(Float32Array::from_iter([<$stat_type_prefix Float32DataPageStatsIterator>]::new($iterator).flatten()))),
Some(DataType::Float64) => Ok(Arc::new(Float64Array::from_iter([<$stat_type_prefix Float64DataPageStatsIterator>]::new($iterator).flatten()))),
_ => unimplemented!()
}
}
Expand Down Expand Up @@ -677,6 +746,21 @@ where
.iter()
.map(|x| x.null_count.map(|x| x as u64))
.collect::<Vec<_>>(),
Index::FLOAT(native_index) => native_index
.indexes
.iter()
.map(|x| x.null_count.map(|x| x as u64))
.collect::<Vec<_>>(),
Index::DOUBLE(native_index) => native_index
.indexes
.iter()
.map(|x| x.null_count.map(|x| x as u64))
.collect::<Vec<_>>(),
Index::FIXED_LEN_BYTE_ARRAY(native_index) => native_index
.indexes
.iter()
.map(|x| x.null_count.map(|x| x as u64))
.collect::<Vec<_>>(),
_ => unimplemented!(),
});

Expand Down
88 changes: 88 additions & 0 deletions datafusion/core/tests/parquet/arrow_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,94 @@ async fn test_int_8() {
.run();
}

#[tokio::test]
async fn test_float_16() {
// This creates a parquet files of 1 column named f
let reader = TestReader {
scenario: Scenario::Float16,
row_per_group: 5,
}
.build()
.await;

Test {
reader: &reader,
// mins are [-5, -4, 0, 5]
expected_min: Arc::new(Float16Array::from(vec![
f16::from_f32(-5.),
f16::from_f32(-4.),
f16::from_f32(-0.),
f16::from_f32(5.),
])),
// maxes are [-1, 0, 4, 9]
expected_max: Arc::new(Float16Array::from(vec![
f16::from_f32(-1.),
f16::from_f32(0.),
f16::from_f32(4.),
f16::from_f32(9.),
])),
// nulls are [0, 0, 0, 0]
expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]),
// row counts are [5, 5, 5, 5]
expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5, 5])),
column_name: "f",
check: Check::Both,
}
.run();
}

#[tokio::test]
async fn test_float_32() {
// This creates a parquet files of 1 column named f
let reader = TestReader {
scenario: Scenario::Float32,
row_per_group: 5,
}
.build()
.await;

Test {
reader: &reader,
// mins are [-5, -4, 0, 5]
expected_min: Arc::new(Float32Array::from(vec![-5., -4., -0., 5.0])),
// maxes are [-1, 0, 4, 9]
expected_max: Arc::new(Float32Array::from(vec![-1., 0., 4., 9.])),
// nulls are [0, 0, 0, 0]
expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]),
// row counts are [5, 5, 5, 5]
expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5, 5])),
column_name: "f",
check: Check::Both,
}
.run();
}

#[tokio::test]
async fn test_float_64() {
// This creates a parquet files of 1 column named f
let reader = TestReader {
scenario: Scenario::Float64,
row_per_group: 5,
}
.build()
.await;

Test {
reader: &reader,
// mins are [-5, -4, 0, 5]
expected_min: Arc::new(Float64Array::from(vec![-5., -4., -0., 5.0])),
// maxes are [-1, 0, 4, 9]
expected_max: Arc::new(Float64Array::from(vec![-1., 0., 4., 9.])),
// nulls are [0, 0, 0, 0]
expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]),
// row counts are [5, 5, 5, 5]
expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]),
column_name: "f",
check: Check::Both,
}
.run();
}

// timestamp
#[tokio::test]
async fn test_timestamp() {
Expand Down
15 changes: 15 additions & 0 deletions datafusion/core/tests/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ enum Scenario {
/// -MIN, -100, -1, 0, 1, 100, MAX
NumericLimits,
Float16,
Float32,
Float64,
Decimal,
Decimal256,
Expand Down Expand Up @@ -586,6 +587,12 @@ fn make_f64_batch(v: Vec<f64>) -> RecordBatch {
RecordBatch::try_new(schema, vec![array.clone()]).unwrap()
}

fn make_f32_batch(v: Vec<f32>) -> RecordBatch {
let schema = Arc::new(Schema::new(vec![Field::new("f", DataType::Float32, true)]));
let array = Arc::new(Float32Array::from(v)) as ArrayRef;
RecordBatch::try_new(schema, vec![array.clone()]).unwrap()
}

fn make_f16_batch(v: Vec<f16>) -> RecordBatch {
let schema = Arc::new(Schema::new(vec![Field::new("f", DataType::Float16, true)]));
let array = Arc::new(Float16Array::from(v)) as ArrayRef;
Expand Down Expand Up @@ -1003,6 +1010,14 @@ fn create_data_batch(scenario: Scenario) -> Vec<RecordBatch> {
),
]
}
Scenario::Float32 => {
vec![
make_f32_batch(vec![-5.0, -4.0, -3.0, -2.0, -1.0]),
make_f32_batch(vec![-4.0, -3.0, -2.0, -1.0, 0.0]),
make_f32_batch(vec![0.0, 1.0, 2.0, 3.0, 4.0]),
make_f32_batch(vec![5.0, 6.0, 7.0, 8.0, 9.0]),
]
}
Scenario::Float64 => {
vec![
make_f64_batch(vec![-5.0, -4.0, -3.0, -2.0, -1.0]),
Expand Down

0 comments on commit c582054

Please sign in to comment.