diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index 05557069aa7d..f601ac7cefdc 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -78,6 +78,7 @@ base64 = { version = "0.22", default-features = false, features = ["std"] } criterion = { version = "0.5", default-features = false, features = ["async_futures"] } snap = { version = "1.0", default-features = false } tempfile = { version = "3.0", default-features = false } +insta = "1.43.1" brotli = { version = "8.0", default-features = false, features = ["std"] } flate2 = { version = "1.0", default-features = false, features = ["rust_backend"] } lz4_flex = { version = "0.11", default-features = false, features = ["std", "frame"] } diff --git a/parquet/src/arrow/array_reader/builder.rs b/parquet/src/arrow/array_reader/builder.rs index 6dcf05ccf8ad..d5e36fbcb486 100644 --- a/parquet/src/arrow/array_reader/builder.rs +++ b/parquet/src/arrow/array_reader/builder.rs @@ -15,18 +15,22 @@ // specific language governing permissions and limitations // under the License. -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use arrow_schema::{DataType, Fields, SchemaBuilder}; use crate::arrow::array_reader::byte_view_array::make_byte_view_array_reader; +use crate::arrow::array_reader::cached_array_reader::CacheRole; +use crate::arrow::array_reader::cached_array_reader::CachedArrayReader; use crate::arrow::array_reader::empty_array::make_empty_array_reader; use crate::arrow::array_reader::fixed_len_byte_array::make_fixed_len_byte_array_reader; +use crate::arrow::array_reader::row_group_cache::RowGroupCache; use crate::arrow::array_reader::{ make_byte_array_dictionary_reader, make_byte_array_reader, ArrayReader, FixedSizeListArrayReader, ListArrayReader, MapArrayReader, NullArrayReader, PrimitiveArrayReader, RowGroups, StructArrayReader, }; +use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics; use crate::arrow::schema::{ParquetField, ParquetFieldType}; use crate::arrow::ProjectionMask; use crate::basic::Type as PhysicalType; @@ -34,14 +38,74 @@ use crate::data_type::{BoolType, DoubleType, FloatType, Int32Type, Int64Type, In use crate::errors::{ParquetError, Result}; use crate::schema::types::{ColumnDescriptor, ColumnPath, Type}; +/// Builder for [`CacheOptions`] +#[derive(Debug, Clone)] +pub struct CacheOptionsBuilder<'a> { + /// Projection mask to apply to the cache + pub projection_mask: &'a ProjectionMask, + /// Cache to use for storing row groups + pub cache: Arc>, +} + +impl<'a> CacheOptionsBuilder<'a> { + /// create a new cache options builder + pub fn new(projection_mask: &'a ProjectionMask, cache: Arc>) -> Self { + Self { + projection_mask, + cache, + } + } + + /// Return a new [`CacheOptions`] for producing (populating) the cache + pub fn producer(self) -> CacheOptions<'a> { + CacheOptions { + projection_mask: self.projection_mask, + cache: self.cache, + role: CacheRole::Producer, + } + } + + /// return a new [`CacheOptions`] for consuming (reading) the cache + pub fn consumer(self) -> CacheOptions<'a> { + CacheOptions { + projection_mask: self.projection_mask, + cache: self.cache, + role: CacheRole::Consumer, + } + } +} + +/// Cache options containing projection mask, cache, and role +#[derive(Clone)] +pub struct CacheOptions<'a> { + pub projection_mask: &'a ProjectionMask, + pub cache: Arc>, + pub role: CacheRole, +} + /// Builds [`ArrayReader`]s from parquet schema, projection mask, and RowGroups reader pub struct ArrayReaderBuilder<'a> { + /// Source of row group data row_groups: &'a dyn RowGroups, + /// Optional cache options for the array reader + cache_options: Option<&'a CacheOptions<'a>>, + /// metrics + metrics: &'a ArrowReaderMetrics, } impl<'a> ArrayReaderBuilder<'a> { - pub fn new(row_groups: &'a dyn RowGroups) -> Self { - Self { row_groups } + pub fn new(row_groups: &'a dyn RowGroups, metrics: &'a ArrowReaderMetrics) -> Self { + Self { + row_groups, + cache_options: None, + metrics, + } + } + + /// Add cache options to the builder + pub fn with_cache_options(mut self, cache_options: Option<&'a CacheOptions<'a>>) -> Self { + self.cache_options = cache_options; + self } /// Create [`ArrayReader`] from parquet schema, projection mask, and parquet file reader. @@ -69,7 +133,26 @@ impl<'a> ArrayReaderBuilder<'a> { mask: &ProjectionMask, ) -> Result>> { match field.field_type { - ParquetFieldType::Primitive { .. } => self.build_primitive_reader(field, mask), + ParquetFieldType::Primitive { col_idx, .. } => { + let Some(reader) = self.build_primitive_reader(field, mask)? else { + return Ok(None); + }; + let Some(cache_options) = self.cache_options.as_ref() else { + return Ok(Some(reader)); + }; + + if cache_options.projection_mask.leaf_included(col_idx) { + Ok(Some(Box::new(CachedArrayReader::new( + reader, + Arc::clone(&cache_options.cache), + col_idx, + cache_options.role, + self.metrics.clone(), // cheap clone + )))) + } else { + Ok(Some(reader)) + } + } ParquetFieldType::Group { .. } => match &field.arrow_type { DataType::Map(_, _) => self.build_map_reader(field, mask), DataType::Struct(_) => self.build_struct_reader(field, mask), @@ -375,7 +458,8 @@ mod tests { ) .unwrap(); - let array_reader = ArrayReaderBuilder::new(&file_reader) + let metrics = ArrowReaderMetrics::disabled(); + let array_reader = ArrayReaderBuilder::new(&file_reader, &metrics) .build_array_reader(fields.as_ref(), &mask) .unwrap(); diff --git a/parquet/src/arrow/array_reader/cached_array_reader.rs b/parquet/src/arrow/array_reader/cached_array_reader.rs new file mode 100644 index 000000000000..0e837782faf5 --- /dev/null +++ b/parquet/src/arrow/array_reader/cached_array_reader.rs @@ -0,0 +1,762 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`CachedArrayReader`] wrapper around [`ArrayReader`] + +use crate::arrow::array_reader::row_group_cache::BatchID; +use crate::arrow::array_reader::{row_group_cache::RowGroupCache, ArrayReader}; +use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics; +use crate::errors::Result; +use arrow_array::{new_empty_array, ArrayRef, BooleanArray}; +use arrow_buffer::BooleanBufferBuilder; +use arrow_schema::DataType as ArrowType; +use std::any::Any; +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; + +/// Role of the cached array reader +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum CacheRole { + /// Producer role: inserts data into the cache during filter phase + Producer, + /// Consumer role: removes consumed data from the cache during output building phase + Consumer, +} + +/// A cached wrapper around an ArrayReader that avoids duplicate decoding +/// when the same column appears in both filter predicates and output projection. +/// +/// This reader acts as a transparent layer over the inner reader, using a cache +/// to avoid redundant work when the same data is needed multiple times. +/// +/// The reader can operate in two roles: +/// - Producer: During filter phase, inserts decoded data into the cache +/// - Consumer: During output building, consumes and removes data from the cache +/// +/// This means the memory consumption of the cache has two stages: +/// 1. During the filter phase, the memory increases as the cache is populated +/// 2. It peaks when filters are built. +/// 3. It decreases as the cached data is consumed. +/// +/// ```text +/// ▲ +/// │ ╭─╮ +/// │ ╱ ╲ +/// │ ╱ ╲ +/// │ ╱ ╲ +/// │ ╱ ╲ +/// │╱ ╲ +/// └─────────────╲──────► Time +/// │ │ │ +/// Filter Peak Consume +/// Phase (Built) (Decrease) +/// ``` +pub struct CachedArrayReader { + /// The underlying array reader + inner: Box, + /// Shared cache for this row group + shared_cache: Arc>, + /// Column index for cache key generation + column_idx: usize, + /// Current logical position in the data stream for this reader (for cache key generation) + outer_position: usize, + /// Current position in `inner` + inner_position: usize, + /// Batch size for the cache + batch_size: usize, + /// Boolean buffer builder to track selections for the next consume_batch() + selections: BooleanBufferBuilder, + /// Role of this reader (Producer or Consumer) + role: CacheRole, + /// Local cache to store batches between read_records and consume_batch calls + /// This ensures data is available even if the shared cache evicts items + local_cache: HashMap, + /// Statistics to report on the Cache behavior + metrics: ArrowReaderMetrics, +} + +impl CachedArrayReader { + /// Creates a new cached array reader with the specified role + pub fn new( + inner: Box, + cache: Arc>, + column_idx: usize, + role: CacheRole, + metrics: ArrowReaderMetrics, + ) -> Self { + let batch_size = cache.lock().unwrap().batch_size(); + + Self { + inner, + shared_cache: cache, + column_idx, + outer_position: 0, + inner_position: 0, + batch_size, + selections: BooleanBufferBuilder::new(0), + role, + local_cache: HashMap::new(), + metrics, + } + } + + fn get_batch_id_from_position(&self, row_id: usize) -> BatchID { + BatchID { + val: row_id / self.batch_size, + } + } + + /// Loads the batch with the given ID (first row offset) from the inner + /// reader + /// + /// After this call the required batch will be available in + /// `self.local_cache` and may also be stored in `self.shared_cache`. + /// + fn fetch_batch(&mut self, batch_id: BatchID) -> Result { + let first_row_offset = batch_id.val * self.batch_size; + if self.inner_position < first_row_offset { + let to_skip = first_row_offset - self.inner_position; + let skipped = self.inner.skip_records(to_skip)?; + assert_eq!(skipped, to_skip); + self.inner_position += skipped; + } + + let read = self.inner.read_records(self.batch_size)?; + + // If there are no remaining records (EOF), return immediately without + // attempting to cache an empty batch. This prevents inserting zero-length + // arrays into the cache which can later cause panics when slicing. + if read == 0 { + return Ok(0); + } + + let array = self.inner.consume_batch()?; + + // Store in both shared cache and local cache + // The shared cache is used to reuse results between readers + // The local cache ensures data is available for our consume_batch call + let _cached = + self.shared_cache + .lock() + .unwrap() + .insert(self.column_idx, batch_id, array.clone()); + // Note: if the shared cache is full (_cached == false), we continue without caching + // The local cache will still store the data for this reader's use + + self.local_cache.insert(batch_id, array); + + self.inner_position += read; + Ok(read) + } + + /// Remove batches from cache that have been completely consumed + /// This is only called for Consumer role readers + fn cleanup_consumed_batches(&mut self) { + let current_batch_id = self.get_batch_id_from_position(self.outer_position); + + // Remove batches that are at least one batch behind the current position + // This ensures we don't remove batches that might still be needed for the current batch + // We can safely remove batch_id if current_batch_id > batch_id + 1 + if current_batch_id.val > 1 { + let mut cache = self.shared_cache.lock().unwrap(); + for batch_id_to_remove in 0..(current_batch_id.val - 1) { + cache.remove( + self.column_idx, + BatchID { + val: batch_id_to_remove, + }, + ); + } + } + } +} + +impl ArrayReader for CachedArrayReader { + fn as_any(&self) -> &dyn Any { + self + } + + fn get_data_type(&self) -> &ArrowType { + self.inner.get_data_type() + } + + fn read_records(&mut self, num_records: usize) -> Result { + let mut read = 0; + while read < num_records { + let batch_id = self.get_batch_id_from_position(self.outer_position); + + // Check local cache first + let cached = if let Some(array) = self.local_cache.get(&batch_id) { + Some(array.clone()) + } else { + // If not in local cache, i.e., we are consumer, check shared cache + let cache_content = self + .shared_cache + .lock() + .unwrap() + .get(self.column_idx, batch_id); + if let Some(array) = cache_content.as_ref() { + // Store in local cache for later use in consume_batch + self.local_cache.insert(batch_id, array.clone()); + } + cache_content + }; + + match cached { + Some(array) => { + let array_len = array.len(); + if array_len + batch_id.val * self.batch_size > self.outer_position { + // the cache batch has some records that we can select + let v = array_len + batch_id.val * self.batch_size - self.outer_position; + let select_cnt = std::cmp::min(num_records - read, v); + read += select_cnt; + self.metrics.increment_cache_reads(select_cnt); + self.outer_position += select_cnt; + self.selections.append_n(select_cnt, true); + } else { + // this is last batch and we have used all records from it + break; + } + } + None => { + let read_from_inner = self.fetch_batch(batch_id)?; + // Reached end-of-file, no more records to read + if read_from_inner == 0 { + break; + } + self.metrics.increment_inner_reads(read_from_inner); + let select_from_this_batch = std::cmp::min( + num_records - read, + self.inner_position - self.outer_position, + ); + read += select_from_this_batch; + self.outer_position += select_from_this_batch; + self.selections.append_n(select_from_this_batch, true); + if read_from_inner < self.batch_size { + // this is last batch from inner reader + break; + } + } + } + } + Ok(read) + } + + fn skip_records(&mut self, num_records: usize) -> Result { + let mut skipped = 0; + while skipped < num_records { + let size = std::cmp::min(num_records - skipped, self.batch_size); + skipped += size; + self.selections.append_n(size, false); + self.outer_position += size; + } + Ok(num_records) + } + + fn consume_batch(&mut self) -> Result { + let row_count = self.selections.len(); + if row_count == 0 { + return Ok(new_empty_array(self.inner.get_data_type())); + } + + let start_position = self.outer_position - row_count; + + let selection_buffer = self.selections.finish(); + + let start_batch = start_position / self.batch_size; + let end_batch = (start_position + row_count - 1) / self.batch_size; + + let mut selected_arrays = Vec::new(); + for batch_id in start_batch..=end_batch { + let batch_start = batch_id * self.batch_size; + let batch_end = batch_start + self.batch_size - 1; + let batch_id = self.get_batch_id_from_position(batch_start); + + // Calculate the overlap between the start_position and the batch + let overlap_start = start_position.max(batch_start); + let overlap_end = (start_position + row_count - 1).min(batch_end); + + if overlap_start > overlap_end { + continue; + } + + let selection_start = overlap_start - start_position; + let selection_length = overlap_end - overlap_start + 1; + let mask = selection_buffer.slice(selection_start, selection_length); + + if mask.count_set_bits() == 0 { + continue; + } + + let mask_array = BooleanArray::from(mask); + // Read from local cache instead of shared cache to avoid cache eviction issues + let cached = self + .local_cache + .get(&batch_id) + .expect("data must be already cached in the read_records call, this is a bug"); + let cached = cached.slice(overlap_start - batch_start, selection_length); + let filtered = arrow_select::filter::filter(&cached, &mask_array)?; + selected_arrays.push(filtered); + } + + self.selections = BooleanBufferBuilder::new(0); + + // Only remove batches from local buffer that are completely behind current position + // Keep the current batch and any future batches as they might still be needed + let current_batch_id = self.get_batch_id_from_position(self.outer_position); + self.local_cache + .retain(|batch_id, _| batch_id.val >= current_batch_id.val); + + // For consumers, cleanup batches that have been completely consumed + // This reduces the memory usage of the shared cache + if self.role == CacheRole::Consumer { + self.cleanup_consumed_batches(); + } + + match selected_arrays.len() { + 0 => Ok(new_empty_array(self.inner.get_data_type())), + 1 => Ok(selected_arrays.into_iter().next().unwrap()), + _ => Ok(arrow_select::concat::concat( + &selected_arrays + .iter() + .map(|a| a.as_ref()) + .collect::>(), + )?), + } + } + + fn get_def_levels(&self) -> Option<&[i16]> { + None // we don't allow nullable parent for now. + } + + fn get_rep_levels(&self) -> Option<&[i16]> { + None + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::arrow::array_reader::row_group_cache::RowGroupCache; + use crate::arrow::array_reader::ArrayReader; + use arrow_array::{ArrayRef, Int32Array}; + use std::sync::{Arc, Mutex}; + + // Mock ArrayReader for testing + struct MockArrayReader { + data: Vec, + position: usize, + records_to_consume: usize, + data_type: ArrowType, + } + + impl MockArrayReader { + fn new(data: Vec) -> Self { + Self { + data, + position: 0, + records_to_consume: 0, + data_type: ArrowType::Int32, + } + } + } + + impl ArrayReader for MockArrayReader { + fn as_any(&self) -> &dyn Any { + self + } + + fn get_data_type(&self) -> &ArrowType { + &self.data_type + } + + fn read_records(&mut self, batch_size: usize) -> Result { + let remaining = self.data.len() - self.position; + let to_read = std::cmp::min(batch_size, remaining); + self.records_to_consume += to_read; + Ok(to_read) + } + + fn consume_batch(&mut self) -> Result { + let start = self.position; + let end = start + self.records_to_consume; + let slice = &self.data[start..end]; + self.position = end; + self.records_to_consume = 0; + Ok(Arc::new(Int32Array::from(slice.to_vec()))) + } + + fn skip_records(&mut self, num_records: usize) -> Result { + let remaining = self.data.len() - self.position; + let to_skip = std::cmp::min(num_records, remaining); + self.position += to_skip; + Ok(to_skip) + } + + fn get_def_levels(&self) -> Option<&[i16]> { + None + } + + fn get_rep_levels(&self) -> Option<&[i16]> { + None + } + } + + #[test] + fn test_cached_reader_basic() { + let metrics = ArrowReaderMetrics::disabled(); + let mock_reader = MockArrayReader::new(vec![1, 2, 3, 4, 5]); + let cache = Arc::new(Mutex::new(RowGroupCache::new(3, usize::MAX))); // Batch size 3 + let mut cached_reader = CachedArrayReader::new( + Box::new(mock_reader), + cache, + 0, + CacheRole::Producer, + metrics, + ); + + // Read 3 records + let records_read = cached_reader.read_records(3).unwrap(); + assert_eq!(records_read, 3); + + let array = cached_reader.consume_batch().unwrap(); + assert_eq!(array.len(), 3); + + let int32_array = array.as_any().downcast_ref::().unwrap(); + assert_eq!(int32_array.values(), &[1, 2, 3]); + + // Read 3 more records + let records_read = cached_reader.read_records(3).unwrap(); + assert_eq!(records_read, 2); + } + + #[test] + fn test_read_skip_pattern() { + let metrics = ArrowReaderMetrics::disabled(); + let mock_reader = MockArrayReader::new(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); + let cache = Arc::new(Mutex::new(RowGroupCache::new(5, usize::MAX))); // Batch size 5 + let mut cached_reader = CachedArrayReader::new( + Box::new(mock_reader), + cache, + 0, + CacheRole::Consumer, + metrics, + ); + + let read1 = cached_reader.read_records(2).unwrap(); + assert_eq!(read1, 2); + + let array1 = cached_reader.consume_batch().unwrap(); + assert_eq!(array1.len(), 2); + let int32_array = array1.as_any().downcast_ref::().unwrap(); + assert_eq!(int32_array.values(), &[1, 2]); + + let skipped = cached_reader.skip_records(2).unwrap(); + assert_eq!(skipped, 2); + + let read2 = cached_reader.read_records(1).unwrap(); + assert_eq!(read2, 1); + + // Consume it (should be the 5th element after skipping 3,4) + let array2 = cached_reader.consume_batch().unwrap(); + assert_eq!(array2.len(), 1); + let int32_array = array2.as_any().downcast_ref::().unwrap(); + assert_eq!(int32_array.values(), &[5]); + } + + #[test] + fn test_multiple_reads_before_consume() { + let metrics = ArrowReaderMetrics::disabled(); + let mock_reader = MockArrayReader::new(vec![1, 2, 3, 4, 5, 6]); + let cache = Arc::new(Mutex::new(RowGroupCache::new(3, usize::MAX))); // Batch size 3 + let mut cached_reader = CachedArrayReader::new( + Box::new(mock_reader), + cache, + 0, + CacheRole::Consumer, + metrics, + ); + + // Multiple reads should accumulate + let read1 = cached_reader.read_records(2).unwrap(); + assert_eq!(read1, 2); + + let read2 = cached_reader.read_records(1).unwrap(); + assert_eq!(read2, 1); + + // Consume should return all accumulated records + let array = cached_reader.consume_batch().unwrap(); + assert_eq!(array.len(), 3); + let int32_array = array.as_any().downcast_ref::().unwrap(); + assert_eq!(int32_array.values(), &[1, 2, 3]); + } + + #[test] + fn test_eof_behavior() { + let metrics = ArrowReaderMetrics::disabled(); + let mock_reader = MockArrayReader::new(vec![1, 2, 3]); + let cache = Arc::new(Mutex::new(RowGroupCache::new(5, usize::MAX))); // Batch size 5 + let mut cached_reader = CachedArrayReader::new( + Box::new(mock_reader), + cache, + 0, + CacheRole::Consumer, + metrics, + ); + + // Try to read more than available + let read1 = cached_reader.read_records(5).unwrap(); + assert_eq!(read1, 3); // Should only get 3 records (all available) + + let array1 = cached_reader.consume_batch().unwrap(); + assert_eq!(array1.len(), 3); + + // Further reads should return 0 + let read2 = cached_reader.read_records(1).unwrap(); + assert_eq!(read2, 0); + + let array2 = cached_reader.consume_batch().unwrap(); + assert_eq!(array2.len(), 0); + } + + #[test] + fn test_cache_sharing() { + let metrics = ArrowReaderMetrics::disabled(); + let cache = Arc::new(Mutex::new(RowGroupCache::new(5, usize::MAX))); // Batch size 5 + + // First reader - populate cache + let mock_reader1 = MockArrayReader::new(vec![1, 2, 3, 4, 5]); + let mut cached_reader1 = CachedArrayReader::new( + Box::new(mock_reader1), + cache.clone(), + 0, + CacheRole::Producer, + metrics.clone(), + ); + + cached_reader1.read_records(3).unwrap(); + let array1 = cached_reader1.consume_batch().unwrap(); + assert_eq!(array1.len(), 3); + + // Second reader with different column index should not interfere + let mock_reader2 = MockArrayReader::new(vec![10, 20, 30, 40, 50]); + let mut cached_reader2 = CachedArrayReader::new( + Box::new(mock_reader2), + cache.clone(), + 1, + CacheRole::Consumer, + metrics.clone(), + ); + + cached_reader2.read_records(2).unwrap(); + let array2 = cached_reader2.consume_batch().unwrap(); + assert_eq!(array2.len(), 2); + + // Verify the second reader got its own data, not from cache + let int32_array = array2.as_any().downcast_ref::().unwrap(); + assert_eq!(int32_array.values(), &[10, 20]); + } + + #[test] + fn test_consumer_removes_batches() { + let metrics = ArrowReaderMetrics::disabled(); + let mock_reader = MockArrayReader::new(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); + let cache = Arc::new(Mutex::new(RowGroupCache::new(3, usize::MAX))); // Batch size 3 + let mut consumer_reader = CachedArrayReader::new( + Box::new(mock_reader), + cache.clone(), + 0, + CacheRole::Consumer, + metrics, + ); + + // Read first batch (positions 0-2, batch 0) + let read1 = consumer_reader.read_records(3).unwrap(); + assert_eq!(read1, 3); + assert_eq!(consumer_reader.outer_position, 3); + // Check that batch 0 is in cache after read_records + assert!(cache.lock().unwrap().get(0, BatchID { val: 0 }).is_some()); + + let array1 = consumer_reader.consume_batch().unwrap(); + assert_eq!(array1.len(), 3); + + // After first consume_batch, batch 0 should still be in cache + // (current_batch_id = 3/3 = 1, cleanup only happens if current_batch_id > 1) + assert!(cache.lock().unwrap().get(0, BatchID { val: 0 }).is_some()); + + // Read second batch (positions 3-5, batch 1) + let read2 = consumer_reader.read_records(3).unwrap(); + assert_eq!(read2, 3); + assert_eq!(consumer_reader.outer_position, 6); + let array2 = consumer_reader.consume_batch().unwrap(); + assert_eq!(array2.len(), 3); + + // After second consume_batch, batch 0 should be removed + // (current_batch_id = 6/3 = 2, cleanup removes batches 0..(2-1) = 0..1, so removes batch 0) + assert!(cache.lock().unwrap().get(0, BatchID { val: 0 }).is_none()); + assert!(cache.lock().unwrap().get(0, BatchID { val: 1 }).is_some()); + + // Read third batch (positions 6-8, batch 2) + let read3 = consumer_reader.read_records(3).unwrap(); + assert_eq!(read3, 3); + assert_eq!(consumer_reader.outer_position, 9); + let array3 = consumer_reader.consume_batch().unwrap(); + assert_eq!(array3.len(), 3); + + // After third consume_batch, batches 0 and 1 should be removed + // (current_batch_id = 9/3 = 3, cleanup removes batches 0..(3-1) = 0..2, so removes batches 0 and 1) + assert!(cache.lock().unwrap().get(0, BatchID { val: 0 }).is_none()); + assert!(cache.lock().unwrap().get(0, BatchID { val: 1 }).is_none()); + assert!(cache.lock().unwrap().get(0, BatchID { val: 2 }).is_some()); + } + + #[test] + fn test_producer_keeps_batches() { + let metrics = ArrowReaderMetrics::disabled(); + let mock_reader = MockArrayReader::new(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); + let cache = Arc::new(Mutex::new(RowGroupCache::new(3, usize::MAX))); // Batch size 3 + let mut producer_reader = CachedArrayReader::new( + Box::new(mock_reader), + cache.clone(), + 0, + CacheRole::Producer, + metrics, + ); + + // Read first batch (positions 0-2) + let read1 = producer_reader.read_records(3).unwrap(); + assert_eq!(read1, 3); + let array1 = producer_reader.consume_batch().unwrap(); + assert_eq!(array1.len(), 3); + + // Verify batch 0 is in cache + assert!(cache.lock().unwrap().get(0, BatchID { val: 0 }).is_some()); + + // Read second batch (positions 3-5) - producer should NOT remove batch 0 + let read2 = producer_reader.read_records(3).unwrap(); + assert_eq!(read2, 3); + let array2 = producer_reader.consume_batch().unwrap(); + assert_eq!(array2.len(), 3); + + // Verify both batch 0 and batch 1 are still present (no removal for producer) + assert!(cache.lock().unwrap().get(0, BatchID { val: 0 }).is_some()); + assert!(cache.lock().unwrap().get(0, BatchID { val: 1 }).is_some()); + } + + #[test] + fn test_local_cache_protects_against_eviction() { + let metrics = ArrowReaderMetrics::disabled(); + let mock_reader = MockArrayReader::new(vec![1, 2, 3, 4, 5, 6]); + let cache = Arc::new(Mutex::new(RowGroupCache::new(3, usize::MAX))); // Batch size 3 + let mut cached_reader = CachedArrayReader::new( + Box::new(mock_reader), + cache.clone(), + 0, + CacheRole::Consumer, + metrics, + ); + + // Read records which should populate both shared and local cache + let records_read = cached_reader.read_records(3).unwrap(); + assert_eq!(records_read, 3); + + // Verify data is in both caches + assert!(cache.lock().unwrap().get(0, BatchID { val: 0 }).is_some()); + assert!(cached_reader.local_cache.contains_key(&BatchID { val: 0 })); + + // Simulate cache eviction by manually removing from shared cache + cache.lock().unwrap().remove(0, BatchID { val: 0 }); + assert!(cache.lock().unwrap().get(0, BatchID { val: 0 }).is_none()); + + // Even though shared cache was evicted, consume_batch should still work + // because data is preserved in local cache + let array = cached_reader.consume_batch().unwrap(); + assert_eq!(array.len(), 3); + + let int32_array = array.as_any().downcast_ref::().unwrap(); + assert_eq!(int32_array.values(), &[1, 2, 3]); + + // Local cache should be cleared after consume_batch + assert!(cached_reader.local_cache.is_empty()); + } + + #[test] + fn test_local_cache_is_cleared_properly() { + let metrics = ArrowReaderMetrics::disabled(); + let mock_reader = MockArrayReader::new(vec![1, 2, 3, 4]); + let cache = Arc::new(Mutex::new(RowGroupCache::new(3, 0))); // Batch size 3, cache 0 + let mut cached_reader = CachedArrayReader::new( + Box::new(mock_reader), + cache.clone(), + 0, + CacheRole::Consumer, + metrics, + ); + + // Read records which should populate both shared and local cache + let records_read = cached_reader.read_records(1).unwrap(); + assert_eq!(records_read, 1); + let array = cached_reader.consume_batch().unwrap(); + assert_eq!(array.len(), 1); + + let records_read = cached_reader.read_records(3).unwrap(); + assert_eq!(records_read, 3); + let array = cached_reader.consume_batch().unwrap(); + assert_eq!(array.len(), 3); + } + + #[test] + fn test_batch_id_calculation_with_incremental_reads() { + let metrics = ArrowReaderMetrics::disabled(); + let mock_reader = MockArrayReader::new(vec![1, 2, 3, 4, 5, 6, 7, 8, 9]); + let cache = Arc::new(Mutex::new(RowGroupCache::new(3, usize::MAX))); // Batch size 3 + + // Create a producer to populate cache + let mut producer = CachedArrayReader::new( + Box::new(MockArrayReader::new(vec![1, 2, 3, 4, 5, 6, 7, 8, 9])), + cache.clone(), + 0, + CacheRole::Producer, + metrics.clone(), + ); + + // Populate cache with first batch (1, 2, 3) + producer.read_records(3).unwrap(); + producer.consume_batch().unwrap(); + + // Now create a consumer that will try to read from cache + let mut consumer = CachedArrayReader::new( + Box::new(mock_reader), + cache.clone(), + 0, + CacheRole::Consumer, + metrics, + ); + + // - We want to read 4 records starting from position 0 + // - First 3 records (positions 0-2) should come from cache (batch 0) + // - The 4th record (position 3) should come from the next batch + let records_read = consumer.read_records(4).unwrap(); + assert_eq!(records_read, 4); + + let array = consumer.consume_batch().unwrap(); + assert_eq!(array.len(), 4); + + let int32_array = array.as_any().downcast_ref::().unwrap(); + assert_eq!(int32_array.values(), &[1, 2, 3, 4]); + } +} diff --git a/parquet/src/arrow/array_reader/list_array.rs b/parquet/src/arrow/array_reader/list_array.rs index 66c4f30b3c29..e28c93cf624d 100644 --- a/parquet/src/arrow/array_reader/list_array.rs +++ b/parquet/src/arrow/array_reader/list_array.rs @@ -249,6 +249,7 @@ mod tests { use crate::arrow::array_reader::list_array::ListArrayReader; use crate::arrow::array_reader::test_util::InMemoryArrayReader; use crate::arrow::array_reader::ArrayReaderBuilder; + use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics; use crate::arrow::schema::parquet_to_arrow_schema_and_fields; use crate::arrow::{parquet_to_arrow_schema, ArrowWriter, ProjectionMask}; use crate::file::properties::WriterProperties; @@ -563,7 +564,8 @@ mod tests { ) .unwrap(); - let mut array_reader = ArrayReaderBuilder::new(&file_reader) + let metrics = ArrowReaderMetrics::disabled(); + let mut array_reader = ArrayReaderBuilder::new(&file_reader, &metrics) .build_array_reader(fields.as_ref(), &mask) .unwrap(); diff --git a/parquet/src/arrow/array_reader/mod.rs b/parquet/src/arrow/array_reader/mod.rs index d6e325b49450..5b0ccd874f9e 100644 --- a/parquet/src/arrow/array_reader/mod.rs +++ b/parquet/src/arrow/array_reader/mod.rs @@ -33,6 +33,7 @@ mod builder; mod byte_array; mod byte_array_dictionary; mod byte_view_array; +mod cached_array_reader; mod empty_array; mod fixed_len_byte_array; mod fixed_size_list_array; @@ -40,13 +41,14 @@ mod list_array; mod map_array; mod null_array; mod primitive_array; +mod row_group_cache; mod struct_array; #[cfg(test)] mod test_util; // Note that this crate is public under the `experimental` feature flag. -pub use builder::ArrayReaderBuilder; +pub use builder::{ArrayReaderBuilder, CacheOptions, CacheOptionsBuilder}; pub use byte_array::make_byte_array_reader; pub use byte_array_dictionary::make_byte_array_dictionary_reader; #[allow(unused_imports)] // Only used for benchmarks @@ -58,6 +60,7 @@ pub use list_array::ListArrayReader; pub use map_array::MapArrayReader; pub use null_array::NullArrayReader; pub use primitive_array::PrimitiveArrayReader; +pub use row_group_cache::RowGroupCache; pub use struct_array::StructArrayReader; /// Reads Parquet data into Arrow Arrays. diff --git a/parquet/src/arrow/array_reader/row_group_cache.rs b/parquet/src/arrow/array_reader/row_group_cache.rs new file mode 100644 index 000000000000..ef726e16495f --- /dev/null +++ b/parquet/src/arrow/array_reader/row_group_cache.rs @@ -0,0 +1,206 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_array::{Array, ArrayRef}; +use arrow_schema::DataType; +use std::collections::HashMap; + +/// Starting row ID for this batch +/// +/// The `BatchID` is used to identify batches of rows within a row group. +/// +/// The row_index in the id are relative to the rows being read from the +/// underlying column reader (which might already have a RowSelection applied) +/// +/// The `BatchID` for any particular row is `row_index / batch_size`. The +/// integer division ensures that rows in the same batch share the same +/// the BatchID which can be calculated quickly from the row index +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] +pub struct BatchID { + pub val: usize, +} + +/// Cache key that uniquely identifies a batch within a row group +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct CacheKey { + /// Column index in the row group + pub column_idx: usize, + /// Starting row ID for this batch + pub batch_id: BatchID, +} + +fn get_array_memory_size_for_cache(array: &ArrayRef) -> usize { + match array.data_type() { + // TODO: this is temporary workaround. It's very difficult to measure the actual memory usage of one StringViewArray, + // because the underlying buffer is shared with multiple StringViewArrays. + DataType::Utf8View => { + use arrow_array::cast::AsArray; + let array = array.as_string_view(); + array.len() * 16 + array.total_buffer_bytes_used() + std::mem::size_of_val(array) + } + _ => array.get_array_memory_size(), + } +} + +/// Row group cache that stores decoded arrow arrays at batch granularity +/// +/// This cache is designed to avoid duplicate decoding when the same column +/// appears in both filter predicates and output projection. +#[derive(Debug)] +pub struct RowGroupCache { + /// Cache storage mapping (column_idx, row_id) -> ArrayRef + cache: HashMap, + /// Cache granularity + batch_size: usize, + /// Maximum cache size in bytes + max_cache_bytes: usize, + /// Current cache size in bytes + current_cache_size: usize, +} + +impl RowGroupCache { + /// Creates a new empty row group cache + pub fn new(batch_size: usize, max_cache_bytes: usize) -> Self { + Self { + cache: HashMap::new(), + batch_size, + max_cache_bytes, + current_cache_size: 0, + } + } + + /// Inserts an array into the cache for the given column and starting row ID + /// Returns true if the array was inserted, false if it would exceed the cache size limit + pub fn insert(&mut self, column_idx: usize, batch_id: BatchID, array: ArrayRef) -> bool { + let array_size = get_array_memory_size_for_cache(&array); + + // Check if adding this array would exceed the cache size limit + if self.current_cache_size + array_size > self.max_cache_bytes { + return false; // Cache is full, don't insert + } + + let key = CacheKey { + column_idx, + batch_id, + }; + + let existing = self.cache.insert(key, array); + assert!(existing.is_none()); + self.current_cache_size += array_size; + true + } + + /// Retrieves a cached array for the given column and row ID + /// Returns None if not found in cache + pub fn get(&self, column_idx: usize, batch_id: BatchID) -> Option { + let key = CacheKey { + column_idx, + batch_id, + }; + self.cache.get(&key).cloned() + } + + /// Gets the batch size for this cache + pub fn batch_size(&self) -> usize { + self.batch_size + } + + /// Removes a cached array for the given column and row ID + /// Returns true if the entry was found and removed, false otherwise + pub fn remove(&mut self, column_idx: usize, batch_id: BatchID) -> bool { + let key = CacheKey { + column_idx, + batch_id, + }; + if let Some(array) = self.cache.remove(&key) { + self.current_cache_size -= get_array_memory_size_for_cache(&array); + true + } else { + false + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow_array::{ArrayRef, Int32Array}; + use std::sync::Arc; + + #[test] + fn test_cache_basic_operations() { + let mut cache = RowGroupCache::new(1000, usize::MAX); + + // Create test array + let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])); + + // Test insert and get + let batch_id = BatchID { val: 0 }; + assert!(cache.insert(0, batch_id, array.clone())); + let retrieved = cache.get(0, batch_id); + assert!(retrieved.is_some()); + assert_eq!(retrieved.unwrap().len(), 5); + + // Test miss + let miss = cache.get(1, batch_id); + assert!(miss.is_none()); + + // Test different row_id + let miss = cache.get(0, BatchID { val: 1000 }); + assert!(miss.is_none()); + } + + #[test] + fn test_cache_remove() { + let mut cache = RowGroupCache::new(1000, usize::MAX); + + // Create test arrays + let array1: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3])); + let array2: ArrayRef = Arc::new(Int32Array::from(vec![4, 5, 6])); + + // Insert arrays + assert!(cache.insert(0, BatchID { val: 0 }, array1.clone())); + assert!(cache.insert(0, BatchID { val: 1000 }, array2.clone())); + assert!(cache.insert(1, BatchID { val: 0 }, array1.clone())); + + // Verify they're there + assert!(cache.get(0, BatchID { val: 0 }).is_some()); + assert!(cache.get(0, BatchID { val: 1000 }).is_some()); + assert!(cache.get(1, BatchID { val: 0 }).is_some()); + + // Remove one entry + let removed = cache.remove(0, BatchID { val: 0 }); + assert!(removed); + assert!(cache.get(0, BatchID { val: 0 }).is_none()); + + // Other entries should still be there + assert!(cache.get(0, BatchID { val: 1000 }).is_some()); + assert!(cache.get(1, BatchID { val: 0 }).is_some()); + + // Try to remove non-existent entry + let not_removed = cache.remove(0, BatchID { val: 0 }); + assert!(!not_removed); + + // Remove remaining entries + assert!(cache.remove(0, BatchID { val: 1000 })); + assert!(cache.remove(1, BatchID { val: 0 })); + + // Cache should be empty + assert!(cache.get(0, BatchID { val: 1000 }).is_none()); + assert!(cache.get(1, BatchID { val: 0 }).is_none()); + } +} diff --git a/parquet/src/arrow/arrow_reader/metrics.rs b/parquet/src/arrow/arrow_reader/metrics.rs new file mode 100644 index 000000000000..05c7a5180193 --- /dev/null +++ b/parquet/src/arrow/arrow_reader/metrics.rs @@ -0,0 +1,135 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [ArrowReaderMetrics] for collecting metrics about the Arrow reader + +use std::sync::atomic::AtomicUsize; +use std::sync::Arc; + +/// This enum represents the state of Arrow reader metrics collection. +/// +/// The inner metrics are stored in an `Arc` +/// so cloning the `ArrowReaderMetrics` enum will not clone the inner metrics. +/// +/// To access metrics, create an `ArrowReaderMetrics` via [`ArrowReaderMetrics::enabled()`] +/// and configure the `ArrowReaderBuilder` with a clone. +#[derive(Debug, Clone)] +pub enum ArrowReaderMetrics { + /// Metrics are not collected (default) + Disabled, + /// Metrics are collected and stored in an `Arc`. + /// + /// Create this via [`ArrowReaderMetrics::enabled()`]. + Enabled(Arc), +} + +impl ArrowReaderMetrics { + /// Creates a new instance of [`ArrowReaderMetrics::Disabled`] + pub fn disabled() -> Self { + Self::Disabled + } + + /// Creates a new instance of [`ArrowReaderMetrics::Enabled`] + pub fn enabled() -> Self { + Self::Enabled(Arc::new(ArrowReaderMetricsInner::new())) + } + + /// Predicate Cache: number of records read directly from the inner reader + /// + /// This is the total number of records read from the inner reader (that is + /// actually decoding). It measures the amount of work that could not be + /// avoided with caching. + /// + /// It returns the number of records read across all columns, so if you read + /// 2 columns each with 100 records, this will return 200. + /// + /// + /// Returns None if metrics are disabled. + pub fn records_read_from_inner(&self) -> Option { + match self { + Self::Disabled => None, + Self::Enabled(inner) => Some( + inner + .records_read_from_inner + .load(std::sync::atomic::Ordering::Relaxed), + ), + } + } + + /// Predicate Cache: number of records read from the cache + /// + /// This is the total number of records read from the cache actually + /// decoding). It measures the amount of work that was avoided with caching. + /// + /// It returns the number of records read across all columns, so if you read + /// 2 columns each with 100 records from the cache, this will return 200. + /// + /// Returns None if metrics are disabled. + pub fn records_read_from_cache(&self) -> Option { + match self { + Self::Disabled => None, + Self::Enabled(inner) => Some( + inner + .records_read_from_cache + .load(std::sync::atomic::Ordering::Relaxed), + ), + } + } + + /// Increments the count of records read from the inner reader + pub(crate) fn increment_inner_reads(&self, count: usize) { + let Self::Enabled(inner) = self else { + return; + }; + inner + .records_read_from_inner + .fetch_add(count, std::sync::atomic::Ordering::Relaxed); + } + + /// Increments the count of records read from the cache + pub(crate) fn increment_cache_reads(&self, count: usize) { + let Self::Enabled(inner) = self else { + return; + }; + + inner + .records_read_from_cache + .fetch_add(count, std::sync::atomic::Ordering::Relaxed); + } +} + +/// Holds the actual metrics for the Arrow reader. +/// +/// Please see [`ArrowReaderMetrics`] for the public interface. +#[derive(Debug)] +pub struct ArrowReaderMetricsInner { + // Metrics for Predicate Cache + /// Total number of records read from the inner reader (uncached) + records_read_from_inner: AtomicUsize, + /// Total number of records read from previously cached pages + records_read_from_cache: AtomicUsize, +} + +impl ArrowReaderMetricsInner { + /// Creates a new instance of `ArrowReaderMetricsInner` + pub(crate) fn new() -> Self { + Self { + records_read_from_inner: AtomicUsize::new(0), + records_read_from_cache: AtomicUsize::new(0), + } + } +} diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index d4a3e11e2c46..3d20fa0a220c 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -42,9 +42,11 @@ use crate::file::reader::{ChunkReader, SerializedPageReader}; use crate::format::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash}; use crate::schema::types::SchemaDescriptor; +use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics; pub(crate) use read_plan::{ReadPlan, ReadPlanBuilder}; mod filter; +pub mod metrics; mod read_plan; mod selection; pub mod statistics; @@ -116,6 +118,10 @@ pub struct ArrowReaderBuilder { pub(crate) limit: Option, pub(crate) offset: Option, + + pub(crate) metrics: ArrowReaderMetrics, + + pub(crate) max_predicate_cache_size: usize, } impl Debug for ArrowReaderBuilder { @@ -132,6 +138,7 @@ impl Debug for ArrowReaderBuilder { .field("selection", &self.selection) .field("limit", &self.limit) .field("offset", &self.offset) + .field("metrics", &self.metrics) .finish() } } @@ -150,6 +157,8 @@ impl ArrowReaderBuilder { selection: None, limit: None, offset: None, + metrics: ArrowReaderMetrics::Disabled, + max_predicate_cache_size: 100 * 1024 * 1024, // 100MB default cache size } } @@ -300,6 +309,65 @@ impl ArrowReaderBuilder { ..self } } + + /// Specify metrics collection during reading + /// + /// To access the metrics, create an [`ArrowReaderMetrics`] and pass a + /// clone of the provided metrics to the builder. + /// + /// For example: + /// + /// ```rust + /// # use std::sync::Arc; + /// # use bytes::Bytes; + /// # use arrow_array::{Int32Array, RecordBatch}; + /// # use arrow_schema::{DataType, Field, Schema}; + /// # use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder}; + /// use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics; + /// # use parquet::arrow::ArrowWriter; + /// # let mut file: Vec = Vec::with_capacity(1024); + /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)])); + /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap(); + /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap(); + /// # writer.write(&batch).unwrap(); + /// # writer.close().unwrap(); + /// # let file = Bytes::from(file); + /// // Create metrics object to pass into the reader + /// let metrics = ArrowReaderMetrics::enabled(); + /// let reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap() + /// // Configure the builder to use the metrics by passing a clone + /// .with_metrics(metrics.clone()) + /// // Build the reader + /// .build().unwrap(); + /// // .. read data from the reader .. + /// + /// // check the metrics + /// assert!(metrics.records_read_from_inner().is_some()); + /// ``` + pub fn with_metrics(self, metrics: ArrowReaderMetrics) -> Self { + Self { metrics, ..self } + } + + /// Set the maximum size (per row group) of the predicate cache in bytes for + /// the async decoder. + /// + /// Defaults to 100MB (across all columns). Set to `usize::MAX` to use + /// unlimited cache size. + /// + /// This cache is used to store decoded arrays that are used in + /// predicate evaluation ([`Self::with_row_filter`]). + /// + /// This cache is only used for the "async" decoder, [`ParquetRecordBatchStream`]. See + /// [this ticket] for more details and alternatives. + /// + /// [`ParquetRecordBatchStream`]: https://docs.rs/parquet/latest/parquet/arrow/async_reader/struct.ParquetRecordBatchStream.html + /// [this ticket]: https://github.com/apache/arrow-rs/issues/8000 + pub fn with_max_predicate_cache_size(self, max_predicate_cache_size: usize) -> Self { + Self { + max_predicate_cache_size, + ..self + } + } } /// Options that control how metadata is read for a parquet file @@ -771,23 +839,37 @@ impl ParquetRecordBatchReaderBuilder { /// /// Note: this will eagerly evaluate any `RowFilter` before returning pub fn build(self) -> Result { + let Self { + input, + metadata, + schema: _, + fields, + batch_size: _, + row_groups, + projection, + mut filter, + selection, + limit, + offset, + metrics, + // Not used for the sync reader, see https://github.com/apache/arrow-rs/issues/8000 + max_predicate_cache_size: _, + } = self; + // Try to avoid allocate large buffer let batch_size = self .batch_size - .min(self.metadata.file_metadata().num_rows() as usize); + .min(metadata.file_metadata().num_rows() as usize); - let row_groups = self - .row_groups - .unwrap_or_else(|| (0..self.metadata.num_row_groups()).collect()); + let row_groups = row_groups.unwrap_or_else(|| (0..metadata.num_row_groups()).collect()); let reader = ReaderRowGroups { - reader: Arc::new(self.input.0), - metadata: self.metadata, + reader: Arc::new(input.0), + metadata, row_groups, }; - let mut filter = self.filter; - let mut plan_builder = ReadPlanBuilder::new(batch_size).with_selection(self.selection); + let mut plan_builder = ReadPlanBuilder::new(batch_size).with_selection(selection); // Update selection based on any filters if let Some(filter) = filter.as_mut() { @@ -797,20 +879,23 @@ impl ParquetRecordBatchReaderBuilder { break; } - let array_reader = ArrayReaderBuilder::new(&reader) - .build_array_reader(self.fields.as_deref(), predicate.projection())?; + let mut cache_projection = predicate.projection().clone(); + cache_projection.intersect(&projection); + + let array_reader = ArrayReaderBuilder::new(&reader, &metrics) + .build_array_reader(fields.as_deref(), predicate.projection())?; plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?; } } - let array_reader = ArrayReaderBuilder::new(&reader) - .build_array_reader(self.fields.as_deref(), &self.projection)?; + let array_reader = ArrayReaderBuilder::new(&reader, &metrics) + .build_array_reader(fields.as_deref(), &projection)?; let read_plan = plan_builder .limited(reader.num_rows()) - .with_offset(self.offset) - .with_limit(self.limit) + .with_offset(offset) + .with_limit(limit) .build_limited() .build(); @@ -1005,7 +1090,9 @@ impl ParquetRecordBatchReader { batch_size: usize, selection: Option, ) -> Result { - let array_reader = ArrayReaderBuilder::new(row_groups) + // note metrics are not supported in this API + let metrics = ArrowReaderMetrics::disabled(); + let array_reader = ArrayReaderBuilder::new(row_groups, &metrics) .build_array_reader(levels.levels.as_ref(), &ProjectionMask::all())?; let read_plan = ReadPlanBuilder::new(batch_size) diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index c53d47be2e56..229eae4c5bb6 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -441,6 +441,59 @@ impl RowSelection { pub fn skipped_row_count(&self) -> usize { self.iter().filter(|s| s.skip).map(|s| s.row_count).sum() } + + /// Expands the selection to align with batch boundaries. + /// This is needed when using cached array readers to ensure that + /// the cached data covers full batches. + #[cfg(feature = "async")] + pub(crate) fn expand_to_batch_boundaries(&self, batch_size: usize, total_rows: usize) -> Self { + if batch_size == 0 { + return self.clone(); + } + + let mut expanded_ranges = Vec::new(); + let mut row_offset = 0; + + for selector in &self.selectors { + if selector.skip { + row_offset += selector.row_count; + } else { + let start = row_offset; + let end = row_offset + selector.row_count; + + // Expand start to batch boundary + let expanded_start = (start / batch_size) * batch_size; + // Expand end to batch boundary + let expanded_end = end.div_ceil(batch_size) * batch_size; + let expanded_end = expanded_end.min(total_rows); + + expanded_ranges.push(expanded_start..expanded_end); + row_offset += selector.row_count; + } + } + + // Sort ranges by start position + expanded_ranges.sort_by_key(|range| range.start); + + // Merge overlapping or consecutive ranges + let mut merged_ranges: Vec> = Vec::new(); + for range in expanded_ranges { + if let Some(last) = merged_ranges.last_mut() { + if range.start <= last.end { + // Overlapping or consecutive - merge them + last.end = last.end.max(range.end); + } else { + // No overlap - add new range + merged_ranges.push(range); + } + } else { + // First range + merged_ranges.push(range); + } + } + + Self::from_consecutive_ranges(merged_ranges.into_iter(), total_rows) + } } impl From> for RowSelection { diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 611d6999e07e..eea6176b766b 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -26,7 +26,7 @@ use std::fmt::Formatter; use std::io::SeekFrom; use std::ops::Range; use std::pin::Pin; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::task::{Context, Poll}; use bytes::{Buf, Bytes}; @@ -38,7 +38,9 @@ use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; use arrow_array::RecordBatch; use arrow_schema::{DataType, Fields, Schema, SchemaRef}; -use crate::arrow::array_reader::{ArrayReaderBuilder, RowGroups}; +use crate::arrow::array_reader::{ + ArrayReaderBuilder, CacheOptionsBuilder, RowGroupCache, RowGroups, +}; use crate::arrow::arrow_reader::{ ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader, RowFilter, RowSelection, @@ -61,6 +63,7 @@ pub use metadata::*; #[cfg(feature = "object_store")] mod store; +use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics; use crate::arrow::arrow_reader::ReadPlanBuilder; use crate::arrow::schema::ParquetField; #[cfg(feature = "object_store")] @@ -510,6 +513,8 @@ impl ParquetRecordBatchStreamBuilder { fields: self.fields, limit: self.limit, offset: self.offset, + metrics: self.metrics, + max_predicate_cache_size: self.max_predicate_cache_size, }; // Ensure schema of ParquetRecordBatchStream respects projection, and does @@ -560,6 +565,12 @@ struct ReaderFactory { /// Offset to apply to the next offset: Option, + + /// Metrics + metrics: ArrowReaderMetrics, + + /// Maximum size of the predicate cache + max_predicate_cache_size: usize, } impl ReaderFactory @@ -588,6 +599,16 @@ where .filter(|index| !index.is_empty()) .map(|x| x[row_group_idx].as_slice()); + // Reuse columns that are selected and used by the filters + let cache_projection = match self.compute_cache_projection(&projection) { + Some(projection) => projection, + None => ProjectionMask::none(meta.columns().len()), + }; + let row_group_cache = Arc::new(Mutex::new(RowGroupCache::new( + batch_size, + self.max_predicate_cache_size, + ))); + let mut row_group = InMemoryRowGroup { // schema: meta.schema_descr_ptr(), row_count: meta.num_rows() as usize, @@ -597,11 +618,16 @@ where metadata: self.metadata.as_ref(), }; + let cache_options_builder = + CacheOptionsBuilder::new(&cache_projection, row_group_cache.clone()); + let filter = self.filter.as_mut(); let mut plan_builder = ReadPlanBuilder::new(batch_size).with_selection(selection); // Update selection based on any filters if let Some(filter) = filter { + let cache_options = cache_options_builder.clone().producer(); + for predicate in filter.predicates.iter_mut() { if !plan_builder.selects_any() { return Ok((self, None)); // ruled out entire row group @@ -609,11 +635,20 @@ where // (pre) Fetch only the columns that are selected by the predicate let selection = plan_builder.selection(); + // Fetch predicate columns; expand selection only for cached predicate columns + let cache_mask = Some(&cache_projection); row_group - .fetch(&mut self.input, predicate.projection(), selection) + .fetch( + &mut self.input, + predicate.projection(), + selection, + batch_size, + cache_mask, + ) .await?; - let array_reader = ArrayReaderBuilder::new(&row_group) + let array_reader = ArrayReaderBuilder::new(&row_group, &self.metrics) + .with_cache_options(Some(&cache_options)) .build_array_reader(self.fields.as_deref(), predicate.projection())?; plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?; @@ -656,18 +691,69 @@ where } // fetch the pages needed for decoding row_group - .fetch(&mut self.input, &projection, plan_builder.selection()) + // Final projection fetch shouldn't expand selection for cache; pass None + .fetch( + &mut self.input, + &projection, + plan_builder.selection(), + batch_size, + None, + ) .await?; let plan = plan_builder.build(); - let array_reader = ArrayReaderBuilder::new(&row_group) + let cache_options = cache_options_builder.consumer(); + let array_reader = ArrayReaderBuilder::new(&row_group, &self.metrics) + .with_cache_options(Some(&cache_options)) .build_array_reader(self.fields.as_deref(), &projection)?; let reader = ParquetRecordBatchReader::new(array_reader, plan); Ok((self, Some(reader))) } + + /// Compute which columns are used in filters and the final (output) projection + fn compute_cache_projection(&self, projection: &ProjectionMask) -> Option { + let filters = self.filter.as_ref()?; + let mut cache_projection = filters.predicates.first()?.projection().clone(); + for predicate in filters.predicates.iter() { + cache_projection.union(predicate.projection()); + } + cache_projection.intersect(projection); + self.exclude_nested_columns_from_cache(&cache_projection) + } + + /// Exclude leaves belonging to roots that span multiple parquet leaves (i.e. nested columns) + fn exclude_nested_columns_from_cache(&self, mask: &ProjectionMask) -> Option { + let schema = self.metadata.file_metadata().schema_descr(); + let num_leaves = schema.num_columns(); + + // Count how many leaves each root column has + let num_roots = schema.root_schema().get_fields().len(); + let mut root_leaf_counts = vec![0usize; num_roots]; + for leaf_idx in 0..num_leaves { + let root_idx = schema.get_column_root_idx(leaf_idx); + root_leaf_counts[root_idx] += 1; + } + + // Keep only leaves whose root has exactly one leaf (non-nested) + let mut included_leaves = Vec::new(); + for leaf_idx in 0..num_leaves { + if mask.leaf_included(leaf_idx) { + let root_idx = schema.get_column_root_idx(leaf_idx); + if root_leaf_counts[root_idx] == 1 { + included_leaves.push(leaf_idx); + } + } + } + + if included_leaves.is_empty() { + None + } else { + Some(ProjectionMask::leaves(schema, included_leaves)) + } + } } enum StreamState { @@ -897,9 +983,13 @@ impl InMemoryRowGroup<'_> { input: &mut T, projection: &ProjectionMask, selection: Option<&RowSelection>, + batch_size: usize, + cache_mask: Option<&ProjectionMask>, ) -> Result<()> { let metadata = self.metadata.row_group(self.row_group_idx); if let Some((selection, offset_index)) = selection.zip(self.offset_index) { + let expanded_selection = + selection.expand_to_batch_boundaries(batch_size, self.row_count); // If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the // `RowSelection` let mut page_start_offsets: Vec> = vec![]; @@ -924,7 +1014,15 @@ impl InMemoryRowGroup<'_> { _ => (), } - ranges.extend(selection.scan_ranges(&offset_index[idx].page_locations)); + // Expand selection to batch boundaries only for cached columns + let use_expanded = cache_mask.map(|m| m.leaf_included(idx)).unwrap_or(false); + if use_expanded { + ranges.extend( + expanded_selection.scan_ranges(&offset_index[idx].page_locations), + ); + } else { + ranges.extend(selection.scan_ranges(&offset_index[idx].page_locations)); + } page_start_offsets.push(ranges.iter().map(|range| range.start).collect()); ranges @@ -1883,6 +1981,8 @@ mod tests { filter: None, limit: None, offset: None, + metrics: ArrowReaderMetrics::disabled(), + max_predicate_cache_size: 0, }; let mut skip = true; @@ -2286,6 +2386,77 @@ mod tests { assert_eq!(requests.lock().unwrap().len(), 3); } + #[tokio::test] + async fn test_cache_projection_excludes_nested_columns() { + use arrow_array::{ArrayRef, StringArray}; + + // Build a simple RecordBatch with a primitive column `a` and a nested struct column `b { aa, bb }` + let a = StringArray::from_iter_values(["r1", "r2"]); + let b = StructArray::from(vec![ + ( + Arc::new(Field::new("aa", DataType::Utf8, true)), + Arc::new(StringArray::from_iter_values(["v1", "v2"])) as ArrayRef, + ), + ( + Arc::new(Field::new("bb", DataType::Utf8, true)), + Arc::new(StringArray::from_iter_values(["w1", "w2"])) as ArrayRef, + ), + ]); + + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, true), + Field::new("b", b.data_type().clone(), true), + ])); + + let mut buf = Vec::new(); + let mut writer = ArrowWriter::try_new(&mut buf, schema, None).unwrap(); + let batch = RecordBatch::try_from_iter([ + ("a", Arc::new(a) as ArrayRef), + ("b", Arc::new(b) as ArrayRef), + ]) + .unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + + // Load Parquet metadata + let data: Bytes = buf.into(); + let metadata = ParquetMetaDataReader::new() + .parse_and_finish(&data) + .unwrap(); + let metadata = Arc::new(metadata); + + // Build a RowFilter whose predicate projects a leaf under the nested root `b` + // Leaf indices are depth-first; with schema [a, b.aa, b.bb] we pick index 1 (b.aa) + let parquet_schema = metadata.file_metadata().schema_descr(); + let nested_leaf_mask = ProjectionMask::leaves(parquet_schema, vec![1]); + + let always_true = ArrowPredicateFn::new(nested_leaf_mask.clone(), |batch: RecordBatch| { + Ok(arrow_array::BooleanArray::from(vec![ + true; + batch.num_rows() + ])) + }); + let filter = RowFilter::new(vec![Box::new(always_true)]); + + // Construct a ReaderFactory and compute cache projection + let reader_factory = ReaderFactory { + metadata: Arc::clone(&metadata), + fields: None, + input: TestReader::new(data), + filter: Some(filter), + limit: None, + offset: None, + metrics: ArrowReaderMetrics::disabled(), + max_predicate_cache_size: 0, + }; + + // Provide an output projection that also selects the same nested leaf + let cache_projection = reader_factory.compute_cache_projection(&nested_leaf_mask); + + // Expect None since nested columns should be excluded from cache projection + assert!(cache_projection.is_none()); + } + #[tokio::test] async fn empty_offset_index_doesnt_panic_in_read_row_group() { use tokio::fs::File; @@ -2386,4 +2557,53 @@ mod tests { let result = reader.try_collect::>().await.unwrap(); assert_eq!(result.len(), 1); } + + #[tokio::test] + async fn test_cached_array_reader_sparse_offset_error() { + use futures::TryStreamExt; + + use crate::arrow::arrow_reader::{ArrowPredicateFn, RowFilter, RowSelection, RowSelector}; + use arrow_array::{BooleanArray, RecordBatch}; + + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet"); + let data = Bytes::from(std::fs::read(path).unwrap()); + + let async_reader = TestReader::new(data); + + // Enable page index so the fetch logic loads only required pages + let options = ArrowReaderOptions::new().with_page_index(true); + let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options) + .await + .unwrap(); + + // Skip the first 22 rows (entire first Parquet page) and then select the + // next 3 rows (22, 23, 24). This means the fetch step will not include + // the first page starting at file offset 0. + let selection = RowSelection::from(vec![RowSelector::skip(22), RowSelector::select(3)]); + + // Trivial predicate on column 0 that always returns `true`. Using the + // same column in both predicate and projection activates the caching + // layer (Producer/Consumer pattern). + let parquet_schema = builder.parquet_schema(); + let proj = ProjectionMask::leaves(parquet_schema, vec![0]); + let always_true = ArrowPredicateFn::new(proj.clone(), |batch: RecordBatch| { + Ok(BooleanArray::from(vec![true; batch.num_rows()])) + }); + let filter = RowFilter::new(vec![Box::new(always_true)]); + + // Build the stream with batch size 8 so the cache reads whole batches + // that straddle the requested row range (rows 0-7, 8-15, 16-23, …). + let stream = builder + .with_batch_size(8) + .with_projection(proj) + .with_row_selection(selection) + .with_row_filter(filter) + .build() + .unwrap(); + + // Collecting the stream should fail with the sparse column chunk offset + // error we want to reproduce. + let _result: Vec<_> = stream.try_collect().await.unwrap(); + } } diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs index 33010f480898..72626d70e0e5 100644 --- a/parquet/src/arrow/mod.rs +++ b/parquet/src/arrow/mod.rs @@ -276,6 +276,13 @@ impl ProjectionMask { Self { mask: None } } + /// Create a [`ProjectionMask`] which selects no columns + pub fn none(len: usize) -> Self { + Self { + mask: Some(vec![false; len]), + } + } + /// Create a [`ProjectionMask`] which selects only the specified leaf columns /// /// Note: repeated or out of order indices will not impact the final mask diff --git a/parquet/src/file/reader.rs b/parquet/src/file/reader.rs index 7e2b149ad3fb..61af21a68ec1 100644 --- a/parquet/src/file/reader.rs +++ b/parquet/src/file/reader.rs @@ -48,11 +48,12 @@ pub trait Length { /// Generates [`Read`]ers to read chunks of a Parquet data source. /// /// The Parquet reader uses [`ChunkReader`] to access Parquet data, allowing -/// multiple decoders to read concurrently from different locations in the same file. +/// multiple decoders to read concurrently from different locations in the same +/// file. /// -/// The trait provides: -/// * random access (via [`Self::get_bytes`]) -/// * sequential (via [`Self::get_read`]) +/// The trait functions both as a reader and a factory for readers. +/// * random access via [`Self::get_bytes`] +/// * sequential access via the reader returned via factory method [`Self::get_read`] /// /// # Provided Implementations /// * [`File`] for reading from local file system diff --git a/parquet/tests/arrow_reader/io/async_reader.rs b/parquet/tests/arrow_reader/io/async_reader.rs new file mode 100644 index 000000000000..f2d3ce07234b --- /dev/null +++ b/parquet/tests/arrow_reader/io/async_reader.rs @@ -0,0 +1,430 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Tests for the async reader ([`ParquetRecordBatchStreamBuilder`]) + +use crate::io::{ + filter_a_175_b_625, filter_b_575_625, filter_b_false, test_file, test_options, LogEntry, + OperationLog, TestParquetFile, +}; +use bytes::Bytes; +use futures::future::BoxFuture; +use futures::{FutureExt, StreamExt}; +use parquet::arrow::arrow_reader::{ArrowReaderOptions, RowSelection, RowSelector}; +use parquet::arrow::async_reader::AsyncFileReader; +use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; +use parquet::errors::Result; +use parquet::file::metadata::ParquetMetaData; +use std::ops::Range; +use std::sync::Arc; + +#[tokio::test] +async fn test_read_entire_file() { + // read entire file without any filtering or projection + let test_file = test_file(); + let builder = async_builder(&test_file, test_options()).await; + insta::assert_debug_snapshot!(run( + &test_file, + builder).await, @r#" + [ + "Get Provided Metadata", + "Event: Builder Configured", + "Event: Reader Built", + "Read Multi:", + " Row Group 0, column 'a': MultiPage(dictionary_page: true, data_pages: [0, 1]) (1856 bytes, 1 requests) [data]", + " Row Group 0, column 'b': MultiPage(dictionary_page: true, data_pages: [0, 1]) (1856 bytes, 1 requests) [data]", + " Row Group 0, column 'c': MultiPage(dictionary_page: true, data_pages: [0, 1]) (7346 bytes, 1 requests) [data]", + "Read Multi:", + " Row Group 1, column 'a': MultiPage(dictionary_page: true, data_pages: [0, 1]) (1856 bytes, 1 requests) [data]", + " Row Group 1, column 'b': MultiPage(dictionary_page: true, data_pages: [0, 1]) (1856 bytes, 1 requests) [data]", + " Row Group 1, column 'c': MultiPage(dictionary_page: true, data_pages: [0, 1]) (7456 bytes, 1 requests) [data]", + ] + "#); +} + +#[tokio::test] +async fn test_read_single_group() { + let test_file = test_file(); + let builder = async_builder(&test_file, test_options()) + .await + // read only second row group + .with_row_groups(vec![1]); + + // Expect to see only IO for Row Group 1. Should see no IO for Row Group 0. + insta::assert_debug_snapshot!(run( + &test_file, + builder).await, @r#" + [ + "Get Provided Metadata", + "Event: Builder Configured", + "Event: Reader Built", + "Read Multi:", + " Row Group 1, column 'a': MultiPage(dictionary_page: true, data_pages: [0, 1]) (1856 bytes, 1 requests) [data]", + " Row Group 1, column 'b': MultiPage(dictionary_page: true, data_pages: [0, 1]) (1856 bytes, 1 requests) [data]", + " Row Group 1, column 'c': MultiPage(dictionary_page: true, data_pages: [0, 1]) (7456 bytes, 1 requests) [data]", + ] + "#); +} + +#[tokio::test] +async fn test_read_single_column() { + let test_file = test_file(); + let builder = async_builder(&test_file, test_options()).await; + let schema_descr = builder.metadata().file_metadata().schema_descr_ptr(); + let builder = builder.with_projection(ProjectionMask::columns(&schema_descr, ["b"])); + // Expect to see only IO for column "b". Should see no IO for columns "a" or "c". + insta::assert_debug_snapshot!(run( + &test_file, + builder).await, @r#" + [ + "Get Provided Metadata", + "Event: Builder Configured", + "Event: Reader Built", + "Read Multi:", + " Row Group 0, column 'b': MultiPage(dictionary_page: true, data_pages: [0, 1]) (1856 bytes, 1 requests) [data]", + "Read Multi:", + " Row Group 1, column 'b': MultiPage(dictionary_page: true, data_pages: [0, 1]) (1856 bytes, 1 requests) [data]", + ] + "#); +} + +#[tokio::test] +async fn test_read_row_selection() { + // There are 400 total rows spread across 4 data pages (100 rows each) + // select rows 175..225 (i.e. DataPage(1) of row group 0 and DataPage(0) of row group 1) + let test_file = test_file(); + let builder = async_builder(&test_file, test_options()).await; + let schema_descr = builder.metadata().file_metadata().schema_descr_ptr(); + let builder = builder + .with_projection(ProjectionMask::columns(&schema_descr, ["a", "b"])) + .with_row_selection(RowSelection::from(vec![ + RowSelector::skip(175), + RowSelector::select(50), + ])); + + // Expect to see only data IO for one page for each column for each row group + insta::assert_debug_snapshot!(run( + &test_file, + builder).await, @r#" + [ + "Get Provided Metadata", + "Event: Builder Configured", + "Event: Reader Built", + "Read Multi:", + " Row Group 0, column 'a': DictionaryPage (1617 bytes, 1 requests) [data]", + " Row Group 0, column 'a': DataPage(1) (126 bytes , 1 requests) [data]", + " Row Group 0, column 'b': DictionaryPage (1617 bytes, 1 requests) [data]", + " Row Group 0, column 'b': DataPage(1) (126 bytes , 1 requests) [data]", + "Read Multi:", + " Row Group 1, column 'a': DictionaryPage (1617 bytes, 1 requests) [data]", + " Row Group 1, column 'a': DataPage(0) (113 bytes , 1 requests) [data]", + " Row Group 1, column 'b': DictionaryPage (1617 bytes, 1 requests) [data]", + " Row Group 1, column 'b': DataPage(0) (113 bytes , 1 requests) [data]", + ] + "#); +} + +#[tokio::test] +async fn test_read_limit() { + // There are 400 total rows spread across 4 data pages (100 rows each) + // a limit of 125 rows should only fetch the first two data pages (DataPage(0) and DataPage(1)) from row group 0 + let test_file = test_file(); + let builder = async_builder(&test_file, test_options()).await; + let schema_descr = builder.metadata().file_metadata().schema_descr_ptr(); + let builder = builder + .with_projection(ProjectionMask::columns(&schema_descr, ["a"])) + .with_limit(125); + + insta::assert_debug_snapshot!(run( + &test_file, + builder).await, @r#" + [ + "Get Provided Metadata", + "Event: Builder Configured", + "Event: Reader Built", + "Read Multi:", + " Row Group 0, column 'a': DictionaryPage (1617 bytes, 1 requests) [data]", + " Row Group 0, column 'a': DataPage(0) (113 bytes , 1 requests) [data]", + " Row Group 0, column 'a': DataPage(1) (126 bytes , 1 requests) [data]", + ] + "#); +} + +#[tokio::test] +async fn test_read_single_row_filter() { + // Values from column "b" range 400..799 + // filter "b" > 575 and < than 625 + // (last data page in Row Group 0 and first DataPage in Row Group 1) + let test_file = test_file(); + let builder = async_builder(&test_file, test_options()).await; + let schema_descr = builder.metadata().file_metadata().schema_descr_ptr(); + + let builder = builder + .with_projection(ProjectionMask::columns(&schema_descr, ["a", "b"])) + .with_row_filter(filter_b_575_625(&schema_descr)); + + // Expect to see I/O for column b in both row groups to evaluate filter, + // then a single pages for the "a" column in each row group + insta::assert_debug_snapshot!(run( + &test_file, + builder).await, @r#" + [ + "Get Provided Metadata", + "Event: Builder Configured", + "Event: Reader Built", + "Read Multi:", + " Row Group 0, column 'b': MultiPage(dictionary_page: true, data_pages: [0, 1]) (1856 bytes, 1 requests) [data]", + "Read Multi:", + " Row Group 0, column 'a': DictionaryPage (1617 bytes, 1 requests) [data]", + " Row Group 0, column 'a': DataPage(1) (126 bytes , 1 requests) [data]", + "Read Multi:", + " Row Group 1, column 'b': MultiPage(dictionary_page: true, data_pages: [0, 1]) (1856 bytes, 1 requests) [data]", + "Read Multi:", + " Row Group 1, column 'a': DictionaryPage (1617 bytes, 1 requests) [data]", + " Row Group 1, column 'a': DataPage(0) (113 bytes , 1 requests) [data]", + ] + "#); +} + +#[tokio::test] +async fn test_read_single_row_filter_no_page_index() { + // Values from column "b" range 400..799 + // Apply a filter "b" > 575 and than 625 + // (last data page in Row Group 0 and first DataPage in Row Group 1) + let test_file = test_file(); + let options = test_options().with_page_index(false); + let builder = async_builder(&test_file, options).await; + let schema_descr = builder.metadata().file_metadata().schema_descr_ptr(); + + let builder = builder + .with_projection(ProjectionMask::columns(&schema_descr, ["a", "b"])) + .with_row_filter(filter_b_575_625(&schema_descr)); + + // Since we don't have the page index, expect to see: + // 1. I/O for all pages of column b to evaluate the filter + // 2. IO for all pages of column a as the reader doesn't know where the page + // boundaries are so needs to scan them. + insta::assert_debug_snapshot!(run( + &test_file, + builder).await, @r#" + [ + "Get Provided Metadata", + "Event: Builder Configured", + "Event: Reader Built", + "Read Multi:", + " Row Group 0, column 'b': MultiPage(dictionary_page: true, data_pages: [0, 1]) (1856 bytes, 1 requests) [data]", + "Read Multi:", + " Row Group 0, column 'a': MultiPage(dictionary_page: true, data_pages: [0, 1]) (1856 bytes, 1 requests) [data]", + "Read Multi:", + " Row Group 1, column 'b': MultiPage(dictionary_page: true, data_pages: [0, 1]) (1856 bytes, 1 requests) [data]", + "Read Multi:", + " Row Group 1, column 'a': MultiPage(dictionary_page: true, data_pages: [0, 1]) (1856 bytes, 1 requests) [data]", + ] + "#); +} + +#[tokio::test] +async fn test_read_multiple_row_filter() { + // Values in column "a" range 0..399 + // Values in column "b" range 400..799 + // First filter: "a" > 175 (last data page in Row Group 0) + // Second filter: "b" < 625 (last data page in Row Group 0 and first DataPage in RowGroup 1) + // Read column "c" + let test_file = test_file(); + let builder = async_builder(&test_file, test_options()).await; + let schema_descr = builder.metadata().file_metadata().schema_descr_ptr(); + + let builder = builder + .with_projection(ProjectionMask::columns(&schema_descr, ["c"])) + .with_row_filter(filter_a_175_b_625(&schema_descr)); + + // Expect that we will see + // 1. IO for all pages of column A (to evaluate the first filter) + // 2. IO for pages of column b that passed the first filter (to evaluate the second filter) + // 3. IO after reader is built only for column c for the rows that passed both filters + insta::assert_debug_snapshot!(run( + &test_file, + builder).await, @r#" + [ + "Get Provided Metadata", + "Event: Builder Configured", + "Event: Reader Built", + "Read Multi:", + " Row Group 0, column 'a': MultiPage(dictionary_page: true, data_pages: [0, 1]) (1856 bytes, 1 requests) [data]", + "Read Multi:", + " Row Group 0, column 'b': DictionaryPage (1617 bytes, 1 requests) [data]", + " Row Group 0, column 'b': DataPage(1) (126 bytes , 1 requests) [data]", + "Read Multi:", + " Row Group 0, column 'c': DictionaryPage (7107 bytes, 1 requests) [data]", + " Row Group 0, column 'c': DataPage(1) (126 bytes , 1 requests) [data]", + "Read Multi:", + " Row Group 1, column 'a': MultiPage(dictionary_page: true, data_pages: [0, 1]) (1856 bytes, 1 requests) [data]", + "Read Multi:", + " Row Group 1, column 'b': DictionaryPage (1617 bytes, 1 requests) [data]", + " Row Group 1, column 'b': DataPage(0) (113 bytes , 1 requests) [data]", + " Row Group 1, column 'b': DataPage(1) (126 bytes , 1 requests) [data]", + "Read Multi:", + " Row Group 1, column 'c': DictionaryPage (7217 bytes, 1 requests) [data]", + " Row Group 1, column 'c': DataPage(0) (113 bytes , 1 requests) [data]", + ] + "#); +} + +#[tokio::test] +async fn test_read_single_row_filter_all() { + // Apply a filter that filters out all rows + + let test_file = test_file(); + let builder = async_builder(&test_file, test_options()).await; + let schema_descr = builder.metadata().file_metadata().schema_descr_ptr(); + + let builder = builder + .with_projection(ProjectionMask::columns(&schema_descr, ["a", "b"])) + .with_row_filter(filter_b_false(&schema_descr)); + + // Expect to see reads for column "b" to evaluate the filter, but no reads + // for column "a" as no rows pass the filter + insta::assert_debug_snapshot!(run( + &test_file, + builder).await, @r#" + [ + "Get Provided Metadata", + "Event: Builder Configured", + "Event: Reader Built", + "Read Multi:", + " Row Group 0, column 'b': MultiPage(dictionary_page: true, data_pages: [0, 1]) (1856 bytes, 1 requests) [data]", + "Read Multi:", + " Row Group 1, column 'b': MultiPage(dictionary_page: true, data_pages: [0, 1]) (1856 bytes, 1 requests) [data]", + ] + "#); +} + +/// Return a [`ParquetRecordBatchStreamBuilder`] for reading this file +async fn async_builder( + test_file: &TestParquetFile, + options: ArrowReaderOptions, +) -> ParquetRecordBatchStreamBuilder { + let parquet_meta_data = if options.page_index() { + Arc::clone(test_file.parquet_metadata()) + } else { + // strip out the page index from the metadata + let metadata = test_file + .parquet_metadata() + .as_ref() + .clone() + .into_builder() + .set_column_index(None) + .set_offset_index(None) + .build(); + Arc::new(metadata) + }; + + let reader = RecordingAsyncFileReader { + bytes: test_file.bytes().clone(), + ops: Arc::clone(test_file.ops()), + parquet_meta_data, + }; + + ParquetRecordBatchStreamBuilder::new_with_options(reader, options) + .await + .unwrap() +} + +/// Build the reader from the specified builder and read all batches from it, +/// and return the operations log. +async fn run( + test_file: &TestParquetFile, + builder: ParquetRecordBatchStreamBuilder, +) -> Vec { + let ops = test_file.ops(); + ops.add_entry(LogEntry::event("Builder Configured")); + let mut stream = builder.build().unwrap(); + ops.add_entry(LogEntry::event("Reader Built")); + while let Some(batch) = stream.next().await { + match batch { + Ok(_) => {} + Err(e) => panic!("Error reading batch: {e}"), + } + } + ops.snapshot() +} + +struct RecordingAsyncFileReader { + bytes: Bytes, + ops: Arc, + parquet_meta_data: Arc, +} + +impl AsyncFileReader for RecordingAsyncFileReader { + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, parquet::errors::Result> { + let ops = Arc::clone(&self.ops); + let data = self + .bytes + .slice(range.start as usize..range.end as usize) + .clone(); + + // translate to usize from u64 + let logged_range = Range { + start: range.start as usize, + end: range.end as usize, + }; + async move { + ops.add_entry_for_range(&logged_range); + Ok(data) + } + .boxed() + } + + fn get_byte_ranges(&mut self, ranges: Vec>) -> BoxFuture<'_, Result>> { + let ops = Arc::clone(&self.ops); + let datas = ranges + .iter() + .map(|range| { + self.bytes + .slice(range.start as usize..range.end as usize) + .clone() + }) + .collect::>(); + // translate to usize from u64 + let logged_ranges = ranges + .into_iter() + .map(|r| Range { + start: r.start as usize, + end: r.end as usize, + }) + .collect::>(); + + async move { + ops.add_entry_for_ranges(&logged_ranges); + Ok(datas) + } + .boxed() + } + + fn get_metadata<'a>( + &'a mut self, + _options: Option<&'a ArrowReaderOptions>, + ) -> BoxFuture<'a, Result>> { + let ops = Arc::clone(&self.ops); + let parquet_meta_data = Arc::clone(&self.parquet_meta_data); + async move { + ops.add_entry(LogEntry::GetProvidedMetadata); + Ok(parquet_meta_data) + } + .boxed() + } +} diff --git a/parquet/tests/arrow_reader/io/mod.rs b/parquet/tests/arrow_reader/io/mod.rs new file mode 100644 index 000000000000..b31f295755b0 --- /dev/null +++ b/parquet/tests/arrow_reader/io/mod.rs @@ -0,0 +1,703 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Tests for IO read patterns in the Parquet Reader +//! +//! Each test: +//! 1. Creates a temporary Parquet file with a known row group structure +//! 2. Reads data from that file using the Arrow Parquet Reader, recording the IO operations +//! 3. Asserts the expected IO patterns based on the read operations +//! +//! Note this module contains test infrastructure only. The actual tests are in the +//! sub-modules [`sync_reader`] and [`async_reader`]. +//! +//! Key components: +//! - [`TestParquetFile`] - Represents a Parquet file and its layout +//! - [`OperationLog`] - Records IO operations performed on the file +//! - [`LogEntry`] - Represents a single IO operation in the log + +mod sync_reader; + +#[cfg(feature = "async")] +mod async_reader; + +use arrow::compute::and; +use arrow::compute::kernels::cmp::{gt, lt}; +use arrow_array::cast::AsArray; +use arrow_array::types::Int64Type; +use arrow_array::{ArrayRef, BooleanArray, Int64Array, RecordBatch, StringViewArray}; +use bytes::Bytes; +use parquet::arrow::arrow_reader::{ + ArrowPredicateFn, ArrowReaderOptions, ParquetRecordBatchReaderBuilder, RowFilter, +}; +use parquet::arrow::{ArrowWriter, ProjectionMask}; +use parquet::data_type::AsBytes; +use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, ParquetOffsetIndex}; +use parquet::file::properties::WriterProperties; +use parquet::file::FOOTER_SIZE; +use parquet::format::PageLocation; +use parquet::schema::types::SchemaDescriptor; +use std::collections::BTreeMap; +use std::fmt::Display; +use std::ops::Range; +use std::sync::{Arc, LazyLock, Mutex}; + +/// Create a new `TestParquetFile` with: +/// 3 columns: "a", "b", "c" +/// +/// 2 row groups, each with 200 rows +/// each data page has 100 rows +/// +/// Values of column "a" are 0..399 +/// Values of column "b" are 400..799 +/// Values of column "c" are alternating strings of length 12 and longer +fn test_file() -> TestParquetFile { + TestParquetFile::new(TEST_FILE_DATA.clone()) +} + +/// Default options for tests +/// +/// Note these tests use the PageIndex to reduce IO +fn test_options() -> ArrowReaderOptions { + ArrowReaderOptions::default().with_page_index(true) +} + +/// Return a row filter that evaluates "b > 575" AND "b < 625" +/// +/// last data page in Row Group 0 and first DataPage in Row Group 1 +fn filter_b_575_625(schema_descr: &SchemaDescriptor) -> RowFilter { + // "b" > 575 and "b" < 625 + let predicate = ArrowPredicateFn::new( + ProjectionMask::columns(schema_descr, ["b"]), + |batch: RecordBatch| { + let scalar_575 = Int64Array::new_scalar(575); + let scalar_625 = Int64Array::new_scalar(625); + let column = batch.column(0).as_primitive::(); + and(>(column, &scalar_575)?, <(column, &scalar_625)?) + }, + ); + RowFilter::new(vec![Box::new(predicate)]) +} + +/// Filter a > 175 and b < 625 +/// First filter: "a" > 175 (last data page in Row Group 0) +/// Second filter: "b" < 625 (last data page in Row Group 0 and first DataPage in RowGroup 1) +fn filter_a_175_b_625(schema_descr: &SchemaDescriptor) -> RowFilter { + // "a" > 175 and "b" < 625 + let predicate_a = ArrowPredicateFn::new( + ProjectionMask::columns(schema_descr, ["a"]), + |batch: RecordBatch| { + let scalar_175 = Int64Array::new_scalar(175); + let column = batch.column(0).as_primitive::(); + gt(column, &scalar_175) + }, + ); + + let predicate_b = ArrowPredicateFn::new( + ProjectionMask::columns(schema_descr, ["b"]), + |batch: RecordBatch| { + let scalar_625 = Int64Array::new_scalar(625); + let column = batch.column(0).as_primitive::(); + lt(column, &scalar_625) + }, + ); + + RowFilter::new(vec![Box::new(predicate_a), Box::new(predicate_b)]) +} + +/// Filter FALSE (no rows) with b +/// Entirely filters out both row groups +/// Note it selects "b" +fn filter_b_false(schema_descr: &SchemaDescriptor) -> RowFilter { + // "false" + let predicate = ArrowPredicateFn::new( + ProjectionMask::columns(schema_descr, ["b"]), + |batch: RecordBatch| { + let result = + BooleanArray::from_iter(std::iter::repeat_n(Some(false), batch.num_rows())); + Ok(result) + }, + ); + RowFilter::new(vec![Box::new(predicate)]) +} + +/// Create a parquet file in memory for testing. See [`test_file`] for details. +static TEST_FILE_DATA: LazyLock = LazyLock::new(|| { + // Input batch has 400 rows, with 3 columns: "a", "b", "c" + // Note c is a different types (so the data page sizes will be different) + let a: ArrayRef = Arc::new(Int64Array::from_iter_values(0..400)); + let b: ArrayRef = Arc::new(Int64Array::from_iter_values(400..800)); + let c: ArrayRef = Arc::new(StringViewArray::from_iter_values((0..400).map(|i| { + if i % 2 == 0 { + format!("string_{i}") + } else { + format!("A string larger than 12 bytes and thus not inlined {i}") + } + }))); + + let input_batch = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap(); + + let mut output = Vec::new(); + + let writer_options = WriterProperties::builder() + .set_max_row_group_size(200) + .set_data_page_row_count_limit(100) + .build(); + let mut writer = + ArrowWriter::try_new(&mut output, input_batch.schema(), Some(writer_options)).unwrap(); + + // since the limits are only enforced on batch boundaries, write the input + // batch in chunks of 50 + let mut row_remain = input_batch.num_rows(); + while row_remain > 0 { + let chunk_size = row_remain.min(50); + let chunk = input_batch.slice(input_batch.num_rows() - row_remain, chunk_size); + writer.write(&chunk).unwrap(); + row_remain -= chunk_size; + } + writer.close().unwrap(); + Bytes::from(output) +}); + +/// A test parquet file and its layout. +struct TestParquetFile { + bytes: Bytes, + /// The operation log for IO operations performed on this file + ops: Arc, + /// The (pre-parsed) parquet metadata for this file + parquet_metadata: Arc, +} + +impl TestParquetFile { + /// Create a new `TestParquetFile` with the specified temporary directory and path + /// and determines the row group layout. + fn new(bytes: Bytes) -> Self { + // Read the parquet file to determine its layout + let builder = ParquetRecordBatchReaderBuilder::try_new_with_options( + bytes.clone(), + ArrowReaderOptions::default().with_page_index(true), + ) + .unwrap(); + + let parquet_metadata = Arc::clone(builder.metadata()); + + let offset_index = parquet_metadata + .offset_index() + .expect("Parquet metadata should have a page index"); + + let row_groups = TestRowGroups::new(&parquet_metadata, offset_index); + + // figure out the footer location in the file + let footer_location = bytes.len() - FOOTER_SIZE..bytes.len(); + let footer = bytes.slice(footer_location.clone()); + let footer: &[u8; FOOTER_SIZE] = footer + .as_bytes() + .try_into() // convert to a fixed size array + .unwrap(); + + // figure out the metadata location + let footer = ParquetMetaDataReader::decode_footer_tail(footer).unwrap(); + let metadata_len = footer.metadata_length(); + let metadata_location = footer_location.start - metadata_len..footer_location.start; + + let ops = Arc::new(OperationLog::new( + footer_location, + metadata_location, + row_groups, + )); + + TestParquetFile { + bytes, + ops, + parquet_metadata, + } + } + + /// Return the internal bytes of the parquet file + fn bytes(&self) -> &Bytes { + &self.bytes + } + + /// Return the operation log for this file + fn ops(&self) -> &Arc { + &self.ops + } + + /// Return the parquet metadata for this file + fn parquet_metadata(&self) -> &Arc { + &self.parquet_metadata + } +} + +/// Information about a column chunk +#[derive(Debug)] +struct TestColumnChunk { + /// The name of the column + name: String, + + /// The location of the entire column chunk in the file including dictionary pages + /// and data pages. + location: Range, + + /// The offset of the start of of the dictionary page if any + dictionary_page_location: Option, + + /// The location of the data pages in the file + page_locations: Vec, +} + +/// Information about the pages in a single row group +#[derive(Debug)] +struct TestRowGroup { + /// Maps column_name -> Information about the column chunk + columns: BTreeMap, +} + +/// Information about all the row groups in a Parquet file, extracted from its metadata +#[derive(Debug)] +struct TestRowGroups { + /// List of row groups, each containing information about its columns and page locations + row_groups: Vec, +} + +impl TestRowGroups { + fn new(parquet_metadata: &ParquetMetaData, offset_index: &ParquetOffsetIndex) -> Self { + let row_groups = parquet_metadata + .row_groups() + .iter() + .enumerate() + .map(|(rg_index, rg_meta)| { + let columns = rg_meta + .columns() + .iter() + .enumerate() + .map(|(col_idx, col_meta)| { + let column_name = col_meta.column_descr().name().to_string(); + let page_locations = + offset_index[rg_index][col_idx].page_locations().to_vec(); + let dictionary_page_location = col_meta.dictionary_page_offset(); + + // We can find the byte range of the entire column chunk + let (start_offset, length) = col_meta.byte_range(); + let start_offset = start_offset as usize; + let end_offset = start_offset + length as usize; + + TestColumnChunk { + name: column_name.clone(), + location: start_offset..end_offset, + dictionary_page_location, + page_locations, + } + }) + .map(|test_column_chunk| { + // make key=value pairs to insert into the BTreeMap + (test_column_chunk.name.clone(), test_column_chunk) + }) + .collect::>(); + TestRowGroup { columns } + }) + .collect(); + + Self { row_groups } + } + + fn iter(&self) -> impl Iterator { + self.row_groups.iter() + } +} + +/// Type of data read +#[derive(Debug, PartialEq)] +enum PageType { + /// The data page with the specified index + Data { + data_page_index: usize, + }, + Dictionary, + /// Multiple pages read together + Multi { + /// Was the dictionary page included? + dictionary_page: bool, + /// The data pages included + data_page_indices: Vec, + }, +} + +impl Display for PageType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + PageType::Data { data_page_index } => { + write!(f, "DataPage({data_page_index})") + } + PageType::Dictionary => write!(f, "DictionaryPage"), + PageType::Multi { + dictionary_page, + data_page_indices, + } => { + let dictionary_page = if *dictionary_page { + "dictionary_page: true, " + } else { + "" + }; + write!( + f, + "MultiPage({dictionary_page}data_pages: {data_page_indices:?})", + ) + } + } + } +} + +/// Read single logical data object (data page or dictionary page) +/// in one or more requests +#[derive(Debug)] +struct ReadInfo { + row_group_index: usize, + column_name: String, + range: Range, + read_type: PageType, + /// Number of distinct requests (function calls) that were used + num_requests: usize, +} + +impl Display for ReadInfo { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let Self { + row_group_index, + column_name, + range, + read_type, + num_requests, + } = self; + + // If the average read size is less than 10 bytes, assume it is the thrift + // decoder reading the page headers and add an annotation + let annotation = if (range.len() / num_requests) < 10 { + " [header]" + } else { + " [data]" + }; + + // align the read type to 20 characters for better readability, not sure why + // this does not work inline with write! macro below + write!( + f, + "Row Group {row_group_index}, column '{column_name}': {:15} ({:10}, {:8}){annotation}", + // convert to strings so alignment works + format!("{read_type}"), + format!("{} bytes", range.len()), + format!("{num_requests} requests"), + ) + } +} + +/// Store structured entries in the log to make it easier to combine multiple entries +#[derive(Debug)] +enum LogEntry { + /// Read the footer (last 8 bytes) of the parquet file + ReadFooter(Range), + /// Read the metadata of the parquet file + ReadMetadata(Range), + /// Access previously parsed metadata + GetProvidedMetadata, + /// Read a single logical data object + ReadData(ReadInfo), + /// Read one or more logical data objects in a single operation + ReadMultipleData(Vec), + /// Not known where the read came from + Unknown(Range), + /// A user defined event + Event(String), +} + +impl LogEntry { + fn event(event: impl Into) -> Self { + LogEntry::Event(event.into()) + } + + /// Appends a string representation of this log entry to the output vector + fn append_string(&self, output: &mut Vec, indent: usize) { + let indent_str = " ".repeat(indent); + match self { + LogEntry::ReadFooter(range) => { + output.push(format!("{indent_str}Footer: {} bytes", range.len())) + } + LogEntry::ReadMetadata(range) => { + output.push(format!("{indent_str}Metadata: {}", range.len())) + } + LogEntry::GetProvidedMetadata => { + output.push(format!("{indent_str}Get Provided Metadata")) + } + LogEntry::ReadData(read_info) => output.push(format!("{indent_str}{read_info}")), + LogEntry::ReadMultipleData(read_infos) => { + output.push(format!("{indent_str}Read Multi:")); + for read_info in read_infos { + let new_indent = indent + 2; + read_info.append_string(output, new_indent); + } + } + LogEntry::Unknown(range) => { + output.push(format!("{indent_str}UNKNOWN: {range:?} (maybe Page Index)")) + } + LogEntry::Event(event) => output.push(format!("Event: {event}")), + } + } +} + +#[derive(Debug)] +struct OperationLog { + /// The operations performed on the file + ops: Mutex>, + + /// Footer location in the parquet file + footer_location: Range, + + /// Metadata location in the parquet file + metadata_location: Range, + + /// Information about the row group layout in the parquet file, used to + /// translate read operations into human understandable IO operations + /// Path to the parquet file + row_groups: TestRowGroups, +} + +impl OperationLog { + fn new( + footer_location: Range, + metadata_location: Range, + row_groups: TestRowGroups, + ) -> Self { + OperationLog { + ops: Mutex::new(Vec::new()), + metadata_location, + footer_location, + row_groups, + } + } + + /// Add an operation to the log + fn add_entry(&self, entry: LogEntry) { + let mut ops = self.ops.lock().unwrap(); + ops.push(entry); + } + + /// Adds an entry to the operation log for the interesting object that is + /// accessed by the specified range + /// + /// This function checks the ranges in order against possible locations + /// and adds the appropriate operation to the log for the first match found. + fn add_entry_for_range(&self, range: &Range) { + self.add_entry(self.entry_for_range(range)); + } + + /// Adds entries to the operation log for each interesting object that is + /// accessed by the specified range + /// + /// It behaves the same as [`add_entry_for_range`] but for multiple ranges. + fn add_entry_for_ranges<'a>(&self, ranges: impl IntoIterator>) { + let entries = ranges + .into_iter() + .map(|range| self.entry_for_range(range)) + .collect::>(); + self.add_entry(LogEntry::ReadMultipleData(entries)); + } + + /// Create an appropriate LogEntry for the specified range + fn entry_for_range(&self, range: &Range) -> LogEntry { + let start = range.start as i64; + let end = range.end as i64; + + // figure out what logical part of the file this range corresponds to + if self.metadata_location.contains(&range.start) + || self.metadata_location.contains(&(range.end - 1)) + { + return LogEntry::ReadMetadata(range.clone()); + } + + if self.footer_location.contains(&range.start) + || self.footer_location.contains(&(range.end - 1)) + { + return LogEntry::ReadFooter(range.clone()); + } + + // Search for the location in each column chunk. + // + // The actual parquet reader must in general decode the page headers + // and determine the byte ranges of the pages. However, for this test + // we assume the following layout: + // + // ```text + // (Dictionary Page) + // (Data Page) + // ... + // (Data Page) + // ``` + // + // We also assume that `self.page_locations` holds the location of all + // data pages, so any read operation that overlaps with a data page + // location is considered a read of that page, and any other read must + // be a dictionary page read. + for (row_group_index, row_group) in self.row_groups.iter().enumerate() { + for (column_name, test_column_chunk) in &row_group.columns { + // Check if the range overlaps with any data page locations + let page_locations = test_column_chunk.page_locations.iter(); + + // What data pages does this range overlap with? + let mut data_page_indices = vec![]; + + for (data_page_index, page_location) in page_locations.enumerate() { + let page_offset = page_location.offset; + let page_end = page_offset + page_location.compressed_page_size as i64; + + // if the range fully contains the page, consider it a read of that page + if start >= page_offset && end <= page_end { + let read_info = ReadInfo { + row_group_index, + column_name: column_name.clone(), + range: range.clone(), + read_type: PageType::Data { data_page_index }, + num_requests: 1, + }; + return LogEntry::ReadData(read_info); + } + + // if the range overlaps with the page, add it to the list of overlapping pages + if start < page_end && end > page_offset { + data_page_indices.push(data_page_index); + } + } + + // was the dictionary page read? + let mut dictionary_page = false; + + // Check if the range overlaps with the dictionary page location + if let Some(dict_page_offset) = test_column_chunk.dictionary_page_location { + let dict_page_end = dict_page_offset + test_column_chunk.location.len() as i64; + if start >= dict_page_offset && end < dict_page_end { + let read_info = ReadInfo { + row_group_index, + column_name: column_name.clone(), + range: range.clone(), + read_type: PageType::Dictionary, + num_requests: 1, + }; + + return LogEntry::ReadData(read_info); + } + + // if the range overlaps with the dictionary page, add it to the list of overlapping pages + if start < dict_page_end && end > dict_page_offset { + dictionary_page = true; + } + } + + // If we can't find a page, but the range overlaps with the + // column chunk location, use the column chunk location + let column_byte_range = &test_column_chunk.location; + if column_byte_range.contains(&range.start) + && column_byte_range.contains(&(range.end - 1)) + { + let read_data_entry = ReadInfo { + row_group_index, + column_name: column_name.clone(), + range: range.clone(), + read_type: PageType::Multi { + data_page_indices, + dictionary_page, + }, + num_requests: 1, + }; + + return LogEntry::ReadData(read_data_entry); + } + } + } + + // If we reach here, the range does not match any known logical part of the file + LogEntry::Unknown(range.clone()) + } + + // Combine entries in the log that are similar to reduce noise in the log. + fn coalesce_entries(&self) { + let mut ops = self.ops.lock().unwrap(); + + // Coalesce entries with the same read type + let prev_ops = std::mem::take(&mut *ops); + for entry in prev_ops { + let Some(last) = ops.last_mut() else { + ops.push(entry); + continue; + }; + + let LogEntry::ReadData(ReadInfo { + row_group_index: last_rg_index, + column_name: last_column_name, + range: last_range, + read_type: last_read_type, + num_requests: last_num_reads, + }) = last + else { + // If the last entry is not a ReadColumnChunk, just push it + ops.push(entry); + continue; + }; + + // If the entry is not a ReadColumnChunk, just push it + let LogEntry::ReadData(ReadInfo { + row_group_index, + column_name, + range, + read_type, + num_requests: num_reads, + }) = &entry + else { + ops.push(entry); + continue; + }; + + // Combine the entries if they are the same and this read is less than 10b. + // + // This heuristic is used to combine small reads (typically 1-2 + // byte) made by the thrift decoder when reading the data/dictionary + // page headers. + if *row_group_index != *last_rg_index + || column_name != last_column_name + || read_type != last_read_type + || (range.start > last_range.end) + || (range.end < last_range.start) + || range.len() > 10 + { + ops.push(entry); + continue; + } + // combine + *last_range = last_range.start.min(range.start)..last_range.end.max(range.end); + *last_num_reads += num_reads; + } + } + + /// return a snapshot of the current operations in the log. + fn snapshot(&self) -> Vec { + self.coalesce_entries(); + let ops = self.ops.lock().unwrap(); + let mut actual = vec![]; + let indent = 0; + ops.iter() + .for_each(|s| s.append_string(&mut actual, indent)); + actual + } +} diff --git a/parquet/tests/arrow_reader/io/sync_reader.rs b/parquet/tests/arrow_reader/io/sync_reader.rs new file mode 100644 index 000000000000..685f251a9e2b --- /dev/null +++ b/parquet/tests/arrow_reader/io/sync_reader.rs @@ -0,0 +1,443 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Tests for the sync reader - [`ParquetRecordBatchReaderBuilder`] + +use crate::io::{ + filter_a_175_b_625, filter_b_575_625, filter_b_false, test_file, test_options, LogEntry, + OperationLog, TestParquetFile, +}; + +use bytes::Bytes; +use parquet::arrow::arrow_reader::{ + ArrowReaderOptions, ParquetRecordBatchReaderBuilder, RowSelection, RowSelector, +}; +use parquet::arrow::ProjectionMask; +use parquet::file::reader::{ChunkReader, Length}; +use std::io::Read; +use std::sync::Arc; + +#[test] +fn test_read_entire_file() { + // read entire file without any filtering or projection + let test_file = test_file(); + // Expect to see IO for all data pages for each row group and column + let builder = sync_builder(&test_file, test_options()); + insta::assert_debug_snapshot!(run(&test_file, builder), + @r#" + [ + "Footer: 8 bytes", + "Metadata: 1162", + "UNKNOWN: 22230..22877 (maybe Page Index)", + "Event: Builder Configured", + "Event: Reader Built", + "Row Group 0, column 'a': DictionaryPage (1617 bytes, 1 requests) [data]", + "Row Group 0, column 'a': DataPage(0) (113 bytes , 1 requests) [data]", + "Row Group 0, column 'a': DataPage(1) (126 bytes , 1 requests) [data]", + "Row Group 1, column 'a': DictionaryPage (1617 bytes, 1 requests) [data]", + "Row Group 1, column 'a': DataPage(0) (113 bytes , 1 requests) [data]", + "Row Group 1, column 'a': DataPage(1) (126 bytes , 1 requests) [data]", + "Row Group 0, column 'b': DictionaryPage (1617 bytes, 1 requests) [data]", + "Row Group 0, column 'b': DataPage(0) (113 bytes , 1 requests) [data]", + "Row Group 0, column 'b': DataPage(1) (126 bytes , 1 requests) [data]", + "Row Group 1, column 'b': DictionaryPage (1617 bytes, 1 requests) [data]", + "Row Group 1, column 'b': DataPage(0) (113 bytes , 1 requests) [data]", + "Row Group 1, column 'b': DataPage(1) (126 bytes , 1 requests) [data]", + "Row Group 0, column 'c': DictionaryPage (7107 bytes, 1 requests) [data]", + "Row Group 0, column 'c': DataPage(0) (113 bytes , 1 requests) [data]", + "Row Group 0, column 'c': DataPage(1) (126 bytes , 1 requests) [data]", + "Row Group 1, column 'c': DictionaryPage (7217 bytes, 1 requests) [data]", + "Row Group 1, column 'c': DataPage(0) (113 bytes , 1 requests) [data]", + "Row Group 1, column 'c': DataPage(1) (126 bytes , 1 requests) [data]", + ] + "#); +} + +#[test] +fn test_read_single_group() { + let test_file = test_file(); + let builder = sync_builder(&test_file, test_options()).with_row_groups(vec![1]); // read only second row group + + // Expect to see only IO for Row Group 1. Should see no IO for Row Group 0. + insta::assert_debug_snapshot!(run(&test_file, builder), + @r#" + [ + "Footer: 8 bytes", + "Metadata: 1162", + "UNKNOWN: 22230..22877 (maybe Page Index)", + "Event: Builder Configured", + "Event: Reader Built", + "Row Group 1, column 'a': DictionaryPage (1617 bytes, 1 requests) [data]", + "Row Group 1, column 'a': DataPage(0) (113 bytes , 1 requests) [data]", + "Row Group 1, column 'a': DataPage(1) (126 bytes , 1 requests) [data]", + "Row Group 1, column 'b': DictionaryPage (1617 bytes, 1 requests) [data]", + "Row Group 1, column 'b': DataPage(0) (113 bytes , 1 requests) [data]", + "Row Group 1, column 'b': DataPage(1) (126 bytes , 1 requests) [data]", + "Row Group 1, column 'c': DictionaryPage (7217 bytes, 1 requests) [data]", + "Row Group 1, column 'c': DataPage(0) (113 bytes , 1 requests) [data]", + "Row Group 1, column 'c': DataPage(1) (126 bytes , 1 requests) [data]", + ] + "#); +} + +#[test] +fn test_read_single_column() { + let test_file = test_file(); + let builder = sync_builder(&test_file, test_options()); + let schema_descr = builder.metadata().file_metadata().schema_descr_ptr(); + let builder = builder.with_projection(ProjectionMask::columns(&schema_descr, ["b"])); + // Expect to see only IO for column "b". Should see no IO for columns "a" or "c". + insta::assert_debug_snapshot!(run(&test_file, builder), + @r#" + [ + "Footer: 8 bytes", + "Metadata: 1162", + "UNKNOWN: 22230..22877 (maybe Page Index)", + "Event: Builder Configured", + "Event: Reader Built", + "Row Group 0, column 'b': DictionaryPage (1617 bytes, 1 requests) [data]", + "Row Group 0, column 'b': DataPage(0) (113 bytes , 1 requests) [data]", + "Row Group 0, column 'b': DataPage(1) (126 bytes , 1 requests) [data]", + "Row Group 1, column 'b': DictionaryPage (1617 bytes, 1 requests) [data]", + "Row Group 1, column 'b': DataPage(0) (113 bytes , 1 requests) [data]", + "Row Group 1, column 'b': DataPage(1) (126 bytes , 1 requests) [data]", + ] + "#); +} + +#[test] +fn test_read_single_column_no_page_index() { + let test_file = test_file(); + let options = test_options().with_page_index(false); + let builder = sync_builder(&test_file, options); + let schema_descr = builder.metadata().file_metadata().schema_descr_ptr(); + let builder = builder.with_projection(ProjectionMask::columns(&schema_descr, ["b"])); + // Expect to see only IO for column "b", should see no IO for columns "a" or "c". + // + // Note that we need to read all data page headers to find the pages for column b + // so there are many more small reads than in the test_read_single_column test above + insta::assert_debug_snapshot!(run(&test_file, builder), + @r#" + [ + "Footer: 8 bytes", + "Metadata: 1162", + "Event: Builder Configured", + "Event: Reader Built", + "Row Group 0, column 'b': DictionaryPage (17 bytes , 17 requests) [header]", + "Row Group 0, column 'b': DictionaryPage (1600 bytes, 1 requests) [data]", + "Row Group 0, column 'b': DataPage(0) (20 bytes , 20 requests) [header]", + "Row Group 0, column 'b': DataPage(0) (93 bytes , 1 requests) [data]", + "Row Group 0, column 'b': DataPage(1) (20 bytes , 20 requests) [header]", + "Row Group 0, column 'b': DataPage(1) (106 bytes , 1 requests) [data]", + "Row Group 1, column 'b': DictionaryPage (17 bytes , 17 requests) [header]", + "Row Group 1, column 'b': DictionaryPage (1600 bytes, 1 requests) [data]", + "Row Group 1, column 'b': DataPage(0) (20 bytes , 20 requests) [header]", + "Row Group 1, column 'b': DataPage(0) (93 bytes , 1 requests) [data]", + "Row Group 1, column 'b': DataPage(1) (20 bytes , 20 requests) [header]", + "Row Group 1, column 'b': DataPage(1) (106 bytes , 1 requests) [data]", + ] + "#); +} + +#[test] +fn test_read_row_selection() { + // There are 400 total rows spread across 4 data pages (100 rows each) + // select rows 175..225 (i.e. DataPage(1) of row group 0 and DataPage(0) of row group 1) + let test_file = test_file(); + let builder = sync_builder(&test_file, test_options()); + let schema_descr = builder.metadata().file_metadata().schema_descr_ptr(); + let builder = builder + .with_projection( + // read both "a" and "b" + ProjectionMask::columns(&schema_descr, ["a", "b"]), + ) + .with_row_selection(RowSelection::from(vec![ + RowSelector::skip(175), + RowSelector::select(50), + ])); + + // Expect to see only data IO for one page for each column for each row group + // Note the data page headers for all pages need to be read to find the correct pages + insta::assert_debug_snapshot!(run(&test_file, builder), + @r#" + [ + "Footer: 8 bytes", + "Metadata: 1162", + "UNKNOWN: 22230..22877 (maybe Page Index)", + "Event: Builder Configured", + "Event: Reader Built", + "Row Group 0, column 'a': DictionaryPage (1617 bytes, 1 requests) [data]", + "Row Group 0, column 'a': DataPage(1) (126 bytes , 1 requests) [data]", + "Row Group 0, column 'b': DictionaryPage (1617 bytes, 1 requests) [data]", + "Row Group 0, column 'b': DataPage(1) (126 bytes , 1 requests) [data]", + "Row Group 1, column 'a': DictionaryPage (1617 bytes, 1 requests) [data]", + "Row Group 1, column 'a': DataPage(0) (113 bytes , 1 requests) [data]", + "Row Group 1, column 'b': DictionaryPage (1617 bytes, 1 requests) [data]", + "Row Group 1, column 'b': DataPage(0) (113 bytes , 1 requests) [data]", + ] + "#); +} + +#[test] +fn test_read_limit() { + // There are 400 total rows spread across 4 data pages (100 rows each) + // a limit of 125 rows should only fetch the first two data pages (DataPage(0) and DataPage(1)) from row group 0 + let test_file = test_file(); + let builder = sync_builder(&test_file, test_options()); + let schema_descr = builder.metadata().file_metadata().schema_descr_ptr(); + let builder = builder + .with_projection(ProjectionMask::columns(&schema_descr, ["a"])) + .with_limit(125); + + insta::assert_debug_snapshot!(run(&test_file, builder), + @r#" + [ + "Footer: 8 bytes", + "Metadata: 1162", + "UNKNOWN: 22230..22877 (maybe Page Index)", + "Event: Builder Configured", + "Event: Reader Built", + "Row Group 0, column 'a': DictionaryPage (1617 bytes, 1 requests) [data]", + "Row Group 0, column 'a': DataPage(0) (113 bytes , 1 requests) [data]", + "Row Group 0, column 'a': DataPage(1) (126 bytes , 1 requests) [data]", + ] + "#); +} + +#[test] +fn test_read_single_row_filter() { + // Values from column "b" range 400..799 + // filter "b" > 575 and < 625 + // (last data page in Row Group 0 and first DataPage in Row Group 1) + let test_file = test_file(); + let builder = sync_builder(&test_file, test_options()); + let schema_descr = builder.metadata().file_metadata().schema_descr_ptr(); + + let builder = builder + .with_projection( + // read both "a" and "b" + ProjectionMask::columns(&schema_descr, ["a", "b"]), + ) + // "b" > 575 and "b" < 625 + .with_row_filter(filter_b_575_625(&schema_descr)); + + // Expect to see I/O for column b in both row groups and then reading just a + // single pages for a in each row group + // + // Note there is significant IO that happens during the construction of the + // reader (between "Builder Configured" and "Reader Built") + insta::assert_debug_snapshot!(run(&test_file, builder), + @r#" + [ + "Footer: 8 bytes", + "Metadata: 1162", + "UNKNOWN: 22230..22877 (maybe Page Index)", + "Event: Builder Configured", + "Row Group 0, column 'b': DictionaryPage (1617 bytes, 1 requests) [data]", + "Row Group 0, column 'b': DataPage(0) (113 bytes , 1 requests) [data]", + "Row Group 0, column 'b': DataPage(1) (126 bytes , 1 requests) [data]", + "Row Group 1, column 'b': DictionaryPage (1617 bytes, 1 requests) [data]", + "Row Group 1, column 'b': DataPage(0) (113 bytes , 1 requests) [data]", + "Row Group 1, column 'b': DataPage(1) (126 bytes , 1 requests) [data]", + "Event: Reader Built", + "Row Group 0, column 'a': DictionaryPage (1617 bytes, 1 requests) [data]", + "Row Group 0, column 'a': DataPage(1) (126 bytes , 1 requests) [data]", + "Row Group 0, column 'b': DictionaryPage (1617 bytes, 1 requests) [data]", + "Row Group 0, column 'b': DataPage(1) (126 bytes , 1 requests) [data]", + "Row Group 1, column 'a': DictionaryPage (1617 bytes, 1 requests) [data]", + "Row Group 1, column 'a': DataPage(0) (113 bytes , 1 requests) [data]", + "Row Group 1, column 'b': DictionaryPage (1617 bytes, 1 requests) [data]", + "Row Group 1, column 'b': DataPage(0) (113 bytes , 1 requests) [data]", + ] + "#); +} + +#[test] +fn test_read_multiple_row_filter() { + // Values in column "a" range 0..399 + // Values in column "b" range 400..799 + // First filter: "a" > 175 (last data page in Row Group 0) + // Second filter: "b" < 625 (last data page in Row Group 0 and first DataPage in RowGroup 1) + // Read column "c" + let test_file = test_file(); + let builder = sync_builder(&test_file, test_options()); + let schema_descr = builder.metadata().file_metadata().schema_descr_ptr(); + + let builder = builder + .with_projection( + ProjectionMask::columns(&schema_descr, ["c"]), // read "c" + ) + // a > 175 and b < 625 + .with_row_filter(filter_a_175_b_625(&schema_descr)); + + // Expect that we will see + // 1. IO for all pages of column A + // 2. IO for pages of column b that passed 1. + // 3. IO after reader is built only for column c + // + // Note there is significant IO that happens during the construction of the + // reader (between "Builder Configured" and "Reader Built") + insta::assert_debug_snapshot!(run(&test_file, builder), + @r#" + [ + "Footer: 8 bytes", + "Metadata: 1162", + "UNKNOWN: 22230..22877 (maybe Page Index)", + "Event: Builder Configured", + "Row Group 0, column 'a': DictionaryPage (1617 bytes, 1 requests) [data]", + "Row Group 0, column 'a': DataPage(0) (113 bytes , 1 requests) [data]", + "Row Group 0, column 'a': DataPage(1) (126 bytes , 1 requests) [data]", + "Row Group 1, column 'a': DictionaryPage (1617 bytes, 1 requests) [data]", + "Row Group 1, column 'a': DataPage(0) (113 bytes , 1 requests) [data]", + "Row Group 1, column 'a': DataPage(1) (126 bytes , 1 requests) [data]", + "Row Group 0, column 'b': DictionaryPage (1617 bytes, 1 requests) [data]", + "Row Group 0, column 'b': DataPage(1) (126 bytes , 1 requests) [data]", + "Row Group 1, column 'b': DictionaryPage (1617 bytes, 1 requests) [data]", + "Row Group 1, column 'b': DataPage(0) (113 bytes , 1 requests) [data]", + "Row Group 1, column 'b': DataPage(1) (126 bytes , 1 requests) [data]", + "Event: Reader Built", + "Row Group 0, column 'c': DictionaryPage (7107 bytes, 1 requests) [data]", + "Row Group 0, column 'c': DataPage(1) (126 bytes , 1 requests) [data]", + "Row Group 1, column 'c': DictionaryPage (7217 bytes, 1 requests) [data]", + "Row Group 1, column 'c': DataPage(0) (113 bytes , 1 requests) [data]", + ] + "#); +} + +#[test] +fn test_read_single_row_filter_all() { + // Apply a filter that entirely filters out rows based on a predicate from one column + // should not read any data pages for any other column + + let test_file = test_file(); + let builder = sync_builder(&test_file, test_options()); + let schema_descr = builder.metadata().file_metadata().schema_descr_ptr(); + + let builder = builder + .with_projection(ProjectionMask::columns(&schema_descr, ["a", "b"])) + .with_row_filter(filter_b_false(&schema_descr)); + + // Expect to see the Footer and Metadata, then I/O for column b + // in both row groups but then nothing for column "a" + // since the row filter entirely filters out all rows. + // + // Note that all IO that happens during the construction of the reader + // (between "Builder Configured" and "Reader Built") + insta::assert_debug_snapshot!(run(&test_file, builder), + @r#" + [ + "Footer: 8 bytes", + "Metadata: 1162", + "UNKNOWN: 22230..22877 (maybe Page Index)", + "Event: Builder Configured", + "Row Group 0, column 'b': DictionaryPage (1617 bytes, 1 requests) [data]", + "Row Group 0, column 'b': DataPage(0) (113 bytes , 1 requests) [data]", + "Row Group 0, column 'b': DataPage(1) (126 bytes , 1 requests) [data]", + "Row Group 1, column 'b': DictionaryPage (1617 bytes, 1 requests) [data]", + "Row Group 1, column 'b': DataPage(0) (113 bytes , 1 requests) [data]", + "Row Group 1, column 'b': DataPage(1) (126 bytes , 1 requests) [data]", + "Event: Reader Built", + ] + "#); +} + +/// Return a [`ParquetRecordBatchReaderBuilder`] for reading this file +fn sync_builder( + test_file: &TestParquetFile, + options: ArrowReaderOptions, +) -> ParquetRecordBatchReaderBuilder { + let reader = RecordingChunkReader { + inner: test_file.bytes().clone(), + ops: Arc::clone(test_file.ops()), + }; + ParquetRecordBatchReaderBuilder::try_new_with_options(reader, options) + .expect("ParquetRecordBatchReaderBuilder") +} + +/// build the reader, and read all batches from it, returning the recorded IO operations +fn run( + test_file: &TestParquetFile, + builder: ParquetRecordBatchReaderBuilder, +) -> Vec { + let ops = test_file.ops(); + ops.add_entry(LogEntry::event("Builder Configured")); + let reader = builder.build().unwrap(); + ops.add_entry(LogEntry::event("Reader Built")); + for batch in reader { + match batch { + Ok(_) => {} + Err(e) => panic!("Error reading batch: {e}"), + } + } + ops.snapshot() +} + +/// Records IO operations on an in-memory chunk reader +struct RecordingChunkReader { + inner: Bytes, + ops: Arc, +} + +impl Length for RecordingChunkReader { + fn len(&self) -> u64 { + self.inner.len() as u64 + } +} + +impl ChunkReader for RecordingChunkReader { + type T = RecordingStdIoReader; + + fn get_read(&self, start: u64) -> parquet::errors::Result { + let reader = RecordingStdIoReader { + start: start as usize, + inner: self.inner.clone(), + ops: Arc::clone(&self.ops), + }; + Ok(reader) + } + + fn get_bytes(&self, start: u64, length: usize) -> parquet::errors::Result { + let start = start as usize; + let range = start..start + length; + self.ops.add_entry_for_range(&range); + Ok(self.inner.slice(start..start + length)) + } +} + +/// Wrapper around a `Bytes` object that implements `Read` +struct RecordingStdIoReader { + /// current offset in the inner `Bytes` that this reader is reading from + start: usize, + inner: Bytes, + ops: Arc, +} + +impl Read for RecordingStdIoReader { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + let remain = self.inner.len() - self.start; + let start = self.start; + let read_length = buf.len().min(remain); + let read_range = start..start + read_length; + + self.ops.add_entry_for_range(&read_range); + + buf.copy_from_slice(self.inner.slice(read_range).as_ref()); + // Update the inner position + self.start += read_length; + Ok(read_length) + } +} diff --git a/parquet/tests/arrow_reader/mod.rs b/parquet/tests/arrow_reader/mod.rs index 48d732f17f21..510d62786077 100644 --- a/parquet/tests/arrow_reader/mod.rs +++ b/parquet/tests/arrow_reader/mod.rs @@ -42,6 +42,9 @@ mod bad_data; #[cfg(feature = "crc")] mod checksum; mod int96_stats_roundtrip; +mod io; +#[cfg(feature = "async")] +mod predicate_cache; mod statistics; // returns a struct array with columns "int32_col", "float32_col" and "float64_col" with the specified values @@ -334,9 +337,9 @@ fn make_uint_batches(start: u8, end: u8) -> RecordBatch { Field::new("u64", DataType::UInt64, true), ])); let v8: Vec = (start..end).collect(); - let v16: Vec = (start as _..end as _).collect(); - let v32: Vec = (start as _..end as _).collect(); - let v64: Vec = (start as _..end as _).collect(); + let v16: Vec = (start as _..end as u16).collect(); + let v32: Vec = (start as _..end as u32).collect(); + let v64: Vec = (start as _..end as u64).collect(); RecordBatch::try_new( schema, vec![ diff --git a/parquet/tests/arrow_reader/predicate_cache.rs b/parquet/tests/arrow_reader/predicate_cache.rs new file mode 100644 index 000000000000..44d43113cbf5 --- /dev/null +++ b/parquet/tests/arrow_reader/predicate_cache.rs @@ -0,0 +1,279 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Test for predicate cache in Parquet Arrow reader + +use arrow::array::ArrayRef; +use arrow::array::Int64Array; +use arrow::compute::and; +use arrow::compute::kernels::cmp::{gt, lt}; +use arrow_array::cast::AsArray; +use arrow_array::types::Int64Type; +use arrow_array::{RecordBatch, StringViewArray}; +use bytes::Bytes; +use futures::future::BoxFuture; +use futures::{FutureExt, StreamExt}; +use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics; +use parquet::arrow::arrow_reader::{ArrowPredicateFn, ArrowReaderOptions, RowFilter}; +use parquet::arrow::arrow_reader::{ArrowReaderBuilder, ParquetRecordBatchReaderBuilder}; +use parquet::arrow::async_reader::AsyncFileReader; +use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder, ProjectionMask}; +use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader}; +use parquet::file::properties::WriterProperties; +use std::ops::Range; +use std::sync::Arc; +use std::sync::LazyLock; + +#[tokio::test] +async fn test_default_read() { + // The cache is not used without predicates, so we expect 0 records read from cache + let test = ParquetPredicateCacheTest::new().with_expected_records_read_from_cache(0); + let sync_builder = test.sync_builder(); + test.run_sync(sync_builder); + let async_builder = test.async_builder().await; + test.run_async(async_builder).await; +} + +#[tokio::test] +async fn test_async_cache_with_filters() { + let test = ParquetPredicateCacheTest::new().with_expected_records_read_from_cache(49); + let async_builder = test.async_builder().await; + let async_builder = test.add_project_ab_and_filter_b(async_builder); + test.run_async(async_builder).await; +} + +#[tokio::test] +async fn test_sync_cache_with_filters() { + let test = ParquetPredicateCacheTest::new() + // The sync reader does not use the cache. See https://github.com/apache/arrow-rs/issues/8000 + .with_expected_records_read_from_cache(0); + + let sync_builder = test.sync_builder(); + let sync_builder = test.add_project_ab_and_filter_b(sync_builder); + test.run_sync(sync_builder); +} + +#[tokio::test] +async fn test_cache_disabled_with_filters() { + // expect no records to be read from cache, because the cache is disabled + let test = ParquetPredicateCacheTest::new().with_expected_records_read_from_cache(0); + let sync_builder = test.sync_builder().with_max_predicate_cache_size(0); + let sync_builder = test.add_project_ab_and_filter_b(sync_builder); + test.run_sync(sync_builder); + + let async_builder = test.async_builder().await.with_max_predicate_cache_size(0); + let async_builder = test.add_project_ab_and_filter_b(async_builder); + test.run_async(async_builder).await; +} + +// -- Begin test infrastructure -- + +/// A test parquet file +struct ParquetPredicateCacheTest { + bytes: Bytes, + expected_records_read_from_cache: usize, +} +impl ParquetPredicateCacheTest { + /// Create a new `TestParquetFile` with: + /// 3 columns: "a", "b", "c" + /// + /// 2 row groups, each with 200 rows + /// each data page has 100 rows + /// + /// Values of column "a" are 0..399 + /// Values of column "b" are 400..799 + /// Values of column "c" are alternating strings of length 12 and longer + fn new() -> Self { + Self { + bytes: TEST_FILE_DATA.clone(), + expected_records_read_from_cache: 0, + } + } + + /// Set the expected number of records read from the cache + fn with_expected_records_read_from_cache( + mut self, + expected_records_read_from_cache: usize, + ) -> Self { + self.expected_records_read_from_cache = expected_records_read_from_cache; + self + } + + /// Return a [`ParquetRecordBatchReaderBuilder`] for reading this file + fn sync_builder(&self) -> ParquetRecordBatchReaderBuilder { + let reader = self.bytes.clone(); + ParquetRecordBatchReaderBuilder::try_new_with_options(reader, ArrowReaderOptions::default()) + .expect("ParquetRecordBatchReaderBuilder") + } + + /// Return a [`ParquetRecordBatchReaderBuilder`] for reading this file + async fn async_builder(&self) -> ParquetRecordBatchStreamBuilder { + let reader = TestReader::new(self.bytes.clone()); + ParquetRecordBatchStreamBuilder::new_with_options(reader, ArrowReaderOptions::default()) + .await + .unwrap() + } + + /// Return a [`ParquetRecordBatchReaderBuilder`] for reading the file with + /// + /// 1. a projection selecting the "a" and "b" column + /// 2. a row_filter applied to "b": 575 < "b" < 625 (select 1 data page from each row group) + fn add_project_ab_and_filter_b( + &self, + builder: ArrowReaderBuilder, + ) -> ArrowReaderBuilder { + let schema_descr = builder.metadata().file_metadata().schema_descr_ptr(); + + // "b" > 575 and "b" < 625 + let row_filter = ArrowPredicateFn::new( + ProjectionMask::columns(&schema_descr, ["b"]), + |batch: RecordBatch| { + let scalar_575 = Int64Array::new_scalar(575); + let scalar_625 = Int64Array::new_scalar(625); + let column = batch.column(0).as_primitive::(); + and(>(column, &scalar_575)?, <(column, &scalar_625)?) + }, + ); + + builder + .with_projection(ProjectionMask::columns(&schema_descr, ["a", "b"])) + .with_row_filter(RowFilter::new(vec![Box::new(row_filter)])) + } + + /// Build the reader from the specified builder, reading all batches from it, + /// and asserts the + fn run_sync(&self, builder: ParquetRecordBatchReaderBuilder) { + let metrics = ArrowReaderMetrics::enabled(); + + let reader = builder.with_metrics(metrics.clone()).build().unwrap(); + for batch in reader { + match batch { + Ok(_) => {} + Err(e) => panic!("Error reading batch: {e}"), + } + } + self.verify_metrics(metrics) + } + + /// Build the reader from the specified builder, reading all batches from it, + /// and asserts the + async fn run_async(&self, builder: ParquetRecordBatchStreamBuilder) { + let metrics = ArrowReaderMetrics::enabled(); + + let mut stream = builder.with_metrics(metrics.clone()).build().unwrap(); + while let Some(batch) = stream.next().await { + match batch { + Ok(_) => {} + Err(e) => panic!("Error reading batch: {e}"), + } + } + self.verify_metrics(metrics) + } + + fn verify_metrics(&self, metrics: ArrowReaderMetrics) { + let Self { + bytes: _, + expected_records_read_from_cache, + } = self; + + let read_from_cache = metrics + .records_read_from_cache() + .expect("Metrics enabled, so should have metrics"); + + assert_eq!( + &read_from_cache, expected_records_read_from_cache, + "Expected {expected_records_read_from_cache} records read from cache, but got {read_from_cache}" + ); + } +} + +/// Create a parquet file in memory for testing. See [`test_file`] for details. +static TEST_FILE_DATA: LazyLock = LazyLock::new(|| { + // Input batch has 400 rows, with 3 columns: "a", "b", "c" + // Note c is a different types (so the data page sizes will be different) + let a: ArrayRef = Arc::new(Int64Array::from_iter_values(0..400)); + let b: ArrayRef = Arc::new(Int64Array::from_iter_values(400..800)); + let c: ArrayRef = Arc::new(StringViewArray::from_iter_values((0..400).map(|i| { + if i % 2 == 0 { + format!("string_{i}") + } else { + format!("A string larger than 12 bytes and thus not inlined {i}") + } + }))); + + let input_batch = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap(); + + let mut output = Vec::new(); + + let writer_options = WriterProperties::builder() + .set_max_row_group_size(200) + .set_data_page_row_count_limit(100) + .build(); + let mut writer = + ArrowWriter::try_new(&mut output, input_batch.schema(), Some(writer_options)).unwrap(); + + // since the limits are only enforced on batch boundaries, write the input + // batch in chunks of 50 + let mut row_remain = input_batch.num_rows(); + while row_remain > 0 { + let chunk_size = row_remain.min(50); + let chunk = input_batch.slice(input_batch.num_rows() - row_remain, chunk_size); + writer.write(&chunk).unwrap(); + row_remain -= chunk_size; + } + writer.close().unwrap(); + Bytes::from(output) +}); + +/// Copy paste version of the `AsyncFileReader` trait for testing purposes 🤮 +/// TODO put this in a common place +#[derive(Clone)] +struct TestReader { + data: Bytes, + metadata: Option>, +} + +impl TestReader { + fn new(data: Bytes) -> Self { + Self { + data, + metadata: Default::default(), + } + } +} + +impl AsyncFileReader for TestReader { + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, parquet::errors::Result> { + let range = range.clone(); + futures::future::ready(Ok(self + .data + .slice(range.start as usize..range.end as usize))) + .boxed() + } + + fn get_metadata<'a>( + &'a mut self, + options: Option<&'a ArrowReaderOptions>, + ) -> BoxFuture<'a, parquet::errors::Result>> { + let metadata_reader = + ParquetMetaDataReader::new().with_page_indexes(options.is_some_and(|o| o.page_index())); + self.metadata = Some(Arc::new( + metadata_reader.parse_and_finish(&self.data).unwrap(), + )); + futures::future::ready(Ok(self.metadata.clone().unwrap().clone())).boxed() + } +}