Skip to content

Commit

Permalink
fix: Support dictionary type in parquet metadata statistics. (apache#…
Browse files Browse the repository at this point in the history
…11169)

* fix: Support dictionary type in parquet metadata statistics.

* Simplify tests.

---------

Co-authored-by: Eric Fredine <[email protected]>
  • Loading branch information
2 people authored and findepi committed Jul 16, 2024
1 parent 3a89a3d commit 7eaeae2
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 3 deletions.
46 changes: 45 additions & 1 deletion datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1152,7 +1152,8 @@ mod tests {
use crate::physical_plan::metrics::MetricValue;
use crate::prelude::{SessionConfig, SessionContext};
use arrow::array::{Array, ArrayRef, StringArray};
use arrow_array::Int64Array;
use arrow_array::types::Int32Type;
use arrow_array::{DictionaryArray, Int32Array, Int64Array};
use arrow_schema::{DataType, Field};
use async_trait::async_trait;
use datafusion_common::cast::{
Expand All @@ -1161,6 +1162,7 @@ mod tests {
};
use datafusion_common::config::ParquetOptions;
use datafusion_common::ScalarValue;
use datafusion_common::ScalarValue::Utf8;
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_execution::runtime_env::RuntimeEnv;
use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
Expand Down Expand Up @@ -1442,6 +1444,48 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_statistics_from_parquet_metadata_dictionary() -> Result<()> {
// Data for column c_dic: ["a", "b", "c", "d"]
let values = StringArray::from_iter_values(["a", "b", "c", "d"]);
let keys = Int32Array::from_iter_values([0, 1, 2, 3]);
let dic_array =
DictionaryArray::<Int32Type>::try_new(keys, Arc::new(values)).unwrap();
let c_dic: ArrayRef = Arc::new(dic_array);

let batch1 = RecordBatch::try_from_iter(vec![("c_dic", c_dic)]).unwrap();

// Use store_parquet to write each batch to its own file
// . batch1 written into first file and includes:
// - column c_dic that has 4 rows with no null. Stats min and max of dictionary column is available.
let store = Arc::new(LocalFileSystem::new()) as _;
let (files, _file_names) = store_parquet(vec![batch1], false).await?;

let state = SessionContext::new().state();
let format = ParquetFormat::default();
let schema = format.infer_schema(&state, &store, &files).await.unwrap();

// Fetch statistics for first file
let pq_meta = fetch_parquet_metadata(store.as_ref(), &files[0], None).await?;
let stats = statistics_from_parquet_meta(&pq_meta, schema.clone()).await?;
assert_eq!(stats.num_rows, Precision::Exact(4));

// column c_dic
let c_dic_stats = &stats.column_statistics[0];

assert_eq!(c_dic_stats.null_count, Precision::Exact(0));
assert_eq!(
c_dic_stats.max_value,
Precision::Exact(Utf8(Some("d".into())))
);
assert_eq!(
c_dic_stats.min_value,
Precision::Exact(Utf8(Some("a".into())))
);

Ok(())
}

#[tokio::test]
async fn test_statistics_from_parquet_metadata() -> Result<()> {
// Data for column c1: ["Foo", null, "bar"]
Expand Down
21 changes: 19 additions & 2 deletions datafusion/core/src/datasource/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::arrow::datatypes::{Schema, SchemaRef};
use crate::error::Result;
use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
use crate::physical_plan::{Accumulator, ColumnStatistics, Statistics};
use arrow_schema::DataType;

use datafusion_common::stats::Precision;
use datafusion_common::ScalarValue;
Expand Down Expand Up @@ -156,12 +157,16 @@ pub(crate) fn create_max_min_accs(
let max_values: Vec<Option<MaxAccumulator>> = schema
.fields()
.iter()
.map(|field| MaxAccumulator::try_new(field.data_type()).ok())
.map(|field| {
MaxAccumulator::try_new(min_max_aggregate_data_type(field.data_type())).ok()
})
.collect();
let min_values: Vec<Option<MinAccumulator>> = schema
.fields()
.iter()
.map(|field| MinAccumulator::try_new(field.data_type()).ok())
.map(|field| {
MinAccumulator::try_new(min_max_aggregate_data_type(field.data_type())).ok()
})
.collect();
(max_values, min_values)
}
Expand Down Expand Up @@ -218,6 +223,18 @@ pub(crate) fn get_col_stats(
.collect()
}

// Min/max aggregation can take Dictionary encode input but always produces unpacked
// (aka non Dictionary) output. We need to adjust the output data type to reflect this.
// The reason min/max aggregate produces unpacked output because there is only one
// min/max value per group; there is no needs to keep them Dictionary encode
fn min_max_aggregate_data_type(input_type: &DataType) -> &DataType {
if let DataType::Dictionary(_, value_type) = input_type {
value_type.as_ref()
} else {
input_type
}
}

/// If the given value is numerically greater than the original maximum value,
/// return the new maximum value with appropriate exactness information.
fn set_max_if_greater(
Expand Down

0 comments on commit 7eaeae2

Please sign in to comment.