diff --git a/rust/lance-encoding/src/compression.rs b/rust/lance-encoding/src/compression.rs index 0e651dc27ed..24c2c71d654 100644 --- a/rust/lance-encoding/src/compression.rs +++ b/rust/lance-encoding/src/compression.rs @@ -51,7 +51,7 @@ use crate::{ PackedStructVariablePerValueEncoder, VariablePackedStructFieldDecoder, VariablePackedStructFieldKind, }, - rle::{RleMiniBlockDecompressor, RleMiniBlockEncoder}, + rle::{RleDecompressor, RleEncoder}, value::{ValueDecompressor, ValueEncoder}, }, }, @@ -194,7 +194,7 @@ fn try_rle_for_mini_block( // multiple entries of up to 255 values. We don't know the run length distribution here, // so we conservatively account for splitting with an upper bound. let num_values = data.num_values; - let estimated_pairs = (run_count + (num_values / 255)).min(num_values); + let estimated_pairs = (run_count.saturating_add(num_values / 255)).min(num_values); let raw_bytes = (num_values as u128) * (type_size as u128); let rle_bytes = (estimated_pairs as u128) * ((type_size + 1) as u128); @@ -208,7 +208,37 @@ fn try_rle_for_mini_block( } } } - return Some(Box::new(RleMiniBlockEncoder::new())); + return Some(Box::new(RleEncoder::new())); + } + None +} + +fn try_rle_for_block( + data: &FixedWidthDataBlock, + version: LanceFileVersion, + params: &CompressionFieldParams, +) -> Option<(Box, CompressiveEncoding)> { + if version < LanceFileVersion::V2_2 { + return None; + } + + let bits = data.bits_per_value; + if !matches!(bits, 8 | 16 | 32 | 64) { + return None; + } + + let run_count = data.expect_single_stat::(Stat::RunCount); + let threshold = params + .rle_threshold + .unwrap_or(DEFAULT_RLE_COMPRESSION_THRESHOLD); + + if (run_count as f64) < (data.num_values as f64) * threshold { + let compressor = Box::new(RleEncoder::new()); + let encoding = ProtobufUtils21::rle( + ProtobufUtils21::flat(bits, None), + ProtobufUtils21::flat(/*bits_per_value=*/ 8, None), + ); + return Some((compressor, encoding)); } None } @@ -304,10 +334,7 @@ fn maybe_wrap_general_for_mini_block( None | Some("none") | Some("fsst") => Ok(inner), Some(raw) => { let scheme = CompressionScheme::from_str(raw).map_err(|_| { - lance_core::Error::invalid_input( - format!("Unknown compression scheme: {raw}"), - location!(), - ) + Error::invalid_input(format!("Unknown compression scheme: {raw}"), location!()) })?; let cfg = CompressionConfig::new(scheme, params.compression_level); Ok(Box::new(GeneralMiniBlockCompressor::new(inner, cfg))) @@ -644,6 +671,11 @@ impl CompressionStrategy for DefaultCompressionStrategy { match data { DataBlock::FixedWidth(fixed_width) => { + if let Some((compressor, encoding)) = + try_rle_for_block(fixed_width, self.version, &field_params) + { + return Ok((compressor, encoding)); + } if let Some((compressor, encoding)) = try_bitpack_for_block(fixed_width) { return Ok((compressor, encoding)); } @@ -793,28 +825,8 @@ impl DecompressionStrategy for DefaultDecompressionStrategy { Ok(Box::new(ValueDecompressor::from_fsl(fsl))) } Compression::Rle(rle) => { - let Compression::Flat(values) = - rle.values.as_ref().unwrap().compression.as_ref().unwrap() - else { - panic!("RLE compression only supports flat values") - }; - let Compression::Flat(run_lengths) = rle - .run_lengths - .as_ref() - .unwrap() - .compression - .as_ref() - .unwrap() - else { - panic!("RLE compression only supports flat run lengths") - }; - assert_eq!( - run_lengths.bits_per_value, 8, - "RLE compression only supports 8-bit run lengths" - ); - Ok(Box::new(RleMiniBlockDecompressor::new( - values.bits_per_value, - ))) + let bits_per_value = validate_rle_compression(rle)?; + Ok(Box::new(RleDecompressor::new(bits_per_value))) } Compression::ByteStreamSplit(bss) => { let Compression::Flat(values) = @@ -842,10 +854,7 @@ impl DecompressionStrategy for DefaultDecompressionStrategy { let scheme = compression.scheme().try_into()?; - let compression_config = crate::encodings::physical::block::CompressionConfig::new( - scheme, - compression.level, - ); + let compression_config = CompressionConfig::new(scheme, compression.level); Ok(Box::new(GeneralMiniBlockDecompressor::new( inner_decompressor, @@ -1020,16 +1029,65 @@ impl DecompressionStrategy for DefaultDecompressionStrategy { Ok(Box::new(general_decompressor)) } + Compression::Rle(rle) => { + let bits_per_value = validate_rle_compression(rle)?; + Ok(Box::new(RleDecompressor::new(bits_per_value))) + } _ => todo!(), } } } +/// Validates RLE compression format and extracts bits_per_value +fn validate_rle_compression(rle: &crate::format::pb21::Rle) -> Result { + let values = rle.values.as_ref().ok_or_else(|| { + Error::invalid_input("RLE compression missing values encoding", location!()) + })?; + let run_lengths = rle.run_lengths.as_ref().ok_or_else(|| { + Error::invalid_input("RLE compression missing run lengths encoding", location!()) + })?; + + let values = values.compression.as_ref().ok_or_else(|| { + Error::invalid_input("RLE compression missing values compression", location!()) + })?; + let Compression::Flat(values) = values else { + return Err(Error::invalid_input( + "RLE compression only supports flat values", + location!(), + )); + }; + + let run_lengths = run_lengths.compression.as_ref().ok_or_else(|| { + Error::invalid_input( + "RLE compression missing run lengths compression", + location!(), + ) + })?; + let Compression::Flat(run_lengths) = run_lengths else { + return Err(Error::invalid_input( + "RLE compression only supports flat run lengths", + location!(), + )); + }; + + if run_lengths.bits_per_value != 8 { + return Err(Error::invalid_input( + format!( + "RLE compression only supports 8-bit run lengths, got {}", + run_lengths.bits_per_value + ), + location!(), + )); + } + + Ok(values.bits_per_value) +} #[cfg(test)] mod tests { use super::*; use crate::buffer::LanceBuffer; use crate::data::{BlockInfo, DataBlock, FixedWidthDataBlock}; + use crate::statistics::ComputeStat; use crate::testing::extract_array_encoding_chain; use arrow_schema::{DataType, Field as ArrowField}; use std::collections::HashMap; @@ -1200,7 +1258,7 @@ mod tests { // The compressor should be RLE wrapped in general compression assert!(debug_str.contains("GeneralMiniBlockCompressor")); - assert!(debug_str.contains("RleMiniBlockEncoder")); + assert!(debug_str.contains("RleEncoder")); } #[test] @@ -1226,7 +1284,7 @@ mod tests { let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap(); // Should use RLE due to very low threshold - assert!(format!("{:?}", compressor).contains("RleMiniBlockEncoder")); + assert!(format!("{:?}", compressor).contains("RleEncoder")); } #[test] @@ -1265,7 +1323,7 @@ mod tests { "expected InlineBitpacking, got: {debug_str}" ); assert!( - !debug_str.contains("RleMiniBlockEncoder"), + !debug_str.contains("RleEncoder"), "expected RLE to be skipped when bitpacking is smaller, got: {debug_str}" ); } @@ -1509,7 +1567,7 @@ mod tests { // Should use RLE because run_count (100) < num_values * threshold (800) let debug_str = format!("{:?}", compressor); - assert!(debug_str.contains("RleMiniBlockEncoder")); + assert!(debug_str.contains("RleEncoder")); } #[test] @@ -1669,4 +1727,77 @@ mod tests { _ => panic!("expected fixed width block"), } } + + #[test] + fn test_rle_block_used_for_version_v2_2() { + let field = create_test_field("test_repdef", DataType::UInt16); + + // Create highly repetitive data + let num_values = 1000u64; + let mut data = Vec::with_capacity(num_values as usize); + for i in 0..10 { + for _ in 0..100 { + data.push(i as u16); + } + } + + let mut block = FixedWidthDataBlock { + bits_per_value: 16, + data: LanceBuffer::reinterpret_vec(data), + num_values, + block_info: BlockInfo::default(), + }; + + block.compute_stat(); + + let data_block = DataBlock::FixedWidth(block); + + let strategy = DefaultCompressionStrategy::with_params(CompressionParams::new()) + .with_version(LanceFileVersion::V2_2); + + let (compressor, _) = strategy + .create_block_compressor(&field, &data_block) + .unwrap(); + + let debug_str = format!("{:?}", compressor); + assert!(debug_str.contains("RleEncoder")); + } + + #[test] + fn test_rle_block_not_used_for_version_v2_1() { + let field = create_test_field("test_repdef", DataType::UInt16); + + // Create highly repetitive data + let num_values = 1000u64; + let mut data = Vec::with_capacity(num_values as usize); + for i in 0..10 { + for _ in 0..100 { + data.push(i as u16); + } + } + + let mut block = FixedWidthDataBlock { + bits_per_value: 16, + data: LanceBuffer::reinterpret_vec(data), + num_values, + block_info: BlockInfo::default(), + }; + + block.compute_stat(); + + let data_block = DataBlock::FixedWidth(block); + + let strategy = DefaultCompressionStrategy::with_params(CompressionParams::new()) + .with_version(LanceFileVersion::V2_1); + + let (compressor, _) = strategy + .create_block_compressor(&field, &data_block) + .unwrap(); + + let debug_str = format!("{:?}", compressor); + assert!( + !debug_str.contains("RleEncoder"), + "RLE should not be used for V2.1" + ); + } } diff --git a/rust/lance-encoding/src/encodings/physical/general.rs b/rust/lance-encoding/src/encodings/physical/general.rs index e824c8aacfd..faa00fdb541 100644 --- a/rust/lance-encoding/src/encodings/physical/general.rs +++ b/rust/lance-encoding/src/encodings/physical/general.rs @@ -140,7 +140,7 @@ mod tests { use crate::compression::{DecompressionStrategy, DefaultDecompressionStrategy}; use crate::data::{BlockInfo, FixedWidthDataBlock}; use crate::encodings::physical::block::CompressionScheme; - use crate::encodings::physical::rle::RleMiniBlockEncoder; + use crate::encodings::physical::rle::RleEncoder; use crate::encodings::physical::value::ValueEncoder; use crate::format::pb21; use crate::format::pb21::compressive_encoding::Compression; @@ -161,7 +161,7 @@ mod tests { // Small data with RLE - should not compress due to size threshold TestCase { name: "small_rle_data", - inner_encoder: Box::new(RleMiniBlockEncoder), + inner_encoder: Box::new(RleEncoder), compression: CompressionConfig { scheme: CompressionScheme::Lz4, level: None, @@ -173,7 +173,7 @@ mod tests { // Large repeated data with RLE + LZ4 TestCase { name: "large_rle_lz4", - inner_encoder: Box::new(RleMiniBlockEncoder), + inner_encoder: Box::new(RleEncoder), compression: CompressionConfig { scheme: CompressionScheme::Lz4, level: None, @@ -185,7 +185,7 @@ mod tests { // Large repeated data with RLE + Zstd TestCase { name: "large_rle_zstd", - inner_encoder: Box::new(RleMiniBlockEncoder), + inner_encoder: Box::new(RleEncoder), compression: CompressionConfig { scheme: CompressionScheme::Zstd, level: Some(3), @@ -403,7 +403,7 @@ mod tests { // Test that small buffers don't get compressed let small_test = TestCase { name: "small_buffer_no_compression", - inner_encoder: Box::new(RleMiniBlockEncoder), + inner_encoder: Box::new(RleEncoder), compression: CompressionConfig { scheme: CompressionScheme::Lz4, level: None, @@ -496,7 +496,7 @@ mod tests { // RLE produces 2 buffers (values and lengths), test that both are handled correctly let data = create_repeated_i32_block(vec![1; 100]); let compressor = GeneralMiniBlockCompressor::new( - Box::new(RleMiniBlockEncoder), + Box::new(RleEncoder), CompressionConfig { scheme: CompressionScheme::Lz4, level: None, @@ -519,7 +519,7 @@ mod tests { // Test case 1: 32-bit RLE data let test_32 = TestCase { name: "rle_32bit_with_general_wrapper", - inner_encoder: Box::new(RleMiniBlockEncoder), + inner_encoder: Box::new(RleEncoder), compression: CompressionConfig { scheme: CompressionScheme::Lz4, level: None, @@ -532,7 +532,7 @@ mod tests { // For 32-bit RLE, the compression strategy should automatically wrap it // Let's directly test the compressor let compressor = GeneralMiniBlockCompressor::new( - Box::new(RleMiniBlockEncoder), + Box::new(RleEncoder), CompressionConfig { scheme: CompressionScheme::Lz4, level: None, @@ -589,7 +589,7 @@ mod tests { let block_64 = DataBlock::from_array(array_64); let compressor_64 = GeneralMiniBlockCompressor::new( - Box::new(RleMiniBlockEncoder), + Box::new(RleEncoder), CompressionConfig { scheme: CompressionScheme::Lz4, level: None, diff --git a/rust/lance-encoding/src/encodings/physical/rle.rs b/rust/lance-encoding/src/encodings/physical/rle.rs index ae17ccea71d..06580c93ece 100644 --- a/rust/lance-encoding/src/encodings/physical/rle.rs +++ b/rust/lance-encoding/src/encodings/physical/rle.rs @@ -1,9 +1,9 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors -//! # RLE (Run-Length Encoding) Miniblock Format +//! # RLE (Run-Length Encoding) //! -//! RLE compression for Lance miniblock format, optimized for data with repeated values. +//! RLE compression for Lance, optimized for data with repeated values. //! //! ## Encoding Format //! @@ -40,20 +40,26 @@ //! - The run count (number of value transitions) < 50% of total values //! - This indicates sufficient repetition for RLE to be effective //! -//! ## Chunk Handling +//! ## MiniBlock Chunk Handling //! -//! - Maximum chunk size: 4096 values (miniblock constraint) -//! - All chunks share two global buffers (values and lengths) -//! - Each chunk's buffer_sizes indicate its portion of the global buffers -//! - Non-last chunks always contain power-of-2 values -//! - Byte limits are enforced dynamically during encoding +//! When used in the miniblock path, all chunks share two global buffers (values and lengths). +//! Each chunk's `buffer_sizes` identifies its slice within those global buffers. Non-last chunks +//! contain a power-of-2 number of values. +//! +//! NOTE: The current encoder uses a 2048-value cap per chunk as a workaround for +//! . +//! +//! ## Block Format +//! +//! When used in the block compression path, the encoded output is a single buffer: +//! `[8-byte header: values buffer size][values buffer][run_lengths buffer]`. use arrow_buffer::ArrowNativeType; use log::trace; use snafu::location; use crate::buffer::LanceBuffer; -use crate::compression::MiniBlockDecompressor; +use crate::compression::{BlockCompressor, BlockDecompressor, MiniBlockDecompressor}; use crate::data::DataBlock; use crate::data::{BlockInfo, FixedWidthDataBlock}; use crate::encodings::logical::primitive::miniblock::{ @@ -67,9 +73,9 @@ use lance_core::{Error, Result}; /// RLE encoder for miniblock format #[derive(Debug, Default)] -pub struct RleMiniBlockEncoder; +pub struct RleEncoder; -impl RleMiniBlockEncoder { +impl RleEncoder { pub fn new() -> Self { Self } @@ -353,7 +359,7 @@ impl RleMiniBlockEncoder { } } -impl MiniBlockCompressor for RleMiniBlockEncoder { +impl MiniBlockCompressor for RleEncoder { fn compress(&self, data: DataBlock) -> Result<(MiniBlockCompressed, CompressiveEncoding)> { match data { DataBlock::FixedWidth(fixed_width) => { @@ -384,13 +390,40 @@ impl MiniBlockCompressor for RleMiniBlockEncoder { } } +impl BlockCompressor for RleEncoder { + // Block format: [8-byte header: values buffer size][values buffer][run_lengths buffer] + fn compress(&self, data: DataBlock) -> Result { + match data { + DataBlock::FixedWidth(fixed_width) => { + let num_values = fixed_width.num_values; + let bits_per_value = fixed_width.bits_per_value; + + let (all_buffers, _) = + self.encode_data(&fixed_width.data, num_values, bits_per_value)?; + + let values_size = all_buffers[0].len() as u64; + + let mut combined = Vec::new(); + combined.extend_from_slice(&values_size.to_le_bytes()); + combined.extend_from_slice(&all_buffers[0]); + combined.extend_from_slice(&all_buffers[1]); + Ok(LanceBuffer::from(combined)) + } + _ => Err(Error::InvalidInput { + location: location!(), + source: "RLE encoding only supports FixedWidth data blocks".into(), + }), + } + } +} + /// RLE decompressor for miniblock format #[derive(Debug)] -pub struct RleMiniBlockDecompressor { +pub struct RleDecompressor { bits_per_value: u64, } -impl RleMiniBlockDecompressor { +impl RleDecompressor { pub fn new(bits_per_value: u64) -> Self { Self { bits_per_value } } @@ -405,12 +438,16 @@ impl RleMiniBlockDecompressor { })); } - assert_eq!( - data.len(), - 2, - "RLE decompressor expects exactly 2 buffers, got {}", - data.len() - ); + if data.len() != 2 { + return Err(Error::InvalidInput { + location: location!(), + source: format!( + "RLE decompressor expects exactly 2 buffers, got {}", + data.len() + ) + .into(), + }); + } let values_buffer = &data[0]; let lengths_buffer = &data[1]; @@ -469,11 +506,16 @@ impl RleMiniBlockDecompressor { let num_runs = values_buffer.len() / type_size; let num_length_entries = lengths_buffer.len(); - assert_eq!( - num_runs, num_length_entries, - "Inconsistent RLE buffers: {} runs but {} length entries", - num_runs, num_length_entries - ); + if num_runs != num_length_entries { + return Err(Error::InvalidInput { + location: location!(), + source: format!( + "Inconsistent RLE buffers: {} runs but {} length entries", + num_runs, num_length_entries + ) + .into(), + }); + } let values_ref = values_buffer.borrow_to_typed_slice::(); let values: &[T] = values_ref.as_ref(); @@ -521,30 +563,72 @@ impl RleMiniBlockDecompressor { } } -impl MiniBlockDecompressor for RleMiniBlockDecompressor { +impl MiniBlockDecompressor for RleDecompressor { fn decompress(&self, data: Vec, num_values: u64) -> Result { self.decode_data(data, num_values) } } +impl BlockDecompressor for RleDecompressor { + fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result { + // fetch the values_size + if data.len() < 8 { + return Err(Error::InvalidInput { + location: location!(), + source: format!("Insufficient data size: {}", data.len()).into(), + }); + } + + let values_size_bytes: [u8; 8] = + data[..8].try_into().expect("slice length already checked"); + let values_size: u64 = u64::from_le_bytes(values_size_bytes); + + // parse values + let values_start: usize = 8; + let values_size: usize = values_size.try_into().map_err(|_| Error::InvalidInput { + location: location!(), + source: format!("Invalid values buffer size: {}", values_size).into(), + })?; + let lengths_start = + values_start + .checked_add(values_size) + .ok_or_else(|| Error::InvalidInput { + location: location!(), + source: "Invalid RLE values buffer size".into(), + })?; + + if data.len() < lengths_start { + return Err(Error::InvalidInput { + location: location!(), + source: format!("Insufficient data size: {}", data.len()).into(), + }); + } + + let values_buffer = data.slice_with_length(values_start, values_size); + let lengths_buffer = data.slice_with_length(lengths_start, data.len() - lengths_start); + + self.decode_data(vec![values_buffer, lengths_buffer], num_values) + } +} + #[cfg(test)] mod tests { use super::*; use crate::data::DataBlock; use crate::encodings::logical::primitive::miniblock::MAX_MINIBLOCK_VALUES; + use crate::{buffer::LanceBuffer, compression::BlockDecompressor}; use arrow_array::Int32Array; - // ========== Core Functionality Tests ========== #[test] - fn test_basic_rle_encoding() { - let encoder = RleMiniBlockEncoder::new(); + fn test_basic_miniblock_rle_encoding() { + let encoder = RleEncoder::new(); // Test basic RLE pattern: [1, 1, 1, 2, 2, 3, 3, 3, 3] let array = Int32Array::from(vec![1, 1, 1, 2, 2, 3, 3, 3, 3]); let data_block = DataBlock::from_array(array); - let (compressed, _) = encoder.compress(data_block).unwrap(); + let (compressed, _) = MiniBlockCompressor::compress(&encoder, data_block).unwrap(); assert_eq!(compressed.num_values, 9); assert_eq!(compressed.chunks.len(), 1); @@ -558,14 +642,15 @@ mod tests { #[test] fn test_long_run_splitting() { - let encoder = RleMiniBlockEncoder::new(); + let encoder = RleEncoder::new(); // Create a run longer than 255 to test splitting let mut data = vec![42i32; 1000]; // Will be split into 255+255+255+235 data.extend(&[100i32; 300]); // Will be split into 255+45 let array = Int32Array::from(data); - let (compressed, _) = encoder.compress(DataBlock::from_array(array)).unwrap(); + let (compressed, _) = + MiniBlockCompressor::compress(&encoder, DataBlock::from_array(array)).unwrap(); // Should have 6 runs total (4 for first value, 2 for second) let lengths_buffer = &compressed.data[1]; @@ -593,7 +678,7 @@ mod tests { where T: bytemuck::Pod + PartialEq + std::fmt::Debug, { - let encoder = RleMiniBlockEncoder::new(); + let encoder = RleEncoder::new(); let bytes: Vec = data .iter() .flat_map(|v| bytemuck::bytes_of(v)) @@ -607,11 +692,14 @@ mod tests { block_info: BlockInfo::default(), }); - let (compressed, _) = encoder.compress(block).unwrap(); - let decompressor = RleMiniBlockDecompressor::new(bits_per_value); - let decompressed = decompressor - .decompress(compressed.data, compressed.num_values) - .unwrap(); + let (compressed, _) = MiniBlockCompressor::compress(&encoder, block).unwrap(); + let decompressor = RleDecompressor::new(bits_per_value); + let decompressed = MiniBlockDecompressor::decompress( + &decompressor, + compressed.data, + compressed.num_values, + ) + .unwrap(); match decompressed { DataBlock::FixedWidth(ref block) => { @@ -626,7 +714,7 @@ mod tests { #[test] fn test_power_of_two_chunking() { - let encoder = RleMiniBlockEncoder::new(); + let encoder = RleEncoder::new(); // Create data that will require multiple chunks let test_sizes = vec![1000, 2500, 5000, 10000]; @@ -637,7 +725,8 @@ mod tests { .collect(); let array = Int32Array::from(data); - let (compressed, _) = encoder.compress(DataBlock::from_array(array)).unwrap(); + let (compressed, _) = + MiniBlockCompressor::compress(&encoder, DataBlock::from_array(array)).unwrap(); // Verify all non-last chunks have power-of-2 values for (i, chunk) in compressed.chunks.iter().enumerate() { @@ -656,24 +745,36 @@ mod tests { // ========== Error Handling Tests ========== #[test] - #[should_panic(expected = "RLE decompressor expects exactly 2 buffers")] fn test_invalid_buffer_count() { - let decompressor = RleMiniBlockDecompressor::new(32); - let _ = decompressor.decompress(vec![LanceBuffer::from(vec![1, 2, 3, 4])], 10); + let decompressor = RleDecompressor::new(32); + let result = MiniBlockDecompressor::decompress( + &decompressor, + vec![LanceBuffer::from(vec![1, 2, 3, 4])], + 10, + ); + assert!(result.is_err()); + assert!(result + .unwrap_err() + .to_string() + .contains("expects exactly 2 buffers")); } #[test] - #[should_panic(expected = "Inconsistent RLE buffers")] fn test_buffer_consistency() { - let decompressor = RleMiniBlockDecompressor::new(32); + let decompressor = RleDecompressor::new(32); let values = LanceBuffer::from(vec![1, 0, 0, 0]); // 1 i32 value let lengths = LanceBuffer::from(vec![5, 10]); // 2 lengths - mismatch! - let _ = decompressor.decompress(vec![values, lengths], 15); + let result = MiniBlockDecompressor::decompress(&decompressor, vec![values, lengths], 15); + assert!(result.is_err()); + assert!(result + .unwrap_err() + .to_string() + .contains("Inconsistent RLE buffers")); } #[test] fn test_empty_data_handling() { - let encoder = RleMiniBlockEncoder::new(); + let encoder = RleEncoder::new(); // Test empty block let empty_block = DataBlock::FixedWidth(FixedWidthDataBlock { @@ -683,13 +784,13 @@ mod tests { block_info: BlockInfo::default(), }); - let (compressed, _) = encoder.compress(empty_block).unwrap(); + let (compressed, _) = MiniBlockCompressor::compress(&encoder, empty_block).unwrap(); assert_eq!(compressed.num_values, 0); assert!(compressed.data.is_empty()); // Test decompression of empty data - let decompressor = RleMiniBlockDecompressor::new(32); - let decompressed = decompressor.decompress(vec![], 0).unwrap(); + let decompressor = RleDecompressor::new(32); + let decompressed = MiniBlockDecompressor::decompress(&decompressor, vec![], 0).unwrap(); match decompressed { DataBlock::FixedWidth(ref block) => { @@ -704,7 +805,7 @@ mod tests { #[test] fn test_multi_chunk_round_trip() { - let encoder = RleMiniBlockEncoder::new(); + let encoder = RleEncoder::new(); // Create data that spans multiple chunks with mixed patterns let mut data = Vec::new(); @@ -717,7 +818,8 @@ mod tests { data.extend(vec![777i32; 2000]); let array = Int32Array::from(data.clone()); - let (compressed, _) = encoder.compress(DataBlock::from_array(array)).unwrap(); + let (compressed, _) = + MiniBlockCompressor::compress(&encoder, DataBlock::from_array(array)).unwrap(); // Manually decompress all chunks let mut reconstructed = Vec::new(); @@ -745,13 +847,13 @@ mod tests { let chunk_lengths_buffer = global_lengths.slice_with_length(lengths_offset, lengths_size); - let decompressor = RleMiniBlockDecompressor::new(32); - let chunk_data = decompressor - .decompress( - vec![chunk_values_buffer, chunk_lengths_buffer], - chunk_values, - ) - .unwrap(); + let decompressor = RleDecompressor::new(32); + let chunk_data = MiniBlockDecompressor::decompress( + &decompressor, + vec![chunk_values_buffer, chunk_lengths_buffer], + chunk_values, + ) + .unwrap(); values_offset += values_size; lengths_offset += lengths_size; @@ -773,8 +875,8 @@ mod tests { fn test_1024_boundary_conditions() { // Comprehensive test for various boundary conditions at 1024 values // This consolidates multiple bug tests that were previously separate - let encoder = RleMiniBlockEncoder::new(); - let decompressor = RleMiniBlockDecompressor::new(32); + let encoder = RleEncoder::new(); + let decompressor = RleDecompressor::new(32); let test_cases = [ ("runs_of_2", { @@ -829,10 +931,15 @@ mod tests { // Compress the data let array = Int32Array::from(data.clone()); - let (compressed, _) = encoder.compress(DataBlock::from_array(array)).unwrap(); + let (compressed, _) = + MiniBlockCompressor::compress(&encoder, DataBlock::from_array(array)).unwrap(); // Decompress and verify - match decompressor.decompress(compressed.data, compressed.num_values) { + match MiniBlockDecompressor::decompress( + &decompressor, + compressed.data, + compressed.num_values, + ) { Ok(decompressed) => match decompressed { DataBlock::FixedWidth(ref block) => { let values: &[i32] = bytemuck::cast_slice(block.data.as_ref()); @@ -868,7 +975,7 @@ mod tests { fn test_low_repetition_50pct_bug() { // Test case that reproduces the 4092 bytes bug with low repetition (50%) // This simulates the 1M benchmark case - let encoder = RleMiniBlockEncoder::new(); + let encoder = RleEncoder::new(); // Create 1M values with low repetition (50% chance of change) let num_values = 1_048_576; // 1M values @@ -895,7 +1002,7 @@ mod tests { block_info: BlockInfo::default(), }); - let (compressed, _) = encoder.compress(block).unwrap(); + let (compressed, _) = MiniBlockCompressor::compress(&encoder, block).unwrap(); // Debug first few chunks for (i, chunk) in compressed.chunks.iter().take(5).enumerate() { @@ -912,8 +1019,12 @@ mod tests { } // Try to decompress - let decompressor = RleMiniBlockDecompressor::new(32); - match decompressor.decompress(compressed.data, compressed.num_values) { + let decompressor = RleDecompressor::new(32); + match MiniBlockDecompressor::decompress( + &decompressor, + compressed.data, + compressed.num_values, + ) { Ok(decompressed) => match decompressed { DataBlock::FixedWidth(ref block) => { assert_eq!( @@ -1041,4 +1152,104 @@ mod tests { Some(lance_datagen::ByteCount::from(4)) } } + + // ========== Block Related tests ========== + #[test] + fn test_block_decompressor_rejects_overflowing_values_size() { + let decompressor = RleDecompressor::new(32); + + let mut data = Vec::new(); + data.extend_from_slice(&u64::MAX.to_le_bytes()); + let result = BlockDecompressor::decompress(&decompressor, LanceBuffer::from(data), 1); + assert!(result.is_err()); + assert!(result + .unwrap_err() + .to_string() + .contains("Invalid RLE values buffer size")); + } + + #[test] + fn test_block_decompressor_too_small() { + let decompressor = RleDecompressor::new(32); + let result = + BlockDecompressor::decompress(&decompressor, LanceBuffer::from(vec![1, 2, 3]), 10); + assert!(result.is_err()); + assert!(result + .unwrap_err() + .to_string() + .contains("Insufficient data size: 3")); + } + + #[test] + fn test_block_compressor_header_format() { + let encoder = RleEncoder::new(); + + let data = vec![1i32, 1, 1]; + let array = Int32Array::from(data); + let compressed = BlockCompressor::compress(&encoder, DataBlock::from_array(array)).unwrap(); + + // Verify header format: first 8 bytes should be values_size as u64 + assert!(compressed.len() >= 8); + let values_size_bytes: [u8; 8] = compressed.as_ref()[..8].try_into().unwrap(); + let values_size = u64::from_le_bytes(values_size_bytes); + + // Values buffer should contain 1 i32 value (4 bytes) + assert_eq!(values_size, 4); + + // Total size should be: 8 (header) + 4 (values) + 1 (lengths) + assert_eq!(compressed.len(), 13); + } + + #[test] + fn test_block_compressor_round_trip() { + let encoder = RleEncoder::new(); + let decompressor = RleDecompressor::new(32); + + // Test basic pattern + let data = vec![1i32, 1, 1, 2, 2, 3, 3, 3, 3]; + let array = Int32Array::from(data.clone()); + let data_block = DataBlock::from_array(array); + + let compressed = BlockCompressor::compress(&encoder, data_block).unwrap(); + let decompressed = + BlockDecompressor::decompress(&decompressor, compressed, data.len() as u64).unwrap(); + + match decompressed { + DataBlock::FixedWidth(block) => { + let values: &[i32] = bytemuck::cast_slice(block.data.as_ref()); + assert_eq!(values, &data[..]); + } + _ => panic!("Expected FixedWidth block"), + } + } + + #[test] + fn test_block_compressor_large_data() { + let encoder = RleEncoder::new(); + let decompressor = RleDecompressor::new(32); + + // Create data that will span multiple chunks + // Each chunks can handle ~2048 values, so use 10K values + let mut data = Vec::new(); + data.extend(vec![999i32; 3000]); // First ~2 chunks + data.extend(vec![777i32; 3000]); // Next ~2 chunks + data.extend(vec![555i32; 4000]); // Final ~2 chunks + + let total_values = data.len(); + assert_eq!(total_values, 10000); + + let array = Int32Array::from(data.clone()); + let compressed = BlockCompressor::compress(&encoder, DataBlock::from_array(array)).unwrap(); + let decompressed = + BlockDecompressor::decompress(&decompressor, compressed, total_values as u64).unwrap(); + + match decompressed { + DataBlock::FixedWidth(block) => { + let values: &[i32] = bytemuck::cast_slice(block.data.as_ref()); + assert_eq!(values.len(), total_values); + assert_eq!(values, &data[..]); + } + _ => panic!("Expected FixedWidth block"), + } + } }