diff --git a/parquet/src/arrow/array_reader/builder.rs b/parquet/src/arrow/array_reader/builder.rs index 82c8e77f6393..c3d0e2daa3f7 100644 --- a/parquet/src/arrow/array_reader/builder.rs +++ b/parquet/src/arrow/array_reader/builder.rs @@ -26,6 +26,7 @@ use crate::arrow::array_reader::cached_array_reader::CachedArrayReader; use crate::arrow::array_reader::empty_array::make_empty_array_reader; use crate::arrow::array_reader::fixed_len_byte_array::make_fixed_len_byte_array_reader; use crate::arrow::array_reader::row_group_cache::RowGroupCache; +use crate::arrow::array_reader::row_group_index::RowGroupIndexReader; use crate::arrow::array_reader::row_number::RowNumberReader; use crate::arrow::array_reader::{ ArrayReader, FixedSizeListArrayReader, ListArrayReader, MapArrayReader, NullArrayReader, @@ -169,6 +170,9 @@ impl<'a> ArrayReaderBuilder<'a> { // They need to be built by specialized readers match virtual_type { VirtualColumnType::RowNumber => Ok(Some(self.build_row_number_reader()?)), + VirtualColumnType::RowGroupIndex => { + Ok(Some(self.build_row_group_index_reader()?)) + } } } ParquetFieldType::Group { .. } => match &field.arrow_type { @@ -194,6 +198,18 @@ impl<'a> ArrayReaderBuilder<'a> { )?)) } + fn build_row_group_index_reader(&self) -> Result> { + let parquet_metadata = self.parquet_metadata.ok_or_else(|| { + ParquetError::General( + "ParquetMetaData is required to read virtual row group index columns.".to_string(), + ) + })?; + Ok(Box::new(RowGroupIndexReader::try_new( + parquet_metadata, + self.row_groups.row_groups(), + )?)) + } + /// Build array reader for map type. fn build_map_reader( &self, diff --git a/parquet/src/arrow/array_reader/mod.rs b/parquet/src/arrow/array_reader/mod.rs index 54be89f23084..019a871e194e 100644 --- a/parquet/src/arrow/array_reader/mod.rs +++ b/parquet/src/arrow/array_reader/mod.rs @@ -43,6 +43,7 @@ mod map_array; mod null_array; mod primitive_array; mod row_group_cache; +mod row_group_index; mod row_number; mod struct_array; diff --git a/parquet/src/arrow/array_reader/row_group_index.rs b/parquet/src/arrow/array_reader/row_group_index.rs new file mode 100644 index 000000000000..c0b976fc0095 --- /dev/null +++ b/parquet/src/arrow/array_reader/row_group_index.rs @@ -0,0 +1,241 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::arrow::array_reader::ArrayReader; +use crate::errors::{ParquetError, Result}; +use crate::file::metadata::{ParquetMetaData, RowGroupMetaData}; +use arrow_array::{ArrayRef, Int64Array}; +use arrow_schema::DataType; +use std::any::Any; +use std::collections::HashMap; +use std::sync::Arc; + +pub(crate) struct RowGroupIndexReader { + buffered_indices: Vec, + remaining_indices: std::iter::Flatten>>, +} + +impl RowGroupIndexReader { + pub(crate) fn try_new<'a>( + parquet_metadata: &'a ParquetMetaData, + row_groups: impl Iterator, + ) -> Result { + // build mapping from ordinal to row group index + // this is O(n) where n is the total number of row groups in the file + let ordinal_to_index: HashMap = + HashMap::from_iter(parquet_metadata.row_groups().iter().enumerate().filter_map( + |(row_group_index, rg)| { + rg.ordinal() + .map(|ordinal| (ordinal, row_group_index as i64)) + }, + )); + + // build repeating iterators in the order specified by the row_groups iterator + // this is O(m) where m is the number of selected row groups + let repeated_indices: Vec<_> = row_groups + .map(|rg| { + let ordinal = rg.ordinal().ok_or_else(|| { + ParquetError::General( + "Row group missing ordinal field, required to compute row group indices" + .to_string(), + ) + })?; + + let row_group_index = ordinal_to_index.get(&ordinal).ok_or_else(|| { + ParquetError::General(format!( + "Row group with ordinal {} not found in metadata", + ordinal + )) + })?; + + // repeat row group index for each row in this row group + Ok(std::iter::repeat_n( + *row_group_index, + rg.num_rows() as usize, + )) + }) + .collect::>()?; + + Ok(Self { + buffered_indices: Vec::new(), + remaining_indices: repeated_indices.into_iter().flatten(), + }) + } +} + +impl ArrayReader for RowGroupIndexReader { + fn read_records(&mut self, batch_size: usize) -> Result { + let starting_len = self.buffered_indices.len(); + self.buffered_indices + .extend((self.remaining_indices.by_ref()).take(batch_size)); + Ok(self.buffered_indices.len() - starting_len) + } + + fn skip_records(&mut self, num_records: usize) -> Result { + // TODO: Use advance_by when it stabilizes to improve performance + Ok((self.remaining_indices.by_ref()).take(num_records).count()) + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn get_data_type(&self) -> &DataType { + &DataType::Int64 + } + + fn consume_batch(&mut self) -> Result { + Ok(Arc::new(Int64Array::from_iter( + self.buffered_indices.drain(..), + ))) + } + + fn get_def_levels(&self) -> Option<&[i16]> { + None + } + + fn get_rep_levels(&self) -> Option<&[i16]> { + None + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::basic::Type as PhysicalType; + use crate::file::metadata::{ + ColumnChunkMetaData, FileMetaData, ParquetMetaData, RowGroupMetaData, + }; + use crate::schema::types::{SchemaDescriptor, Type as SchemaType}; + use std::sync::Arc; + + fn create_test_schema() -> Arc { + let schema = SchemaType::group_type_builder("schema") + .with_fields(vec![Arc::new( + SchemaType::primitive_type_builder("test_col", PhysicalType::INT32) + .build() + .unwrap(), + )]) + .build() + .unwrap(); + Arc::new(SchemaDescriptor::new(Arc::new(schema))) + } + + fn create_test_parquet_metadata(row_groups: Vec<(i16, i64)>) -> ParquetMetaData { + let schema_descr = create_test_schema(); + + let mut row_group_metas = vec![]; + for (ordinal, num_rows) in row_groups { + let columns: Vec<_> = schema_descr + .columns() + .iter() + .map(|col| ColumnChunkMetaData::builder(col.clone()).build().unwrap()) + .collect(); + + let row_group = RowGroupMetaData::builder(schema_descr.clone()) + .set_num_rows(num_rows) + .set_ordinal(ordinal) + .set_total_byte_size(100) + .set_column_metadata(columns) + .build() + .unwrap(); + row_group_metas.push(row_group); + } + + let total_rows: i64 = row_group_metas.iter().map(|rg| rg.num_rows()).sum(); + let file_metadata = FileMetaData::new(1, total_rows, None, None, schema_descr, None); + + ParquetMetaData::new(file_metadata, row_group_metas) + } + + #[test] + fn test_row_group_index_reader_basic() { + // create metadata with 3 row groups, each with varying number of rows + let metadata = create_test_parquet_metadata(vec![ + (0, 2), // rg: 0, ordinal: 0, 2 rows + (1, 3), // rg: 1, ordinal: 1, 3 rows + (2, 1), // rg: 2, ordinal: 2, 1 row + ]); + + let selected_row_groups: Vec<_> = metadata.row_groups().iter().collect(); + + let mut reader = + RowGroupIndexReader::try_new(&metadata, selected_row_groups.into_iter()).unwrap(); + + // 2 rows + 3 rows + 1 row + let num_read = reader.read_records(6).unwrap(); + assert_eq!(num_read, 6); + + let array = reader.consume_batch().unwrap(); + let indices = array.as_any().downcast_ref::().unwrap(); + + let actual: Vec = indices.iter().map(|v| v.unwrap()).collect(); + assert_eq!(actual, [0, 0, 1, 1, 1, 2],); + } + + #[test] + fn test_row_group_index_reader_reverse_order() { + // create metadata with 3 row groups, each rg has 2 rows + let metadata = create_test_parquet_metadata(vec![(0, 2), (1, 2), (2, 2)]); + + // select only rgs with ordinals 2 and 0 (in that order) + // means select row group 2 first, then row group 0, skipping row group 1 + let selected_row_groups: Vec<_> = + vec![&metadata.row_groups()[2], &metadata.row_groups()[0]]; + + let mut reader = + RowGroupIndexReader::try_new(&metadata, selected_row_groups.into_iter()).unwrap(); + + let num_read = reader.read_records(6).unwrap(); + // 2 rgs * 2 rows each + assert_eq!(num_read, 4); + + let array = reader.consume_batch().unwrap(); + let indices = array.as_any().downcast_ref::().unwrap(); + + let actual: Vec = indices.iter().map(|v| v.unwrap()).collect(); + + assert_eq!(actual, [2, 2, 0, 0],); + } + + #[test] + fn test_row_group_index_reader_skip_records() { + // rg 0: 3 rows, rg 1: 4 rows, rg 2: 2 rows + // [0, 0, 0, 1, 1, 1, 1, 2, 2] + let metadata = create_test_parquet_metadata(vec![(0, 3), (1, 4), (2, 2)]); + + let selected_row_groups = metadata.row_groups().iter().collect::>(); + + let mut reader = + RowGroupIndexReader::try_new(&metadata, selected_row_groups.into_iter()).unwrap(); + + // skip first 5 rows + // [0, 0, 0, 1, 1, 1, 1, 2, 2] + // |---- skip ---| + let num_skipped = reader.skip_records(5).unwrap(); + assert_eq!(num_skipped, 5); + + let num_read = reader.read_records(10).unwrap(); + assert_eq!(num_read, 4); + + let array = reader.consume_batch().unwrap(); + let indices = array.as_any().downcast_ref::().unwrap(); + + let actual = indices.iter().map(|v| v.unwrap()).collect::>(); + assert_eq!(actual, [1, 1, 2, 2]); + } +} diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index a626076ebdd7..b5eb1e3f21f1 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -597,7 +597,7 @@ impl ArrowReaderOptions { /// Include virtual columns in the output. /// - /// Virtual columns are columns that are not part of the Parquet schema, but are added to the output by the reader such as row numbers. + /// Virtual columns are columns that are not part of the Parquet schema, but are added to the output by the reader such as row numbers and row group indices. /// /// # Example /// ``` @@ -1435,7 +1435,10 @@ pub(crate) mod tests { ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelectionPolicy, RowSelector, }; - use crate::arrow::schema::{add_encoded_arrow_schema_to_metadata, virtual_type::RowNumber}; + use crate::arrow::schema::{ + add_encoded_arrow_schema_to_metadata, + virtual_type::{RowGroupIndex, RowNumber}, + }; use crate::arrow::{ArrowWriter, ProjectionMask}; use crate::basic::{ConvertedType, Encoding, LogicalType, Repetition, Type as PhysicalType}; use crate::column::reader::decoder::REPETITION_LEVELS_BATCH_SIZE; @@ -5793,6 +5796,183 @@ pub(crate) mod tests { ); } + #[test] + fn test_read_row_group_indices() { + // create a parquet file with 3 row groups, 2 rows each + let array1 = Int64Array::from(vec![1, 2]); + let array2 = Int64Array::from(vec![3, 4]); + let array3 = Int64Array::from(vec![5, 6]); + + let batch1 = + RecordBatch::try_from_iter(vec![("value", Arc::new(array1) as ArrayRef)]).unwrap(); + let batch2 = + RecordBatch::try_from_iter(vec![("value", Arc::new(array2) as ArrayRef)]).unwrap(); + let batch3 = + RecordBatch::try_from_iter(vec![("value", Arc::new(array3) as ArrayRef)]).unwrap(); + + let mut buffer = Vec::new(); + let options = WriterProperties::builder() + .set_max_row_group_size(2) + .build(); + let mut writer = ArrowWriter::try_new(&mut buffer, batch1.schema(), Some(options)).unwrap(); + writer.write(&batch1).unwrap(); + writer.write(&batch2).unwrap(); + writer.write(&batch3).unwrap(); + writer.close().unwrap(); + + let file = Bytes::from(buffer); + let row_group_index_field = Arc::new( + Field::new("row_group_index", ArrowDataType::Int64, false) + .with_extension_type(RowGroupIndex), + ); + + let options = ArrowReaderOptions::new() + .with_virtual_columns(vec![row_group_index_field.clone()]) + .unwrap(); + let mut arrow_reader = + ParquetRecordBatchReaderBuilder::try_new_with_options(file.clone(), options) + .expect("reader builder with virtual columns") + .build() + .expect("reader with virtual columns"); + + let batch = arrow_reader.next().unwrap().unwrap(); + + assert_eq!(batch.num_columns(), 2); + assert_eq!(batch.num_rows(), 6); + + assert_eq!( + batch + .column(0) + .as_primitive::() + .iter() + .collect::>(), + vec![Some(1), Some(2), Some(3), Some(4), Some(5), Some(6)] + ); + + assert_eq!( + batch + .column(1) + .as_primitive::() + .iter() + .collect::>(), + vec![Some(0), Some(0), Some(1), Some(1), Some(2), Some(2)] + ); + } + + #[test] + fn test_read_only_row_group_indices() { + let array1 = Int64Array::from(vec![1, 2, 3]); + let array2 = Int64Array::from(vec![4, 5]); + + let batch1 = + RecordBatch::try_from_iter(vec![("value", Arc::new(array1) as ArrayRef)]).unwrap(); + let batch2 = + RecordBatch::try_from_iter(vec![("value", Arc::new(array2) as ArrayRef)]).unwrap(); + + let mut buffer = Vec::new(); + let options = WriterProperties::builder() + .set_max_row_group_size(3) + .build(); + let mut writer = ArrowWriter::try_new(&mut buffer, batch1.schema(), Some(options)).unwrap(); + writer.write(&batch1).unwrap(); + writer.write(&batch2).unwrap(); + writer.close().unwrap(); + + let file = Bytes::from(buffer); + let row_group_index_field = Arc::new( + Field::new("row_group_index", ArrowDataType::Int64, false) + .with_extension_type(RowGroupIndex), + ); + + let options = ArrowReaderOptions::new() + .with_virtual_columns(vec![row_group_index_field.clone()]) + .unwrap(); + let metadata = ArrowReaderMetadata::load(&file, options).unwrap(); + let num_columns = metadata + .metadata + .file_metadata() + .schema_descr() + .num_columns(); + + let mut arrow_reader = ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata) + .with_projection(ProjectionMask::none(num_columns)) + .build() + .expect("reader with virtual columns only"); + + let batch = arrow_reader.next().unwrap().unwrap(); + let schema = Arc::new(Schema::new(vec![(*row_group_index_field).clone()])); + + assert_eq!(batch.schema(), schema); + assert_eq!(batch.num_columns(), 1); + assert_eq!(batch.num_rows(), 5); + + assert_eq!( + batch + .column(0) + .as_primitive::() + .iter() + .collect::>(), + vec![Some(0), Some(0), Some(0), Some(1), Some(1)] + ); + } + + #[test] + fn test_read_row_group_indices_with_selection() -> Result<()> { + let mut buffer = Vec::new(); + let options = WriterProperties::builder() + .set_max_row_group_size(10) + .build(); + + let schema = Arc::new(Schema::new(vec![Field::new( + "value", + ArrowDataType::Int64, + false, + )])); + + let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), Some(options))?; + + // write out 3 batches of 10 rows each + for i in 0..3 { + let start = i * 10; + let array = Int64Array::from_iter_values(start..start + 10); + let batch = RecordBatch::try_from_iter(vec![("value", Arc::new(array) as ArrayRef)])?; + writer.write(&batch)?; + } + writer.close()?; + + let file = Bytes::from(buffer); + let row_group_index_field = Arc::new( + Field::new("rg_idx", ArrowDataType::Int64, false).with_extension_type(RowGroupIndex), + ); + + let options = + ArrowReaderOptions::new().with_virtual_columns(vec![row_group_index_field])?; + + // test row groups are read in reverse order + let arrow_reader = + ParquetRecordBatchReaderBuilder::try_new_with_options(file.clone(), options.clone())? + .with_row_groups(vec![2, 1, 0]) + .build()?; + + let batches: Vec<_> = arrow_reader.collect::, _>>()?; + let combined = concat_batches(&batches[0].schema(), &batches)?; + + let values = combined.column(0).as_primitive::(); + let first_val = values.value(0); + let last_val = values.value(combined.num_rows() - 1); + // first row from rg 2 + assert_eq!(first_val, 20); + // the last row from rg 0 + assert_eq!(last_val, 9); + + let rg_indices = combined.column(1).as_primitive::(); + assert_eq!(rg_indices.value(0), 2); + assert_eq!(rg_indices.value(10), 1); + assert_eq!(rg_indices.value(20), 0); + + Ok(()) + } + pub(crate) fn test_row_numbers_with_multiple_row_groups_helper( use_filter: bool, test_case: F, diff --git a/parquet/src/arrow/schema/complex.rs b/parquet/src/arrow/schema/complex.rs index 8b85cac479c1..fdb3943e8511 100644 --- a/parquet/src/arrow/schema/complex.rs +++ b/parquet/src/arrow/schema/complex.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use crate::arrow::schema::extension::try_add_extension_type; use crate::arrow::schema::primitive::convert_primitive; -use crate::arrow::schema::virtual_type::RowNumber; +use crate::arrow::schema::virtual_type::{RowGroupIndex, RowNumber}; use crate::arrow::{PARQUET_FIELD_ID_META_KEY, ProjectionMask}; use crate::basic::{ConvertedType, Repetition}; use crate::errors::ParquetError; @@ -88,6 +88,8 @@ impl ParquetField { pub enum VirtualColumnType { /// Row number within the file RowNumber, + /// Row group index + RowGroupIndex, } #[derive(Debug, Clone)] @@ -584,6 +586,7 @@ pub(super) fn convert_virtual_field( let virtual_type = match extension_name { RowNumber::NAME => VirtualColumnType::RowNumber, + RowGroupIndex::NAME => VirtualColumnType::RowGroupIndex, _ => { return Err(ParquetError::ArrowError(format!( "unsupported virtual column type '{}' for field '{}'", diff --git a/parquet/src/arrow/schema/virtual_type.rs b/parquet/src/arrow/schema/virtual_type.rs index d3092a3bd53f..b71753f61c93 100644 --- a/parquet/src/arrow/schema/virtual_type.rs +++ b/parquet/src/arrow/schema/virtual_type.rs @@ -27,6 +27,50 @@ macro_rules! VIRTUAL_PREFIX { }; } +/// The extension type for row group indices +/// +/// Extension name: `parquet.virtual.row_group_index` +/// +/// This virtual column has storage type `Int64` and uses empty string metadata +#[derive(Debug, Default, Clone, Copy, PartialEq)] +pub struct RowGroupIndex; + +impl ExtensionType for RowGroupIndex { + const NAME: &'static str = concat!(VIRTUAL_PREFIX!(), "row_group_index"); + type Metadata = &'static str; + + fn metadata(&self) -> &Self::Metadata { + &"" + } + + fn serialize_metadata(&self) -> Option { + Some(String::default()) + } + + fn deserialize_metadata(metadata: Option<&str>) -> Result { + if metadata.is_some_and(str::is_empty) { + Ok("") + } else { + Err(ArrowError::InvalidArgumentError( + "Virtual column extension type expects an empty string as metadata".to_owned(), + )) + } + } + + fn supports_data_type(&self, data_type: &DataType) -> Result<(), ArrowError> { + match data_type { + DataType::Int64 => Ok(()), + data_type => Err(ArrowError::InvalidArgumentError(format!( + "Virtual column data type mismatch, expected Int64, found {data_type}" + ))), + } + } + + fn try_new(data_type: &DataType, _metadata: Self::Metadata) -> Result { + Self.supports_data_type(data_type).map(|_| Self) + } +} + /// The extension type for row numbers. /// /// Extension name: `parquet.virtual.row_number`. @@ -90,7 +134,7 @@ mod tests { use super::*; #[test] - fn valid() -> Result<(), ArrowError> { + fn row_number_valid() -> Result<(), ArrowError> { let mut field = Field::new("", DataType::Int64, false); field.try_with_extension_type(RowNumber)?; field.try_extension_type::()?; @@ -100,7 +144,7 @@ mod tests { #[test] #[should_panic(expected = "Field extension type name missing")] - fn missing_name() { + fn row_number_missing_name() { let field = Field::new("", DataType::Int64, false).with_metadata( [(EXTENSION_TYPE_METADATA_KEY.to_owned(), "".to_owned())] .into_iter() @@ -111,13 +155,13 @@ mod tests { #[test] #[should_panic(expected = "expected Int64, found Int32")] - fn invalid_type() { + fn row_number_invalid_type() { Field::new("", DataType::Int32, false).with_extension_type(RowNumber); } #[test] #[should_panic(expected = "Virtual column extension type expects an empty string as metadata")] - fn missing_metadata() { + fn row_number_missing_metadata() { let field = Field::new("", DataType::Int64, false).with_metadata( [( EXTENSION_TYPE_NAME_KEY.to_owned(), @@ -131,7 +175,7 @@ mod tests { #[test] #[should_panic(expected = "Virtual column extension type expects an empty string as metadata")] - fn invalid_metadata() { + fn row_number_invalid_metadata() { let field = Field::new("", DataType::Int64, false).with_metadata( [ ( @@ -148,4 +192,64 @@ mod tests { ); field.extension_type::(); } + + #[test] + fn row_group_index_valid() -> Result<(), ArrowError> { + let mut field = Field::new("", DataType::Int64, false); + field.try_with_extension_type(RowGroupIndex)?; + field.try_extension_type::()?; + + Ok(()) + } + + #[test] + #[should_panic(expected = "Field extension type name missing")] + fn row_group_index_missing_name() { + let field = Field::new("", DataType::Int64, false).with_metadata( + [(EXTENSION_TYPE_METADATA_KEY.to_owned(), "".to_owned())] + .into_iter() + .collect(), + ); + field.extension_type::(); + } + + #[test] + #[should_panic(expected = "expected Int64, found Int32")] + fn row_group_index_invalid_type() { + Field::new("", DataType::Int32, false).with_extension_type(RowGroupIndex); + } + + #[test] + #[should_panic(expected = "Virtual column extension type expects an empty string as metadata")] + fn row_group_index_missing_metadata() { + let field = Field::new("", DataType::Int64, false).with_metadata( + [( + EXTENSION_TYPE_NAME_KEY.to_owned(), + RowGroupIndex::NAME.to_owned(), + )] + .into_iter() + .collect(), + ); + field.extension_type::(); + } + + #[test] + #[should_panic(expected = "Virtual column extension type expects an empty string as metadata")] + fn row_group_index_invalid_metadata() { + let field = Field::new("", DataType::Int64, false).with_metadata( + [ + ( + EXTENSION_TYPE_NAME_KEY.to_owned(), + RowGroupIndex::NAME.to_owned(), + ), + ( + EXTENSION_TYPE_METADATA_KEY.to_owned(), + "non-empty".to_owned(), + ), + ] + .into_iter() + .collect(), + ); + field.extension_type::(); + } }