From f287035850adb340360a9121518f233cc6b9e503 Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Mon, 20 May 2024 14:54:20 -0400 Subject: [PATCH] feat: API for collecting statistics/index for metadata of a parquet file + tests (#10537) * test: some tests to write data to a parquet file and read its metadata * feat: API to convert parquet stats to arrow stats * Refine statistics extraction API and tests * Implement null counts * port test * test: add more tests for the arrow statistics * chore: fix format and test output * chore: rename test helpers * chore: Apply suggestions from code review Co-authored-by: Andrew Lamb * Apply suggestions from code review * Apply suggestions from code review --------- Co-authored-by: Andrew Lamb --- .../datasource/physical_plan/parquet/mod.rs | 2 +- .../physical_plan/parquet/statistics.rs | 156 ++++- .../core/tests/parquet/arrow_statistics.rs | 657 ++++++++++++++++++ datafusion/core/tests/parquet/mod.rs | 1 + 4 files changed, 812 insertions(+), 4 deletions(-) create mode 100644 datafusion/core/tests/parquet/arrow_statistics.rs diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 7509d08ad88a..dd953878df49 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -72,6 +72,7 @@ mod statistics; pub use metrics::ParquetFileMetrics; pub use schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper}; +pub use statistics::{RequestedStatistics, StatisticsConverter}; /// Execution plan for scanning one or more Parquet partitions #[derive(Debug, Clone)] @@ -482,7 +483,6 @@ struct ParquetOpener { impl FileOpener for ParquetOpener { fn open(&self, file_meta: FileMeta) -> Result { let file_range = file_meta.range.clone(); - let file_metrics = ParquetFileMetrics::new( self.partition_index, file_meta.location().as_ref(), diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index 8972c261b14a..0ebf7dfe2384 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -20,11 +20,15 @@ // TODO: potentially move this to arrow-rs: https://github.com/apache/arrow-rs/issues/4328 use arrow::{array::ArrayRef, datatypes::DataType}; -use arrow_array::new_empty_array; -use arrow_schema::{FieldRef, Schema}; -use datafusion_common::{Result, ScalarValue}; +use arrow_array::{new_empty_array, new_null_array, UInt64Array}; +use arrow_schema::{Field, FieldRef, Schema}; +use datafusion_common::{ + internal_datafusion_err, internal_err, plan_err, Result, ScalarValue, +}; +use parquet::file::metadata::ParquetMetaData; use parquet::file::statistics::Statistics as ParquetStatistics; use parquet::schema::types::SchemaDescriptor; +use std::sync::Arc; // Convert the bytes array to i128. // The endian of the input bytes array must be big-endian. @@ -210,6 +214,152 @@ fn collect_scalars>>( } } +/// What type of statistics should be extracted? +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub enum RequestedStatistics { + /// Minimum Value + Min, + /// Maximum Value + Max, + /// Null Count, returned as a [`UInt64Array`]) + NullCount, +} + +/// Extracts Parquet statistics as Arrow arrays +/// +/// This is used to convert Parquet statistics to Arrow arrays, with proper type +/// conversions. This information can be used for pruning parquet files or row +/// groups based on the statistics embedded in parquet files +/// +/// # Schemas +/// +/// The schema of the parquet file and the arrow schema are used to convert the +/// underlying statistics value (stored as a parquet value) into the +/// corresponding Arrow value. For example, Decimals are stored as binary in +/// parquet files. +/// +/// The parquet_schema and arrow _schema do not have to be identical (for +/// example, the columns may be in different orders and one or the other schemas +/// may have additional columns). The function [`parquet_column`] is used to +/// match the column in the parquet file to the column in the arrow schema. +/// +/// # Multiple parquet files +/// +/// This API is designed to support efficiently extracting statistics from +/// multiple parquet files (hence why the parquet schema is passed in as an +/// argument). This is useful when building an index for a directory of parquet +/// files. +/// +#[derive(Debug)] +pub struct StatisticsConverter<'a> { + /// The name of the column to extract statistics for + column_name: &'a str, + /// The type of statistics to extract + statistics_type: RequestedStatistics, + /// The arrow schema of the query + arrow_schema: &'a Schema, + /// The field (with data type) of the column in the arrow schema + arrow_field: &'a Field, +} + +impl<'a> StatisticsConverter<'a> { + /// Returns a [`UInt64Array`] with counts for each row group + /// + /// The returned array has no nulls, and has one value for each row group. + /// Each value is the number of rows in the row group. + pub fn row_counts(metadata: &ParquetMetaData) -> Result { + let row_groups = metadata.row_groups(); + let mut builder = UInt64Array::builder(row_groups.len()); + for row_group in row_groups { + let row_count = row_group.num_rows(); + let row_count: u64 = row_count.try_into().map_err(|e| { + internal_datafusion_err!( + "Parquet row count {row_count} too large to convert to u64: {e}" + ) + })?; + builder.append_value(row_count); + } + Ok(builder.finish()) + } + + /// create an new statistics converter + pub fn try_new( + column_name: &'a str, + statistics_type: RequestedStatistics, + arrow_schema: &'a Schema, + ) -> Result { + // ensure the requested column is in the arrow schema + let Some((_idx, arrow_field)) = arrow_schema.column_with_name(column_name) else { + return plan_err!( + "Column '{}' not found in schema for statistics conversion", + column_name + ); + }; + + Ok(Self { + column_name, + statistics_type, + arrow_schema, + arrow_field, + }) + } + + /// extract the statistics from a parquet file, given the parquet file's metadata + /// + /// The returned array contains 1 value for each row group in the parquet + /// file in order + /// + /// Each value is either + /// * the requested statistics type for the column + /// * a null value, if the statistics can not be extracted + /// + /// Note that a null value does NOT mean the min or max value was actually + /// `null` it means it the requested statistic is unknown + /// + /// Reasons for not being able to extract the statistics include: + /// * the column is not present in the parquet file + /// * statistics for the column are not present in the row group + /// * the stored statistic value can not be converted to the requested type + pub fn extract(&self, metadata: &ParquetMetaData) -> Result { + let data_type = self.arrow_field.data_type(); + let num_row_groups = metadata.row_groups().len(); + + let parquet_schema = metadata.file_metadata().schema_descr(); + let row_groups = metadata.row_groups(); + + // find the column in the parquet schema, if not, return a null array + let Some((parquet_idx, matched_field)) = + parquet_column(parquet_schema, self.arrow_schema, self.column_name) + else { + // column was in the arrow schema but not in the parquet schema, so return a null array + return Ok(new_null_array(data_type, num_row_groups)); + }; + + // sanity check that matching field matches the arrow field + if matched_field.as_ref() != self.arrow_field { + return internal_err!( + "Matched column '{:?}' does not match original matched column '{:?}'", + matched_field, + self.arrow_field + ); + } + + // Get an iterator over the column statistics + let iter = row_groups + .iter() + .map(|x| x.column(parquet_idx).statistics()); + + match self.statistics_type { + RequestedStatistics::Min => min_statistics(data_type, iter), + RequestedStatistics::Max => max_statistics(data_type, iter), + RequestedStatistics::NullCount => { + let null_counts = iter.map(|stats| stats.map(|s| s.null_count())); + Ok(Arc::new(UInt64Array::from_iter(null_counts))) + } + } + } +} + #[cfg(test)] mod test { use super::*; diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs b/datafusion/core/tests/parquet/arrow_statistics.rs new file mode 100644 index 000000000000..272afea7b28a --- /dev/null +++ b/datafusion/core/tests/parquet/arrow_statistics.rs @@ -0,0 +1,657 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This file contains an end to end test of extracting statitics from parquet files. +//! It writes data into a parquet file, reads statistics and verifies they are correct + +use std::fs::File; +use std::sync::Arc; + +use arrow_array::{ + make_array, Array, ArrayRef, Int16Array, Int32Array, Int64Array, Int8Array, + RecordBatch, UInt64Array, +}; +use arrow_schema::{DataType, Field, Schema}; +use datafusion::datasource::physical_plan::parquet::{ + RequestedStatistics, StatisticsConverter, +}; +use parquet::arrow::arrow_reader::{ArrowReaderBuilder, ParquetRecordBatchReaderBuilder}; +use parquet::arrow::ArrowWriter; +use parquet::file::properties::{EnabledStatistics, WriterProperties}; + +use crate::parquet::Scenario; + +use super::make_test_file_rg; + +// TEST HELPERS + +/// Return a record batch with i64 with Null values +fn make_int64_batches_with_null( + null_values: usize, + no_null_values_start: i64, + no_null_values_end: i64, +) -> RecordBatch { + let schema = Arc::new(Schema::new(vec![Field::new("i64", DataType::Int64, true)])); + + let v64: Vec = (no_null_values_start as _..no_null_values_end as _).collect(); + + RecordBatch::try_new( + schema, + vec![make_array( + Int64Array::from_iter( + v64.into_iter() + .map(Some) + .chain(std::iter::repeat(None).take(null_values)), + ) + .to_data(), + )], + ) + .unwrap() +} + +// Create a parquet file with one column for data type i64 +// Data of the file include +// . Number of null rows is the given num_null +// . There are non-null values in the range [no_null_values_start, no_null_values_end], one value each row +// . The file is divided into row groups of size row_per_group +pub fn parquet_file_one_column( + num_null: usize, + no_null_values_start: i64, + no_null_values_end: i64, + row_per_group: usize, +) -> ParquetRecordBatchReaderBuilder { + let mut output_file = tempfile::Builder::new() + .prefix("parquert_statistics_test") + .suffix(".parquet") + .tempfile() + .expect("tempfile creation"); + + let props = WriterProperties::builder() + .set_max_row_group_size(row_per_group) + .set_statistics_enabled(EnabledStatistics::Chunk) + .build(); + + let batches = vec![make_int64_batches_with_null( + num_null, + no_null_values_start, + no_null_values_end, + )]; + + let schema = batches[0].schema(); + + let mut writer = ArrowWriter::try_new(&mut output_file, schema, Some(props)).unwrap(); + + for batch in batches { + writer.write(&batch).expect("writing batch"); + } + + // close file + let _file_meta = writer.close().unwrap(); + + // open the file & get the reader + let file = output_file.reopen().unwrap(); + ArrowReaderBuilder::try_new(file).unwrap() +} + +// Create a parquet file with many columns each has different data type +// - Data types are specified by the given scenario +// - Row group sizes are withe the same or different depending on the provided row_per_group & data created in the scenario +pub async fn parquet_file_many_columns( + scenario: super::Scenario, + row_per_group: usize, +) -> ParquetRecordBatchReaderBuilder { + let file = make_test_file_rg(scenario, row_per_group).await; + + // open the file & get the reader + let file = file.reopen().unwrap(); + ArrowReaderBuilder::try_new(file).unwrap() +} + +struct Test { + reader: ParquetRecordBatchReaderBuilder, + expected_min: ArrayRef, + expected_max: ArrayRef, + expected_null_counts: UInt64Array, + expected_row_counts: UInt64Array, +} + +impl Test { + fn run(self, col_name: &str) { + let Self { + reader, + expected_min, + expected_max, + expected_null_counts, + expected_row_counts, + } = self; + + let min = StatisticsConverter::try_new( + col_name, + RequestedStatistics::Min, + reader.schema(), + ) + .unwrap() + .extract(reader.metadata()) + .unwrap(); + + assert_eq!(&min, &expected_min, "Mismatch with expected minimums"); + + let max = StatisticsConverter::try_new( + col_name, + RequestedStatistics::Max, + reader.schema(), + ) + .unwrap() + .extract(reader.metadata()) + .unwrap(); + assert_eq!(&max, &expected_max, "Mismatch with expected maximum"); + + let null_counts = StatisticsConverter::try_new( + col_name, + RequestedStatistics::NullCount, + reader.schema(), + ) + .unwrap() + .extract(reader.metadata()) + .unwrap(); + let expected_null_counts = Arc::new(expected_null_counts) as ArrayRef; + assert_eq!( + &null_counts, &expected_null_counts, + "Mismatch with expected null counts" + ); + + let row_counts = StatisticsConverter::row_counts(reader.metadata()).unwrap(); + assert_eq!( + row_counts, expected_row_counts, + "Mismatch with expected row counts" + ); + } + + fn run_col_not_found(self, col_name: &str) { + let Self { + reader, + expected_min: _, + expected_max: _, + expected_null_counts: _, + expected_row_counts: _, + } = self; + + let min = StatisticsConverter::try_new( + col_name, + RequestedStatistics::Min, + reader.schema(), + ); + + assert!(min.is_err()); + } +} + +// TESTS +// +// Remaining cases +// - Create parquet files / metadata with missing statistic values +// - Create parquet files / metadata with different data types -- included but not all data types yet +// - Create parquet files / metadata with different row group sizes -- done +// - Using truncated statistics ("exact min value" and "exact max value" https://docs.rs/parquet/latest/parquet/file/statistics/enum.Statistics.html#method.max_is_exact) + +#[tokio::test] +async fn test_one_row_group_without_null() { + let row_per_group = 20; + let reader = parquet_file_one_column(0, 4, 7, row_per_group); + Test { + reader, + // min is 4 + expected_min: Arc::new(Int64Array::from(vec![4])), + // max is 6 + expected_max: Arc::new(Int64Array::from(vec![6])), + // no nulls + expected_null_counts: UInt64Array::from(vec![0]), + // 3 rows + expected_row_counts: UInt64Array::from(vec![3]), + } + .run("i64") +} + +#[tokio::test] +async fn test_one_row_group_with_null_and_negative() { + let row_per_group = 20; + let reader = parquet_file_one_column(2, -1, 5, row_per_group); + + Test { + reader, + // min is -1 + expected_min: Arc::new(Int64Array::from(vec![-1])), + // max is 4 + expected_max: Arc::new(Int64Array::from(vec![4])), + // 2 nulls + expected_null_counts: UInt64Array::from(vec![2]), + // 8 rows + expected_row_counts: UInt64Array::from(vec![8]), + } + .run("i64") +} + +#[tokio::test] +async fn test_two_row_group_with_null() { + let row_per_group = 10; + let reader = parquet_file_one_column(2, 4, 17, row_per_group); + + Test { + reader, + // mins are [4, 14] + expected_min: Arc::new(Int64Array::from(vec![4, 14])), + // maxes are [13, 16] + expected_max: Arc::new(Int64Array::from(vec![13, 16])), + // nulls are [0, 2] + expected_null_counts: UInt64Array::from(vec![0, 2]), + // row counts are [10, 5] + expected_row_counts: UInt64Array::from(vec![10, 5]), + } + .run("i64") +} + +#[tokio::test] +async fn test_two_row_groups_with_all_nulls_in_one() { + let row_per_group = 5; + let reader = parquet_file_one_column(4, -2, 2, row_per_group); + + Test { + reader, + // mins are [-2, null] + expected_min: Arc::new(Int64Array::from(vec![Some(-2), None])), + // maxes are [1, null] + expected_max: Arc::new(Int64Array::from(vec![Some(1), None])), + // nulls are [1, 3] + expected_null_counts: UInt64Array::from(vec![1, 3]), + // row counts are [5, 3] + expected_row_counts: UInt64Array::from(vec![5, 3]), + } + .run("i64") +} + +/////////////// MORE GENERAL TESTS ////////////////////// +// . Many columns in a file +// . Differnet data types +// . Different row group sizes + +// Four different integer types +#[tokio::test] +async fn test_int_64() { + let row_per_group = 5; + // This creates a parquet files of 4 columns named "i8", "i16", "i32", "i64" + let reader = parquet_file_many_columns(Scenario::Int, row_per_group).await; + + Test { + reader, + // mins are [-5, -4, 0, 5] + expected_min: Arc::new(Int64Array::from(vec![-5, -4, 0, 5])), + // maxes are [-1, 0, 4, 9] + expected_max: Arc::new(Int64Array::from(vec![-1, 0, 4, 9])), + // nulls are [0, 0, 0, 0] + expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), + // row counts are [5, 5, 5, 5] + expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), + } + .run("i64"); +} + +#[tokio::test] +async fn test_int_32() { + let row_per_group = 5; + // This creates a parquet files of 4 columns named "i8", "i16", "i32", "i64" + let reader = parquet_file_many_columns(Scenario::Int, row_per_group).await; + + Test { + reader, + // mins are [-5, -4, 0, 5] + expected_min: Arc::new(Int32Array::from(vec![-5, -4, 0, 5])), + // maxes are [-1, 0, 4, 9] + expected_max: Arc::new(Int32Array::from(vec![-1, 0, 4, 9])), + // nulls are [0, 0, 0, 0] + expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), + // row counts are [5, 5, 5, 5] + expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), + } + .run("i32"); +} + +// BUG: ignore this test for now +// https://github.com/apache/datafusion/issues/10585 +// Note that the file has 4 columns named "i8", "i16", "i32", "i64". +// - The tests on column i32 and i64 passed. +// - The tests on column i8 and i16 failed. +#[ignore] +#[tokio::test] +async fn test_int_16() { + let row_per_group = 5; + // This creates a parquet files of 4 columns named "i8", "i16", "i32", "i64" + let reader = parquet_file_many_columns(Scenario::Int, row_per_group).await; + + Test { + reader, + // mins are [-5, -4, 0, 5] + // BUG: not sure why this returns same data but in Int32Array type even though I debugged and the columns name is "i16" an its data is Int16 + // My debugging tells me the bug is either at: + // 1. The new code to get "iter". See the code in this PR with + // // Get an iterator over the column statistics + // let iter = row_groups + // .iter() + // .map(|x| x.column(parquet_idx).statistics()); + // OR + // 2. in the function (and/or its marco) `pub(crate) fn min_statistics<'a, I: Iterator>>` here + // https://github.com/apache/datafusion/blob/ea023e2d4878240eece870cf4b346c7a0667aeed/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs#L179 + expected_min: Arc::new(Int16Array::from(vec![-5, -4, 0, 5])), // panic here because the actual data is Int32Array + // maxes are [-1, 0, 4, 9] + expected_max: Arc::new(Int16Array::from(vec![-1, 0, 4, 9])), + // nulls are [0, 0, 0, 0] + expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), + // row counts are [5, 5, 5, 5] + expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), + } + .run("i16"); +} + +// BUG (same as above): ignore this test for now +// https://github.com/apache/datafusion/issues/10585 +#[ignore] +#[tokio::test] +async fn test_int_8() { + let row_per_group = 5; + // This creates a parquet files of 4 columns named "i8", "i16", "i32", "i64" + let reader = parquet_file_many_columns(Scenario::Int, row_per_group).await; + + Test { + reader, + // mins are [-5, -4, 0, 5] + // BUG: not sure why this returns same data but in Int32Array even though I debugged and the columns name is "i8" an its data is Int8 + expected_min: Arc::new(Int8Array::from(vec![-5, -4, 0, 5])), // panic here because the actual data is Int32Array + // maxes are [-1, 0, 4, 9] + expected_max: Arc::new(Int8Array::from(vec![-1, 0, 4, 9])), + // nulls are [0, 0, 0, 0] + expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), + // row counts are [5, 5, 5, 5] + expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), + } + .run("i8"); +} + +// timestamp +#[tokio::test] +async fn test_timestamp() { + let row_per_group = 5; + + // This creates a parquet files of 5 columns named "nanos", "micros", "millis", "seconds", "names" + // "nanos" --> TimestampNanosecondArray + // "micros" --> TimestampMicrosecondArray + // "millis" --> TimestampMillisecondArray + // "seconds" --> TimestampSecondArray + // "names" --> StringArray + // + // The file is created by 4 record batches, each has 5 rowws. + // Since the row group isze is set to 5, those 4 batches will go into 4 row groups + let reader = parquet_file_many_columns(Scenario::Timestamps, row_per_group).await; + + Test { + reader, + // mins are [1577840461000000000, 1577840471000000000, 1577841061000000000, 1578704461000000000,] + expected_min: Arc::new(Int64Array::from(vec![ + 1577840461000000000, + 1577840471000000000, + 1577841061000000000, + 1578704461000000000, + ])), + // maxes are [1577926861000000000, 1577926871000000000, 1577927461000000000, 1578790861000000000,] + expected_max: Arc::new(Int64Array::from(vec![ + 1577926861000000000, + 1577926871000000000, + 1577927461000000000, + 1578790861000000000, + ])), + // nulls are [1, 1, 1, 1] + expected_null_counts: UInt64Array::from(vec![1, 1, 1, 1]), + // row counts are [5, 5, 5, 5] + expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), + } + .run("nanos"); + + // micros + let reader = parquet_file_many_columns(Scenario::Timestamps, row_per_group).await; + Test { + reader, + expected_min: Arc::new(Int64Array::from(vec![ + 1577840461000000, + 1577840471000000, + 1577841061000000, + 1578704461000000, + ])), + expected_max: Arc::new(Int64Array::from(vec![ + 1577926861000000, + 1577926871000000, + 1577927461000000, + 1578790861000000, + ])), + expected_null_counts: UInt64Array::from(vec![1, 1, 1, 1]), + expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), + } + .run("micros"); + + // millis + let reader = parquet_file_many_columns(Scenario::Timestamps, row_per_group).await; + Test { + reader, + expected_min: Arc::new(Int64Array::from(vec![ + 1577840461000, + 1577840471000, + 1577841061000, + 1578704461000, + ])), + expected_max: Arc::new(Int64Array::from(vec![ + 1577926861000, + 1577926871000, + 1577927461000, + 1578790861000, + ])), + expected_null_counts: UInt64Array::from(vec![1, 1, 1, 1]), + expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), + } + .run("millis"); + + // seconds + let reader = parquet_file_many_columns(Scenario::Timestamps, row_per_group).await; + Test { + reader, + expected_min: Arc::new(Int64Array::from(vec![ + 1577840461, 1577840471, 1577841061, 1578704461, + ])), + expected_max: Arc::new(Int64Array::from(vec![ + 1577926861, 1577926871, 1577927461, 1578790861, + ])), + expected_null_counts: UInt64Array::from(vec![1, 1, 1, 1]), + expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), + } + .run("seconds"); +} + +// timestamp with different row group sizes +#[tokio::test] +async fn test_timestamp_diff_rg_sizes() { + let row_per_group = 8; + + // This creates a parquet files of 5 columns named "nanos", "micros", "millis", "seconds", "names" + // "nanos" --> TimestampNanosecondArray + // "micros" --> TimestampMicrosecondArray + // "millis" --> TimestampMillisecondArray + // "seconds" --> TimestampSecondArray + // "names" --> StringArray + // + // The file is created by 4 record batches (each has a null row), each has 5 rows but then will be split into 3 row groups with size 8, 8, 4 + let reader = parquet_file_many_columns(Scenario::Timestamps, row_per_group).await; + + Test { + reader, + // mins are [1577840461000000000, 1577841061000000000, 1578704521000000000] + expected_min: Arc::new(Int64Array::from(vec![ + 1577840461000000000, + 1577841061000000000, + 1578704521000000000, + ])), + // maxes are [1577926861000000000, 1578704461000000000, 157879086100000000] + expected_max: Arc::new(Int64Array::from(vec![ + 1577926861000000000, + 1578704461000000000, + 1578790861000000000, + ])), + // nulls are [1, 2, 1] + expected_null_counts: UInt64Array::from(vec![1, 2, 1]), + // row counts are [8, 8, 4] + expected_row_counts: UInt64Array::from(vec![8, 8, 4]), + } + .run("nanos"); + + // micros + let reader = parquet_file_many_columns(Scenario::Timestamps, row_per_group).await; + Test { + reader, + expected_min: Arc::new(Int64Array::from(vec![ + 1577840461000000, + 1577841061000000, + 1578704521000000, + ])), + expected_max: Arc::new(Int64Array::from(vec![ + 1577926861000000, + 1578704461000000, + 1578790861000000, + ])), + expected_null_counts: UInt64Array::from(vec![1, 2, 1]), + expected_row_counts: UInt64Array::from(vec![8, 8, 4]), + } + .run("micros"); + + // millis + let reader = parquet_file_many_columns(Scenario::Timestamps, row_per_group).await; + Test { + reader, + expected_min: Arc::new(Int64Array::from(vec![ + 1577840461000, + 1577841061000, + 1578704521000, + ])), + expected_max: Arc::new(Int64Array::from(vec![ + 1577926861000, + 1578704461000, + 1578790861000, + ])), + expected_null_counts: UInt64Array::from(vec![1, 2, 1]), + expected_row_counts: UInt64Array::from(vec![8, 8, 4]), + } + .run("millis"); + + // seconds + let reader = parquet_file_many_columns(Scenario::Timestamps, row_per_group).await; + Test { + reader, + expected_min: Arc::new(Int64Array::from(vec![ + 1577840461, 1577841061, 1578704521, + ])), + expected_max: Arc::new(Int64Array::from(vec![ + 1577926861, 1578704461, 1578790861, + ])), + expected_null_counts: UInt64Array::from(vec![1, 2, 1]), + expected_row_counts: UInt64Array::from(vec![8, 8, 4]), + } + .run("seconds"); +} + +// date with different row group sizes +// Bug expect `Date32Array` but returns Int32Array +// https://github.com/apache/datafusion/issues/10587 +#[tokio::test] +async fn test_dates_32_diff_rg_sizes() { + let row_per_group = 13; + + // This creates a parquet files of 3 columns named "date32", "date64", "names" + // "date32" --> Date32Array + // "date64" --> Date64Array + // "names" --> StringArray + // + // The file is created by 4 record batches (each has a null row), each has 5 rows but then will be split into 2 row groups with size 13, 7 + let reader = parquet_file_many_columns(Scenario::Dates, row_per_group).await; + + Test { + reader, + // mins are [18262, 18565,] + expected_min: Arc::new(Int32Array::from(vec![18262, 18565])), + // maxes are [18564, 21865,] + expected_max: Arc::new(Int32Array::from(vec![18564, 21865])), + // nulls are [2, 2] + expected_null_counts: UInt64Array::from(vec![2, 2]), + // row counts are [13, 7] + expected_row_counts: UInt64Array::from(vec![13, 7]), + } + .run("date32"); +} + +// BUG: same as above. Expect to return Date64Array but returns Int32Array +// test date with different row group sizes +// https://github.com/apache/datafusion/issues/10587 +#[ignore] +#[tokio::test] +async fn test_dates_64_diff_rg_sizes() { + let row_per_group = 13; + // The file is created by 4 record batches (each has a null row), each has 5 rows but then will be split into 2 row groups with size 13, 7 + let reader = parquet_file_many_columns(Scenario::Dates, row_per_group).await; + Test { + reader, + expected_min: Arc::new(Int64Array::from(vec![18262, 18565])), // panic here because the actual data is Int32Array + expected_max: Arc::new(Int64Array::from(vec![18564, 21865])), + expected_null_counts: UInt64Array::from(vec![2, 2]), + expected_row_counts: UInt64Array::from(vec![13, 7]), + } + .run("date64"); +} + +// TODO: +// Other data types to tests +// `u8`, `u16`, `u32`, and `u64`, +// UInt, +// UInt32Range, +// Float64, +// Decimal, +// DecimalBloomFilterInt32, +// DecimalBloomFilterInt64, +// DecimalLargePrecision, +// DecimalLargePrecisionBloomFilter, +// ByteArray, +// PeriodsInColumnNames, +// WithNullValuesPageLevel, +// WITHOUT Stats + +/////// NEGATIVE TESTS /////// +// column not found +#[tokio::test] +async fn test_column_not_found() { + let row_per_group = 5; + let reader = parquet_file_many_columns(Scenario::Dates, row_per_group).await; + Test { + reader, + expected_min: Arc::new(Int64Array::from(vec![18262, 18565])), + expected_max: Arc::new(Int64Array::from(vec![18564, 21865])), + expected_null_counts: UInt64Array::from(vec![2, 2]), + expected_row_counts: UInt64Array::from(vec![13, 7]), + } + .run_col_not_found("not_a_column"); +} diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index fe839bf1bcec..6e3f366b4373 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -41,6 +41,7 @@ use parquet::file::properties::WriterProperties; use std::sync::Arc; use tempfile::NamedTempFile; +mod arrow_statistics; mod custom_reader; mod file_statistics; #[cfg(not(target_family = "windows"))]