Skip to content

Commit

Permalink
Add parquet_column_index and arrow_field accessors + test
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Jul 16, 2024
1 parent 3438746 commit 5c8c1ba
Showing 1 changed file with 30 additions and 10 deletions.
40 changes: 30 additions & 10 deletions parquet/src/arrow/arrow_reader/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1064,12 +1064,25 @@ where
#[derive(Debug)]
pub struct StatisticsConverter<'a> {
/// the index of the matched column in the parquet schema
parquet_index: Option<usize>,
parquet_column_index: Option<usize>,
/// The field (with data type) of the column in the arrow schema
arrow_field: &'a Field,
}

impl<'a> StatisticsConverter<'a> {
/// Return the index of the column in the parquet schema, if any
///
/// Returns `None` if the column is was present in the Arrow schema, but not
/// present in the parquet file
pub fn parquet_column_index(&self) -> Option<usize> {
self.parquet_column_index
}

/// Return the arrow schema's [`Field]` of the column in the arrow schema
pub fn arrow_field(&self) -> &'a Field {
self.arrow_field
}

/// Returns a [`UInt64Array`] with row counts for each row group
///
/// # Return Value
Expand Down Expand Up @@ -1102,7 +1115,7 @@ impl<'a> StatisticsConverter<'a> {
where
I: IntoIterator<Item = &'a RowGroupMetaData>,
{
let Some(_) = self.parquet_index else {
let Some(_) = self.parquet_column_index else {
return Ok(None);
};

Expand Down Expand Up @@ -1159,7 +1172,7 @@ impl<'a> StatisticsConverter<'a> {
};

Ok(Self {
parquet_index,
parquet_column_index: parquet_index,
arrow_field,
})
}
Expand Down Expand Up @@ -1210,7 +1223,7 @@ impl<'a> StatisticsConverter<'a> {
{
let data_type = self.arrow_field.data_type();

let Some(parquet_index) = self.parquet_index else {
let Some(parquet_index) = self.parquet_column_index else {
return Ok(self.make_null_array(data_type, metadatas));
};

Expand All @@ -1229,7 +1242,7 @@ impl<'a> StatisticsConverter<'a> {
{
let data_type = self.arrow_field.data_type();

let Some(parquet_index) = self.parquet_index else {
let Some(parquet_index) = self.parquet_column_index else {
return Ok(self.make_null_array(data_type, metadatas));
};

Expand All @@ -1246,7 +1259,7 @@ impl<'a> StatisticsConverter<'a> {
where
I: IntoIterator<Item = &'a RowGroupMetaData>,
{
let Some(parquet_index) = self.parquet_index else {
let Some(parquet_index) = self.parquet_column_index else {
let num_row_groups = metadatas.into_iter().count();
return Ok(UInt64Array::from_iter(
std::iter::repeat(None).take(num_row_groups),
Expand Down Expand Up @@ -1322,7 +1335,7 @@ impl<'a> StatisticsConverter<'a> {
{
let data_type = self.arrow_field.data_type();

let Some(parquet_index) = self.parquet_index else {
let Some(parquet_index) = self.parquet_column_index else {
return Ok(self.make_null_array(data_type, row_group_indices));
};

Expand Down Expand Up @@ -1351,7 +1364,7 @@ impl<'a> StatisticsConverter<'a> {
{
let data_type = self.arrow_field.data_type();

let Some(parquet_index) = self.parquet_index else {
let Some(parquet_index) = self.parquet_column_index else {
return Ok(self.make_null_array(data_type, row_group_indices));
};

Expand All @@ -1378,7 +1391,7 @@ impl<'a> StatisticsConverter<'a> {
where
I: IntoIterator<Item = &'a usize>,
{
let Some(parquet_index) = self.parquet_index else {
let Some(parquet_index) = self.parquet_column_index else {
let num_row_groups = row_group_indices.into_iter().count();
return Ok(UInt64Array::from_iter(
std::iter::repeat(None).take(num_row_groups),
Expand Down Expand Up @@ -1421,7 +1434,7 @@ impl<'a> StatisticsConverter<'a> {
where
I: IntoIterator<Item = &'a usize>,
{
let Some(parquet_index) = self.parquet_index else {
let Some(parquet_index) = self.parquet_column_index else {
// no matching column found in parquet_index;
// thus we cannot extract page_locations in order to determine
// the row count on a per DataPage basis.
Expand Down Expand Up @@ -2366,6 +2379,13 @@ mod test {

let converter =
StatisticsConverter::try_new(name, arrow_schema, parquet_schema).unwrap();

// test accessors on the converter
let parquet_column_index =
parquet_column(parquet_schema, &arrow_schema, name).map(|(idx, _field)| idx);
assert_eq!(converter.parquet_column_index(), parquet_column_index);
assert_eq!(converter.arrow_field().name(), name);

let actual_min = converter.row_group_mins(row_groups.iter()).unwrap();
assert_eq!(&expected_min, &actual_min, "column {name}");

Expand Down

0 comments on commit 5c8c1ba

Please sign in to comment.