Skip to content
Merged
214 changes: 58 additions & 156 deletions parquet/src/arrow/arrow_reader/statistics.rs

Large diffs are not rendered by default.

13 changes: 8 additions & 5 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1488,6 +1488,7 @@ mod tests {
use crate::arrow::ARROW_SCHEMA_META_KEY;
use crate::column::page::{Page, PageReader};
use crate::file::page_encoding_stats::PageEncodingStats;
use crate::file::page_index::column_index::ColumnIndexMetaData;
use crate::file::reader::SerializedPageReader;
use crate::format::PageHeader;
use crate::schema::types::ColumnPath;
Expand All @@ -1507,7 +1508,6 @@ mod tests {
use crate::basic::Encoding;
use crate::data_type::AsBytes;
use crate::file::metadata::{ColumnChunkMetaData, ParquetMetaData, ParquetMetaDataReader};
use crate::file::page_index::index::Index;
use crate::file::properties::{
BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion,
};
Expand Down Expand Up @@ -4002,9 +4002,12 @@ mod tests {
assert_eq!(column_index[0].len(), 2); // 2 columns

let a_idx = &column_index[0][0];
assert!(matches!(a_idx, Index::BYTE_ARRAY(_)), "{a_idx:?}");
assert!(
matches!(a_idx, ColumnIndexMetaData::BYTE_ARRAY(_)),
"{a_idx:?}"
);
let b_idx = &column_index[0][1];
assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
assert!(matches!(b_idx, ColumnIndexMetaData::NONE), "{b_idx:?}");
}

#[test]
Expand Down Expand Up @@ -4070,9 +4073,9 @@ mod tests {
assert_eq!(column_index[0].len(), 2); // 2 columns

let a_idx = &column_index[0][0];
assert!(matches!(a_idx, Index::NONE), "{a_idx:?}");
assert!(matches!(a_idx, ColumnIndexMetaData::NONE), "{a_idx:?}");
let b_idx = &column_index[0][1];
assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
assert!(matches!(b_idx, ColumnIndexMetaData::NONE), "{b_idx:?}");
}

#[test]
Expand Down
89 changes: 71 additions & 18 deletions parquet/src/bin/parquet-index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,11 @@
//! [page index]: https://github.com/apache/parquet-format/blob/master/PageIndex.md

use clap::Parser;
use parquet::data_type::ByteArray;
use parquet::errors::{ParquetError, Result};
use parquet::file::page_index::index::{Index, PageIndex};
use parquet::file::page_index::column_index::{
ByteArrayColumnIndex, ColumnIndexMetaData, PrimitiveColumnIndex,
};
use parquet::file::page_index::offset_index::{OffsetIndexMetaData, PageLocation};
use parquet::file::reader::{FileReader, SerializedFileReader};
use parquet::file::serialized_reader::ReadOptionsBuilder;
Expand Down Expand Up @@ -96,16 +99,20 @@ impl Args {
let row_counts =
compute_row_counts(offset_index.page_locations.as_slice(), row_group.num_rows());
match &column_indices[column_idx] {
Index::NONE => println!("NO INDEX"),
Index::BOOLEAN(v) => print_index(&v.indexes, offset_index, &row_counts)?,
Index::INT32(v) => print_index(&v.indexes, offset_index, &row_counts)?,
Index::INT64(v) => print_index(&v.indexes, offset_index, &row_counts)?,
Index::INT96(v) => print_index(&v.indexes, offset_index, &row_counts)?,
Index::FLOAT(v) => print_index(&v.indexes, offset_index, &row_counts)?,
Index::DOUBLE(v) => print_index(&v.indexes, offset_index, &row_counts)?,
Index::BYTE_ARRAY(v) => print_index(&v.indexes, offset_index, &row_counts)?,
Index::FIXED_LEN_BYTE_ARRAY(v) => {
print_index(&v.indexes, offset_index, &row_counts)?
ColumnIndexMetaData::NONE => println!("NO INDEX"),
ColumnIndexMetaData::BOOLEAN(v) => {
print_index::<bool>(v, offset_index, &row_counts)?
}
ColumnIndexMetaData::INT32(v) => print_index(v, offset_index, &row_counts)?,
ColumnIndexMetaData::INT64(v) => print_index(v, offset_index, &row_counts)?,
ColumnIndexMetaData::INT96(v) => print_index(v, offset_index, &row_counts)?,
ColumnIndexMetaData::FLOAT(v) => print_index(v, offset_index, &row_counts)?,
ColumnIndexMetaData::DOUBLE(v) => print_index(v, offset_index, &row_counts)?,
ColumnIndexMetaData::BYTE_ARRAY(v) => {
print_bytes_index(v, offset_index, &row_counts)?
}
ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(v) => {
print_bytes_index(v, offset_index, &row_counts)?
}
}
}
Expand All @@ -131,20 +138,21 @@ fn compute_row_counts(offset_index: &[PageLocation], rows: i64) -> Vec<i64> {

/// Prints index information for a single column chunk
fn print_index<T: std::fmt::Display>(
column_index: &[PageIndex<T>],
column_index: &PrimitiveColumnIndex<T>,
offset_index: &OffsetIndexMetaData,
row_counts: &[i64],
) -> Result<()> {
if column_index.len() != offset_index.page_locations.len() {
if column_index.num_pages() as usize != offset_index.page_locations.len() {
return Err(ParquetError::General(format!(
"Index length mismatch, got {} and {}",
column_index.len(),
column_index.num_pages(),
offset_index.page_locations.len()
)));
}

for (idx, ((c, o), row_count)) in column_index
.iter()
for (idx, (((min, max), o), row_count)) in column_index
.min_values_iter()
.zip(column_index.max_values_iter())
.zip(offset_index.page_locations())
.zip(row_counts)
.enumerate()
Expand All @@ -153,12 +161,12 @@ fn print_index<T: std::fmt::Display>(
"Page {:>5} at offset {:#010x} with length {:>10} and row count {:>10}",
idx, o.offset, o.compressed_page_size, row_count
);
match &c.min {
match min {
Some(m) => print!(", min {m:>10}"),
None => print!(", min {:>10}", "NONE"),
}

match &c.max {
match max {
Some(m) => print!(", max {m:>10}"),
None => print!(", max {:>10}", "NONE"),
}
Expand All @@ -168,6 +176,51 @@ fn print_index<T: std::fmt::Display>(
Ok(())
}

fn print_bytes_index(
column_index: &ByteArrayColumnIndex,
offset_index: &OffsetIndexMetaData,
row_counts: &[i64],
) -> Result<()> {
if column_index.num_pages() as usize != offset_index.page_locations.len() {
return Err(ParquetError::General(format!(
"Index length mismatch, got {} and {}",
column_index.num_pages(),
offset_index.page_locations.len()
)));
}

for (idx, (((min, max), o), row_count)) in column_index
.min_values_iter()
.zip(column_index.max_values_iter())
.zip(offset_index.page_locations())
.zip(row_counts)
.enumerate()
{
print!(
"Page {:>5} at offset {:#010x} with length {:>10} and row count {:>10}",
idx, o.offset, o.compressed_page_size, row_count
);
match min {
Some(m) => match String::from_utf8(m.to_vec()) {
Ok(s) => print!(", min {s:>10}"),
Err(_) => print!(", min {:>10}", ByteArray::from(m)),
},
None => print!(", min {:>10}", "NONE"),
}

match max {
Some(m) => match String::from_utf8(m.to_vec()) {
Ok(s) => print!(", max {s:>10}"),
Err(_) => print!(", min {:>10}", ByteArray::from(m)),
},
None => print!(", max {:>10}", "NONE"),
}
println!()
}

Ok(())
}

fn main() -> Result<()> {
Args::parse().run()
}
50 changes: 50 additions & 0 deletions parquet/src/file/metadata/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ use crate::file::metadata::{
ColumnChunkMetaData, FileMetaData, KeyValue, RowGroupMetaData, SortingColumn,
};
use crate::file::page_encoding_stats::PageEncodingStats;
use crate::file::page_index::column_index::{
ByteArrayColumnIndex, ColumnIndex, ColumnIndexMetaData, PrimitiveColumnIndex,
};
use crate::file::page_index::index::{Index, NativeIndex, PageIndex};
use crate::file::page_index::offset_index::{OffsetIndexMetaData, PageLocation};
use crate::file::statistics::{Statistics, ValueStatistics};
Expand Down Expand Up @@ -154,6 +157,48 @@ impl HeapSize for OffsetIndexMetaData {
}
}

impl HeapSize for ColumnIndexMetaData {
fn heap_size(&self) -> usize {
match self {
Self::NONE => 0,
Self::BOOLEAN(native_index) => native_index.heap_size(),
Self::INT32(native_index) => native_index.heap_size(),
Self::INT64(native_index) => native_index.heap_size(),
Self::INT96(native_index) => native_index.heap_size(),
Self::FLOAT(native_index) => native_index.heap_size(),
Self::DOUBLE(native_index) => native_index.heap_size(),
Self::BYTE_ARRAY(native_index) => native_index.heap_size(),
Self::FIXED_LEN_BYTE_ARRAY(native_index) => native_index.heap_size(),
}
}
}

impl HeapSize for ColumnIndex {
fn heap_size(&self) -> usize {
self.null_pages.heap_size()
+ self.boundary_order.heap_size()
+ self.null_counts.heap_size()
+ self.definition_level_histograms.heap_size()
+ self.repetition_level_histograms.heap_size()
}
}

impl<T: ParquetValueType> HeapSize for PrimitiveColumnIndex<T> {
fn heap_size(&self) -> usize {
self.column_index.heap_size() + self.min_values.heap_size() + self.max_values.heap_size()
}
}

impl HeapSize for ByteArrayColumnIndex {
fn heap_size(&self) -> usize {
self.column_index.heap_size()
+ self.min_bytes.heap_size()
+ self.min_offsets.heap_size()
+ self.max_bytes.heap_size()
+ self.max_offsets.heap_size()
}
}

impl HeapSize for Index {
fn heap_size(&self) -> usize {
match self {
Expand Down Expand Up @@ -193,6 +238,11 @@ impl HeapSize for bool {
0 // no heap allocations
}
}
impl HeapSize for u8 {
fn heap_size(&self) -> usize {
0 // no heap allocations
}
}
impl HeapSize for i32 {
fn heap_size(&self) -> usize {
0 // no heap allocations
Expand Down
24 changes: 17 additions & 7 deletions parquet/src/file/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ use crate::file::column_crypto_metadata::{self, ColumnCryptoMetaData};
pub(crate) use crate::file::metadata::memory::HeapSize;
use crate::file::{
page_encoding_stats::{self, PageEncodingStats},
page_index::offset_index::PageLocation,
page_index::{column_index::ColumnIndexMetaData, offset_index::PageLocation},
};
use crate::file::{
page_index::index::PageIndex,
Expand Down Expand Up @@ -156,7 +156,7 @@ pub(crate) use writer::ThriftMetadataWriter;
///
/// [PageIndex documentation]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
/// [`ColumnIndex`]: crate::format::ColumnIndex
pub type ParquetColumnIndex = Vec<Vec<Index>>;
pub type ParquetColumnIndex = Vec<Vec<ColumnIndexMetaData>>;

/// [`OffsetIndexMetaData`] for each data page of each row group of each column
///
Expand Down Expand Up @@ -1948,7 +1948,7 @@ impl OffsetIndexBuilder {
mod tests {
use super::*;
use crate::basic::{PageType, SortOrder};
use crate::file::page_index::index::NativeIndex;
use crate::file::page_index::column_index::{ColumnIndex, PrimitiveColumnIndex};

#[test]
fn test_row_group_metadata_thrift_conversion() {
Expand Down Expand Up @@ -2223,7 +2223,17 @@ mod tests {
let mut column_index = ColumnIndexBuilder::new(Type::BOOLEAN);
column_index.append(false, vec![1u8], vec![2u8, 3u8], 4);
let column_index = column_index.build_to_thrift();
let native_index = NativeIndex::<bool>::try_new(column_index).unwrap();
let native_index = PrimitiveColumnIndex::<bool> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is so much nicer I think

column_index: ColumnIndex {
null_pages: column_index.null_pages,
boundary_order: column_index.boundary_order.try_into().unwrap(),
null_counts: column_index.null_counts,
repetition_level_histograms: column_index.repetition_level_histograms,
definition_level_histograms: column_index.definition_level_histograms,
},
min_values: vec![],
max_values: vec![],
};

// Now, add in OffsetIndex
let mut offset_index = OffsetIndexBuilder::new();
Expand All @@ -2237,16 +2247,16 @@ mod tests {

let parquet_meta = ParquetMetaDataBuilder::new(file_metadata)
.set_row_groups(row_group_meta)
.set_column_index(Some(vec![vec![Index::BOOLEAN(native_index)]]))
.set_column_index(Some(vec![vec![ColumnIndexMetaData::BOOLEAN(native_index)]]))
.set_offset_index(Some(vec![vec![
OffsetIndexMetaData::try_new(offset_index).unwrap()
]]))
.build();

#[cfg(not(feature = "encryption"))]
let bigger_expected_size = 2784;
let bigger_expected_size = 2704;
#[cfg(feature = "encryption")]
let bigger_expected_size = 3120;
let bigger_expected_size = 3040;

// more set fields means more memory usage
assert!(bigger_expected_size > base_expected_size);
Expand Down
8 changes: 4 additions & 4 deletions parquet/src/file/metadata/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use bytes::Bytes;

use crate::errors::{ParquetError, Result};
use crate::file::metadata::{ColumnChunkMetaData, ParquetMetaData, RowGroupMetaData};
use crate::file::page_index::index::Index;
use crate::file::page_index::column_index::ColumnIndexMetaData;
use crate::file::page_index::index_reader::{acc_range, decode_column_index, decode_offset_index};
use crate::file::reader::ChunkReader;
use crate::file::{FOOTER_SIZE, PARQUET_MAGIC, PARQUET_MAGIC_ENCR_FOOTER};
Expand Down Expand Up @@ -566,7 +566,7 @@ impl ParquetMetaDataReader {
col_idx,
)
}
None => Ok(Index::NONE),
None => Ok(ColumnIndexMetaData::NONE),
})
.collect::<Result<Vec<_>>>()
})
Expand All @@ -584,7 +584,7 @@ impl ParquetMetaDataReader {
column: &ColumnChunkMetaData,
row_group_index: usize,
col_index: usize,
) -> Result<Index> {
) -> Result<ColumnIndexMetaData> {
match &column.column_crypto_metadata {
Some(crypto_metadata) => {
let file_decryptor = metadata.file_decryptor.as_ref().ok_or_else(|| {
Expand Down Expand Up @@ -612,7 +612,7 @@ impl ParquetMetaDataReader {
column: &ColumnChunkMetaData,
_row_group_index: usize,
_col_index: usize,
) -> Result<Index> {
) -> Result<ColumnIndexMetaData> {
decode_column_index(bytes, column.column_type())
}

Expand Down
Loading
Loading