Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions rust/lance-encoding/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -704,7 +704,6 @@ impl CoreFieldDecoderStrategy {
let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
column_info.as_ref(),
self.decompressor_strategy.as_ref(),
self.cache_repetition_index,
field,
)?);

Expand All @@ -721,7 +720,6 @@ impl CoreFieldDecoderStrategy {
let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
column_info.as_ref(),
self.decompressor_strategy.as_ref(),
self.cache_repetition_index,
field,
)?);

Expand All @@ -745,7 +743,6 @@ impl CoreFieldDecoderStrategy {
let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
column_info.as_ref(),
self.decompressor_strategy.as_ref(),
self.cache_repetition_index,
field,
)?);
column_infos.next_top_level();
Expand Down
186 changes: 67 additions & 119 deletions rust/lance-encoding/src/encodings/logical/primitive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1920,8 +1920,6 @@ pub struct FullZipScheduler {
details: Arc<FullZipDecodeDetails>,
/// Cached state containing the decoded repetition index
cached_state: Option<Arc<FullZipCacheableState>>,
/// Whether to enable caching of repetition indices
enable_cache: bool,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why get rid of this? It might still be useful for users that want 1 IOP random access on relatively small amounts of data?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to always enable cache before and now we have a shortcut, we can enable the flag back.,

}

impl FullZipScheduler {
Expand Down Expand Up @@ -2009,7 +2007,6 @@ impl FullZipScheduler {
rows_in_page,
bits_per_offset,
cached_state: None,
enable_cache: false, // Default to false, will be set later
})
}

Expand Down Expand Up @@ -2126,81 +2123,63 @@ impl FullZipScheduler {
.collect()
}

