diff --git a/rust/lance-encoding/src/encodings/logical/primitive.rs b/rust/lance-encoding/src/encodings/logical/primitive.rs index 51ae0045c79..b54822c8d96 100644 --- a/rust/lance-encoding/src/encodings/logical/primitive.rs +++ b/rust/lance-encoding/src/encodings/logical/primitive.rs @@ -1981,6 +1981,79 @@ struct FullZipDecodeDetails { max_visible_def: u16, } +/// Describes where FullZip byte ranges should be read from. +/// +/// FullZip decoding always needs a list of byte ranges, but those bytes can come +/// from two different places: +/// - Remote I/O (normal path): ranges are fetched from the underlying `EncodingsIo`. +/// - A prefetched full page (full scan fast path): the entire page has already been +/// loaded once and ranges should be sliced from memory. +/// +/// This abstraction keeps scheduling code focused on "which ranges are needed" +/// instead of "how bytes are fetched", and it lets full-page scans avoid the +/// two-stage rep-index -> data I/O pipeline. +#[derive(Debug, Clone)] +enum FullZipReadSource { + /// Fetch ranges from the storage backend through the encoding I/O interface. + Remote(Arc), + /// Slice ranges from an already-loaded FullZip page buffer. + PrefetchedPage { base_offset: u64, data: LanceBuffer }, +} + +impl FullZipReadSource { + /// Materialize the requested ranges as decode-ready `LanceBuffer`s. + /// + /// The returned buffers preserve the input range order. + fn fetch( + &self, + ranges: &[Range], + priority: u64, + ) -> BoxFuture<'static, Result>> { + match self { + Self::Remote(io) => { + let io = io.clone(); + let ranges = ranges.to_vec(); + async move { + let data = io.submit_request(ranges, priority).await?; + Ok(data + .into_iter() + .map(|bytes| LanceBuffer::from_bytes(bytes, 1)) + .collect::>()) + } + .boxed() + } + Self::PrefetchedPage { base_offset, data } => { + let base_offset = *base_offset; + let data = data.clone(); + let page_end = base_offset + data.len() as u64; + std::future::ready( + ranges + .iter() + .map(|range| { + if range.start > range.end + || range.start < base_offset + || range.end > page_end + { + return Err(Error::Internal { + message: format!( + "Requested range {:?} is outside page range {}..{}", + range, base_offset, page_end + ), + location: location!(), + }); + } + let start = (range.start - base_offset) as usize; + let len = (range.end - range.start) as usize; + Ok(data.slice_with_length(start, len)) + }) + .collect::>>(), + ) + .boxed() + } + } + } +} + /// A scheduler for full-zip encoded data /// /// When the data type has a fixed-width then we simply need to map from @@ -1991,6 +2064,7 @@ struct FullZipDecodeDetails { #[derive(Debug)] pub struct FullZipScheduler { data_buf_position: u64, + data_buf_size: u64, rep_index: Option, priority: u64, rows_in_page: u64, @@ -1998,7 +2072,7 @@ pub struct FullZipScheduler { details: Arc, /// Cached state containing the decoded repetition index cached_state: Option>, - /// Whether to enable caching of repetition indices + /// Whether repetition index metadata should be cached during initialize. enable_cache: bool, } @@ -2010,10 +2084,7 @@ impl FullZipScheduler { layout: &pb21::FullZipLayout, decompressors: &dyn DecompressionStrategy, ) -> Result { - // We don't need the data_buf_size because either the data type is - // fixed-width (and we can tell size from rows_in_page) or it is not - // and we have a repetition index. - let (data_buf_position, _) = buffer_offsets_and_sizes[0]; + let (data_buf_position, data_buf_size) = buffer_offsets_and_sizes[0]; let rep_index = buffer_offsets_and_sizes.get(1).map(|(pos, len)| { let num_reps = rows_in_page + 1; let bytes_per_rep = len / num_reps; @@ -2081,16 +2152,51 @@ impl FullZipScheduler { }); Ok(Self { data_buf_position, + data_buf_size, rep_index, details, priority, rows_in_page, bits_per_offset, cached_state: None, - enable_cache: false, // Default to false, will be set later + enable_cache: false, }) } + fn covers_entire_page(ranges: &[Range], rows_in_page: u64) -> bool { + if ranges.is_empty() { + return false; + } + let mut expected_start = 0; + for range in ranges { + if range.start != expected_start || range.end > rows_in_page || range.end < range.start + { + return false; + } + expected_start = range.end; + } + expected_start == rows_in_page + } + + fn create_page_load_task( + read_source: FullZipReadSource, + byte_ranges: Vec>, + priority: u64, + num_rows: u64, + details: Arc, + bits_per_offset: u8, + ) -> PageLoadTask { + let load_task = async move { + let data = read_source.fetch(&byte_ranges, priority).await?; + Self::create_decoder(details, data, num_rows, bits_per_offset) + } + .boxed(); + PageLoadTask { + decoder_fut: load_task, + num_rows, + } + } + /// Creates a decoder from the loaded data fn create_decoder( details: Arc, @@ -2101,12 +2207,6 @@ impl FullZipScheduler { match &details.value_decompressor { PerValueDecompressor::Fixed(decompressor) => { let bits_per_value = decompressor.bits_per_value(); - if bits_per_value == 0 { - return Err(lance_core::Error::Internal { - message: "Invalid encoding: bits_per_value must be greater than 0".into(), - location: location!(), - }); - } if bits_per_value % 8 != 0 { return Err(lance_core::Error::NotSupported { source: "Bit-packed full-zip encoding (non-byte-aligned values) is not yet implemented".into(), @@ -2116,6 +2216,13 @@ impl FullZipScheduler { let bytes_per_value = bits_per_value / 8; let total_bytes_per_value = bytes_per_value as usize + details.ctrl_word_parser.bytes_per_word(); + if total_bytes_per_value == 0 { + return Err(lance_core::Error::Internal { + message: "Invalid encoding: per-row byte width must be greater than 0" + .into(), + location: location!(), + }); + } Ok(Box::new(FixedFullZipDecoder { details, data, @@ -2204,41 +2311,6 @@ impl FullZipScheduler { .collect() } - /// Resolves byte ranges from repetition index (either from cache or disk) - async fn resolve_byte_ranges( - data_buf_position: u64, - ranges: &[Range], - io: &Arc, - rep_index: &FullZipRepIndexDetails, - cached_state: Option<&Arc>, - priority: u64, - ) -> Result>> { - if let Some(cached_state) = cached_state { - // Use cached repetition index - Ok(Self::extract_byte_ranges_from_cached( - &cached_state.rep_index_buffer, - ranges, - rep_index.bytes_per_value, - data_buf_position, - )) - } else { - // Load from disk - let rep_ranges = Self::compute_rep_index_ranges(ranges, rep_index); - let rep_data = io.submit_request(rep_ranges, priority).await?; - let rep_buffer = LanceBuffer::concat( - &rep_data - .into_iter() - .map(|d| LanceBuffer::from_bytes(d, 1)) - .collect::>(), - ); - Ok(Self::extract_byte_ranges_from_pairs( - rep_buffer, - rep_index.bytes_per_value, - data_buf_position, - )) - } - } - /// Schedules ranges in the presence of a repetition index fn schedule_ranges_rep( &self, @@ -2246,39 +2318,69 @@ impl FullZipScheduler { io: &Arc, rep_index: FullZipRepIndexDetails, ) -> Result> { - // Copy necessary fields to avoid lifetime issues + let num_rows = ranges.iter().map(|r| r.end - r.start).sum(); let data_buf_position = self.data_buf_position; - let cached_state = self.cached_state.clone(); let priority = self.priority; let details = self.details.clone(); let bits_per_offset = self.bits_per_offset; - let ranges = ranges.to_vec(); - let io_clone = io.clone(); - let num_rows = ranges.iter().map(|r| r.end - r.start).sum(); - let load_task = async move { - // Step 1: Resolve byte ranges from repetition index - let byte_ranges = Self::resolve_byte_ranges( + if Self::covers_entire_page(ranges, self.rows_in_page) { + let full_range = self.data_buf_position..(self.data_buf_position + self.data_buf_size); + let page_data = io.submit_single(full_range.clone(), priority); + let load_task = async move { + let page_data = page_data.await?; + let source = FullZipReadSource::PrefetchedPage { + base_offset: full_range.start, + data: LanceBuffer::from_bytes(page_data, 1), + }; + let read_ranges = vec![full_range]; + let data = source.fetch(&read_ranges, priority).await?; + Self::create_decoder(details, data, num_rows, bits_per_offset) + } + .boxed(); + let page_load_task = PageLoadTask { + decoder_fut: load_task, + num_rows, + }; + return Ok(vec![page_load_task]); + } + + if let Some(cached_state) = &self.cached_state { + let byte_ranges = Self::extract_byte_ranges_from_cached( + &cached_state.rep_index_buffer, + ranges, + rep_index.bytes_per_value, data_buf_position, - &ranges, - &io_clone, - &rep_index, - cached_state.as_ref(), + ); + let page_load_task = Self::create_page_load_task( + FullZipReadSource::Remote(io.clone()), + byte_ranges, priority, - ) - .await?; - - // Step 2: Load data - let data = io_clone.submit_request(byte_ranges, priority).await?; - let data = data - .into_iter() - .map(|d| LanceBuffer::from_bytes(d, 1)) - .collect::>(); - - // Step 3: Calculate total rows - let num_rows: u64 = ranges.iter().map(|r| r.end - r.start).sum(); + num_rows, + details, + bits_per_offset, + ); + return Ok(vec![page_load_task]); + } - // Step 4: Create decoder + let rep_ranges = Self::compute_rep_index_ranges(ranges, &rep_index); + let rep_data = io.submit_request(rep_ranges, priority); + let io_clone = io.clone(); + let load_task = async move { + let rep_data = rep_data.await?; + let rep_buffer = LanceBuffer::concat( + &rep_data + .into_iter() + .map(|d| LanceBuffer::from_bytes(d, 1)) + .collect::>(), + ); + let byte_ranges = Self::extract_byte_ranges_from_pairs( + rep_buffer, + rep_index.bytes_per_value, + data_buf_position, + ); + let source = FullZipReadSource::Remote(io_clone); + let data = source.fetch(&byte_ranges, priority).await?; Self::create_decoder(details, data, num_rows, bits_per_offset) } .boxed(); @@ -2295,7 +2397,7 @@ impl FullZipScheduler { fn schedule_ranges_simple( &self, ranges: &[Range], - io: &dyn EncodingsIo, + io: &Arc, ) -> Result> { // Convert row ranges to item ranges (i.e. multiply by items per row) let num_rows = ranges.iter().map(|r| r.end - r.start).sum(); @@ -2310,38 +2412,24 @@ impl FullZipScheduler { let bytes_per_value = bits_per_value / 8; let bytes_per_cw = self.details.ctrl_word_parser.bytes_per_word(); let total_bytes_per_value = bytes_per_value + bytes_per_cw as u64; - let byte_ranges = ranges.iter().map(|r| { - debug_assert!(r.end <= self.rows_in_page); - let start = self.data_buf_position + r.start * total_bytes_per_value; - let end = self.data_buf_position + r.end * total_bytes_per_value; - start..end - }); - - // Request byte ranges - let data = io.submit_request(byte_ranges.collect(), self.priority); - - let details = self.details.clone(); + let byte_ranges = ranges + .iter() + .map(|r| { + debug_assert!(r.end <= self.rows_in_page); + let start = self.data_buf_position + r.start * total_bytes_per_value; + let end = self.data_buf_position + r.end * total_bytes_per_value; + start..end + }) + .collect::>(); - let load_task = async move { - let data = data.await?; - let data = data - .into_iter() - .map(|d| LanceBuffer::from_bytes(d, 1)) - .collect(); - Ok(Box::new(FixedFullZipDecoder { - details, - data, - num_rows, - offset_in_current: 0, - bytes_per_value: bytes_per_value as usize, - total_bytes_per_value: total_bytes_per_value as usize, - }) as Box) - } - .boxed(); - let page_load_task = PageLoadTask { - decoder_fut: load_task, + let page_load_task = Self::create_page_load_task( + FullZipReadSource::Remote(io.clone()), + byte_ranges, + self.priority, num_rows, - }; + self.details.clone(), + self.bits_per_offset, + ); Ok(vec![page_load_task]) } } @@ -2366,34 +2454,26 @@ impl CachedPageData for FullZipCacheableState { } impl StructuralPageScheduler for FullZipScheduler { - /// Initializes the scheduler. If there's a repetition index, loads and caches it. - /// Otherwise returns NoCachedPageData. fn initialize<'a>( &'a mut self, io: &Arc, ) -> BoxFuture<'a, Result>> { - // Check if caching is enabled and we have a repetition index if self.enable_cache { - if let Some(rep_index) = self.rep_index.as_ref() { - // Calculate the total size of the repetition index + if let Some(rep_index) = self.rep_index { let total_size = (self.rows_in_page + 1) * rep_index.bytes_per_value; let rep_index_range = rep_index.buf_position..(rep_index.buf_position + total_size); - - // Load the repetition index buffer let io_clone = io.clone(); - let future = async move { + return async move { let rep_index_data = io_clone.submit_request(vec![rep_index_range], 0).await?; - let rep_index_buffer = LanceBuffer::from_bytes(rep_index_data[0].clone(), 1); - - // Create and return the cacheable state - Ok(Arc::new(FullZipCacheableState { rep_index_buffer }) - as Arc) - }; - - return future.boxed(); + let state = Arc::new(FullZipCacheableState { + rep_index_buffer: LanceBuffer::from_bytes(rep_index_data[0].clone(), 1), + }); + self.cached_state = Some(state.clone()); + Ok(state as Arc) + } + .boxed(); } } - // Caching disabled or no repetition index, skip caching std::future::ready(Ok(Arc::new(NoCachedPageData) as Arc)).boxed() } @@ -2420,7 +2500,7 @@ impl StructuralPageScheduler for FullZipScheduler { if let Some(rep_index) = self.rep_index { self.schedule_ranges_rep(ranges, io, rep_index) } else { - self.schedule_ranges_simple(ranges, io.as_ref()) + self.schedule_ranges_simple(ranges, io) } } } @@ -5213,9 +5293,9 @@ impl FieldEncoder for PrimitiveStructuralEncoder { mod tests { use super::{ ChunkInstructions, DataBlock, DecodeMiniBlockTask, FixedPerValueDecompressor, - FixedWidthDataBlock, FullZipCacheableState, FullZipDecodeDetails, FullZipRepIndexDetails, - FullZipScheduler, MiniBlockRepIndex, PerValueDecompressor, PreambleAction, - StructuralPageScheduler, VariableFullZipDecoder, + FixedWidthDataBlock, FullZipCacheableState, FullZipDecodeDetails, FullZipReadSource, + FullZipRepIndexDetails, FullZipScheduler, MiniBlockRepIndex, PerValueDecompressor, + PreambleAction, StructuralPageScheduler, VariableFullZipDecoder, }; use crate::buffer::LanceBuffer; use crate::compression::DefaultDecompressionStrategy; @@ -5963,10 +6043,45 @@ mod tests { } #[tokio::test] - async fn test_fullzip_repetition_index_caching() { - use crate::testing::SimulatedScheduler; + async fn test_fullzip_initialize_is_lazy() { + use futures::{future::BoxFuture, FutureExt}; + use std::ops::Range; + use std::sync::Mutex; + + #[derive(Debug, Clone)] + struct RecordingScheduler { + data: bytes::Bytes, + requests: Arc>>>>, + } + + impl RecordingScheduler { + fn new(data: bytes::Bytes) -> Self { + Self { + data, + requests: Arc::new(Mutex::new(Vec::new())), + } + } + + fn requests(&self) -> Vec>> { + self.requests.lock().unwrap().clone() + } + } + + impl crate::EncodingsIo for RecordingScheduler { + fn submit_request( + &self, + ranges: Vec>, + _priority: u64, + ) -> BoxFuture<'static, crate::Result>> { + self.requests.lock().unwrap().push(ranges.clone()); + let data = ranges + .into_iter() + .map(|range| self.data.slice(range.start as usize..range.end as usize)) + .collect::>(); + std::future::ready(Ok(data)).boxed() + } + } - // Simplified FixedPerValueDecompressor for testing #[derive(Debug)] struct TestFixedDecompressor; @@ -5984,36 +6099,19 @@ mod tests { } } - // Create test repetition index data - let rows_in_page = 100u64; - let bytes_per_value = 4u64; - let _rep_index_size = (rows_in_page + 1) * bytes_per_value; - - // Create mock repetition index data - let mut rep_index_data = Vec::new(); - for i in 0..=rows_in_page { - let offset = (i * 100) as u32; // Each row starts at i * 100 bytes - rep_index_data.extend_from_slice(&offset.to_le_bytes()); - } - - // Simulate storage with the repetition index at position 1000 - let mut full_data = vec![0u8; 1000]; - full_data.extend_from_slice(&rep_index_data); - full_data.extend_from_slice(&vec![0u8; 10000]); // Add some data after - - let data = bytes::Bytes::from(full_data); - let io = Arc::new(SimulatedScheduler::new(data)); - let _cache = Arc::new(lance_core::cache::LanceCache::with_capacity(1024 * 1024)); - - // Create FullZipScheduler with repetition index + let io = Arc::new(RecordingScheduler::new(bytes::Bytes::from(vec![ + 0; + 16 * 1024 + ]))); let mut scheduler = FullZipScheduler { data_buf_position: 0, + data_buf_size: 4096, rep_index: Some(FullZipRepIndexDetails { buf_position: 1000, - bytes_per_value, + bytes_per_value: 4, }), priority: 0, - rows_in_page, + rows_in_page: 100, bits_per_offset: 32, details: Arc::new(FullZipDecodeDetails { value_decompressor: PerValueDecompressor::Fixed(Arc::new(TestFixedDecompressor)), @@ -6023,88 +6121,83 @@ mod tests { max_visible_def: 0, }), cached_state: None, - enable_cache: true, // Enable caching for test + enable_cache: false, }; - // First initialization should load and cache the repetition index let io_dyn: Arc = io.clone(); - let cached_data1 = scheduler.initialize(&io_dyn).await.unwrap(); + let cached_data = scheduler.initialize(&io_dyn).await.unwrap(); - // Verify that we got a FullZipCacheableState (not NoCachedPageData) - let is_cached = cached_data1 - .clone() - .as_arc_any() - .downcast::() - .is_ok(); assert!( - is_cached, - "Expected FullZipCacheableState, got NoCachedPageData" - ); - - // Load the cached data into the scheduler - scheduler.load(&cached_data1); - - // Verify that cached_state is now populated - assert!( - scheduler.cached_state.is_some(), - "cached_state should be populated after load" - ); - - // Verify the cached data contains the repetition index - let cached_state = scheduler.cached_state.as_ref().unwrap(); - - // Test that schedule_ranges_rep uses the cached data - let ranges = vec![0..10, 20..30]; - let result = scheduler.schedule_ranges_rep( - &ranges, - &io_dyn, - FullZipRepIndexDetails { - buf_position: 1000, - bytes_per_value, - }, + cached_data + .as_arc_any() + .downcast_ref::() + .is_some(), + "FullZip initialize should not eagerly load repetition index data" ); - - // The result should be OK (not an error) + assert!(scheduler.cached_state.is_none()); assert!( - result.is_ok(), - "schedule_ranges_rep should succeed with cached data" + io.requests().is_empty(), + "FullZip initialize should not issue any I/O" ); + } - // Second scheduler instance should be able to use the cached data - let mut scheduler2 = FullZipScheduler { - data_buf_position: 0, - rep_index: Some(FullZipRepIndexDetails { - buf_position: 1000, - bytes_per_value, - }), - priority: 0, - rows_in_page, - bits_per_offset: 32, - details: scheduler.details.clone(), - cached_state: None, - enable_cache: true, // Enable caching for test + #[tokio::test] + async fn test_fullzip_read_source_slices_prefetched_page() { + let page_start = 200_u64; + let page_data = LanceBuffer::copy_slice(&[0, 1, 2, 3, 4, 5, 6, 7]); + let source = FullZipReadSource::PrefetchedPage { + base_offset: page_start, + data: page_data, }; - - // Load cached data from the first scheduler - scheduler2.load(&cached_data1); - assert!( - scheduler2.cached_state.is_some(), - "Second scheduler should have cached_state after load" - ); - - // Verify that both schedulers have the same cached data - let cached_state2 = scheduler2.cached_state.as_ref().unwrap(); - assert!( - Arc::ptr_eq(cached_state, cached_state2), - "Both schedulers should share the same cached data" - ); + let ranges = vec![ + page_start..(page_start + 3), + (page_start + 4)..(page_start + 8), + ]; + let mut data = source.fetch(&ranges, 0).await.unwrap(); + assert_eq!(data.pop_front().unwrap().as_ref(), &[0, 1, 2]); + assert_eq!(data.pop_front().unwrap().as_ref(), &[4, 5, 6, 7]); } #[tokio::test] - async fn test_fullzip_cache_config_controls_caching() { - use crate::testing::SimulatedScheduler; + async fn test_fullzip_initialize_caches_rep_index_when_enabled() { + use futures::{future::BoxFuture, FutureExt}; + use std::ops::Range; + use std::sync::Mutex; + + #[derive(Debug, Clone)] + struct RecordingScheduler { + data: bytes::Bytes, + requests: Arc>>>>, + } + + impl RecordingScheduler { + fn new(data: bytes::Bytes) -> Self { + Self { + data, + requests: Arc::new(Mutex::new(Vec::new())), + } + } + + fn requests(&self) -> Vec>> { + self.requests.lock().unwrap().clone() + } + } + + impl crate::EncodingsIo for RecordingScheduler { + fn submit_request( + &self, + ranges: Vec>, + _priority: u64, + ) -> BoxFuture<'static, crate::Result>> { + self.requests.lock().unwrap().push(ranges.clone()); + let data = ranges + .into_iter() + .map(|range| self.data.slice(range.start as usize..range.end as usize)) + .collect::>(); + std::future::ready(Ok(data)).boxed() + } + } - // Simplified FixedPerValueDecompressor for testing #[derive(Debug)] struct TestFixedDecompressor; @@ -6122,25 +6215,19 @@ mod tests { } } - // Test that enable_cache flag actually controls caching behavior - let rows_in_page = 1000_u64; + let rows_in_page = 100_u64; let bytes_per_value = 4_u64; + let rep_start = 1000_u64; + let rep_size = ((rows_in_page + 1) * bytes_per_value) as usize; + let mut data = vec![0_u8; 16 * 1024]; + data[rep_start as usize..rep_start as usize + rep_size].fill(7); + let io = Arc::new(RecordingScheduler::new(bytes::Bytes::from(data))); - // Create simulated data - let rep_index_data = vec![0u8; ((rows_in_page + 1) * bytes_per_value) as usize]; - let value_data = vec![0u8; 4000]; // Dummy value data - let mut full_data = vec![0u8; 1000]; // Padding before rep index - full_data.extend_from_slice(&rep_index_data); - full_data.extend_from_slice(&value_data); - - let data = bytes::Bytes::from(full_data); - let io = Arc::new(SimulatedScheduler::new(data)); - - // Test 1: With caching disabled - let mut scheduler_no_cache = FullZipScheduler { + let mut scheduler = FullZipScheduler { data_buf_position: 0, + data_buf_size: 4096, rep_index: Some(FullZipRepIndexDetails { - buf_position: 1000, + buf_position: rep_start, bytes_per_value, }), priority: 0, @@ -6154,26 +6241,100 @@ mod tests { max_visible_def: 0, }), cached_state: None, - enable_cache: false, // Caching disabled + enable_cache: true, }; let io_dyn: Arc = io.clone(); - let cached_data = scheduler_no_cache.initialize(&io_dyn).await.unwrap(); - - // Should return NoCachedPageData when caching is disabled - assert!( - cached_data - .as_arc_any() - .downcast_ref::() - .is_some(), - "With enable_cache=false, should return NoCachedPageData" + let cached_data = scheduler.initialize(&io_dyn).await.unwrap(); + assert!(cached_data + .as_arc_any() + .downcast_ref::() + .is_some()); + assert!(scheduler.cached_state.is_some()); + assert_eq!( + io.requests(), + vec![vec![ + rep_start..(rep_start + (rows_in_page + 1) * bytes_per_value) + ]] ); + } - // Test 2: With caching enabled - let mut scheduler_with_cache = FullZipScheduler { - data_buf_position: 0, + #[tokio::test] + async fn test_fullzip_full_page_bypasses_rep_index_io() { + use futures::{future::BoxFuture, FutureExt}; + use std::ops::Range; + use std::sync::Mutex; + + #[derive(Debug, Clone)] + struct RecordingScheduler { + data: bytes::Bytes, + requests: Arc>>>>, + } + + impl RecordingScheduler { + fn new(data: bytes::Bytes) -> Self { + Self { + data, + requests: Arc::new(Mutex::new(Vec::new())), + } + } + + fn requests(&self) -> Vec>> { + self.requests.lock().unwrap().clone() + } + } + + impl crate::EncodingsIo for RecordingScheduler { + fn submit_request( + &self, + ranges: Vec>, + _priority: u64, + ) -> BoxFuture<'static, crate::Result>> { + self.requests.lock().unwrap().push(ranges.clone()); + let data = ranges + .into_iter() + .map(|range| self.data.slice(range.start as usize..range.end as usize)) + .collect::>(); + std::future::ready(Ok(data)).boxed() + } + } + + #[derive(Debug)] + struct TestFixedDecompressor; + + impl FixedPerValueDecompressor for TestFixedDecompressor { + fn decompress( + &self, + _data: FixedWidthDataBlock, + _num_rows: u64, + ) -> crate::Result { + unimplemented!("Test decompressor") + } + + fn bits_per_value(&self) -> u64 { + 32 + } + } + + let rows_in_page = 100_u64; + let data_start = 256_u64; + let data_size = 500_u64; + let rep_start = 4096_u64; + let bytes_per_value = 4_u64; + + let mut bytes = vec![0_u8; 16 * 1024]; + for i in 0..=rows_in_page { + let offset = (i * 5) as u32; + let pos = rep_start as usize + (i * bytes_per_value) as usize; + bytes[pos..pos + 4].copy_from_slice(&offset.to_le_bytes()); + } + let io = Arc::new(RecordingScheduler::new(bytes::Bytes::from(bytes))); + + let scheduler = FullZipScheduler { + data_buf_position: data_start, + data_buf_size: data_size, rep_index: Some(FullZipRepIndexDetails { - buf_position: 1000, + buf_position: rep_start, bytes_per_value, }), priority: 0, @@ -6187,18 +6348,31 @@ mod tests { max_visible_def: 0, }), cached_state: None, - enable_cache: true, // Caching enabled + enable_cache: false, }; - let cached_data2 = scheduler_with_cache.initialize(&io_dyn).await.unwrap(); + let io_dyn: Arc = io.clone(); + let tasks = scheduler + .schedule_ranges_rep( + &[0..rows_in_page], + &io_dyn, + FullZipRepIndexDetails { + buf_position: rep_start, + bytes_per_value, + }, + ) + .unwrap(); + + let requests = io.requests(); + assert_eq!(requests.len(), 1); + assert_eq!(requests[0], vec![data_start..(data_start + data_size)]); - // Should return FullZipCacheableState when caching is enabled - assert!( - cached_data2 - .as_arc_any() - .downcast_ref::() - .is_some(), - "With enable_cache=true, should return FullZipCacheableState" + let _ = tasks.into_iter().next().unwrap().decoder_fut.await.unwrap(); + let requests_after_await = io.requests(); + assert_eq!( + requests_after_await.len(), + 1, + "full page path should not issue rep-index I/O" ); }