diff --git a/parquet/src/arrow/array_reader/row_number.rs b/parquet/src/arrow/array_reader/row_number.rs index e8116b2936b8..f9e60a2c0d34 100644 --- a/parquet/src/arrow/array_reader/row_number.rs +++ b/parquet/src/arrow/array_reader/row_number.rs @@ -21,7 +21,7 @@ use crate::file::metadata::{ParquetMetaData, RowGroupMetaData}; use arrow_array::{ArrayRef, Int64Array}; use arrow_schema::DataType; use std::any::Any; -use std::collections::HashSet; +use std::collections::HashMap; use std::sync::Arc; pub(crate) struct RowNumberReader { @@ -34,35 +34,40 @@ impl RowNumberReader { parquet_metadata: &'a ParquetMetaData, row_groups: impl Iterator, ) -> Result { - // Collect ordinals from the selected row groups - let selected_ordinals: HashSet = row_groups - .map(|rg| { - rg.ordinal().ok_or_else(|| { - ParquetError::General( - "Row group missing ordinal field, required to compute row numbers" - .to_string(), - ) - }) - }) - .collect::>()?; - - // Iterate through all row groups once, computing first_row_index and creating ranges - // This is O(M) where M is total row groups, much better than O(N * O) where N is selected + // Pass 1: Build a map from ordinal to first_row_index + // This is O(M) where M is the total number of row groups in the file + let mut ordinal_to_offset: HashMap = HashMap::new(); let mut first_row_index: i64 = 0; - let mut ranges = Vec::new(); for rg in parquet_metadata.row_groups() { if let Some(ordinal) = rg.ordinal() { - if selected_ordinals.contains(&ordinal) { - ranges.push((ordinal, first_row_index..first_row_index + rg.num_rows())); - } + ordinal_to_offset.insert(ordinal, first_row_index); } first_row_index += rg.num_rows(); } - // Sort ranges by ordinal to maintain original row group order - ranges.sort_by_key(|(ordinal, _)| *ordinal); - let ranges: Vec<_> = ranges.into_iter().map(|(_, range)| range).collect(); + // Pass 2: Build ranges in the order specified by the row_groups iterator + // This is O(N) where N is the number of selected row groups + // This preserves the user's requested order instead of sorting by ordinal + let ranges: Vec<_> = row_groups + .map(|rg| { + let ordinal = rg.ordinal().ok_or_else(|| { + ParquetError::General( + "Row group missing ordinal field, required to compute row numbers" + .to_string(), + ) + })?; + + let offset = ordinal_to_offset.get(&ordinal).ok_or_else(|| { + ParquetError::General(format!( + "Row group with ordinal {} not found in metadata", + ordinal + )) + })?; + + Ok(*offset..*offset + rg.num_rows()) + }) + .collect::>()?; Ok(Self { buffered_row_numbers: Vec::new(), @@ -106,3 +111,96 @@ impl ArrayReader for RowNumberReader { None } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::basic::Type as PhysicalType; + use crate::file::metadata::{ + ColumnChunkMetaData, FileMetaData, ParquetMetaData, RowGroupMetaData, + }; + use crate::schema::types::{SchemaDescriptor, Type as SchemaType}; + use std::sync::Arc; + + fn create_test_schema() -> Arc { + let schema = SchemaType::group_type_builder("schema") + .with_fields(vec![Arc::new( + SchemaType::primitive_type_builder("test_col", PhysicalType::INT32) + .build() + .unwrap(), + )]) + .build() + .unwrap(); + Arc::new(SchemaDescriptor::new(Arc::new(schema))) + } + + fn create_test_parquet_metadata(row_groups: Vec<(i16, i64)>) -> ParquetMetaData { + let schema_descr = create_test_schema(); + + let mut row_group_metas = vec![]; + for (ordinal, num_rows) in row_groups { + let columns: Vec<_> = schema_descr + .columns() + .iter() + .map(|col| ColumnChunkMetaData::builder(col.clone()).build().unwrap()) + .collect(); + + let row_group = RowGroupMetaData::builder(schema_descr.clone()) + .set_num_rows(num_rows) + .set_ordinal(ordinal) + .set_total_byte_size(100) + .set_column_metadata(columns) + .build() + .unwrap(); + row_group_metas.push(row_group); + } + + let total_rows: i64 = row_group_metas.iter().map(|rg| rg.num_rows()).sum(); + let file_metadata = FileMetaData::new( + 1, // version + total_rows, // num_rows + None, // created_by + None, // key_value_metadata + schema_descr, // schema_descr + None, // column_orders + ); + + ParquetMetaData::new(file_metadata, row_group_metas) + } + + #[test] + fn test_row_number_reader_reverse_order() { + // Create metadata with 3 row groups, each with 2 rows + let metadata = create_test_parquet_metadata(vec![ + (0, 2), // Row group 0: ordinal=0, rows 0-1 + (1, 2), // Row group 1: ordinal=1, rows 2-3 + (2, 2), // Row group 2: ordinal=2, rows 4-5 + ]); + + // Select only row groups with ordinals 2 and 0 (in that order) + // This means we want row group 2 first, then row group 0, skipping row group 1 + let selected_row_groups: Vec<_> = vec![ + &metadata.row_groups()[2], // ordinal 2 + &metadata.row_groups()[0], // ordinal 0 + ]; + + let mut reader = + RowNumberReader::try_new(&metadata, selected_row_groups.into_iter()).unwrap(); + + // Read all row numbers + let num_read = reader.read_records(6).unwrap(); + assert_eq!(num_read, 4); // Should read 4 rows total (2 from each selected group) + + let array = reader.consume_batch().unwrap(); + let row_numbers = array.as_any().downcast_ref::().unwrap(); + + // Expected: row group 2 first (rows 4-5), then row group 0 (rows 0-1) + let expected = vec![4, 5, 0, 1]; + let actual: Vec = row_numbers.iter().map(|v| v.unwrap()).collect(); + + assert_eq!( + actual, expected, + "Row numbers should match the order of selected row groups, not file order" + ); + } +} diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 16e01a6ebe27..f9b7d54ce373 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -625,12 +625,11 @@ impl ArrowReaderOptions { pub fn with_virtual_columns(self, virtual_columns: Vec) -> Self { // Validate that all fields are virtual columns for field in &virtual_columns { - if !is_virtual_column(field) { - panic!( - "Field '{}' is not a virtual column. Virtual columns must have extension type names starting with 'arrow.virtual.'", - field.name() - ); - } + assert!( + is_virtual_column(field), + "Field '{}' is not a virtual column. Virtual columns must have extension type names starting with 'arrow.virtual.'", + field.name() + ); } Self { virtual_columns, @@ -5546,6 +5545,97 @@ pub(crate) mod tests { ); } + #[test] + fn test_read_row_numbers_row_group_order() -> Result<()> { + // Make a parquet file with 100 rows split across 2 row groups + let array = Int64Array::from_iter_values(5000..5100); + let batch = RecordBatch::try_from_iter([("col", Arc::new(array) as ArrayRef)])?; + let mut buffer = Vec::new(); + let options = WriterProperties::builder() + .set_max_row_group_size(50) + .build(); + let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema().clone(), Some(options))?; + // write in 10 row batches as the size limits are enforced after each batch + for batch_chunk in (0..10).map(|i| batch.slice(i * 10, 10)) { + writer.write(&batch_chunk)?; + } + writer.close()?; + + let row_number_field = Arc::new( + Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber), + ); + + let buffer = Bytes::from(buffer); + + let options = + ArrowReaderOptions::new().with_virtual_columns(vec![row_number_field.clone()]); + + // read out with normal options + let arrow_reader = + ParquetRecordBatchReaderBuilder::try_new_with_options(buffer.clone(), options.clone())? + .build()?; + + assert_eq!( + ValuesAndRowNumbers { + values: (5000..5100).collect(), + row_numbers: (0..100).collect() + }, + ValuesAndRowNumbers::new_from_reader(arrow_reader) + ); + + // Now read, out of order row groups + let arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(buffer, options)? + .with_row_groups(vec![1, 0]) + .build()?; + + assert_eq!( + ValuesAndRowNumbers { + values: (5050..5100).chain(5000..5050).collect(), + row_numbers: (50..100).chain(0..50).collect(), + }, + ValuesAndRowNumbers::new_from_reader(arrow_reader) + ); + + Ok(()) + } + + #[derive(Debug, PartialEq)] + struct ValuesAndRowNumbers { + values: Vec, + row_numbers: Vec, + } + impl ValuesAndRowNumbers { + fn new_from_reader(reader: ParquetRecordBatchReader) -> Self { + let mut values = vec![]; + let mut row_numbers = vec![]; + for batch in reader { + let batch = batch.expect("Could not read batch"); + values.extend( + batch + .column_by_name("col") + .expect("Could not get col column") + .as_primitive::() + .iter() + .map(|v| v.expect("Could not get value")), + ); + + row_numbers.extend( + batch + .column_by_name("row_number") + .expect("Could not get row_number column") + .as_primitive::() + .iter() + .map(|v| v.expect("Could not get row number")) + .collect::>(), + ); + } + Self { + values, + row_numbers, + } + } + } + #[test] #[should_panic(expected = "is not a virtual column")] fn test_with_virtual_columns_rejects_non_virtual_fields() { diff --git a/parquet/src/arrow/schema/virtual_type.rs b/parquet/src/arrow/schema/virtual_type.rs index eca2aef08dca..d3092a3bd53f 100644 --- a/parquet/src/arrow/schema/virtual_type.rs +++ b/parquet/src/arrow/schema/virtual_type.rs @@ -77,8 +77,7 @@ impl ExtensionType for RowNumber { pub fn is_virtual_column(field: &Field) -> bool { field .extension_type_name() - .map(|name| name.starts_with(VIRTUAL_PREFIX!())) - .unwrap_or(false) + .is_some_and(|name| name.starts_with(VIRTUAL_PREFIX!())) } #[cfg(test)]