/// Resolves byte ranges from repetition index (either from cache or disk)
async fn resolve_byte_ranges(
data_buf_position: u64,
/// Schedules ranges in the presence of a repetition index
fn schedule_ranges_rep(
&self,
ranges: &[Range<u64>],
io: &Arc<dyn EncodingsIo>,
rep_index: &FullZipRepIndexDetails,
cached_state: Option<&Arc<FullZipCacheableState>>,
priority: u64,
) -> Result<Vec<Range<u64>>> {
if let Some(cached_state) = cached_state {
// Use cached repetition index
Ok(Self::extract_byte_ranges_from_cached(
rep_index: FullZipRepIndexDetails,
) -> Result<Vec<PageLoadTask>> {
let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
let data_buf_position = self.data_buf_position;
let priority = self.priority;
let details = self.details.clone();
let bits_per_offset = self.bits_per_offset;
if let Some(cached_state) = &self.cached_state {
let byte_ranges = Self::extract_byte_ranges_from_cached(
&cached_state.rep_index_buffer,
ranges,
rep_index.bytes_per_value,
data_buf_position,
))
} else {
// Load from disk
let rep_ranges = Self::compute_rep_index_ranges(ranges, rep_index);
let rep_data = io.submit_request(rep_ranges, priority).await?;
);
let data = io.submit_request(byte_ranges, priority);
let load_task = async move {
let data = data.await?;
let data = data
.into_iter()
.map(|d| LanceBuffer::from_bytes(d, 1))
.collect::<VecDeque<_>>();
Self::create_decoder(details, data, num_rows, bits_per_offset)
}
.boxed();
let page_load_task = PageLoadTask {
decoder_fut: load_task,
num_rows,
};
return Ok(vec![page_load_task]);
}

let rep_ranges = Self::compute_rep_index_ranges(ranges, &rep_index);
let rep_data = io.submit_request(rep_ranges, priority);
let io_clone = io.clone();
let load_task = async move {
let rep_data = rep_data.await?;
let rep_buffer = LanceBuffer::concat(
&rep_data
.into_iter()
.map(|d| LanceBuffer::from_bytes(d, 1))
.collect::<Vec<_>>(),
);
Ok(Self::extract_byte_ranges_from_pairs(
let byte_ranges = Self::extract_byte_ranges_from_pairs(
rep_buffer,
rep_index.bytes_per_value,
data_buf_position,
))
}
}

/// Schedules ranges in the presence of a repetition index
fn schedule_ranges_rep(
&self,
ranges: &[Range<u64>],
io: &Arc<dyn EncodingsIo>,
rep_index: FullZipRepIndexDetails,
) -> Result<Vec<PageLoadTask>> {
// Copy necessary fields to avoid lifetime issues
let data_buf_position = self.data_buf_position;
let cached_state = self.cached_state.clone();
let priority = self.priority;
let details = self.details.clone();
let bits_per_offset = self.bits_per_offset;
let ranges = ranges.to_vec();
let io_clone = io.clone();
let num_rows = ranges.iter().map(|r| r.end - r.start).sum();

let load_task = async move {
// Step 1: Resolve byte ranges from repetition index
let byte_ranges = Self::resolve_byte_ranges(
data_buf_position,
&ranges,
&io_clone,
&rep_index,
cached_state.as_ref(),
priority,
)
.await?;

// Step 2: Load data
);
let data = io_clone.submit_request(byte_ranges, priority).await?;
let data = data
.into_iter()
.map(|d| LanceBuffer::from_bytes(d, 1))
.collect::<VecDeque<_>>();

// Step 3: Calculate total rows
let num_rows: u64 = ranges.iter().map(|r| r.end - r.start).sum();

// Step 4: Create decoder
Self::create_decoder(details, data, num_rows, bits_per_offset)
}
.boxed();
Expand Down Expand Up @@ -2288,32 +2267,26 @@ impl CachedPageData for FullZipCacheableState {
}

impl StructuralPageScheduler for FullZipScheduler {
/// Initializes the scheduler. If there's a repetition index, loads and caches it.
/// Initializes the scheduler. If there's a repetition index, load and cache it.
/// Otherwise returns NoCachedPageData.
fn initialize<'a>(
&'a mut self,
io: &Arc<dyn EncodingsIo>,
) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
// Check if caching is enabled and we have a repetition index
if self.enable_cache && self.rep_index.is_some() {
let rep_index = self.rep_index.as_ref().unwrap();
// Calculate the total size of the repetition index
if let Some(rep_index) = self.rep_index {
let total_size = (self.rows_in_page + 1) * rep_index.bytes_per_value;
let rep_index_range = rep_index.buf_position..(rep_index.buf_position + total_size);

// Load the repetition index buffer
let io_clone = io.clone();
let future = async move {
async move {
let rep_index_data = io_clone.submit_request(vec![rep_index_range], 0).await?;
let rep_index_buffer = LanceBuffer::from_bytes(rep_index_data[0].clone(), 1);

// Create and return the cacheable state
Ok(Arc::new(FullZipCacheableState { rep_index_buffer }) as Arc<dyn CachedPageData>)
};

future.boxed()
let state = Arc::new(FullZipCacheableState {
rep_index_buffer: LanceBuffer::from_bytes(rep_index_data[0].clone(), 1),
});
self.cached_state = Some(state.clone());
Ok(state as Arc<dyn CachedPageData>)
}
.boxed()
} else {
// Caching disabled or no repetition index, skip caching
std::future::ready(Ok(Arc::new(NoCachedPageData) as Arc<dyn CachedPageData>)).boxed()
}
}
Expand Down Expand Up @@ -3022,21 +2995,14 @@ impl StructuralPrimitiveFieldScheduler {
pub fn try_new(
column_info: &ColumnInfo,
decompressors: &dyn DecompressionStrategy,
cache_repetition_index: bool,
target_field: &Field,
) -> Result<Self> {
let page_schedulers = column_info
.page_infos
.iter()
.enumerate()
.map(|(page_index, page_info)| {
Self::page_info_to_scheduler(
page_info,
page_index,
decompressors,
cache_repetition_index,
target_field,
)
Self::page_info_to_scheduler(page_info, page_index, decompressors, target_field)
})
.collect::<Result<Vec<_>>>()?;
Ok(Self {
Expand All @@ -3049,7 +3015,6 @@ impl StructuralPrimitiveFieldScheduler {
page_info: &PageInfo,
page_layout: &PageLayout,
decompressors: &dyn DecompressionStrategy,
cache_repetition_index: bool,
target_field: &Field,
) -> Result<Box<dyn StructuralPageScheduler>> {
use pb21::page_layout::Layout;
Expand All @@ -3061,17 +3026,13 @@ impl StructuralPrimitiveFieldScheduler {
mini_block,
decompressors,
)?),
Layout::FullZipLayout(full_zip) => {
let mut scheduler = FullZipScheduler::try_new(
&page_info.buffer_offsets_and_sizes,
page_info.priority,
page_info.num_rows,
full_zip,
decompressors,
)?;
scheduler.enable_cache = cache_repetition_index;
Box::new(scheduler)
}
Layout::FullZipLayout(full_zip) => Box::new(FullZipScheduler::try_new(
&page_info.buffer_offsets_and_sizes,
page_info.priority,
page_info.num_rows,
full_zip,
decompressors,
)?),
Layout::ConstantLayout(constant_layout) => {
let def_meaning = constant_layout
.layers
Expand Down Expand Up @@ -3104,7 +3065,6 @@ impl StructuralPrimitiveFieldScheduler {
page_info,
blob.inner_layout.as_ref().expect_ok()?.as_ref(),
decompressors,
cache_repetition_index,
target_field,
)?;
let def_meaning = blob
Expand Down Expand Up @@ -3135,17 +3095,11 @@ impl StructuralPrimitiveFieldScheduler {
page_info: &PageInfo,
page_index: usize,
decompressors: &dyn DecompressionStrategy,
cache_repetition_index: bool,
target_field: &Field,
) -> Result<PageInfoAndScheduler> {
let page_layout = page_info.encoding.as_structural();
let scheduler = Self::page_layout_to_scheduler(
page_info,
page_layout,
decompressors,
cache_repetition_index,
target_field,
)?;
let scheduler =
Self::page_layout_to_scheduler(page_info, page_layout, decompressors, target_field)?;
Ok(PageInfoAndScheduler {
page_index,
num_rows: page_info.num_rows,
Expand Down Expand Up @@ -5669,7 +5623,6 @@ mod tests {
max_visible_def: 0,
}),
cached_state: None,
enable_cache: true, // Enable caching for test
};

// First initialization should load and cache the repetition index
Expand Down Expand Up @@ -5728,7 +5681,6 @@ mod tests {
bits_per_offset: 32,
details: scheduler.details.clone(),
cached_state: None,
enable_cache: true, // Enable caching for test
};

// Load cached data from the first scheduler
Expand Down Expand Up @@ -5768,7 +5720,7 @@ mod tests {
}
}

// Test that enable_cache flag actually controls caching behavior
// Repetition index caching is unconditional when the index exists.
let rows_in_page = 1000_u64;
let bytes_per_value = 4_u64;

Expand All @@ -5782,8 +5734,7 @@ mod tests {
let data = bytes::Bytes::from(full_data);
let io = Arc::new(SimulatedScheduler::new(data));

// Test 1: With caching disabled
let mut scheduler_no_cache = FullZipScheduler {
let mut scheduler_a = FullZipScheduler {
data_buf_position: 0,
rep_index: Some(FullZipRepIndexDetails {
buf_position: 1000,
Expand All @@ -5800,23 +5751,21 @@ mod tests {
max_visible_def: 0,
}),
cached_state: None,
enable_cache: false, // Caching disabled
};

let io_dyn: Arc<dyn crate::EncodingsIo> = io.clone();
let cached_data = scheduler_no_cache.initialize(&io_dyn).await.unwrap();
let cached_data = scheduler_a.initialize(&io_dyn).await.unwrap();

// Should return NoCachedPageData when caching is disabled
assert!(
cached_data
.as_arc_any()
.downcast_ref::<super::NoCachedPageData>()
.downcast_ref::<super::FullZipCacheableState>()
.is_some(),
"With enable_cache=false, should return NoCachedPageData"
"With repetition index present, initialize should return FullZipCacheableState"
);
assert!(scheduler_a.cached_state.is_some());

// Test 2: With caching enabled
let mut scheduler_with_cache = FullZipScheduler {
let mut scheduler_b = FullZipScheduler {
data_buf_position: 0,
rep_index: Some(FullZipRepIndexDetails {
buf_position: 1000,
Expand All @@ -5833,19 +5782,18 @@ mod tests {
max_visible_def: 0,
}),
cached_state: None,
enable_cache: true, // Caching enabled
};

let cached_data2 = scheduler_with_cache.initialize(&io_dyn).await.unwrap();
let cached_data2 = scheduler_b.initialize(&io_dyn).await.unwrap();

// Should return FullZipCacheableState when caching is enabled
assert!(
cached_data2
.as_arc_any()
.downcast_ref::<super::FullZipCacheableState>()
.is_some(),
"With enable_cache=true, should return FullZipCacheableState"
"With repetition index present, initialize should always return FullZipCacheableState"
);
assert!(scheduler_b.cached_state.is_some());
}

/// This test is used to reproduce fuzz test https://github.com/lancedb/lance/issues/4492
Expand Down
Loading