diff --git a/parquet/src/arrow/array_reader/builder.rs b/parquet/src/arrow/array_reader/builder.rs index 6dcf05ccf8ad..d5e36fbcb486 100644 --- a/parquet/src/arrow/array_reader/builder.rs +++ b/parquet/src/arrow/array_reader/builder.rs @@ -15,18 +15,22 @@ // specific language governing permissions and limitations // under the License. -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use arrow_schema::{DataType, Fields, SchemaBuilder}; use crate::arrow::array_reader::byte_view_array::make_byte_view_array_reader; +use crate::arrow::array_reader::cached_array_reader::CacheRole; +use crate::arrow::array_reader::cached_array_reader::CachedArrayReader; use crate::arrow::array_reader::empty_array::make_empty_array_reader; use crate::arrow::array_reader::fixed_len_byte_array::make_fixed_len_byte_array_reader; +use crate::arrow::array_reader::row_group_cache::RowGroupCache; use crate::arrow::array_reader::{ make_byte_array_dictionary_reader, make_byte_array_reader, ArrayReader, FixedSizeListArrayReader, ListArrayReader, MapArrayReader, NullArrayReader, PrimitiveArrayReader, RowGroups, StructArrayReader, }; +use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics; use crate::arrow::schema::{ParquetField, ParquetFieldType}; use crate::arrow::ProjectionMask; use crate::basic::Type as PhysicalType; @@ -34,14 +38,74 @@ use crate::data_type::{BoolType, DoubleType, FloatType, Int32Type, Int64Type, In use crate::errors::{ParquetError, Result}; use crate::schema::types::{ColumnDescriptor, ColumnPath, Type}; +/// Builder for [`CacheOptions`] +#[derive(Debug, Clone)] +pub struct CacheOptionsBuilder<'a> { + /// Projection mask to apply to the cache + pub projection_mask: &'a ProjectionMask, + /// Cache to use for storing row groups + pub cache: Arc>, +} + +impl<'a> CacheOptionsBuilder<'a> { + /// create a new cache options builder + pub fn new(projection_mask: &'a ProjectionMask, cache: Arc>) -> Self { + Self { + projection_mask, + cache, + } + } + + /// Return a new [`CacheOptions`] for producing (populating) the cache + pub fn producer(self) -> CacheOptions<'a> { + CacheOptions { + projection_mask: self.projection_mask, + cache: self.cache, + role: CacheRole::Producer, + } + } + + /// return a new [`CacheOptions`] for consuming (reading) the cache + pub fn consumer(self) -> CacheOptions<'a> { + CacheOptions { + projection_mask: self.projection_mask, + cache: self.cache, + role: CacheRole::Consumer, + } + } +} + +/// Cache options containing projection mask, cache, and role +#[derive(Clone)] +pub struct CacheOptions<'a> { + pub projection_mask: &'a ProjectionMask, + pub cache: Arc>, + pub role: CacheRole, +} + /// Builds [`ArrayReader`]s from parquet schema, projection mask, and RowGroups reader pub struct ArrayReaderBuilder<'a> { + /// Source of row group data row_groups: &'a dyn RowGroups, + /// Optional cache options for the array reader + cache_options: Option<&'a CacheOptions<'a>>, + /// metrics + metrics: &'a ArrowReaderMetrics, } impl<'a> ArrayReaderBuilder<'a> { - pub fn new(row_groups: &'a dyn RowGroups) -> Self { - Self { row_groups } + pub fn new(row_groups: &'a dyn RowGroups, metrics: &'a ArrowReaderMetrics) -> Self { + Self { + row_groups, + cache_options: None, + metrics, + } + } + + /// Add cache options to the builder + pub fn with_cache_options(mut self, cache_options: Option<&'a CacheOptions<'a>>) -> Self { + self.cache_options = cache_options; + self } /// Create [`ArrayReader`] from parquet schema, projection mask, and parquet file reader. @@ -69,7 +133,26 @@ impl<'a> ArrayReaderBuilder<'a> { mask: &ProjectionMask, ) -> Result>> { match field.field_type { - ParquetFieldType::Primitive { .. } => self.build_primitive_reader(field, mask), + ParquetFieldType::Primitive { col_idx, .. } => { + let Some(reader) = self.build_primitive_reader(field, mask)? else { + return Ok(None); + }; + let Some(cache_options) = self.cache_options.as_ref() else { + return Ok(Some(reader)); + }; + + if cache_options.projection_mask.leaf_included(col_idx) { + Ok(Some(Box::new(CachedArrayReader::new( + reader, + Arc::clone(&cache_options.cache), + col_idx, + cache_options.role, + self.metrics.clone(), // cheap clone + )))) + } else { + Ok(Some(reader)) + } + } ParquetFieldType::Group { .. } => match &field.arrow_type { DataType::Map(_, _) => self.build_map_reader(field, mask), DataType::Struct(_) => self.build_struct_reader(field, mask), @@ -375,7 +458,8 @@ mod tests { ) .unwrap(); - let array_reader = ArrayReaderBuilder::new(&file_reader) + let metrics = ArrowReaderMetrics::disabled(); + let array_reader = ArrayReaderBuilder::new(&file_reader, &metrics) .build_array_reader(fields.as_ref(), &mask) .unwrap(); diff --git a/parquet/src/arrow/array_reader/cached_array_reader.rs b/parquet/src/arrow/array_reader/cached_array_reader.rs new file mode 100644 index 000000000000..0e837782faf5 --- /dev/null +++ b/parquet/src/arrow/array_reader/cached_array_reader.rs @@ -0,0 +1,762 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`CachedArrayReader`] wrapper around [`ArrayReader`] + +use crate::arrow::array_reader::row_group_cache::BatchID; +use crate::arrow::array_reader::{row_group_cache::RowGroupCache, ArrayReader}; +use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics; +use crate::errors::Result; +use arrow_array::{new_empty_array, ArrayRef, BooleanArray}; +use arrow_buffer::BooleanBufferBuilder; +use arrow_schema::DataType as ArrowType; +use std::any::Any; +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; + +/// Role of the cached array reader +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum CacheRole { + /// Producer role: inserts data into the cache during filter phase + Producer, + /// Consumer role: removes consumed data from the cache during output building phase + Consumer, +} + +/// A cached wrapper around an ArrayReader that avoids duplicate decoding +/// when the same column appears in both filter predicates and output projection. +/// +/// This reader acts as a transparent layer over the inner reader, using a cache +/// to avoid redundant work when the same data is needed multiple times. +/// +/// The reader can operate in two roles: +/// - Producer: During filter phase, inserts decoded data into the cache +/// - Consumer: During output building, consumes and removes data from the cache +/// +/// This means the memory consumption of the cache has two stages: +/// 1. During the filter phase, the memory increases as the cache is populated +/// 2. It peaks when filters are built. +/// 3. It decreases as the cached data is consumed. +/// +/// ```text +/// ▲ +/// │ ╭─╮ +/// │ ╱ ╲ +/// │ ╱ ╲ +/// │ ╱ ╲ +/// │ ╱ ╲ +/// │╱ ╲ +/// └─────────────╲──────► Time +/// │ │ │ +/// Filter Peak Consume +/// Phase (Built) (Decrease) +/// ``` +pub struct CachedArrayReader { + /// The underlying array reader + inner: Box, + /// Shared cache for this row group + shared_cache: Arc>, + /// Column index for cache key generation + column_idx: usize, + /// Current logical position in the data stream for this reader (for cache key generation) + outer_position: usize, + /// Current position in `inner` + inner_position: usize, + /// Batch size for the cache + batch_size: usize, + /// Boolean buffer builder to track selections for the next consume_batch() + selections: BooleanBufferBuilder, + /// Role of this reader (Producer or Consumer) + role: CacheRole, + /// Local cache to store batches between read_records and consume_batch calls + /// This ensures data is available even if the shared cache evicts items + local_cache: HashMap, + /// Statistics to report on the Cache behavior + metrics: ArrowReaderMetrics, +} + +impl CachedArrayReader { + /// Creates a new cached array reader with the specified role + pub fn new( + inner: Box, + cache: Arc>, + column_idx: usize, + role: CacheRole, + metrics: ArrowReaderMetrics, + ) -> Self { + let batch_size = cache.lock().unwrap().batch_size(); + + Self { + inner, + shared_cache: cache, + column_idx, + outer_position: 0, + inner_position: 0, + batch_size, + selections: BooleanBufferBuilder::new(0), + role, + local_cache: HashMap::new(), + metrics, + } + } + + fn get_batch_id_from_position(&self, row_id: usize) -> BatchID { + BatchID { + val: row_id / self.batch_size, + } + } + + /// Loads the batch with the given ID (first row offset) from the inner + /// reader + /// + /// After this call the required batch will be available in + /// `self.local_cache` and may also be stored in `self.shared_cache`. + /// + fn fetch_batch(&mut self, batch_id: BatchID) -> Result { + let first_row_offset = batch_id.val * self.batch_size; + if self.inner_position < first_row_offset { + let to_skip = first_row_offset - self.inner_position; + let skipped = self.inner.skip_records(to_skip)?; + assert_eq!(skipped, to_skip); + self.inner_position += skipped; + } + + let read = self.inner.read_records(self.batch_size)?; + + // If there are no remaining records (EOF), return immediately without + // attempting to cache an empty batch. This prevents inserting zero-length + // arrays into the cache which can later cause panics when slicing. + if read == 0 { + return Ok(0); + } + + let array = self.inner.consume_batch()?; + + // Store in both shared cache and local cache + // The shared cache is used to reuse results between readers + // The local cache ensures data is available for our consume_batch call + let _cached = + self.shared_cache + .lock() + .unwrap() + .insert(self.column_idx, batch_id, array.clone()); + // Note: if the shared cache is full (_cached == false), we continue without caching + // The local cache will still store the data for this reader's use + + self.local_cache.insert(batch_id, array); + + self.inner_position += read; + Ok(read) + } + + /// Remove batches from cache that have been completely consumed + /// This is only called for Consumer role readers + fn cleanup_consumed_batches(&mut self) { + let current_batch_id = self.get_batch_id_from_position(self.outer_position); + + // Remove batches that are at least one batch behind the current position + // This ensures we don't remove batches that might still be needed for the current batch + // We can safely remove batch_id if current_batch_id > batch_id + 1 + if current_batch_id.val > 1 { + let mut cache = self.shared_cache.lock().unwrap(); + for batch_id_to_remove in 0..(current_batch_id.val - 1) { + cache.remove( + self.column_idx, + BatchID { + val: batch_id_to_remove, + }, + ); + } + } + } +} + +impl ArrayReader for CachedArrayReader { + fn as_any(&self) -> &dyn Any { + self + } + + fn get_data_type(&self) -> &ArrowType { + self.inner.get_data_type() + } + + fn read_records(&mut self, num_records: usize) -> Result { + let mut read = 0; + while read < num_records { + let batch_id = self.get_batch_id_from_position(self.outer_position); + + // Check local cache first + let cached = if let Some(array) = self.local_cache.get(&batch_id) { + Some(array.clone()) + } else { + // If not in local cache, i.e., we are consumer, check shared cache + let cache_content = self + .shared_cache + .lock() + .unwrap() + .get(self.column_idx, batch_id); + if let Some(array) = cache_content.as_ref() { + // Store in local cache for later use in consume_batch + self.local_cache.insert(batch_id, array.clone()); + } + cache_content + }; + + match cached { + Some(array) => { + let array_len = array.len(); + if array_len + batch_id.val * self.batch_size > self.outer_position { + // the cache batch has some records that we can select + let v = array_len + batch_id.val * self.batch_size - self.outer_position; + let select_cnt = std::cmp::min(num_records - read, v); + read += select_cnt; + self.metrics.increment_cache_reads(select_cnt); + self.outer_position += select_cnt; + self.selections.append_n(select_cnt, true); + } else { + // this is last batch and we have used all records from it + break; + } + } + None => { + let read_from_inner = self.fetch_batch(batch_id)?; + // Reached end-of-file, no more records to read + if read_from_inner == 0 { + break; + } + self.metrics.increment_inner_reads(read_from_inner); + let select_from_this_batch = std::cmp::min( + num_records - read, + self.inner_position - self.outer_position, + ); + read += select_from_this_batch; + self.outer_position += select_from_this_batch; + self.selections.append_n(select_from_this_batch, true); + if read_from_inner < self.batch_size { + // this is last batch from inner reader + break; + } + } + } + } + Ok(read) + } + + fn skip_records(&mut self, num_records: usize) -> Result { + let mut skipped = 0; + while skipped < num_records { + let size = std::cmp::min(num_records - skipped, self.batch_size); + skipped += size; + self.selections.append_n(size, false); + self.outer_position += size; + } + Ok(num_records) + } + + fn consume_batch(&mut self) -> Result { + let row_count = self.selections.len(); + if row_count == 0 { + return Ok(new_empty_array(self.inner.get_data_type())); + } + + let start_position = self.outer_position - row_count; + + let selection_buffer = self.selections.finish(); + + let start_batch = start_position / self.batch_size; + let end_batch = (start_position + row_count - 1) / self.batch_size; + + let mut selected_arrays = Vec::new(); + for batch_id in start_batch..=end_batch { + let batch_start = batch_id * self.batch_size; + let batch_end = batch_start + self.batch_size - 1; + let batch_id = self.get_batch_id_from_position(batch_start); + + // Calculate the overlap between the start_position and the batch + let overlap_start = start_position.max(batch_start); + let overlap_end = (start_position + row_count - 1).min(batch_end); + + if overlap_start > overlap_end { + continue; + } + + let selection_start = overlap_start - start_position; + let selection_length = overlap_end - overlap_start + 1; + let mask = selection_buffer.slice(selection_start, selection_length); + + if mask.count_set_bits() == 0 { + continue; + } + + let mask_array = BooleanArray::from(mask); + // Read from local cache instead of shared cache to avoid cache eviction issues + let cached = self + .local_cache + .get(&batch_id) + .expect("data must be already cached in the read_records call, this is a bug"); + let cached = cached.slice(overlap_start - batch_start, selection_length); + let filtered = arrow_select::filter::filter(&cached, &mask_array)?; + selected_arrays.push(filtered); + } + + self.selections = BooleanBufferBuilder::new(0); + + // Only remove batches from local buffer that are completely behind current position + // Keep the current batch and any future batches as they might still be needed + let current_batch_id = self.get_batch_id_from_position(self.outer_position); + self.local_cache + .retain(|batch_id, _| batch_id.val >= current_batch_id.val); + + // For consumers, cleanup batches that have been completely consumed + // This reduces the memory usage of the shared cache + if self.role == CacheRole::Consumer { + self.cleanup_consumed_batches(); + } + + match selected_arrays.len() { + 0 => Ok(new_empty_array(self.inner.get_data_type())), + 1 => Ok(selected_arrays.into_iter().next().unwrap()), + _ => Ok(arrow_select::concat::concat( + &selected_arrays + .iter() + .map(|a| a.as_ref()) + .collect::>(), + )?), + } + } + + fn get_def_levels(&self) -> Option<&[i16]> { + None // we don't allow nullable parent for now. + } + + fn get_rep_levels(&self) -> Option<&[i16]> { + None + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::arrow::array_reader::row_group_cache::RowGroupCache; + use crate::arrow::array_reader::ArrayReader; + use arrow_array::{ArrayRef, Int32Array}; + use std::sync::{Arc, Mutex}; + + // Mock ArrayReader for testing + struct MockArrayReader { + data: Vec, + position: usize, + records_to_consume: usize, + data_type: ArrowType, + } + + impl MockArrayReader { + fn new(data: Vec) -> Self { + Self { + data, + position: 0, + records_to_consume: 0, + data_type: ArrowType::Int32, + } + } + } + + impl ArrayReader for MockArrayReader { + fn as_any(&self) -> &dyn Any { + self + } + + fn get_data_type(&self) -> &ArrowType { + &self.data_type + } + + fn read_records(&mut self, batch_size: usize) -> Result { + let remaining = self.data.len() - self.position; + let to_read = std::cmp::min(batch_size, remaining); + self.records_to_consume += to_read; + Ok(to_read) + } + + fn consume_batch(&mut self) -> Result { + let start = self.position; + let end = start + self.records_to_consume; + let slice = &self.data[start..end]; + self.position = end; + self.records_to_consume = 0; + Ok(Arc::new(Int32Array::from(slice.to_vec()))) + } + + fn skip_records(&mut self, num_records: usize) -> Result { + let remaining = self.data.len() - self.position; + let to_skip = std::cmp::min(num_records, remaining); + self.position += to_skip; + Ok(to_skip) + } + + fn get_def_levels(&self) -> Option<&[i16]> { + None + } + + fn get_rep_levels(&self) -> Option<&[i16]> { + None + } + } + + #[test] + fn test_cached_reader_basic() { + let metrics = ArrowReaderMetrics::disabled(); + let mock_reader = MockArrayReader::new(vec![1, 2, 3, 4, 5]); + let cache = Arc::new(Mutex::new(RowGroupCache::new(3, usize::MAX))); // Batch size 3 + let mut cached_reader = CachedArrayReader::new( + Box::new(mock_reader), + cache, + 0, + CacheRole::Producer, + metrics, + ); + + // Read 3 records + let records_read = cached_reader.read_records(3).unwrap(); + assert_eq!(records_read, 3); + + let array = cached_reader.consume_batch().unwrap(); + assert_eq!(array.len(), 3); + + let int32_array = array.as_any().downcast_ref::().unwrap(); + assert_eq!(int32_array.values(), &[1, 2, 3]); + + // Read 3 more records + let records_read = cached_reader.read_records(3).unwrap(); + assert_eq!(records_read, 2); + } + + #[test] + fn test_read_skip_pattern() { + let metrics = ArrowReaderMetrics::disabled(); + let mock_reader = MockArrayReader::new(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); + let cache = Arc::new(Mutex::new(RowGroupCache::new(5, usize::MAX))); // Batch size 5 + let mut cached_reader = CachedArrayReader::new( + Box::new(mock_reader), + cache, + 0, + CacheRole::Consumer, + metrics, + ); + + let read1 = cached_reader.read_records(2).unwrap(); + assert_eq!(read1, 2); + + let array1 = cached_reader.consume_batch().unwrap(); + assert_eq!(array1.len(), 2); + let int32_array = array1.as_any().downcast_ref::().unwrap(); + assert_eq!(int32_array.values(), &[1, 2]); + + let skipped = cached_reader.skip_records(2).unwrap(); + assert_eq!(skipped, 2); + + let read2 = cached_reader.read_records(1).unwrap(); + assert_eq!(read2, 1); + + // Consume it (should be the 5th element after skipping 3,4) + let array2 = cached_reader.consume_batch().unwrap(); + assert_eq!(array2.len(), 1); + let int32_array = array2.as_any().downcast_ref::().unwrap(); + assert_eq!(int32_array.values(), &[5]); + } + + #[test] + fn test_multiple_reads_before_consume() { + let metrics = ArrowReaderMetrics::disabled(); + let mock_reader = MockArrayReader::new(vec![1, 2, 3, 4, 5, 6]); + let cache = Arc::new(Mutex::new(RowGroupCache::new(3, usize::MAX))); // Batch size 3 + let mut cached_reader = CachedArrayReader::new( + Box::new(mock_reader), + cache, + 0, + CacheRole::Consumer, + metrics, + ); + + // Multiple reads should accumulate + let read1 = cached_reader.read_records(2).unwrap(); + assert_eq!(read1, 2); + + let read2 = cached_reader.read_records(1).unwrap(); + assert_eq!(read2, 1); + + // Consume should return all accumulated records + let array = cached_reader.consume_batch().unwrap(); + assert_eq!(array.len(), 3); + let int32_array = array.as_any().downcast_ref::().unwrap(); + assert_eq!(int32_array.values(), &[1, 2, 3]); + } + + #[test] + fn test_eof_behavior() { + let metrics = ArrowReaderMetrics::disabled(); + let mock_reader = MockArrayReader::new(vec![1, 2, 3]); + let cache = Arc::new(Mutex::new(RowGroupCache::new(5, usize::MAX))); // Batch size 5 + let mut cached_reader = CachedArrayReader::new( + Box::new(mock_reader), + cache, + 0, + CacheRole::Consumer, + metrics, + ); + + // Try to read more than available + let read1 = cached_reader.read_records(5).unwrap(); + assert_eq!(read1, 3); // Should only get 3 records (all available) + + let array1 = cached_reader.consume_batch().unwrap(); + assert_eq!(array1.len(), 3); + + // Further reads should return 0 + let read2 = cached_reader.read_records(1).unwrap(); + assert_eq!(read2, 0); + + let array2 = cached_reader.consume_batch().unwrap(); + assert_eq!(array2.len(), 0); + } + + #[test] + fn test_cache_sharing() { + let metrics = ArrowReaderMetrics::disabled(); + let cache = Arc::new(Mutex::new(RowGroupCache::new(5, usize::MAX))); // Batch size 5 + + // First reader - populate cache + let mock_reader1 = MockArrayReader::new(vec![1, 2, 3, 4, 5]); + let mut cached_reader1 = CachedArrayReader::new( + Box::new(mock_reader1), + cache.clone(), + 0, + CacheRole::Producer, + metrics.clone(), + ); + + cached_reader1.read_records(3).unwrap(); + let array1 = cached_reader1.consume_batch().unwrap(); + assert_eq!(array1.len(), 3); + + // Second reader with different column index should not interfere + let mock_reader2 = MockArrayReader::new(vec![10, 20, 30, 40, 50]); + let mut cached_reader2 = CachedArrayReader::new( + Box::new(mock_reader2), + cache.clone(), + 1, + CacheRole::Consumer, + metrics.clone(), + ); + + cached_reader2.read_records(2).unwrap(); + let array2 = cached_reader2.consume_batch().unwrap(); + assert_eq!(array2.len(), 2); + + // Verify the second reader got its own data, not from cache + let int32_array = array2.as_any().downcast_ref::().unwrap(); + assert_eq!(int32_array.values(), &[10, 20]); + } + + #[test] + fn test_consumer_removes_batches() { + let metrics = ArrowReaderMetrics::disabled(); + let mock_reader = MockArrayReader::new(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); + let cache = Arc::new(Mutex::new(RowGroupCache::new(3, usize::MAX))); // Batch size 3 + let mut consumer_reader = CachedArrayReader::new( + Box::new(mock_reader), + cache.clone(), + 0, + CacheRole::Consumer, + metrics, + ); + + // Read first batch (positions 0-2, batch 0) + let read1 = consumer_reader.read_records(3).unwrap(); + assert_eq!(read1, 3); + assert_eq!(consumer_reader.outer_position, 3); + // Check that batch 0 is in cache after read_records + assert!(cache.lock().unwrap().get(0, BatchID { val: 0 }).is_some()); + + let array1 = consumer_reader.consume_batch().unwrap(); + assert_eq!(array1.len(), 3); + + // After first consume_batch, batch 0 should still be in cache + // (current_batch_id = 3/3 = 1, cleanup only happens if current_batch_id > 1) + assert!(cache.lock().unwrap().get(0, BatchID { val: 0 }).is_some()); + + // Read second batch (positions 3-5, batch 1) + let read2 = consumer_reader.read_records(3).unwrap(); + assert_eq!(read2, 3); + assert_eq!(consumer_reader.outer_position, 6); + let array2 = consumer_reader.consume_batch().unwrap(); + assert_eq!(array2.len(), 3); + + // After second consume_batch, batch 0 should be removed + // (current_batch_id = 6/3 = 2, cleanup removes batches 0..(2-1) = 0..1, so removes batch 0) + assert!(cache.lock().unwrap().get(0, BatchID { val: 0 }).is_none()); + assert!(cache.lock().unwrap().get(0, BatchID { val: 1 }).is_some()); + + // Read third batch (positions 6-8, batch 2) + let read3 = consumer_reader.read_records(3).unwrap(); + assert_eq!(read3, 3); + assert_eq!(consumer_reader.outer_position, 9); + let array3 = consumer_reader.consume_batch().unwrap(); + assert_eq!(array3.len(), 3); + + // After third consume_batch, batches 0 and 1 should be removed + // (current_batch_id = 9/3 = 3, cleanup removes batches 0..(3-1) = 0..2, so removes batches 0 and 1) + assert!(cache.lock().unwrap().get(0, BatchID { val: 0 }).is_none()); + assert!(cache.lock().unwrap().get(0, BatchID { val: 1 }).is_none()); + assert!(cache.lock().unwrap().get(0, BatchID { val: 2 }).is_some()); + } + + #[test] + fn test_producer_keeps_batches() { + let metrics = ArrowReaderMetrics::disabled(); + let mock_reader = MockArrayReader::new(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); + let cache = Arc::new(Mutex::new(RowGroupCache::new(3, usize::MAX))); // Batch size 3 + let mut producer_reader = CachedArrayReader::new( + Box::new(mock_reader), + cache.clone(), + 0, + CacheRole::Producer, + metrics, + ); + + // Read first batch (positions 0-2) + let read1 = producer_reader.read_records(3).unwrap(); + assert_eq!(read1, 3); + let array1 = producer_reader.consume_batch().unwrap(); + assert_eq!(array1.len(), 3); + + // Verify batch 0 is in cache + assert!(cache.lock().unwrap().get(0, BatchID { val: 0 }).is_some()); + + // Read second batch (positions 3-5) - producer should NOT remove batch 0 + let read2 = producer_reader.read_records(3).unwrap(); + assert_eq!(read2, 3); + let array2 = producer_reader.consume_batch().unwrap(); + assert_eq!(array2.len(), 3); + + // Verify both batch 0 and batch 1 are still present (no removal for producer) + assert!(cache.lock().unwrap().get(0, BatchID { val: 0 }).is_some()); + assert!(cache.lock().unwrap().get(0, BatchID { val: 1 }).is_some()); + } + + #[test] + fn test_local_cache_protects_against_eviction() { + let metrics = ArrowReaderMetrics::disabled(); + let mock_reader = MockArrayReader::new(vec![1, 2, 3, 4, 5, 6]); + let cache = Arc::new(Mutex::new(RowGroupCache::new(3, usize::MAX))); // Batch size 3 + let mut cached_reader = CachedArrayReader::new( + Box::new(mock_reader), + cache.clone(), + 0, + CacheRole::Consumer, + metrics, + ); + + // Read records which should populate both shared and local cache + let records_read = cached_reader.read_records(3).unwrap(); + assert_eq!(records_read, 3); + + // Verify data is in both caches + assert!(cache.lock().unwrap().get(0, BatchID { val: 0 }).is_some()); + assert!(cached_reader.local_cache.contains_key(&BatchID { val: 0 })); + + // Simulate cache eviction by manually removing from shared cache + cache.lock().unwrap().remove(0, BatchID { val: 0 }); + assert!(cache.lock().unwrap().get(0, BatchID { val: 0 }).is_none()); + + // Even though shared cache was evicted, consume_batch should still work + // because data is preserved in local cache + let array = cached_reader.consume_batch().unwrap(); + assert_eq!(array.len(), 3); + + let int32_array = array.as_any().downcast_ref::().unwrap(); + assert_eq!(int32_array.values(), &[1, 2, 3]); + + // Local cache should be cleared after consume_batch + assert!(cached_reader.local_cache.is_empty()); + } + + #[test] + fn test_local_cache_is_cleared_properly() { + let metrics = ArrowReaderMetrics::disabled(); + let mock_reader = MockArrayReader::new(vec![1, 2, 3, 4]); + let cache = Arc::new(Mutex::new(RowGroupCache::new(3, 0))); // Batch size 3, cache 0 + let mut cached_reader = CachedArrayReader::new( + Box::new(mock_reader), + cache.clone(), + 0, + CacheRole::Consumer, + metrics, + ); + + // Read records which should populate both shared and local cache + let records_read = cached_reader.read_records(1).unwrap(); + assert_eq!(records_read, 1); + let array = cached_reader.consume_batch().unwrap(); + assert_eq!(array.len(), 1); + + let records_read = cached_reader.read_records(3).unwrap(); + assert_eq!(records_read, 3); + let array = cached_reader.consume_batch().unwrap(); + assert_eq!(array.len(), 3); + } + + #[test] + fn test_batch_id_calculation_with_incremental_reads() { + let metrics = ArrowReaderMetrics::disabled(); + let mock_reader = MockArrayReader::new(vec![1, 2, 3, 4, 5, 6, 7, 8, 9]); + let cache = Arc::new(Mutex::new(RowGroupCache::new(3, usize::MAX))); // Batch size 3 + + // Create a producer to populate cache + let mut producer = CachedArrayReader::new( + Box::new(MockArrayReader::new(vec![1, 2, 3, 4, 5, 6, 7, 8, 9])), + cache.clone(), + 0, + CacheRole::Producer, + metrics.clone(), + ); + + // Populate cache with first batch (1, 2, 3) + producer.read_records(3).unwrap(); + producer.consume_batch().unwrap(); + + // Now create a consumer that will try to read from cache + let mut consumer = CachedArrayReader::new( + Box::new(mock_reader), + cache.clone(), + 0, + CacheRole::Consumer, + metrics, + ); + + // - We want to read 4 records starting from position 0 + // - First 3 records (positions 0-2) should come from cache (batch 0) + // - The 4th record (position 3) should come from the next batch + let records_read = consumer.read_records(4).unwrap(); + assert_eq!(records_read, 4); + + let array = consumer.consume_batch().unwrap(); + assert_eq!(array.len(), 4); + + let int32_array = array.as_any().downcast_ref::().unwrap(); + assert_eq!(int32_array.values(), &[1, 2, 3, 4]); + } +} diff --git a/parquet/src/arrow/array_reader/list_array.rs b/parquet/src/arrow/array_reader/list_array.rs index 66c4f30b3c29..e28c93cf624d 100644 --- a/parquet/src/arrow/array_reader/list_array.rs +++ b/parquet/src/arrow/array_reader/list_array.rs @@ -249,6 +249,7 @@ mod tests { use crate::arrow::array_reader::list_array::ListArrayReader; use crate::arrow::array_reader::test_util::InMemoryArrayReader; use crate::arrow::array_reader::ArrayReaderBuilder; + use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics; use crate::arrow::schema::parquet_to_arrow_schema_and_fields; use crate::arrow::{parquet_to_arrow_schema, ArrowWriter, ProjectionMask}; use crate::file::properties::WriterProperties; @@ -563,7 +564,8 @@ mod tests { ) .unwrap(); - let mut array_reader = ArrayReaderBuilder::new(&file_reader) + let metrics = ArrowReaderMetrics::disabled(); + let mut array_reader = ArrayReaderBuilder::new(&file_reader, &metrics) .build_array_reader(fields.as_ref(), &mask) .unwrap(); diff --git a/parquet/src/arrow/array_reader/mod.rs b/parquet/src/arrow/array_reader/mod.rs index d6e325b49450..5b0ccd874f9e 100644 --- a/parquet/src/arrow/array_reader/mod.rs +++ b/parquet/src/arrow/array_reader/mod.rs @@ -33,6 +33,7 @@ mod builder; mod byte_array; mod byte_array_dictionary; mod byte_view_array; +mod cached_array_reader; mod empty_array; mod fixed_len_byte_array; mod fixed_size_list_array; @@ -40,13 +41,14 @@ mod list_array; mod map_array; mod null_array; mod primitive_array; +mod row_group_cache; mod struct_array; #[cfg(test)] mod test_util; // Note that this crate is public under the `experimental` feature flag. -pub use builder::ArrayReaderBuilder; +pub use builder::{ArrayReaderBuilder, CacheOptions, CacheOptionsBuilder}; pub use byte_array::make_byte_array_reader; pub use byte_array_dictionary::make_byte_array_dictionary_reader; #[allow(unused_imports)] // Only used for benchmarks @@ -58,6 +60,7 @@ pub use list_array::ListArrayReader; pub use map_array::MapArrayReader; pub use null_array::NullArrayReader; pub use primitive_array::PrimitiveArrayReader; +pub use row_group_cache::RowGroupCache; pub use struct_array::StructArrayReader; /// Reads Parquet data into Arrow Arrays. diff --git a/parquet/src/arrow/array_reader/row_group_cache.rs b/parquet/src/arrow/array_reader/row_group_cache.rs new file mode 100644 index 000000000000..ef726e16495f --- /dev/null +++ b/parquet/src/arrow/array_reader/row_group_cache.rs @@ -0,0 +1,206 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_array::{Array, ArrayRef}; +use arrow_schema::DataType; +use std::collections::HashMap; + +/// Starting row ID for this batch +/// +/// The `BatchID` is used to identify batches of rows within a row group. +/// +/// The row_index in the id are relative to the rows being read from the +/// underlying column reader (which might already have a RowSelection applied) +/// +/// The `BatchID` for any particular row is `row_index / batch_size`. The +/// integer division ensures that rows in the same batch share the same +/// the BatchID which can be calculated quickly from the row index +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] +pub struct BatchID { + pub val: usize, +} + +/// Cache key that uniquely identifies a batch within a row group +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct CacheKey { + /// Column index in the row group + pub column_idx: usize, + /// Starting row ID for this batch + pub batch_id: BatchID, +} + +fn get_array_memory_size_for_cache(array: &ArrayRef) -> usize { + match array.data_type() { + // TODO: this is temporary workaround. It's very difficult to measure the actual memory usage of one StringViewArray, + // because the underlying buffer is shared with multiple StringViewArrays. + DataType::Utf8View => { + use arrow_array::cast::AsArray; + let array = array.as_string_view(); + array.len() * 16 + array.total_buffer_bytes_used() + std::mem::size_of_val(array) + } + _ => array.get_array_memory_size(), + } +} + +/// Row group cache that stores decoded arrow arrays at batch granularity +/// +/// This cache is designed to avoid duplicate decoding when the same column +/// appears in both filter predicates and output projection. +#[derive(Debug)] +pub struct RowGroupCache { + /// Cache storage mapping (column_idx, row_id) -> ArrayRef + cache: HashMap, + /// Cache granularity + batch_size: usize, + /// Maximum cache size in bytes + max_cache_bytes: usize, + /// Current cache size in bytes + current_cache_size: usize, +} + +impl RowGroupCache { + /// Creates a new empty row group cache + pub fn new(batch_size: usize, max_cache_bytes: usize) -> Self { + Self { + cache: HashMap::new(), + batch_size, + max_cache_bytes, + current_cache_size: 0, + } + } + + /// Inserts an array into the cache for the given column and starting row ID + /// Returns true if the array was inserted, false if it would exceed the cache size limit + pub fn insert(&mut self, column_idx: usize, batch_id: BatchID, array: ArrayRef) -> bool { + let array_size = get_array_memory_size_for_cache(&array); + + // Check if adding this array would exceed the cache size limit + if self.current_cache_size + array_size > self.max_cache_bytes { + return false; // Cache is full, don't insert + } + + let key = CacheKey { + column_idx, + batch_id, + }; + + let existing = self.cache.insert(key, array); + assert!(existing.is_none()); + self.current_cache_size += array_size; + true + } + + /// Retrieves a cached array for the given column and row ID + /// Returns None if not found in cache + pub fn get(&self, column_idx: usize, batch_id: BatchID) -> Option { + let key = CacheKey { + column_idx, + batch_id, + }; + self.cache.get(&key).cloned() + } + + /// Gets the batch size for this cache + pub fn batch_size(&self) -> usize { + self.batch_size + } + + /// Removes a cached array for the given column and row ID + /// Returns true if the entry was found and removed, false otherwise + pub fn remove(&mut self, column_idx: usize, batch_id: BatchID) -> bool { + let key = CacheKey { + column_idx, + batch_id, + }; + if let Some(array) = self.cache.remove(&key) { + self.current_cache_size -= get_array_memory_size_for_cache(&array); + true + } else { + false + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow_array::{ArrayRef, Int32Array}; + use std::sync::Arc; + + #[test] + fn test_cache_basic_operations() { + let mut cache = RowGroupCache::new(1000, usize::MAX); + + // Create test array + let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])); + + // Test insert and get + let batch_id = BatchID { val: 0 }; + assert!(cache.insert(0, batch_id, array.clone())); + let retrieved = cache.get(0, batch_id); + assert!(retrieved.is_some()); + assert_eq!(retrieved.unwrap().len(), 5); + + // Test miss + let miss = cache.get(1, batch_id); + assert!(miss.is_none()); + + // Test different row_id + let miss = cache.get(0, BatchID { val: 1000 }); + assert!(miss.is_none()); + } + + #[test] + fn test_cache_remove() { + let mut cache = RowGroupCache::new(1000, usize::MAX); + + // Create test arrays + let array1: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3])); + let array2: ArrayRef = Arc::new(Int32Array::from(vec![4, 5, 6])); + + // Insert arrays + assert!(cache.insert(0, BatchID { val: 0 }, array1.clone())); + assert!(cache.insert(0, BatchID { val: 1000 }, array2.clone())); + assert!(cache.insert(1, BatchID { val: 0 }, array1.clone())); + + // Verify they're there + assert!(cache.get(0, BatchID { val: 0 }).is_some()); + assert!(cache.get(0, BatchID { val: 1000 }).is_some()); + assert!(cache.get(1, BatchID { val: 0 }).is_some()); + + // Remove one entry + let removed = cache.remove(0, BatchID { val: 0 }); + assert!(removed); + assert!(cache.get(0, BatchID { val: 0 }).is_none()); + + // Other entries should still be there + assert!(cache.get(0, BatchID { val: 1000 }).is_some()); + assert!(cache.get(1, BatchID { val: 0 }).is_some()); + + // Try to remove non-existent entry + let not_removed = cache.remove(0, BatchID { val: 0 }); + assert!(!not_removed); + + // Remove remaining entries + assert!(cache.remove(0, BatchID { val: 1000 })); + assert!(cache.remove(1, BatchID { val: 0 })); + + // Cache should be empty + assert!(cache.get(0, BatchID { val: 1000 }).is_none()); + assert!(cache.get(1, BatchID { val: 0 }).is_none()); + } +} diff --git a/parquet/src/arrow/arrow_reader/metrics.rs b/parquet/src/arrow/arrow_reader/metrics.rs new file mode 100644 index 000000000000..05c7a5180193 --- /dev/null +++ b/parquet/src/arrow/arrow_reader/metrics.rs @@ -0,0 +1,135 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [ArrowReaderMetrics] for collecting metrics about the Arrow reader + +use std::sync::atomic::AtomicUsize; +use std::sync::Arc; + +/// This enum represents the state of Arrow reader metrics collection. +/// +/// The inner metrics are stored in an `Arc` +/// so cloning the `ArrowReaderMetrics` enum will not clone the inner metrics. +/// +/// To access metrics, create an `ArrowReaderMetrics` via [`ArrowReaderMetrics::enabled()`] +/// and configure the `ArrowReaderBuilder` with a clone. +#[derive(Debug, Clone)] +pub enum ArrowReaderMetrics { + /// Metrics are not collected (default) + Disabled, + /// Metrics are collected and stored in an `Arc`. + /// + /// Create this via [`ArrowReaderMetrics::enabled()`]. + Enabled(Arc), +} + +impl ArrowReaderMetrics { + /// Creates a new instance of [`ArrowReaderMetrics::Disabled`] + pub fn disabled() -> Self { + Self::Disabled + } + + /// Creates a new instance of [`ArrowReaderMetrics::Enabled`] + pub fn enabled() -> Self { + Self::Enabled(Arc::new(ArrowReaderMetricsInner::new())) + } + + /// Predicate Cache: number of records read directly from the inner reader + /// + /// This is the total number of records read from the inner reader (that is + /// actually decoding). It measures the amount of work that could not be + /// avoided with caching. + /// + /// It returns the number of records read across all columns, so if you read + /// 2 columns each with 100 records, this will return 200. + /// + /// + /// Returns None if metrics are disabled. + pub fn records_read_from_inner(&self) -> Option { + match self { + Self::Disabled => None, + Self::Enabled(inner) => Some( + inner + .records_read_from_inner + .load(std::sync::atomic::Ordering::Relaxed), + ), + } + } + + /// Predicate Cache: number of records read from the cache + /// + /// This is the total number of records read from the cache actually + /// decoding). It measures the amount of work that was avoided with caching. + /// + /// It returns the number of records read across all columns, so if you read + /// 2 columns each with 100 records from the cache, this will return 200. + /// + /// Returns None if metrics are disabled. + pub fn records_read_from_cache(&self) -> Option { + match self { + Self::Disabled => None, + Self::Enabled(inner) => Some( + inner + .records_read_from_cache + .load(std::sync::atomic::Ordering::Relaxed), + ), + } + } + + /// Increments the count of records read from the inner reader + pub(crate) fn increment_inner_reads(&self, count: usize) { + let Self::Enabled(inner) = self else { + return; + }; + inner + .records_read_from_inner + .fetch_add(count, std::sync::atomic::Ordering::Relaxed); + } + + /// Increments the count of records read from the cache + pub(crate) fn increment_cache_reads(&self, count: usize) { + let Self::Enabled(inner) = self else { + return; + }; + + inner + .records_read_from_cache + .fetch_add(count, std::sync::atomic::Ordering::Relaxed); + } +} + +/// Holds the actual metrics for the Arrow reader. +/// +/// Please see [`ArrowReaderMetrics`] for the public interface. +#[derive(Debug)] +pub struct ArrowReaderMetricsInner { + // Metrics for Predicate Cache + /// Total number of records read from the inner reader (uncached) + records_read_from_inner: AtomicUsize, + /// Total number of records read from previously cached pages + records_read_from_cache: AtomicUsize, +} + +impl ArrowReaderMetricsInner { + /// Creates a new instance of `ArrowReaderMetricsInner` + pub(crate) fn new() -> Self { + Self { + records_read_from_inner: AtomicUsize::new(0), + records_read_from_cache: AtomicUsize::new(0), + } + } +} diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index d4a3e11e2c46..3d20fa0a220c 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -42,9 +42,11 @@ use crate::file::reader::{ChunkReader, SerializedPageReader}; use crate::format::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash}; use crate::schema::types::SchemaDescriptor; +use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics; pub(crate) use read_plan::{ReadPlan, ReadPlanBuilder}; mod filter; +pub mod metrics; mod read_plan; mod selection; pub mod statistics; @@ -116,6 +118,10 @@ pub struct ArrowReaderBuilder { pub(crate) limit: Option, pub(crate) offset: Option, + + pub(crate) metrics: ArrowReaderMetrics, + + pub(crate) max_predicate_cache_size: usize, } impl Debug for ArrowReaderBuilder { @@ -132,6 +138,7 @@ impl Debug for ArrowReaderBuilder { .field("selection", &self.selection) .field("limit", &self.limit) .field("offset", &self.offset) + .field("metrics", &self.metrics) .finish() } } @@ -150,6 +157,8 @@ impl ArrowReaderBuilder { selection: None, limit: None, offset: None, + metrics: ArrowReaderMetrics::Disabled, + max_predicate_cache_size: 100 * 1024 * 1024, // 100MB default cache size } } @@ -300,6 +309,65 @@ impl ArrowReaderBuilder { ..self } } + + /// Specify metrics collection during reading + /// + /// To access the metrics, create an [`ArrowReaderMetrics`] and pass a + /// clone of the provided metrics to the builder. + /// + /// For example: + /// + /// ```rust + /// # use std::sync::Arc; + /// # use bytes::Bytes; + /// # use arrow_array::{Int32Array, RecordBatch}; + /// # use arrow_schema::{DataType, Field, Schema}; + /// # use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder}; + /// use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics; + /// # use parquet::arrow::ArrowWriter; + /// # let mut file: Vec = Vec::with_capacity(1024); + /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)])); + /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap(); + /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap(); + /// # writer.write(&batch).unwrap(); + /// # writer.close().unwrap(); + /// # let file = Bytes::from(file); + /// // Create metrics object to pass into the reader + /// let metrics = ArrowReaderMetrics::enabled(); + /// let reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap() + /// // Configure the builder to use the metrics by passing a clone + /// .with_metrics(metrics.clone()) + /// // Build the reader + /// .build().unwrap(); + /// // .. read data from the reader .. + /// + /// // check the metrics + /// assert!(metrics.records_read_from_inner().is_some()); + /// ``` + pub fn with_metrics(self, metrics: ArrowReaderMetrics) -> Self { + Self { metrics, ..self } + } + + /// Set the maximum size (per row group) of the predicate cache in bytes for + /// the async decoder. + /// + /// Defaults to 100MB (across all columns). Set to `usize::MAX` to use + /// unlimited cache size. + /// + /// This cache is used to store decoded arrays that are used in + /// predicate evaluation ([`Self::with_row_filter`]). + /// + /// This cache is only used for the "async" decoder, [`ParquetRecordBatchStream`]. See + /// [this ticket] for more details and alternatives. + /// + /// [`ParquetRecordBatchStream`]: https://docs.rs/parquet/latest/parquet/arrow/async_reader/struct.ParquetRecordBatchStream.html + /// [this ticket]: https://github.com/apache/arrow-rs/issues/8000 + pub fn with_max_predicate_cache_size(self, max_predicate_cache_size: usize) -> Self { + Self { + max_predicate_cache_size, + ..self + } + } } /// Options that control how metadata is read for a parquet file @@ -771,23 +839,37 @@ impl ParquetRecordBatchReaderBuilder { /// /// Note: this will eagerly evaluate any `RowFilter` before returning pub fn build(self) -> Result { + let Self { + input, + metadata, + schema: _, + fields, + batch_size: _, + row_groups, + projection, + mut filter, + selection, + limit, + offset, + metrics, + // Not used for the sync reader, see https://github.com/apache/arrow-rs/issues/8000 + max_predicate_cache_size: _, + } = self; + // Try to avoid allocate large buffer let batch_size = self .batch_size - .min(self.metadata.file_metadata().num_rows() as usize); + .min(metadata.file_metadata().num_rows() as usize); - let row_groups = self - .row_groups - .unwrap_or_else(|| (0..self.metadata.num_row_groups()).collect()); + let row_groups = row_groups.unwrap_or_else(|| (0..metadata.num_row_groups()).collect()); let reader = ReaderRowGroups { - reader: Arc::new(self.input.0), - metadata: self.metadata, + reader: Arc::new(input.0), + metadata, row_groups, }; - let mut filter = self.filter; - let mut plan_builder = ReadPlanBuilder::new(batch_size).with_selection(self.selection); + let mut plan_builder = ReadPlanBuilder::new(batch_size).with_selection(selection); // Update selection based on any filters if let Some(filter) = filter.as_mut() { @@ -797,20 +879,23 @@ impl ParquetRecordBatchReaderBuilder { break; } - let array_reader = ArrayReaderBuilder::new(&reader) - .build_array_reader(self.fields.as_deref(), predicate.projection())?; + let mut cache_projection = predicate.projection().clone(); + cache_projection.intersect(&projection); + + let array_reader = ArrayReaderBuilder::new(&reader, &metrics) + .build_array_reader(fields.as_deref(), predicate.projection())?; plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?; } } - let array_reader = ArrayReaderBuilder::new(&reader) - .build_array_reader(self.fields.as_deref(), &self.projection)?; + let array_reader = ArrayReaderBuilder::new(&reader, &metrics) + .build_array_reader(fields.as_deref(), &projection)?; let read_plan = plan_builder .limited(reader.num_rows()) - .with_offset(self.offset) - .with_limit(self.limit) + .with_offset(offset) + .with_limit(limit) .build_limited() .build(); @@ -1005,7 +1090,9 @@ impl ParquetRecordBatchReader { batch_size: usize, selection: Option, ) -> Result { - let array_reader = ArrayReaderBuilder::new(row_groups) + // note metrics are not supported in this API + let metrics = ArrowReaderMetrics::disabled(); + let array_reader = ArrayReaderBuilder::new(row_groups, &metrics) .build_array_reader(levels.levels.as_ref(), &ProjectionMask::all())?; let read_plan = ReadPlanBuilder::new(batch_size) diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index c53d47be2e56..229eae4c5bb6 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -441,6 +441,59 @@ impl RowSelection { pub fn skipped_row_count(&self) -> usize { self.iter().filter(|s| s.skip).map(|s| s.row_count).sum() } + + /// Expands the selection to align with batch boundaries. + /// This is needed when using cached array readers to ensure that + /// the cached data covers full batches. + #[cfg(feature = "async")] + pub(crate) fn expand_to_batch_boundaries(&self, batch_size: usize, total_rows: usize) -> Self { + if batch_size == 0 { + return self.clone(); + } + + let mut expanded_ranges = Vec::new(); + let mut row_offset = 0; + + for selector in &self.selectors { + if selector.skip { + row_offset += selector.row_count; + } else { + let start = row_offset; + let end = row_offset + selector.row_count; + + // Expand start to batch boundary + let expanded_start = (start / batch_size) * batch_size; + // Expand end to batch boundary + let expanded_end = end.div_ceil(batch_size) * batch_size; + let expanded_end = expanded_end.min(total_rows); + + expanded_ranges.push(expanded_start..expanded_end); + row_offset += selector.row_count; + } + } + + // Sort ranges by start position + expanded_ranges.sort_by_key(|range| range.start); + + // Merge overlapping or consecutive ranges + let mut merged_ranges: Vec> = Vec::new(); + for range in expanded_ranges { + if let Some(last) = merged_ranges.last_mut() { + if range.start <= last.end { + // Overlapping or consecutive - merge them + last.end = last.end.max(range.end); + } else { + // No overlap - add new range + merged_ranges.push(range); + } + } else { + // First range + merged_ranges.push(range); + } + } + + Self::from_consecutive_ranges(merged_ranges.into_iter(), total_rows) + } } impl From> for RowSelection { diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 611d6999e07e..eea6176b766b 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -26,7 +26,7 @@ use std::fmt::Formatter; use std::io::SeekFrom; use std::ops::Range; use std::pin::Pin; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::task::{Context, Poll}; use bytes::{Buf, Bytes}; @@ -38,7 +38,9 @@ use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; use arrow_array::RecordBatch; use arrow_schema::{DataType, Fields, Schema, SchemaRef}; -use crate::arrow::array_reader::{ArrayReaderBuilder, RowGroups}; +use crate::arrow::array_reader::{ + ArrayReaderBuilder, CacheOptionsBuilder, RowGroupCache, RowGroups, +}; use crate::arrow::arrow_reader::{ ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader, RowFilter, RowSelection, @@ -61,6 +63,7 @@ pub use metadata::*; #[cfg(feature = "object_store")] mod store; +use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics; use crate::arrow::arrow_reader::ReadPlanBuilder; use crate::arrow::schema::ParquetField; #[cfg(feature = "object_store")] @@ -510,6 +513,8 @@ impl ParquetRecordBatchStreamBuilder { fields: self.fields, limit: self.limit, offset: self.offset, + metrics: self.metrics, + max_predicate_cache_size: self.max_predicate_cache_size, }; // Ensure schema of ParquetRecordBatchStream respects projection, and does @@ -560,6 +565,12 @@ struct ReaderFactory { /// Offset to apply to the next offset: Option, + + /// Metrics + metrics: ArrowReaderMetrics, + + /// Maximum size of the predicate cache + max_predicate_cache_size: usize, } impl ReaderFactory @@ -588,6 +599,16 @@ where .filter(|index| !index.is_empty()) .map(|x| x[row_group_idx].as_slice()); + // Reuse columns that are selected and used by the filters + let cache_projection = match self.compute_cache_projection(&projection) { + Some(projection) => projection, + None => ProjectionMask::none(meta.columns().len()), + }; + let row_group_cache = Arc::new(Mutex::new(RowGroupCache::new( + batch_size, + self.max_predicate_cache_size, + ))); + let mut row_group = InMemoryRowGroup { // schema: meta.schema_descr_ptr(), row_count: meta.num_rows() as usize, @@ -597,11 +618,16 @@ where metadata: self.metadata.as_ref(), }; + let cache_options_builder = + CacheOptionsBuilder::new(&cache_projection, row_group_cache.clone()); + let filter = self.filter.as_mut(); let mut plan_builder = ReadPlanBuilder::new(batch_size).with_selection(selection); // Update selection based on any filters if let Some(filter) = filter { + let cache_options = cache_options_builder.clone().producer(); + for predicate in filter.predicates.iter_mut() { if !plan_builder.selects_any() { return Ok((self, None)); // ruled out entire row group @@ -609,11 +635,20 @@ where // (pre) Fetch only the columns that are selected by the predicate let selection = plan_builder.selection(); + // Fetch predicate columns; expand selection only for cached predicate columns + let cache_mask = Some(&cache_projection); row_group - .fetch(&mut self.input, predicate.projection(), selection) + .fetch( + &mut self.input, + predicate.projection(), + selection, + batch_size, + cache_mask, + ) .await?; - let array_reader = ArrayReaderBuilder::new(&row_group) + let array_reader = ArrayReaderBuilder::new(&row_group, &self.metrics) + .with_cache_options(Some(&cache_options)) .build_array_reader(self.fields.as_deref(), predicate.projection())?; plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?; @@ -656,18 +691,69 @@ where } // fetch the pages needed for decoding row_group - .fetch(&mut self.input, &projection, plan_builder.selection()) + // Final projection fetch shouldn't expand selection for cache; pass None + .fetch( + &mut self.input, + &projection, + plan_builder.selection(), + batch_size, + None, + ) .await?; let plan = plan_builder.build(); - let array_reader = ArrayReaderBuilder::new(&row_group) + let cache_options = cache_options_builder.consumer(); + let array_reader = ArrayReaderBuilder::new(&row_group, &self.metrics) + .with_cache_options(Some(&cache_options)) .build_array_reader(self.fields.as_deref(), &projection)?; let reader = ParquetRecordBatchReader::new(array_reader, plan); Ok((self, Some(reader))) } + + /// Compute which columns are used in filters and the final (output) projection + fn compute_cache_projection(&self, projection: &ProjectionMask) -> Option { + let filters = self.filter.as_ref()?; + let mut cache_projection = filters.predicates.first()?.projection().clone(); + for predicate in filters.predicates.iter() { + cache_projection.union(predicate.projection()); + } + cache_projection.intersect(projection); + self.exclude_nested_columns_from_cache(&cache_projection) + } + + /// Exclude leaves belonging to roots that span multiple parquet leaves (i.e. nested columns) + fn exclude_nested_columns_from_cache(&self, mask: &ProjectionMask) -> Option { + let schema = self.metadata.file_metadata().schema_descr(); + let num_leaves = schema.num_columns(); + + // Count how many leaves each root column has + let num_roots = schema.root_schema().get_fields().len(); + let mut root_leaf_counts = vec![0usize; num_roots]; + for leaf_idx in 0..num_leaves { + let root_idx = schema.get_column_root_idx(leaf_idx); + root_leaf_counts[root_idx] += 1; + } + + // Keep only leaves whose root has exactly one leaf (non-nested) + let mut included_leaves = Vec::new(); + for leaf_idx in 0..num_leaves { + if mask.leaf_included(leaf_idx) { + let root_idx = schema.get_column_root_idx(leaf_idx); + if root_leaf_counts[root_idx] == 1 { + included_leaves.push(leaf_idx); + } + } + } + + if included_leaves.is_empty() { + None + } else { + Some(ProjectionMask::leaves(schema, included_leaves)) + } + } } enum StreamState { @@ -897,9 +983,13 @@ impl InMemoryRowGroup<'_> { input: &mut T, projection: &ProjectionMask, selection: Option<&RowSelection>, + batch_size: usize, + cache_mask: Option<&ProjectionMask>, ) -> Result<()> { let metadata = self.metadata.row_group(self.row_group_idx); if let Some((selection, offset_index)) = selection.zip(self.offset_index) { + let expanded_selection = + selection.expand_to_batch_boundaries(batch_size, self.row_count); // If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the // `RowSelection` let mut page_start_offsets: Vec> = vec![]; @@ -924,7 +1014,15 @@ impl InMemoryRowGroup<'_> { _ => (), } - ranges.extend(selection.scan_ranges(&offset_index[idx].page_locations)); + // Expand selection to batch boundaries only for cached columns + let use_expanded = cache_mask.map(|m| m.leaf_included(idx)).unwrap_or(false); + if use_expanded { + ranges.extend( + expanded_selection.scan_ranges(&offset_index[idx].page_locations), + ); + } else { + ranges.extend(selection.scan_ranges(&offset_index[idx].page_locations)); + } page_start_offsets.push(ranges.iter().map(|range| range.start).collect()); ranges @@ -1883,6 +1981,8 @@ mod tests { filter: None, limit: None, offset: None, + metrics: ArrowReaderMetrics::disabled(), + max_predicate_cache_size: 0, }; let mut skip = true; @@ -2286,6 +2386,77 @@ mod tests { assert_eq!(requests.lock().unwrap().len(), 3); } + #[tokio::test] + async fn test_cache_projection_excludes_nested_columns() { + use arrow_array::{ArrayRef, StringArray}; + + // Build a simple RecordBatch with a primitive column `a` and a nested struct column `b { aa, bb }` + let a = StringArray::from_iter_values(["r1", "r2"]); + let b = StructArray::from(vec![ + ( + Arc::new(Field::new("aa", DataType::Utf8, true)), + Arc::new(StringArray::from_iter_values(["v1", "v2"])) as ArrayRef, + ), + ( + Arc::new(Field::new("bb", DataType::Utf8, true)), + Arc::new(StringArray::from_iter_values(["w1", "w2"])) as ArrayRef, + ), + ]); + + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, true), + Field::new("b", b.data_type().clone(), true), + ])); + + let mut buf = Vec::new(); + let mut writer = ArrowWriter::try_new(&mut buf, schema, None).unwrap(); + let batch = RecordBatch::try_from_iter([ + ("a", Arc::new(a) as ArrayRef), + ("b", Arc::new(b) as ArrayRef), + ]) + .unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + + // Load Parquet metadata + let data: Bytes = buf.into(); + let metadata = ParquetMetaDataReader::new() + .parse_and_finish(&data) + .unwrap(); + let metadata = Arc::new(metadata); + + // Build a RowFilter whose predicate projects a leaf under the nested root `b` + // Leaf indices are depth-first; with schema [a, b.aa, b.bb] we pick index 1 (b.aa) + let parquet_schema = metadata.file_metadata().schema_descr(); + let nested_leaf_mask = ProjectionMask::leaves(parquet_schema, vec![1]); + + let always_true = ArrowPredicateFn::new(nested_leaf_mask.clone(), |batch: RecordBatch| { + Ok(arrow_array::BooleanArray::from(vec![ + true; + batch.num_rows() + ])) + }); + let filter = RowFilter::new(vec![Box::new(always_true)]); + + // Construct a ReaderFactory and compute cache projection + let reader_factory = ReaderFactory { + metadata: Arc::clone(&metadata), + fields: None, + input: TestReader::new(data), + filter: Some(filter), + limit: None, + offset: None, + metrics: ArrowReaderMetrics::disabled(), + max_predicate_cache_size: 0, + }; + + // Provide an output projection that also selects the same nested leaf + let cache_projection = reader_factory.compute_cache_projection(&nested_leaf_mask); + + // Expect None since nested columns should be excluded from cache projection + assert!(cache_projection.is_none()); + } + #[tokio::test] async fn empty_offset_index_doesnt_panic_in_read_row_group() { use tokio::fs::File; @@ -2386,4 +2557,53 @@ mod tests { let result = reader.try_collect::>().await.unwrap(); assert_eq!(result.len(), 1); } + + #[tokio::test] + async fn test_cached_array_reader_sparse_offset_error() { + use futures::TryStreamExt; + + use crate::arrow::arrow_reader::{ArrowPredicateFn, RowFilter, RowSelection, RowSelector}; + use arrow_array::{BooleanArray, RecordBatch}; + + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet"); + let data = Bytes::from(std::fs::read(path).unwrap()); + + let async_reader = TestReader::new(data); + + // Enable page index so the fetch logic loads only required pages + let options = ArrowReaderOptions::new().with_page_index(true); + let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options) + .await + .unwrap(); + + // Skip the first 22 rows (entire first Parquet page) and then select the + // next 3 rows (22, 23, 24). This means the fetch step will not include + // the first page starting at file offset 0. + let selection = RowSelection::from(vec![RowSelector::skip(22), RowSelector::select(3)]); + + // Trivial predicate on column 0 that always returns `true`. Using the + // same column in both predicate and projection activates the caching + // layer (Producer/Consumer pattern). + let parquet_schema = builder.parquet_schema(); + let proj = ProjectionMask::leaves(parquet_schema, vec![0]); + let always_true = ArrowPredicateFn::new(proj.clone(), |batch: RecordBatch| { + Ok(BooleanArray::from(vec![true; batch.num_rows()])) + }); + let filter = RowFilter::new(vec![Box::new(always_true)]); + + // Build the stream with batch size 8 so the cache reads whole batches + // that straddle the requested row range (rows 0-7, 8-15, 16-23, …). + let stream = builder + .with_batch_size(8) + .with_projection(proj) + .with_row_selection(selection) + .with_row_filter(filter) + .build() + .unwrap(); + + // Collecting the stream should fail with the sparse column chunk offset + // error we want to reproduce. + let _result: Vec<_> = stream.try_collect().await.unwrap(); + } } diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs index 33010f480898..72626d70e0e5 100644 --- a/parquet/src/arrow/mod.rs +++ b/parquet/src/arrow/mod.rs @@ -276,6 +276,13 @@ impl ProjectionMask { Self { mask: None } } + /// Create a [`ProjectionMask`] which selects no columns + pub fn none(len: usize) -> Self { + Self { + mask: Some(vec![false; len]), + } + } + /// Create a [`ProjectionMask`] which selects only the specified leaf columns /// /// Note: repeated or out of order indices will not impact the final mask diff --git a/parquet/tests/arrow_reader/mod.rs b/parquet/tests/arrow_reader/mod.rs index 48d732f17f21..8d72d1def17a 100644 --- a/parquet/tests/arrow_reader/mod.rs +++ b/parquet/tests/arrow_reader/mod.rs @@ -42,6 +42,8 @@ mod bad_data; #[cfg(feature = "crc")] mod checksum; mod int96_stats_roundtrip; +#[cfg(feature = "async")] +mod predicate_cache; mod statistics; // returns a struct array with columns "int32_col", "float32_col" and "float64_col" with the specified values diff --git a/parquet/tests/arrow_reader/predicate_cache.rs b/parquet/tests/arrow_reader/predicate_cache.rs new file mode 100644 index 000000000000..44d43113cbf5 --- /dev/null +++ b/parquet/tests/arrow_reader/predicate_cache.rs @@ -0,0 +1,279 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Test for predicate cache in Parquet Arrow reader + +use arrow::array::ArrayRef; +use arrow::array::Int64Array; +use arrow::compute::and; +use arrow::compute::kernels::cmp::{gt, lt}; +use arrow_array::cast::AsArray; +use arrow_array::types::Int64Type; +use arrow_array::{RecordBatch, StringViewArray}; +use bytes::Bytes; +use futures::future::BoxFuture; +use futures::{FutureExt, StreamExt}; +use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics; +use parquet::arrow::arrow_reader::{ArrowPredicateFn, ArrowReaderOptions, RowFilter}; +use parquet::arrow::arrow_reader::{ArrowReaderBuilder, ParquetRecordBatchReaderBuilder}; +use parquet::arrow::async_reader::AsyncFileReader; +use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder, ProjectionMask}; +use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader}; +use parquet::file::properties::WriterProperties; +use std::ops::Range; +use std::sync::Arc; +use std::sync::LazyLock; + +#[tokio::test] +async fn test_default_read() { + // The cache is not used without predicates, so we expect 0 records read from cache + let test = ParquetPredicateCacheTest::new().with_expected_records_read_from_cache(0); + let sync_builder = test.sync_builder(); + test.run_sync(sync_builder); + let async_builder = test.async_builder().await; + test.run_async(async_builder).await; +} + +#[tokio::test] +async fn test_async_cache_with_filters() { + let test = ParquetPredicateCacheTest::new().with_expected_records_read_from_cache(49); + let async_builder = test.async_builder().await; + let async_builder = test.add_project_ab_and_filter_b(async_builder); + test.run_async(async_builder).await; +} + +#[tokio::test] +async fn test_sync_cache_with_filters() { + let test = ParquetPredicateCacheTest::new() + // The sync reader does not use the cache. See https://github.com/apache/arrow-rs/issues/8000 + .with_expected_records_read_from_cache(0); + + let sync_builder = test.sync_builder(); + let sync_builder = test.add_project_ab_and_filter_b(sync_builder); + test.run_sync(sync_builder); +} + +#[tokio::test] +async fn test_cache_disabled_with_filters() { + // expect no records to be read from cache, because the cache is disabled + let test = ParquetPredicateCacheTest::new().with_expected_records_read_from_cache(0); + let sync_builder = test.sync_builder().with_max_predicate_cache_size(0); + let sync_builder = test.add_project_ab_and_filter_b(sync_builder); + test.run_sync(sync_builder); + + let async_builder = test.async_builder().await.with_max_predicate_cache_size(0); + let async_builder = test.add_project_ab_and_filter_b(async_builder); + test.run_async(async_builder).await; +} + +// -- Begin test infrastructure -- + +/// A test parquet file +struct ParquetPredicateCacheTest { + bytes: Bytes, + expected_records_read_from_cache: usize, +} +impl ParquetPredicateCacheTest { + /// Create a new `TestParquetFile` with: + /// 3 columns: "a", "b", "c" + /// + /// 2 row groups, each with 200 rows + /// each data page has 100 rows + /// + /// Values of column "a" are 0..399 + /// Values of column "b" are 400..799 + /// Values of column "c" are alternating strings of length 12 and longer + fn new() -> Self { + Self { + bytes: TEST_FILE_DATA.clone(), + expected_records_read_from_cache: 0, + } + } + + /// Set the expected number of records read from the cache + fn with_expected_records_read_from_cache( + mut self, + expected_records_read_from_cache: usize, + ) -> Self { + self.expected_records_read_from_cache = expected_records_read_from_cache; + self + } + + /// Return a [`ParquetRecordBatchReaderBuilder`] for reading this file + fn sync_builder(&self) -> ParquetRecordBatchReaderBuilder { + let reader = self.bytes.clone(); + ParquetRecordBatchReaderBuilder::try_new_with_options(reader, ArrowReaderOptions::default()) + .expect("ParquetRecordBatchReaderBuilder") + } + + /// Return a [`ParquetRecordBatchReaderBuilder`] for reading this file + async fn async_builder(&self) -> ParquetRecordBatchStreamBuilder { + let reader = TestReader::new(self.bytes.clone()); + ParquetRecordBatchStreamBuilder::new_with_options(reader, ArrowReaderOptions::default()) + .await + .unwrap() + } + + /// Return a [`ParquetRecordBatchReaderBuilder`] for reading the file with + /// + /// 1. a projection selecting the "a" and "b" column + /// 2. a row_filter applied to "b": 575 < "b" < 625 (select 1 data page from each row group) + fn add_project_ab_and_filter_b( + &self, + builder: ArrowReaderBuilder, + ) -> ArrowReaderBuilder { + let schema_descr = builder.metadata().file_metadata().schema_descr_ptr(); + + // "b" > 575 and "b" < 625 + let row_filter = ArrowPredicateFn::new( + ProjectionMask::columns(&schema_descr, ["b"]), + |batch: RecordBatch| { + let scalar_575 = Int64Array::new_scalar(575); + let scalar_625 = Int64Array::new_scalar(625); + let column = batch.column(0).as_primitive::(); + and(>(column, &scalar_575)?, <(column, &scalar_625)?) + }, + ); + + builder + .with_projection(ProjectionMask::columns(&schema_descr, ["a", "b"])) + .with_row_filter(RowFilter::new(vec![Box::new(row_filter)])) + } + + /// Build the reader from the specified builder, reading all batches from it, + /// and asserts the + fn run_sync(&self, builder: ParquetRecordBatchReaderBuilder) { + let metrics = ArrowReaderMetrics::enabled(); + + let reader = builder.with_metrics(metrics.clone()).build().unwrap(); + for batch in reader { + match batch { + Ok(_) => {} + Err(e) => panic!("Error reading batch: {e}"), + } + } + self.verify_metrics(metrics) + } + + /// Build the reader from the specified builder, reading all batches from it, + /// and asserts the + async fn run_async(&self, builder: ParquetRecordBatchStreamBuilder) { + let metrics = ArrowReaderMetrics::enabled(); + + let mut stream = builder.with_metrics(metrics.clone()).build().unwrap(); + while let Some(batch) = stream.next().await { + match batch { + Ok(_) => {} + Err(e) => panic!("Error reading batch: {e}"), + } + } + self.verify_metrics(metrics) + } + + fn verify_metrics(&self, metrics: ArrowReaderMetrics) { + let Self { + bytes: _, + expected_records_read_from_cache, + } = self; + + let read_from_cache = metrics + .records_read_from_cache() + .expect("Metrics enabled, so should have metrics"); + + assert_eq!( + &read_from_cache, expected_records_read_from_cache, + "Expected {expected_records_read_from_cache} records read from cache, but got {read_from_cache}" + ); + } +} + +/// Create a parquet file in memory for testing. See [`test_file`] for details. +static TEST_FILE_DATA: LazyLock = LazyLock::new(|| { + // Input batch has 400 rows, with 3 columns: "a", "b", "c" + // Note c is a different types (so the data page sizes will be different) + let a: ArrayRef = Arc::new(Int64Array::from_iter_values(0..400)); + let b: ArrayRef = Arc::new(Int64Array::from_iter_values(400..800)); + let c: ArrayRef = Arc::new(StringViewArray::from_iter_values((0..400).map(|i| { + if i % 2 == 0 { + format!("string_{i}") + } else { + format!("A string larger than 12 bytes and thus not inlined {i}") + } + }))); + + let input_batch = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap(); + + let mut output = Vec::new(); + + let writer_options = WriterProperties::builder() + .set_max_row_group_size(200) + .set_data_page_row_count_limit(100) + .build(); + let mut writer = + ArrowWriter::try_new(&mut output, input_batch.schema(), Some(writer_options)).unwrap(); + + // since the limits are only enforced on batch boundaries, write the input + // batch in chunks of 50 + let mut row_remain = input_batch.num_rows(); + while row_remain > 0 { + let chunk_size = row_remain.min(50); + let chunk = input_batch.slice(input_batch.num_rows() - row_remain, chunk_size); + writer.write(&chunk).unwrap(); + row_remain -= chunk_size; + } + writer.close().unwrap(); + Bytes::from(output) +}); + +/// Copy paste version of the `AsyncFileReader` trait for testing purposes 🤮 +/// TODO put this in a common place +#[derive(Clone)] +struct TestReader { + data: Bytes, + metadata: Option>, +} + +impl TestReader { + fn new(data: Bytes) -> Self { + Self { + data, + metadata: Default::default(), + } + } +} + +impl AsyncFileReader for TestReader { + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, parquet::errors::Result> { + let range = range.clone(); + futures::future::ready(Ok(self + .data + .slice(range.start as usize..range.end as usize))) + .boxed() + } + + fn get_metadata<'a>( + &'a mut self, + options: Option<&'a ArrowReaderOptions>, + ) -> BoxFuture<'a, parquet::errors::Result>> { + let metadata_reader = + ParquetMetaDataReader::new().with_page_indexes(options.is_some_and(|o| o.page_index())); + self.metadata = Some(Arc::new( + metadata_reader.parse_and_finish(&self.data).unwrap(), + )); + futures::future::ready(Ok(self.metadata.clone().unwrap().clone())).boxed() + } +}