diff --git a/parquet/src/arrow/array_reader/builder.rs b/parquet/src/arrow/array_reader/builder.rs index 82c8e77f6393..71f25f4587d1 100644 --- a/parquet/src/arrow/array_reader/builder.rs +++ b/parquet/src/arrow/array_reader/builder.rs @@ -152,13 +152,17 @@ impl<'a> ArrayReaderBuilder<'a> { return Ok(Some(reader)); }; - if cache_options.projection_mask.leaf_included(col_idx) { + // Skip caching for nested fields (inside List/Map) where rep_level > 0 + // because the cache tracks by row count, but nested fields have + // values that don't correspond 1:1 with rows due to repetition + if cache_options.projection_mask.leaf_included(col_idx) && field.rep_level == 0 { Ok(Some(Box::new(CachedArrayReader::new( reader, Arc::clone(cache_options.cache), col_idx, cache_options.role, self.metrics.clone(), // cheap clone + field.def_level > 0, // needs_def_levels: true if has nullable ancestors )))) } else { Ok(Some(reader)) diff --git a/parquet/src/arrow/array_reader/cached_array_reader.rs b/parquet/src/arrow/array_reader/cached_array_reader.rs index b55b1e1d1a65..dd37b0d8f01d 100644 --- a/parquet/src/arrow/array_reader/cached_array_reader.rs +++ b/parquet/src/arrow/array_reader/cached_array_reader.rs @@ -17,7 +17,7 @@ //! [`CachedArrayReader`] wrapper around [`ArrayReader`] -use crate::arrow::array_reader::row_group_cache::BatchID; +use crate::arrow::array_reader::row_group_cache::{BatchID, CachedBatch}; use crate::arrow::array_reader::{ArrayReader, row_group_cache::RowGroupCache}; use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics; use crate::errors::Result; @@ -84,19 +84,38 @@ pub struct CachedArrayReader { role: CacheRole, /// Local cache to store batches between read_records and consume_batch calls /// This ensures data is available even if the shared cache evicts items - local_cache: HashMap, + local_cache: HashMap, /// Statistics to report on the Cache behavior metrics: ArrowReaderMetrics, + /// Definition levels for the last consume_batch() output + def_levels_buffer: Option>, + /// Repetition levels for the last consume_batch() output + rep_levels_buffer: Option>, + /// Whether this reader needs definition levels (def_level > 0, i.e., has nullable ancestors) + /// When false, we skip copying definition levels to avoid unnecessary allocations. + needs_def_levels: bool, } impl CachedArrayReader { /// Creates a new cached array reader with the specified role + /// + /// # Arguments + /// * `inner` - The underlying array reader + /// * `cache` - Shared cache for this row group + /// * `column_idx` - Column index for cache key generation + /// * `role` - Producer or Consumer role + /// * `metrics` - Statistics to report on cache behavior + /// * `needs_def_levels` - Whether this column needs definition levels (def_level > 0). + /// When false, definition levels are not copied to avoid unnecessary allocations. + /// Note: repetition levels are never copied for cached columns since caching is + /// only enabled for columns with rep_level == 0. pub fn new( inner: Box, cache: Arc>, column_idx: usize, role: CacheRole, metrics: ArrowReaderMetrics, + needs_def_levels: bool, ) -> Self { let batch_size = cache.lock().unwrap().batch_size(); @@ -111,6 +130,9 @@ impl CachedArrayReader { role, local_cache: HashMap::new(), metrics, + def_levels_buffer: None, + rep_levels_buffer: None, + needs_def_levels, } } @@ -146,18 +168,31 @@ impl CachedArrayReader { let array = self.inner.consume_batch()?; + // Capture definition levels from inner reader only when needed (def_level > 0). + // This avoids unnecessary allocations for columns without nullable ancestors. + // Repetition levels are never copied because caching is only enabled for + // columns with rep_level == 0 (non-nested, non-repeated columns). + let def_levels = if self.needs_def_levels { + self.inner.get_def_levels().map(|l| l.to_vec()) + } else { + None + }; + // rep_levels always None for cached columns (rep_level == 0 per builder.rs) + let rep_levels = None; + let cached_batch = CachedBatch::with_levels(array, def_levels, rep_levels); + // Store in both shared cache and local cache // The shared cache is used to reuse results between readers // The local cache ensures data is available for our consume_batch call - let _cached = - self.shared_cache - .lock() - .unwrap() - .insert(self.column_idx, batch_id, array.clone()); + let _cached = self.shared_cache.lock().unwrap().insert( + self.column_idx, + batch_id, + cached_batch.clone(), + ); // Note: if the shared cache is full (_cached == false), we continue without caching // The local cache will still store the data for this reader's use - self.local_cache.insert(batch_id, array); + self.local_cache.insert(batch_id, cached_batch); self.inner_position += read; Ok(read) @@ -200,8 +235,8 @@ impl ArrayReader for CachedArrayReader { let batch_id = self.get_batch_id_from_position(self.outer_position); // Check local cache first - let cached = if let Some(array) = self.local_cache.get(&batch_id) { - Some(Arc::clone(array)) + let cached = if let Some(batch) = self.local_cache.get(&batch_id) { + Some(batch.clone()) } else { // If not in local cache, i.e., we are consumer, check shared cache let cache_content = self @@ -209,16 +244,16 @@ impl ArrayReader for CachedArrayReader { .lock() .unwrap() .get(self.column_idx, batch_id); - if let Some(array) = cache_content.as_ref() { + if let Some(batch) = cache_content.as_ref() { // Store in local cache for later use in consume_batch - self.local_cache.insert(batch_id, Arc::clone(array)); + self.local_cache.insert(batch_id, batch.clone()); } cache_content }; match cached { - Some(array) => { - let array_len = array.len(); + Some(batch) => { + let array_len = batch.array.len(); if array_len + batch_id.val * self.batch_size > self.outer_position { // the cache batch has some records that we can select let v = array_len + batch_id.val * self.batch_size - self.outer_position; @@ -270,6 +305,15 @@ impl ArrayReader for CachedArrayReader { fn consume_batch(&mut self) -> Result { let row_count = self.selections.len(); if row_count == 0 { + // When there's no data to consume, set empty level buffers if we + // previously had levels. This ensures the levels match the empty array. + // We keep Some([]) rather than None to indicate this reader provides levels. + if self.def_levels_buffer.is_some() { + self.def_levels_buffer = Some(Vec::new()); + } + if self.rep_levels_buffer.is_some() { + self.rep_levels_buffer = Some(Vec::new()); + } return Ok(new_empty_array(self.inner.get_data_type())); } @@ -281,6 +325,11 @@ impl ArrayReader for CachedArrayReader { let end_batch = (start_position + row_count - 1) / self.batch_size; let mut selected_arrays = Vec::new(); + let mut selected_def_levels: Vec = Vec::new(); + let mut selected_rep_levels: Vec = Vec::new(); + let mut has_def_levels = false; + let mut has_rep_levels = false; + for batch_id in start_batch..=end_batch { let batch_start = batch_id * self.batch_size; let batch_end = batch_start + self.batch_size - 1; @@ -299,22 +348,76 @@ impl ArrayReader for CachedArrayReader { let mask = selection_buffer.slice(selection_start, selection_length); if mask.count_set_bits() == 0 { + // Even when all records are filtered out, check if the batch has levels + // so we can return Some([]) instead of None to indicate this reader provides levels. + // Check local cache first, then shared cache (since skip_records doesn't populate local cache) + let cached_batch = self.local_cache.get(&batch_id).cloned().or_else(|| { + self.shared_cache + .lock() + .unwrap() + .get(self.column_idx, batch_id) + }); + if let Some(batch) = cached_batch { + if batch.def_levels.is_some() { + has_def_levels = true; + } + if batch.rep_levels.is_some() { + has_rep_levels = true; + } + } continue; } - let mask_array = BooleanArray::from(mask); + let mask_array = BooleanArray::from(mask.clone()); // Read from local cache instead of shared cache to avoid cache eviction issues - let cached = self + let cached_batch = self .local_cache .get(&batch_id) .expect("data must be already cached in the read_records call, this is a bug"); - let cached = cached.slice(overlap_start - batch_start, selection_length); - let filtered = arrow_select::filter::filter(&cached, &mask_array)?; + + // Slice and filter the array + let slice_start = overlap_start - batch_start; + let cached_array = cached_batch.array.slice(slice_start, selection_length); + let filtered = arrow_select::filter::filter(&cached_array, &mask_array)?; selected_arrays.push(filtered); + + // Slice and filter definition levels if present + if let Some(ref def_levels) = cached_batch.def_levels { + has_def_levels = true; + let sliced = &def_levels[slice_start..slice_start + selection_length]; + for (level, selected) in sliced.iter().zip(mask.iter()) { + if selected { + selected_def_levels.push(*level); + } + } + } + + // Slice and filter repetition levels if present + if let Some(ref rep_levels) = cached_batch.rep_levels { + has_rep_levels = true; + let sliced = &rep_levels[slice_start..slice_start + selection_length]; + for (level, selected) in sliced.iter().zip(mask.iter()) { + if selected { + selected_rep_levels.push(*level); + } + } + } } self.selections = BooleanBufferBuilder::new(0); + // Store the filtered levels + self.def_levels_buffer = if has_def_levels { + Some(selected_def_levels) + } else { + None + }; + self.rep_levels_buffer = if has_rep_levels { + Some(selected_rep_levels) + } else { + None + }; + // Only remove batches from local buffer that are completely behind current position // Keep the current batch and any future batches as they might still be needed let current_batch_id = self.get_batch_id_from_position(self.outer_position); @@ -340,11 +443,11 @@ impl ArrayReader for CachedArrayReader { } fn get_def_levels(&self) -> Option<&[i16]> { - None // we don't allow nullable parent for now. + self.def_levels_buffer.as_deref() } fn get_rep_levels(&self) -> Option<&[i16]> { - None + self.rep_levels_buffer.as_deref() } } @@ -427,6 +530,7 @@ mod tests { 0, CacheRole::Producer, metrics, + false, // needs_def_levels: basic test doesn't use levels ); // Read 3 records @@ -455,6 +559,7 @@ mod tests { 0, CacheRole::Consumer, metrics, + false, // needs_def_levels: basic test doesn't use levels ); let read1 = cached_reader.read_records(2).unwrap(); @@ -489,6 +594,7 @@ mod tests { 0, CacheRole::Consumer, metrics, + false, // needs_def_levels ); // Multiple reads should accumulate @@ -516,6 +622,7 @@ mod tests { 0, CacheRole::Consumer, metrics, + false, // needs_def_levels ); // Try to read more than available @@ -546,6 +653,7 @@ mod tests { 0, CacheRole::Producer, metrics.clone(), + false, // needs_def_levels ); cached_reader1.read_records(3).unwrap(); @@ -560,6 +668,7 @@ mod tests { 1, CacheRole::Consumer, metrics.clone(), + false, // needs_def_levels ); cached_reader2.read_records(2).unwrap(); @@ -582,6 +691,7 @@ mod tests { 0, CacheRole::Consumer, metrics, + false, // needs_def_levels ); // Read first batch (positions 0-2, batch 0) @@ -635,6 +745,7 @@ mod tests { 0, CacheRole::Producer, metrics, + false, // needs_def_levels ); // Read first batch (positions 0-2) @@ -668,6 +779,7 @@ mod tests { 0, CacheRole::Consumer, metrics, + false, // needs_def_levels ); // Read records which should populate both shared and local cache @@ -705,6 +817,7 @@ mod tests { 0, CacheRole::Consumer, metrics, + false, // needs_def_levels ); // Read records which should populate both shared and local cache @@ -732,6 +845,7 @@ mod tests { 0, CacheRole::Producer, metrics.clone(), + false, // needs_def_levels ); // Populate cache with first batch (1, 2, 3) @@ -745,6 +859,7 @@ mod tests { 0, CacheRole::Consumer, metrics, + false, // needs_def_levels ); // - We want to read 4 records starting from position 0 @@ -759,4 +874,329 @@ mod tests { let int32_array = array.as_any().downcast_ref::().unwrap(); assert_eq!(int32_array.values(), &[1, 2, 3, 4]); } + + // Mock ArrayReader that returns definition and repetition levels + struct MockArrayReaderWithLevels { + data: Vec, + def_levels: Vec, + rep_levels: Vec, + position: usize, + records_to_consume: usize, + data_type: ArrowType, + // Buffers to hold levels after consume_batch + def_levels_buffer: Option>, + rep_levels_buffer: Option>, + } + + impl MockArrayReaderWithLevels { + fn new(data: Vec, def_levels: Vec, rep_levels: Vec) -> Self { + assert_eq!(data.len(), def_levels.len()); + assert_eq!(data.len(), rep_levels.len()); + Self { + data, + def_levels, + rep_levels, + position: 0, + records_to_consume: 0, + data_type: ArrowType::Int32, + def_levels_buffer: None, + rep_levels_buffer: None, + } + } + } + + impl ArrayReader for MockArrayReaderWithLevels { + fn as_any(&self) -> &dyn Any { + self + } + + fn get_data_type(&self) -> &ArrowType { + &self.data_type + } + + fn read_records(&mut self, batch_size: usize) -> Result { + let remaining = self.data.len() - self.position; + let to_read = std::cmp::min(batch_size, remaining); + self.records_to_consume += to_read; + Ok(to_read) + } + + fn consume_batch(&mut self) -> Result { + let start = self.position; + let end = start + self.records_to_consume; + let slice = &self.data[start..end]; + self.def_levels_buffer = Some(self.def_levels[start..end].to_vec()); + self.rep_levels_buffer = Some(self.rep_levels[start..end].to_vec()); + self.position = end; + self.records_to_consume = 0; + Ok(Arc::new(Int32Array::from(slice.to_vec()))) + } + + fn skip_records(&mut self, num_records: usize) -> Result { + let remaining = self.data.len() - self.position; + let to_skip = std::cmp::min(num_records, remaining); + self.position += to_skip; + Ok(to_skip) + } + + fn get_def_levels(&self) -> Option<&[i16]> { + self.def_levels_buffer.as_deref() + } + + fn get_rep_levels(&self) -> Option<&[i16]> { + self.rep_levels_buffer.as_deref() + } + } + + #[test] + fn test_level_propagation_basic() { + let metrics = ArrowReaderMetrics::disabled(); + // Data with corresponding def and rep levels + let data = vec![1, 2, 3, 4, 5]; + let def_levels = vec![1, 1, 0, 1, 1]; // Third value is null + let rep_levels = vec![0, 1, 1, 0, 1]; // New list at positions 0 and 3 + + let mock_reader = MockArrayReaderWithLevels::new(data, def_levels, rep_levels); + let cache = Arc::new(Mutex::new(RowGroupCache::new(5, usize::MAX))); + let mut cached_reader = CachedArrayReader::new( + Box::new(mock_reader), + cache, + 0, + CacheRole::Producer, + metrics, + true, // needs_def_levels: test level propagation + ); + + // Read all 5 records + let records_read = cached_reader.read_records(5).unwrap(); + assert_eq!(records_read, 5); + + let array = cached_reader.consume_batch().unwrap(); + assert_eq!(array.len(), 5); + + // Verify definition levels are captured + let def_levels = cached_reader.get_def_levels(); + assert!(def_levels.is_some()); + assert_eq!(def_levels.unwrap(), &[1, 1, 0, 1, 1]); + + // Repetition levels are not copied because caching is only enabled + // for columns with rep_level == 0 (non-nested columns) + let rep_levels = cached_reader.get_rep_levels(); + assert!(rep_levels.is_none()); + } + + #[test] + fn test_level_propagation_with_skip() { + let metrics = ArrowReaderMetrics::disabled(); + // Data with corresponding def and rep levels + let data = vec![1, 2, 3, 4, 5, 6]; + let def_levels = vec![1, 1, 0, 1, 1, 0]; + let rep_levels = vec![0, 1, 1, 0, 1, 1]; + + let mock_reader = MockArrayReaderWithLevels::new(data, def_levels, rep_levels); + let cache = Arc::new(Mutex::new(RowGroupCache::new(6, usize::MAX))); + let mut cached_reader = CachedArrayReader::new( + Box::new(mock_reader), + cache, + 0, + CacheRole::Producer, + metrics, + true, // needs_def_levels: test level propagation + ); + + // Read 2 records + let records_read = cached_reader.read_records(2).unwrap(); + assert_eq!(records_read, 2); + + // Skip 2 records + let skipped = cached_reader.skip_records(2).unwrap(); + assert_eq!(skipped, 2); + + // Read 2 more records + let records_read = cached_reader.read_records(2).unwrap(); + assert_eq!(records_read, 2); + + let array = cached_reader.consume_batch().unwrap(); + // Should have 4 values: positions 0, 1 (read), positions 4, 5 (read) + // Positions 2, 3 were skipped + assert_eq!(array.len(), 4); + + let int32_array = array.as_any().downcast_ref::().unwrap(); + assert_eq!(int32_array.values(), &[1, 2, 5, 6]); + + // Verify definition levels match the selected values + let def_levels = cached_reader.get_def_levels(); + assert!(def_levels.is_some()); + assert_eq!(def_levels.unwrap(), &[1, 1, 1, 0]); // def_levels for positions 0, 1, 4, 5 + + // Repetition levels are not copied (rep_level == 0 for cached columns) + let rep_levels = cached_reader.get_rep_levels(); + assert!(rep_levels.is_none()); + } + + #[test] + fn test_level_propagation_multi_batch() { + let metrics = ArrowReaderMetrics::disabled(); + // Data spanning multiple batches + let data = vec![1, 2, 3, 4, 5, 6]; + let def_levels = vec![1, 0, 1, 1, 0, 1]; + let rep_levels = vec![0, 0, 1, 0, 0, 1]; + + let mock_reader = MockArrayReaderWithLevels::new(data, def_levels, rep_levels); + let cache = Arc::new(Mutex::new(RowGroupCache::new(3, usize::MAX))); // Batch size 3 + let mut cached_reader = CachedArrayReader::new( + Box::new(mock_reader), + cache, + 0, + CacheRole::Producer, + metrics, + true, // needs_def_levels: test level propagation + ); + + // Read all 6 records (spanning 2 batches) + let records_read = cached_reader.read_records(6).unwrap(); + assert_eq!(records_read, 6); + + let array = cached_reader.consume_batch().unwrap(); + assert_eq!(array.len(), 6); + + // Verify definition levels are correctly concatenated from both batches + let def_levels = cached_reader.get_def_levels(); + assert!(def_levels.is_some()); + assert_eq!(def_levels.unwrap(), &[1, 0, 1, 1, 0, 1]); + + // Repetition levels are not copied (rep_level == 0 for cached columns) + let rep_levels = cached_reader.get_rep_levels(); + assert!(rep_levels.is_none()); + } + + #[test] + fn test_no_levels_returns_none() { + let metrics = ArrowReaderMetrics::disabled(); + // Use the original MockArrayReader which returns no levels + let mock_reader = MockArrayReader::new(vec![1, 2, 3]); + let cache = Arc::new(Mutex::new(RowGroupCache::new(3, usize::MAX))); + let mut cached_reader = CachedArrayReader::new( + Box::new(mock_reader), + cache, + 0, + CacheRole::Producer, + metrics, + false, // needs_def_levels: false since inner reader has no levels + ); + + let records_read = cached_reader.read_records(3).unwrap(); + assert_eq!(records_read, 3); + + let _array = cached_reader.consume_batch().unwrap(); + + // Should return None since inner reader has no levels and needs_def_levels is false + assert!(cached_reader.get_def_levels().is_none()); + assert!(cached_reader.get_rep_levels().is_none()); + } + + #[test] + fn test_level_propagation_cache_sharing() { + let metrics = ArrowReaderMetrics::disabled(); + let cache = Arc::new(Mutex::new(RowGroupCache::new(5, usize::MAX))); + + // Producer populates cache with levels + let data = vec![1, 2, 3, 4, 5]; + let def_levels = vec![1, 0, 1, 1, 0]; + let rep_levels = vec![0, 1, 0, 1, 1]; + + let mock_reader = + MockArrayReaderWithLevels::new(data.clone(), def_levels.clone(), rep_levels.clone()); + let mut producer = CachedArrayReader::new( + Box::new(mock_reader), + cache.clone(), + 0, + CacheRole::Producer, + metrics.clone(), + true, // needs_def_levels: test level propagation + ); + + // Producer reads and populates cache + producer.read_records(5).unwrap(); + producer.consume_batch().unwrap(); + + // Consumer reads from cache - should get the same levels + let mock_reader2 = MockArrayReaderWithLevels::new( + vec![10, 20, 30, 40, 50], // Different data (shouldn't be used) + vec![0, 0, 0, 0, 0], + vec![0, 0, 0, 0, 0], + ); + let mut consumer = CachedArrayReader::new( + Box::new(mock_reader2), + cache.clone(), + 0, // Same column index + CacheRole::Consumer, + metrics, + true, // needs_def_levels: test level propagation + ); + + consumer.read_records(5).unwrap(); + let array = consumer.consume_batch().unwrap(); + + // Should get original data from cache + let int32_array = array.as_any().downcast_ref::().unwrap(); + assert_eq!(int32_array.values(), &[1, 2, 3, 4, 5]); + + // Should get original definition levels from cache + assert_eq!(consumer.get_def_levels().unwrap(), &[1, 0, 1, 1, 0]); + // Repetition levels are not cached (rep_level == 0 for cached columns) + assert!(consumer.get_rep_levels().is_none()); + } + + #[test] + fn test_level_propagation_empty_after_skip() { + let metrics = ArrowReaderMetrics::disabled(); + let cache = Arc::new(Mutex::new(RowGroupCache::new(4, usize::MAX))); + + // Producer populates cache with levels + let data = vec![1, 2, 3, 4]; + let def_levels = vec![1, 0, 1, 1]; + let rep_levels = vec![0, 1, 1, 0]; + let mock_reader = + MockArrayReaderWithLevels::new(data, def_levels.clone(), rep_levels.clone()); + let mut producer = CachedArrayReader::new( + Box::new(mock_reader), + cache.clone(), + 0, + CacheRole::Producer, + metrics.clone(), + true, // needs_def_levels: test level propagation + ); + + producer.read_records(4).unwrap(); + producer.consume_batch().unwrap(); + + // Consumer skips all rows, resulting in an empty output batch + let mock_reader2 = MockArrayReaderWithLevels::new( + vec![10, 20, 30, 40], + vec![0, 0, 0, 0], + vec![0, 0, 0, 0], + ); + let mut consumer = CachedArrayReader::new( + Box::new(mock_reader2), + cache, + 0, + CacheRole::Consumer, + metrics, + true, // needs_def_levels: test level propagation + ); + + let skipped = consumer.skip_records(4).unwrap(); + assert_eq!(skipped, 4); + + let array = consumer.consume_batch().unwrap(); + assert_eq!(array.len(), 0); + + // Definition levels should be empty (not None) after skip + let def_levels = consumer.get_def_levels().map(|l| l.to_vec()); + assert_eq!(def_levels, Some(vec![])); + // Repetition levels are not cached (rep_level == 0 for cached columns) + let rep_levels = consumer.get_rep_levels(); + assert!(rep_levels.is_none()); + } } diff --git a/parquet/src/arrow/array_reader/row_group_cache.rs b/parquet/src/arrow/array_reader/row_group_cache.rs index ef726e16495f..5d1254f3fc67 100644 --- a/parquet/src/arrow/array_reader/row_group_cache.rs +++ b/parquet/src/arrow/array_reader/row_group_cache.rs @@ -19,6 +19,58 @@ use arrow_array::{Array, ArrayRef}; use arrow_schema::DataType; use std::collections::HashMap; +/// A cached batch containing array data and optional definition/repetition levels +#[derive(Debug, Clone)] +pub struct CachedBatch { + /// The decoded array data + pub array: ArrayRef, + /// Definition levels for nullability tracking in nested structures + pub def_levels: Option>, + /// Repetition levels for list boundary tracking + pub rep_levels: Option>, +} + +impl CachedBatch { + /// Creates a new cached batch with just array data (no levels) + #[cfg(test)] + pub fn new(array: ArrayRef) -> Self { + Self { + array, + def_levels: None, + rep_levels: None, + } + } + + /// Creates a new cached batch with array data and levels + pub fn with_levels( + array: ArrayRef, + def_levels: Option>, + rep_levels: Option>, + ) -> Self { + Self { + array, + def_levels, + rep_levels, + } + } + + /// Returns the memory size of this cached batch + fn memory_size(&self) -> usize { + let array_size = get_array_memory_size_for_cache(&self.array); + let def_size = self + .def_levels + .as_ref() + .map(|l| l.capacity() * std::mem::size_of::()) + .unwrap_or(0); + let rep_size = self + .rep_levels + .as_ref() + .map(|l| l.capacity() * std::mem::size_of::()) + .unwrap_or(0); + array_size + def_size + rep_size + } +} + /// Starting row ID for this batch /// /// The `BatchID` is used to identify batches of rows within a row group. @@ -62,8 +114,8 @@ fn get_array_memory_size_for_cache(array: &ArrayRef) -> usize { /// appears in both filter predicates and output projection. #[derive(Debug)] pub struct RowGroupCache { - /// Cache storage mapping (column_idx, row_id) -> ArrayRef - cache: HashMap, + /// Cache storage mapping (column_idx, row_id) -> CachedBatch + cache: HashMap, /// Cache granularity batch_size: usize, /// Maximum cache size in bytes @@ -83,13 +135,13 @@ impl RowGroupCache { } } - /// Inserts an array into the cache for the given column and starting row ID - /// Returns true if the array was inserted, false if it would exceed the cache size limit - pub fn insert(&mut self, column_idx: usize, batch_id: BatchID, array: ArrayRef) -> bool { - let array_size = get_array_memory_size_for_cache(&array); + /// Inserts a batch into the cache for the given column and starting row ID + /// Returns true if the batch was inserted, false if it would exceed the cache size limit + pub fn insert(&mut self, column_idx: usize, batch_id: BatchID, batch: CachedBatch) -> bool { + let batch_size = batch.memory_size(); - // Check if adding this array would exceed the cache size limit - if self.current_cache_size + array_size > self.max_cache_bytes { + // Check if adding this batch would exceed the cache size limit + if self.current_cache_size + batch_size > self.max_cache_bytes { return false; // Cache is full, don't insert } @@ -98,15 +150,15 @@ impl RowGroupCache { batch_id, }; - let existing = self.cache.insert(key, array); + let existing = self.cache.insert(key, batch); assert!(existing.is_none()); - self.current_cache_size += array_size; + self.current_cache_size += batch_size; true } - /// Retrieves a cached array for the given column and row ID + /// Retrieves a cached batch for the given column and row ID /// Returns None if not found in cache - pub fn get(&self, column_idx: usize, batch_id: BatchID) -> Option { + pub fn get(&self, column_idx: usize, batch_id: BatchID) -> Option { let key = CacheKey { column_idx, batch_id, @@ -119,15 +171,15 @@ impl RowGroupCache { self.batch_size } - /// Removes a cached array for the given column and row ID + /// Removes a cached batch for the given column and row ID /// Returns true if the entry was found and removed, false otherwise pub fn remove(&mut self, column_idx: usize, batch_id: BatchID) -> bool { let key = CacheKey { column_idx, batch_id, }; - if let Some(array) = self.cache.remove(&key) { - self.current_cache_size -= get_array_memory_size_for_cache(&array); + if let Some(batch) = self.cache.remove(&key) { + self.current_cache_size -= batch.memory_size(); true } else { false @@ -147,13 +199,14 @@ mod tests { // Create test array let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])); + let batch = CachedBatch::new(array); // Test insert and get let batch_id = BatchID { val: 0 }; - assert!(cache.insert(0, batch_id, array.clone())); + assert!(cache.insert(0, batch_id, batch)); let retrieved = cache.get(0, batch_id); assert!(retrieved.is_some()); - assert_eq!(retrieved.unwrap().len(), 5); + assert_eq!(retrieved.unwrap().array.len(), 5); // Test miss let miss = cache.get(1, batch_id); @@ -172,10 +225,10 @@ mod tests { let array1: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3])); let array2: ArrayRef = Arc::new(Int32Array::from(vec![4, 5, 6])); - // Insert arrays - assert!(cache.insert(0, BatchID { val: 0 }, array1.clone())); - assert!(cache.insert(0, BatchID { val: 1000 }, array2.clone())); - assert!(cache.insert(1, BatchID { val: 0 }, array1.clone())); + // Insert batches + assert!(cache.insert(0, BatchID { val: 0 }, CachedBatch::new(array1.clone()))); + assert!(cache.insert(0, BatchID { val: 1000 }, CachedBatch::new(array2.clone()))); + assert!(cache.insert(1, BatchID { val: 0 }, CachedBatch::new(array1.clone()))); // Verify they're there assert!(cache.get(0, BatchID { val: 0 }).is_some()); diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs index 52152988166f..d03b16754222 100644 --- a/parquet/src/arrow/mod.rs +++ b/parquet/src/arrow/mod.rs @@ -419,51 +419,6 @@ impl ProjectionMask { } } } - - /// Return a new [`ProjectionMask`] that excludes any leaf columns that are - /// part of a nested type, such as struct, list, or map - /// - /// If there are no non-nested columns in the mask, returns `None` - pub(crate) fn without_nested_types(&self, schema: &SchemaDescriptor) -> Option { - let num_leaves = schema.num_columns(); - - // Count how many leaves each root column has - let num_roots = schema.root_schema().get_fields().len(); - let mut root_leaf_counts = vec![0usize; num_roots]; - for leaf_idx in 0..num_leaves { - let root_idx = schema.get_column_root_idx(leaf_idx); - root_leaf_counts[root_idx] += 1; - } - - // Keep only leaves whose root has exactly one leaf (non-nested) and is not a - // LIST. LIST is encoded as a wrapped logical type with a single leaf, e.g. - // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists - // - // ```text - // // List (list non-null, elements nullable) - // required group my_list (LIST) { - // repeated group list { - // optional binary element (STRING); - // } - // } - // ``` - let mut included_leaves = Vec::new(); - for leaf_idx in 0..num_leaves { - if self.leaf_included(leaf_idx) { - let root = schema.get_column_root(leaf_idx); - let root_idx = schema.get_column_root_idx(leaf_idx); - if root_leaf_counts[root_idx] == 1 && !root.is_list() { - included_leaves.push(leaf_idx); - } - } - } - - if included_leaves.is_empty() { - None - } else { - Some(ProjectionMask::leaves(schema, included_leaves)) - } - } } /// Lookups up the parquet column by name @@ -826,222 +781,6 @@ mod test { assert_eq!(mask1.mask, None); } - #[test] - fn test_projection_mask_without_nested_no_nested() { - // Schema with no nested types - let schema = parse_schema( - " - message test_schema { - OPTIONAL INT32 a; - OPTIONAL INT32 b; - REQUIRED DOUBLE d; - } - ", - ); - - let mask = ProjectionMask::all(); - // All columns are non-nested, but without_nested_types returns a new mask - assert_eq!( - Some(ProjectionMask::leaves(&schema, [0, 1, 2])), - mask.without_nested_types(&schema) - ); - - // select b, c - let mask = ProjectionMask::leaves(&schema, [1, 2]); - assert_eq!(Some(mask.clone()), mask.without_nested_types(&schema)); - } - - #[test] - fn test_projection_mask_without_nested_nested() { - // Schema with nested types (structs) - let schema = parse_schema( - " - message test_schema { - OPTIONAL INT32 a; - OPTIONAL group b { - REQUIRED INT32 b1; - OPTIONAL INT64 b2; - } - OPTIONAL group c (LIST) { - REPEATED group list { - OPTIONAL INT32 element; - } - } - REQUIRED DOUBLE d; - } - ", - ); - - // all leaves --> a, d - let mask = ProjectionMask::all(); - assert_eq!( - Some(ProjectionMask::leaves(&schema, [0, 4])), - mask.without_nested_types(&schema) - ); - - // b1 --> empty (it is nested) - let mask = ProjectionMask::leaves(&schema, [1]); - assert_eq!(None, mask.without_nested_types(&schema)); - - // b2, d --> d - let mask = ProjectionMask::leaves(&schema, [1, 4]); - assert_eq!( - Some(ProjectionMask::leaves(&schema, [4])), - mask.without_nested_types(&schema) - ); - - // element --> empty (it is nested) - let mask = ProjectionMask::leaves(&schema, [3]); - assert_eq!(None, mask.without_nested_types(&schema)); - } - - #[test] - fn test_projection_mask_without_nested_map_only() { - // Example from https://github.com/apache/parquet-format/blob/master/LogicalTypes.md - let schema = parse_schema( - " - message test_schema { - required group my_map (MAP) { - repeated group key_value { - required binary key (STRING); - optional int32 value; - } - } - } - ", - ); - - let mask = ProjectionMask::all(); - assert_eq!(None, mask.without_nested_types(&schema)); - - // key --> empty (it is nested) - let mask = ProjectionMask::leaves(&schema, [0]); - assert_eq!(None, mask.without_nested_types(&schema)); - - // value --> empty (it is nested) - let mask = ProjectionMask::leaves(&schema, [1]); - assert_eq!(None, mask.without_nested_types(&schema)); - } - - #[test] - fn test_projection_mask_without_nested_map_with_non_nested() { - // Example from https://github.com/apache/parquet-format/blob/master/LogicalTypes.md - // with an additional non-nested field - let schema = parse_schema( - " - message test_schema { - REQUIRED INT32 a; - required group my_map (MAP) { - repeated group key_value { - required binary key (STRING); - optional int32 value; - } - } - REQUIRED INT32 b; - } - ", - ); - - // all leaves --> a, b which are the only non nested ones - let mask = ProjectionMask::all(); - assert_eq!( - Some(ProjectionMask::leaves(&schema, [0, 3])), - mask.without_nested_types(&schema) - ); - - // key, value, b --> b (the only non-nested one) - let mask = ProjectionMask::leaves(&schema, [1, 2, 3]); - assert_eq!( - Some(ProjectionMask::leaves(&schema, [3])), - mask.without_nested_types(&schema) - ); - - // key, value --> NONE - let mask = ProjectionMask::leaves(&schema, [1, 2]); - assert_eq!(None, mask.without_nested_types(&schema)); - } - - #[test] - fn test_projection_mask_without_nested_deeply_nested() { - // Map of Maps - let schema = parse_schema( - " - message test_schema { - OPTIONAL group a (MAP) { - REPEATED group key_value { - REQUIRED BYTE_ARRAY key (UTF8); - OPTIONAL group value (MAP) { - REPEATED group key_value { - REQUIRED INT32 key; - REQUIRED BOOLEAN value; - } - } - } - } - REQUIRED INT32 b; - REQUIRED DOUBLE c; - ", - ); - - let mask = ProjectionMask::all(); - assert_eq!( - Some(ProjectionMask::leaves(&schema, [3, 4])), - mask.without_nested_types(&schema) - ); - - // (first) key, c --> c (the only non-nested one) - let mask = ProjectionMask::leaves(&schema, [0, 4]); - assert_eq!( - Some(ProjectionMask::leaves(&schema, [4])), - mask.without_nested_types(&schema) - ); - - // (second) key, value, b --> b (the only non-nested one) - let mask = ProjectionMask::leaves(&schema, [1, 2, 3]); - assert_eq!( - Some(ProjectionMask::leaves(&schema, [3])), - mask.without_nested_types(&schema) - ); - - // key --> NONE (the only non-nested one) - let mask = ProjectionMask::leaves(&schema, [0]); - assert_eq!(None, mask.without_nested_types(&schema)); - } - - #[test] - fn test_projection_mask_without_nested_list() { - // Example from https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists - let schema = parse_schema( - " - message test_schema { - required group my_list (LIST) { - repeated group list { - optional binary element (STRING); - } - } - REQUIRED INT32 b; - } - ", - ); - - let mask = ProjectionMask::all(); - assert_eq!( - Some(ProjectionMask::leaves(&schema, [1])), - mask.without_nested_types(&schema), - ); - - // element --> empty (it is nested) - let mask = ProjectionMask::leaves(&schema, [0]); - assert_eq!(None, mask.without_nested_types(&schema)); - - // element, b --> b (it is nested) - let mask = ProjectionMask::leaves(&schema, [0, 1]); - assert_eq!( - Some(ProjectionMask::leaves(&schema, [1])), - mask.without_nested_types(&schema), - ); - } - /// Converts a schema string into a `SchemaDescriptor` fn parse_schema(schema: &str) -> SchemaDescriptor { let parquet_group_type = parse_message_type(schema).unwrap(); diff --git a/parquet/src/arrow/push_decoder/reader_builder/mod.rs b/parquet/src/arrow/push_decoder/reader_builder/mod.rs index 61a244589c6d..22978c8b4b8f 100644 --- a/parquet/src/arrow/push_decoder/reader_builder/mod.rs +++ b/parquet/src/arrow/push_decoder/reader_builder/mod.rs @@ -636,12 +636,7 @@ impl RowGroupReaderBuilder { cache_projection.union(predicate.projection()); } cache_projection.intersect(&self.projection); - self.exclude_nested_columns_from_cache(&cache_projection) - } - - /// Exclude leaves belonging to roots that span multiple parquet leaves (i.e. nested columns) - fn exclude_nested_columns_from_cache(&self, mask: &ProjectionMask) -> Option { - mask.without_nested_types(self.metadata.file_metadata().schema_descr()) + Some(cache_projection) } /// Get the offset index for the specified row group, if any diff --git a/parquet/tests/arrow_reader/predicate_cache.rs b/parquet/tests/arrow_reader/predicate_cache.rs index b419c37158dc..cae526f6b43a 100644 --- a/parquet/tests/arrow_reader/predicate_cache.rs +++ b/parquet/tests/arrow_reader/predicate_cache.rs @@ -20,10 +20,10 @@ use arrow::array::ArrayRef; use arrow::array::Int64Array; use arrow::compute::and; -use arrow::compute::kernels::cmp::{gt, lt}; +use arrow::compute::kernels::cmp::{eq, gt, lt}; use arrow_array::cast::AsArray; use arrow_array::types::Int64Type; -use arrow_array::{RecordBatch, StringArray, StringViewArray, StructArray}; +use arrow_array::{Array, RecordBatch, StringArray, StringViewArray, StructArray}; use arrow_schema::{DataType, Field}; use bytes::Bytes; use futures::future::BoxFuture; @@ -85,14 +85,470 @@ async fn test_cache_disabled_with_filters() { } #[tokio::test] -async fn test_cache_projection_excludes_nested_columns() { - let test = ParquetPredicateCacheTest::new_nested().with_expected_records_read_from_cache(0); +async fn test_async_cache_with_nested_columns() { + // Nested columns now work with cache - expect records from cache + // 100 rows × 2 leaf columns (b.aa, b.bb) = 200 records + let test = ParquetPredicateCacheTest::new_nested().with_expected_records_read_from_cache(200); + let async_builder = test.async_builder().await.add_nested_root_filter(); + test.run_async(async_builder).await; +} - let sync_builder = test.sync_builder().add_nested_filter(); - test.run_sync(sync_builder); +/// Test RowSelectionPolicy impact on cache reads with a struct filter that selects sparse rows. +/// +/// Filter: `bb % 2 == 0 AND (id < 25 OR id > 75)` selects rows at both ends (sparse, non-contiguous) +/// Filter mask: `[id, b]` (3 leaf columns: id, b.aa, b.bb) +/// Projection: `[id, b]` +/// +/// Both policies must return identical row counts and data, but differ in cache reads: +/// - Auto (Mask strategy): reads more rows due to covering the range +/// - Selectors: reads only the selected rows +#[tokio::test] +async fn test_struct_filter_sparse_selection_policy() { + use arrow::compute::kernels::numeric::rem; + use parquet::arrow::arrow_reader::RowSelectionPolicy; + + // Helper to create the filter: bb % 2 == 0 AND (id < 25 OR id > 75) + fn make_filter( + schema_descr: &std::sync::Arc, + ) -> RowFilter { + let filter_mask = ProjectionMask::roots(schema_descr, [0, 1]); // id and b + let row_filter = ArrowPredicateFn::new(filter_mask, |batch: RecordBatch| { + let id = batch.column(0).as_primitive::(); + // id < 25 OR id > 75 (sparse: both ends) + let id_sparse = arrow::compute::or( + <(id, &Int64Array::new_scalar(25))?, + >(id, &Int64Array::new_scalar(75))?, + )?; + + let struct_col = batch.column(1).as_struct(); + let bb = struct_col.column_by_name("bb").unwrap(); + let bb = bb.as_primitive::(); + let remainder = rem(bb, &arrow_array::Int32Array::new_scalar(2))?; + let bb_even = eq(&remainder, &arrow_array::Int32Array::new_scalar(0))?; + + and(&id_sparse, &bb_even) + }); + RowFilter::new(vec![Box::new(row_filter)]) + } - let async_builder = test.async_builder().await.add_nested_filter(); - test.run_async(async_builder).await; + // Test with Auto policy + let (auto_rows, auto_cache) = { + let test = ParquetPredicateCacheTest::new_nested_nullable(); + let async_builder = test.async_builder().await; + let schema_descr = async_builder.metadata().file_metadata().schema_descr_ptr(); + let projection = ProjectionMask::roots(&schema_descr, [0, 1]); + + let async_builder = async_builder + .with_projection(projection) + .with_row_filter(make_filter(&schema_descr)) + .with_row_selection_policy(RowSelectionPolicy::default()); // Auto + + let metrics = ArrowReaderMetrics::enabled(); + let mut stream = async_builder.with_metrics(metrics.clone()).build().unwrap(); + let mut total_rows = 0; + while let Some(batch) = stream.next().await { + total_rows += batch.expect("Error").num_rows(); + } + (total_rows, metrics.records_read_from_cache().unwrap()) + }; + + // Test with Selectors policy + let (selectors_rows, selectors_cache) = { + let test = ParquetPredicateCacheTest::new_nested_nullable(); + let async_builder = test.async_builder().await; + let schema_descr = async_builder.metadata().file_metadata().schema_descr_ptr(); + let projection = ProjectionMask::roots(&schema_descr, [0, 1]); + + let async_builder = async_builder + .with_projection(projection) + .with_row_filter(make_filter(&schema_descr)) + .with_row_selection_policy(RowSelectionPolicy::Selectors); + + let metrics = ArrowReaderMetrics::enabled(); + let mut stream = async_builder.with_metrics(metrics.clone()).build().unwrap(); + let mut total_rows = 0; + while let Some(batch) = stream.next().await { + total_rows += batch.expect("Error").num_rows(); + } + (total_rows, metrics.records_read_from_cache().unwrap()) + }; + + eprintln!("Sparse struct filter:"); + eprintln!(" Auto policy: rows={auto_rows}, cache={auto_cache}"); + eprintln!(" Selectors policy: rows={selectors_rows}, cache={selectors_cache}"); + + // Both policies must return identical row counts + assert_eq!( + auto_rows, selectors_rows, + "Both policies must return same row count" + ); + + // Selectors policy reads exactly: filtered_rows × 3 leaf columns + assert_eq!( + selectors_cache, + selectors_rows * 3, + "Selectors policy: cache reads = rows × 3 columns" + ); + + // Auto policy reads more due to Mask strategy covering the range + assert!( + auto_cache >= selectors_cache, + "Auto policy should read >= Selectors policy" + ); +} + +/// Test struct field projections with a filter on id (non-struct column). +/// +/// Filter: `id > 50` (filter mask includes only `id`) +/// Tests various projections to verify cache behavior: +/// - Only columns in (filter_mask ∩ projection) are read from cache +/// - Since filter only uses `id`, struct columns are never cached +/// +/// Schema leaf columns: 0=id, 1=b.aa, 2=b.bb +#[tokio::test] +async fn test_struct_field_projections_filter_on_id() { + // Expected: 49 rows (id 51-99) + const EXPECTED_ROWS: usize = 49; + + // Test cases: (projection_leaves, expected_cache_reads, description) + let test_cases: &[(&[usize], usize, &str)] = &[ + ( + &[0, 2], + EXPECTED_ROWS, + "[id, b.bb]: id in filter mask → cached", + ), + ( + &[1, 2], + 0, + "[b.aa, b.bb]: neither in filter mask → not cached", + ), + ( + &[0, 1, 2], + EXPECTED_ROWS, + "[id, b]: id in filter mask → cached", + ), + (&[2], 0, "[b.bb]: not in filter mask → not cached"), + (&[0], EXPECTED_ROWS, "[id]: id in filter mask → cached"), + ]; + + for (projection_leaves, expected_cache, description) in test_cases { + let test = ParquetPredicateCacheTest::new_nested_nullable(); + let async_builder = test.async_builder().await; + let schema_descr = async_builder.metadata().file_metadata().schema_descr_ptr(); + + // Filter on id only: id > 50 + let filter_mask = ProjectionMask::leaves(&schema_descr, [0]); // only id + let row_filter = ArrowPredicateFn::new(filter_mask, |batch: RecordBatch| { + let id = batch.column(0).as_primitive::(); + gt(id, &Int64Array::new_scalar(50)) + }); + + let projection = ProjectionMask::leaves(&schema_descr, projection_leaves.iter().copied()); + let async_builder = async_builder + .with_projection(projection) + .with_row_filter(RowFilter::new(vec![Box::new(row_filter)])); + + let metrics = ArrowReaderMetrics::enabled(); + let mut stream = async_builder.with_metrics(metrics.clone()).build().unwrap(); + let mut total_rows = 0; + while let Some(batch) = stream.next().await { + total_rows += batch.expect("Error").num_rows(); + } + + let cache_reads = metrics.records_read_from_cache().unwrap(); + eprintln!( + "Filter id>50, projection {:?}: rows={total_rows}, cache={cache_reads} ({})", + projection_leaves, description + ); + + assert_eq!( + total_rows, EXPECTED_ROWS, + "Expected {EXPECTED_ROWS} rows for {description}" + ); + assert_eq!( + cache_reads, *expected_cache, + "Cache reads mismatch for {description}" + ); + } +} + +/// Test struct field projections with a filter on bb (struct field). +/// +/// Filter: `bb > 50` (filter mask includes `b` which means both b.aa and b.bb leaves) +/// Tests various projections with both RowSelectionPolicy options. +/// +/// Schema leaf columns: 0=id, 1=b.aa, 2=b.bb +/// Filter mask: roots([1]) = leaves([1, 2]) +/// +/// Cache reads (Selectors policy) = rows × (filter_mask ∩ projection).len() +#[tokio::test] +async fn test_struct_field_projections_filter_on_bb() { + use parquet::arrow::arrow_reader::RowSelectionPolicy; + + // Helper to create the filter: bb > 50 + // Filter mask includes the entire struct `b` (both b.aa and b.bb) + fn make_filter( + schema_descr: &std::sync::Arc, + ) -> RowFilter { + let filter_mask = ProjectionMask::roots(schema_descr, [1]); // struct b (includes b.aa, b.bb) + let row_filter = ArrowPredicateFn::new(filter_mask, |batch: RecordBatch| { + let struct_col = batch.column(0).as_struct(); + let bb = struct_col.column_by_name("bb").unwrap(); + let bb = bb.as_primitive::(); + gt(bb, &arrow_array::Int32Array::new_scalar(50)) + }); + RowFilter::new(vec![Box::new(row_filter)]) + } + + // Test cases: (projection_leaves, cached_leaf_count, description) + // cached_leaf_count = number of leaves in (filter_mask ∩ projection) + // filter_mask = leaves [1, 2] (b.aa, b.bb) + let test_cases: &[(&[usize], usize, &str)] = &[ + (&[0, 2], 1, "[id, b.bb]: b.bb in filter mask → 1 cached"), + (&[1, 2], 2, "[b.aa, b.bb]: both in filter mask → 2 cached"), + ( + &[0, 1, 2], + 2, + "[id, b]: b.aa, b.bb in filter mask → 2 cached", + ), + (&[2], 1, "[b.bb]: in filter mask → 1 cached"), + (&[0], 0, "[id]: not in filter mask → 0 cached"), + ]; + + for (projection_leaves, cached_leaf_count, description) in test_cases { + // Test with Selectors policy (exact cache reads) + let (selectors_rows, selectors_cache) = { + let test = ParquetPredicateCacheTest::new_nested_nullable(); + let async_builder = test.async_builder().await; + let schema_descr = async_builder.metadata().file_metadata().schema_descr_ptr(); + + let projection = + ProjectionMask::leaves(&schema_descr, projection_leaves.iter().copied()); + let async_builder = async_builder + .with_projection(projection) + .with_row_filter(make_filter(&schema_descr)) + .with_row_selection_policy(RowSelectionPolicy::Selectors); + + let metrics = ArrowReaderMetrics::enabled(); + let mut stream = async_builder.with_metrics(metrics.clone()).build().unwrap(); + let mut total_rows = 0; + while let Some(batch) = stream.next().await { + total_rows += batch.expect("Error").num_rows(); + } + (total_rows, metrics.records_read_from_cache().unwrap()) + }; + + // Test with Auto policy + let (auto_rows, auto_cache) = { + let test = ParquetPredicateCacheTest::new_nested_nullable(); + let async_builder = test.async_builder().await; + let schema_descr = async_builder.metadata().file_metadata().schema_descr_ptr(); + + let projection = + ProjectionMask::leaves(&schema_descr, projection_leaves.iter().copied()); + let async_builder = async_builder + .with_projection(projection) + .with_row_filter(make_filter(&schema_descr)) + .with_row_selection_policy(RowSelectionPolicy::default()); + + let metrics = ArrowReaderMetrics::enabled(); + let mut stream = async_builder.with_metrics(metrics.clone()).build().unwrap(); + let mut total_rows = 0; + while let Some(batch) = stream.next().await { + total_rows += batch.expect("Error").num_rows(); + } + (total_rows, metrics.records_read_from_cache().unwrap()) + }; + + eprintln!( + "Filter bb>50, projection {:?}: Selectors(rows={}, cache={}), Auto(rows={}, cache={}) ({})", + projection_leaves, selectors_rows, selectors_cache, auto_rows, auto_cache, description + ); + + // Both policies must return identical row counts + assert_eq!( + selectors_rows, auto_rows, + "Row count mismatch between policies for {description}" + ); + + // Selectors policy: exact cache reads = rows × cached_leaf_count + let expected_selectors_cache = selectors_rows * cached_leaf_count; + assert_eq!( + selectors_cache, expected_selectors_cache, + "Selectors cache mismatch for {description}: expected {expected_selectors_cache}" + ); + + // Auto policy reads >= Selectors due to Mask strategy + assert!( + auto_cache >= selectors_cache, + "Auto should read >= Selectors for {description}" + ); + } +} + +/// Test that List->Struct fields are NOT cached (rep_level > 0). +/// +/// Schema: +/// - id: Int64 (leaf 0, rep_level=0 - CAN be cached) +/// - list_col: List +/// - struct_field_a (leaf 1, rep_level=1 - NOT cached) +/// - struct_field_b (leaf 2, rep_level=1 - NOT cached) +/// +/// Filter: `id > 50` (filter mask includes only `id`) +/// Expected: 49 rows (id 51-99) +/// +/// Cache behavior: +/// - Only `id` (leaf 0) is cached because rep_level=0 +/// - List struct fields (leaves 1,2) are NOT cached because rep_level > 0 +#[tokio::test] +async fn test_list_struct_fields_not_cached_filter_on_id() { + const EXPECTED_ROWS: usize = 49; + + // Test cases: (projection_leaves, expected_cache_reads, description) + // Filter mask = [id] (leaf 0), so only id can be cached + let test_cases: &[(&[usize], usize, &str)] = &[ + ( + &[0, 1, 2], + EXPECTED_ROWS, + "[id, list_col]: only id cached (rep_level=0)", + ), + ( + &[1, 2], + 0, + "[list_col]: nothing cached (rep_level > 0 for list fields)", + ), + (&[0], EXPECTED_ROWS, "[id]: id cached (rep_level=0)"), + (&[1], 0, "[struct_field_a]: not cached (rep_level > 0)"), + (&[2], 0, "[struct_field_b]: not cached (rep_level > 0)"), + ]; + + for (projection_leaves, expected_cache, description) in test_cases { + let test = ParquetPredicateCacheTest::new_list_struct(); + let async_builder = test.async_builder().await; + let schema_descr = async_builder.metadata().file_metadata().schema_descr_ptr(); + + // Filter on id only: id > 50 + let filter_mask = ProjectionMask::leaves(&schema_descr, [0]); // only id + let row_filter = ArrowPredicateFn::new(filter_mask, |batch: RecordBatch| { + let id = batch.column(0).as_primitive::(); + gt(id, &Int64Array::new_scalar(50)) + }); + + let projection = ProjectionMask::leaves(&schema_descr, projection_leaves.iter().copied()); + let async_builder = async_builder + .with_projection(projection) + .with_row_filter(RowFilter::new(vec![Box::new(row_filter)])); + + let metrics = ArrowReaderMetrics::enabled(); + let mut stream = async_builder.with_metrics(metrics.clone()).build().unwrap(); + let mut total_rows = 0; + while let Some(batch) = stream.next().await { + total_rows += batch.expect("Error").num_rows(); + } + + let cache_reads = metrics.records_read_from_cache().unwrap(); + eprintln!( + "List struct filter id>50, projection {:?}: rows={total_rows}, cache={cache_reads} ({})", + projection_leaves, description + ); + + assert_eq!( + total_rows, EXPECTED_ROWS, + "Expected {EXPECTED_ROWS} rows for {description}" + ); + assert_eq!( + cache_reads, *expected_cache, + "Cache reads mismatch for {description}" + ); + } +} + +/// Test that filtering on List->Struct fields also results in 0 cache hits. +/// +/// Filter: on struct_field_b (field inside list->struct) +/// Filter mask: [list_col] (includes leaves 1,2) +/// +/// Since all leaves in the filter mask have rep_level > 0, nothing is cached. +#[tokio::test] +async fn test_list_struct_fields_not_cached_filter_on_struct_field() { + // Filter on struct_field_b > 500 (i.e., rows where i*10 + j > 500, so i > 50) + // This selects rows with id >= 51, so ~49 rows + + // Test cases: (projection_leaves, description) + // All should have 0 cache reads because filter mask leaves have rep_level > 0 + let test_cases: &[(&[usize], &str)] = &[ + (&[0, 1, 2], "[id, list_col]: filter mask has rep_level > 0"), + (&[1, 2], "[list_col]: filter mask has rep_level > 0"), + ( + &[0], + "[id]: id not in filter mask, filter leaves have rep_level > 0", + ), + (&[1], "[struct_field_a]: rep_level > 0"), + (&[2], "[struct_field_b]: rep_level > 0"), + ]; + + for (projection_leaves, description) in test_cases { + let test = ParquetPredicateCacheTest::new_list_struct(); + let async_builder = test.async_builder().await; + let schema_descr = async_builder.metadata().file_metadata().schema_descr_ptr(); + + // Filter on struct_field_b: check if any element has struct_field_b > 500 + // Filter mask includes the entire list_col (leaves 1, 2) + let filter_mask = ProjectionMask::leaves(&schema_descr, [1, 2]); + let row_filter = ArrowPredicateFn::new(filter_mask, |batch: RecordBatch| { + // batch.column(0) is list_col + let list = batch.column(0).as_list::(); + + let result: Vec = (0..batch.num_rows()) + .map(|row| { + if list.is_null(row) { + return false; + } + let struct_array = list.value(row); + let struct_arr = struct_array.as_struct(); + let field_b = struct_arr + .column_by_name("struct_field_b") + .unwrap() + .as_primitive::(); + + // Check if any element has struct_field_b > 500 + (0..field_b.len()).any(|i| !field_b.is_null(i) && field_b.value(i) > 500) + }) + .collect(); + + Ok(arrow_array::BooleanArray::from(result)) + }); + + let projection = ProjectionMask::leaves(&schema_descr, projection_leaves.iter().copied()); + let async_builder = async_builder + .with_projection(projection) + .with_row_filter(RowFilter::new(vec![Box::new(row_filter)])); + + let metrics = ArrowReaderMetrics::enabled(); + let mut stream = async_builder.with_metrics(metrics.clone()).build().unwrap(); + let mut total_rows = 0; + while let Some(batch) = stream.next().await { + total_rows += batch.expect("Error").num_rows(); + } + + let cache_reads = metrics.records_read_from_cache().unwrap(); + eprintln!( + "List struct filter on struct_field_b, projection {:?}: rows={total_rows}, cache={cache_reads} ({})", + projection_leaves, description + ); + + // All cases should have 0 cache reads because filter mask leaves have rep_level > 0 + assert_eq!( + cache_reads, 0, + "Expected 0 cache reads for {description}, got {cache_reads}" + ); + + // Should have some rows (those where any struct_field_b > 500) + assert!( + total_rows > 0, + "Expected some rows for {description}, got {total_rows}" + ); + } } // -- Begin test infrastructure -- @@ -131,6 +587,26 @@ impl ParquetPredicateCacheTest { } } + /// Create a new test file with nullable nested struct. + /// + /// See [`NESTED_NULLABLE_TEST_FILE_DATA`] for data details. + fn new_nested_nullable() -> Self { + Self { + bytes: NESTED_NULLABLE_TEST_FILE_DATA.clone(), + expected_records_read_from_cache: 0, + } + } + + /// Create a new test file with List containing Struct elements. + /// + /// See [`LIST_STRUCT_TEST_FILE_DATA`] for data details. + fn new_list_struct() -> Self { + Self { + bytes: LIST_STRUCT_TEST_FILE_DATA.clone(), + expected_records_read_from_cache: 0, + } + } + /// Set the expected number of records read from the cache fn with_expected_records_read_from_cache( mut self, @@ -240,6 +716,75 @@ static TEST_FILE_DATA: LazyLock = LazyLock::new(|| { Bytes::from(output) }); +/// Build a ParquetFile with nullable nested struct for testing row filters with definition levels. +/// +/// Schema: +/// * `id: Int64` +/// * `b: Struct { aa: String (nullable), bb: Int32 (nullable) }` (nullable struct) +/// +/// Null patterns: +/// - Every 3rd row (i % 3 == 0): aa is null +/// - Every 5th row (i % 5 == 0): bb is null +/// - Every 7th row (i % 7 == 0): entire struct is null (overrides field nulls) +/// +/// The non-null values are: +/// - aa = "v{i}" +/// - bb = i as i32 +/// +/// Where i is the row index from 0 to 99 +static NESTED_NULLABLE_TEST_FILE_DATA: LazyLock = LazyLock::new(|| { + use arrow_array::Int32Array; + use arrow_buffer::NullBuffer; + use arrow_schema::Fields; + + const NUM_ROWS: usize = 100; + + // id column + let id: Int64Array = (0..NUM_ROWS as i64).collect(); + + // aa: String column - null every 3rd row + let aa: StringArray = (0..NUM_ROWS) + .map(|i| { + if i % 3 == 0 { + None + } else { + Some(format!("v{i}")) + } + }) + .collect(); + + // bb: Int32 column - null every 5th row + let bb: Int32Array = (0..NUM_ROWS) + .map(|i| if i % 5 == 0 { None } else { Some(i as i32) }) + .collect(); + + // Struct null buffer - null every 7th row + let struct_nulls: Vec = (0..NUM_ROWS).map(|i| i % 7 != 0).collect(); + let struct_null_buffer = NullBuffer::from(struct_nulls); + + // Create struct with null buffer + let b = StructArray::new( + Fields::from(vec![ + Field::new("aa", DataType::Utf8, true), + Field::new("bb", DataType::Int32, true), + ]), + vec![Arc::new(aa) as ArrayRef, Arc::new(bb) as ArrayRef], + Some(struct_null_buffer), + ); + + let input_batch = RecordBatch::try_from_iter([ + ("id", Arc::new(id) as ArrayRef), + ("b", Arc::new(b) as ArrayRef), + ]) + .unwrap(); + + let mut output = Vec::new(); + let mut writer = ArrowWriter::try_new(&mut output, input_batch.schema(), None).unwrap(); + writer.write(&input_batch).unwrap(); + writer.close().unwrap(); + Bytes::from(output) +}); + /// Build a ParquetFile with a /// /// * string column `a` @@ -276,15 +821,81 @@ static NESTED_TEST_FILE_DATA: LazyLock = LazyLock::new(|| { Bytes::from(output) }); +/// Build a ParquetFile with a List containing Struct elements. +/// +/// Schema: +/// * `id: Int64` (leaf 0, rep_level=0 - CAN be cached) +/// * `list_col: List` +/// - struct_field_a (leaf 1, rep_level=1 - NOT cached) +/// - struct_field_b (leaf 2, rep_level=1 - NOT cached) +/// +/// Data: +/// - 100 rows (id 0..99) +/// - Each row has (i % 3 + 1) list elements (1-3 elements per row) +/// - struct_field_a = "val_{i}_{j}" where i=row, j=element index +/// - struct_field_b = i * 10 + j +static LIST_STRUCT_TEST_FILE_DATA: LazyLock = LazyLock::new(|| { + use arrow_array::builder::{Int32Builder, ListBuilder, StringBuilder, StructBuilder}; + + const NUM_ROWS: usize = 100; + + // id column + let id: Int64Array = (0..NUM_ROWS as i64).collect(); + + // Build list of structs + let struct_fields = vec![ + Field::new("struct_field_a", DataType::Utf8, true), + Field::new("struct_field_b", DataType::Int32, true), + ]; + let mut list_builder = ListBuilder::new(StructBuilder::new( + struct_fields.clone(), + vec![ + Box::new(StringBuilder::new()) as Box, + Box::new(Int32Builder::new()) as Box, + ], + )); + + for i in 0..NUM_ROWS { + let num_elements = (i % 3) + 1; // 1, 2, or 3 elements + for j in 0..num_elements { + let struct_builder = list_builder.values(); + struct_builder + .field_builder::(0) + .unwrap() + .append_value(format!("val_{}_{}", i, j)); + struct_builder + .field_builder::(1) + .unwrap() + .append_value((i * 10 + j) as i32); + struct_builder.append(true); + } + list_builder.append(true); + } + + let list_col = list_builder.finish(); + + let input_batch = RecordBatch::try_from_iter([ + ("id", Arc::new(id) as ArrayRef), + ("list_col", Arc::new(list_col) as ArrayRef), + ]) + .unwrap(); + + let mut output = Vec::new(); + let mut writer = ArrowWriter::try_new(&mut output, input_batch.schema(), None).unwrap(); + writer.write(&input_batch).unwrap(); + writer.close().unwrap(); + Bytes::from(output) +}); + trait ArrowReaderBuilderExt { /// Applies the following: /// 1. a projection selecting the "a" and "b" column /// 2. a row_filter applied to "b": 575 < "b" < 625 (select 1 data page from each row group) fn add_project_ab_and_filter_b(self) -> Self; - /// Adds a row filter that projects the nested leaf column "b.aa" and + /// Adds a row filter that projects the nested ROOT column "b" and /// returns true for all rows. - fn add_nested_filter(self) -> Self; + fn add_nested_root_filter(self) -> Self; } impl ArrowReaderBuilderExt for ArrowReaderBuilder { @@ -306,14 +917,13 @@ impl ArrowReaderBuilderExt for ArrowReaderBuilder { .with_row_filter(RowFilter::new(vec![Box::new(row_filter)])) } - fn add_nested_filter(self) -> Self { + fn add_nested_root_filter(self) -> Self { let schema_descr = self.metadata().file_metadata().schema_descr_ptr(); - // Build a RowFilter whose predicate projects a leaf under the nested root `b` - // Leaf indices are depth-first; with schema [a, b.aa, b.bb] we pick index 1 (b.aa) - let nested_leaf_mask = ProjectionMask::leaves(&schema_descr, vec![1]); + // Project the ROOT struct column "b", not just leaf "b.aa" + let root_mask = ProjectionMask::roots(&schema_descr, [1]); // column index 1 = "b" - let always_true = ArrowPredicateFn::new(nested_leaf_mask.clone(), |batch: RecordBatch| { + let always_true = ArrowPredicateFn::new(root_mask.clone(), |batch: RecordBatch| { Ok(arrow_array::BooleanArray::from(vec![ true; batch.num_rows() @@ -321,8 +931,7 @@ impl ArrowReaderBuilderExt for ArrowReaderBuilder { }); let row_filter = RowFilter::new(vec![Box::new(always_true)]); - self.with_projection(nested_leaf_mask) - .with_row_filter(row_filter) + self.with_projection(root_mask).with_row_filter(row_filter) } }