From 310db90ffc6e1c8d190d4e4682ace3cdca54958b Mon Sep 17 00:00:00 2001 From: stevie9868 Date: Sun, 12 Oct 2025 18:13:34 -0700 Subject: [PATCH 1/4] support rle for block --- rust/lance-encoding/src/compression.rs | 176 +++++++++--- .../src/encodings/physical/general.rs | 18 +- .../src/encodings/physical/rle.rs | 254 ++++++++++++++---- 3 files changed, 358 insertions(+), 90 deletions(-) diff --git a/rust/lance-encoding/src/compression.rs b/rust/lance-encoding/src/compression.rs index f65b7bb2025..5c8c1b95f94 100644 --- a/rust/lance-encoding/src/compression.rs +++ b/rust/lance-encoding/src/compression.rs @@ -51,7 +51,7 @@ use crate::{ PackedStructVariablePerValueEncoder, VariablePackedStructFieldDecoder, VariablePackedStructFieldKind, }, - rle::{RleMiniBlockDecompressor, RleMiniBlockEncoder}, + rle::{RleDecompressor, RleEncoder}, value::{ValueDecompressor, ValueEncoder}, }, }, @@ -136,6 +136,7 @@ pub struct DefaultCompressionStrategy { /// User-configured compression parameters params: CompressionParams, /// The lance file version for compatibilities. + // LanceFile Version version: LanceFileVersion, } @@ -174,7 +175,35 @@ fn try_rle_for_mini_block( .unwrap_or(DEFAULT_RLE_COMPRESSION_THRESHOLD); if (run_count as f64) < (data.num_values as f64) * threshold { - return Some(Box::new(RleMiniBlockEncoder::new())); + return Some(Box::new(RleEncoder::new())); + } + None +} + +fn try_rle_for_block( + data: &FixedWidthDataBlock, + version: LanceFileVersion, +) -> Option<(Box, CompressiveEncoding)> { + if version < LanceFileVersion::V2_2 { + return None; + } + + let bits = data.bits_per_value; + if !matches!(bits, 8 | 16 | 32 | 64) { + return None; + } + + let run_count = data.expect_single_stat::(Stat::RunCount); + // TODO: Make this configurable + let threshold = DEFAULT_RLE_COMPRESSION_THRESHOLD; + + if (run_count as f64) < (data.num_values as f64) * threshold { + let compressor = Box::new(RleEncoder::new()); + let encoding = ProtobufUtils21::rle( + ProtobufUtils21::flat(bits, None), + ProtobufUtils21::flat(/*bits_per_value=*/ 8, None), + ); + return Some((compressor, encoding)); } None } @@ -247,10 +276,7 @@ fn maybe_wrap_general_for_mini_block( None | Some("none") | Some("fsst") => Ok(inner), Some(raw) => { let scheme = CompressionScheme::from_str(raw).map_err(|_| { - lance_core::Error::invalid_input( - format!("Unknown compression scheme: {raw}"), - location!(), - ) + Error::invalid_input(format!("Unknown compression scheme: {raw}"), location!()) })?; let cfg = CompressionConfig::new(scheme, params.compression_level); Ok(Box::new(GeneralMiniBlockCompressor::new(inner, cfg))) @@ -561,6 +587,9 @@ impl CompressionStrategy for DefaultCompressionStrategy { match data { DataBlock::FixedWidth(fixed_width) => { + if let Some((compressor, encoding)) = try_rle_for_block(fixed_width, self.version) { + return Ok((compressor, encoding)); + } if let Some((compressor, encoding)) = try_bitpack_for_block(fixed_width) { return Ok((compressor, encoding)); } @@ -710,28 +739,8 @@ impl DecompressionStrategy for DefaultDecompressionStrategy { Ok(Box::new(ValueDecompressor::from_fsl(fsl))) } Compression::Rle(rle) => { - let Compression::Flat(values) = - rle.values.as_ref().unwrap().compression.as_ref().unwrap() - else { - panic!("RLE compression only supports flat values") - }; - let Compression::Flat(run_lengths) = rle - .run_lengths - .as_ref() - .unwrap() - .compression - .as_ref() - .unwrap() - else { - panic!("RLE compression only supports flat run lengths") - }; - assert_eq!( - run_lengths.bits_per_value, 8, - "RLE compression only supports 8-bit run lengths" - ); - Ok(Box::new(RleMiniBlockDecompressor::new( - values.bits_per_value, - ))) + let bits_per_value = validate_rle_compression(rle); + Ok(Box::new(RleDecompressor::new(bits_per_value))) } Compression::ByteStreamSplit(bss) => { let Compression::Flat(values) = @@ -759,10 +768,7 @@ impl DecompressionStrategy for DefaultDecompressionStrategy { let scheme = compression.scheme().try_into()?; - let compression_config = crate::encodings::physical::block::CompressionConfig::new( - scheme, - compression.level, - ); + let compression_config = CompressionConfig::new(scheme, compression.level); Ok(Box::new(GeneralMiniBlockDecompressor::new( inner_decompressor, @@ -937,16 +943,43 @@ impl DecompressionStrategy for DefaultDecompressionStrategy { Ok(Box::new(general_decompressor)) } + Compression::Rle(rle) => { + let bits_per_value = validate_rle_compression(rle); + Ok(Box::new(RleDecompressor::new(bits_per_value))) + } _ => todo!(), } } } +/// Validates RLE compression format and extracts bits_per_value +fn validate_rle_compression(rle: &crate::format::pb21::Rle) -> u64 { + let Compression::Flat(values) = rle.values.as_ref().unwrap().compression.as_ref().unwrap() + else { + panic!("RLE compression only supports flat values") + }; + let Compression::Flat(run_lengths) = rle + .run_lengths + .as_ref() + .unwrap() + .compression + .as_ref() + .unwrap() + else { + panic!("RLE compression only supports flat run lengths") + }; + assert_eq!( + run_lengths.bits_per_value, 8, + "RLE compression only supports 8-bit run lengths" + ); + values.bits_per_value +} #[cfg(test)] mod tests { use super::*; use crate::buffer::LanceBuffer; use crate::data::{BlockInfo, DataBlock, FixedWidthDataBlock}; + use crate::statistics::ComputeStat; use arrow_schema::{DataType, Field as ArrowField}; use std::collections::HashMap; @@ -1054,7 +1087,7 @@ mod tests { // The compressor should be RLE wrapped in general compression assert!(debug_str.contains("GeneralMiniBlockCompressor")); - assert!(debug_str.contains("RleMiniBlockEncoder")); + assert!(debug_str.contains("RleEncoder")); } #[test] @@ -1079,7 +1112,7 @@ mod tests { let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap(); // Should use RLE due to very low threshold - assert!(format!("{:?}", compressor).contains("RleMiniBlockEncoder")); + assert!(format!("{:?}", compressor).contains("RleEncoder")); } #[test] @@ -1254,7 +1287,7 @@ mod tests { // Should use RLE because run_count (100) < num_values * threshold (800) let debug_str = format!("{:?}", compressor); - assert!(debug_str.contains("RleMiniBlockEncoder")); + assert!(debug_str.contains("RleEncoder")); } #[test] @@ -1413,4 +1446,77 @@ mod tests { _ => panic!("expected fixed width block"), } } + + #[test] + fn test_rle_block_not_used_for_version_larger_than_v2_1() { + let field = create_test_field("test_repdef", DataType::UInt16); + + // Create highly repetitive data + let num_values = 1000u64; + let mut data = Vec::with_capacity(num_values as usize); + for i in 0..10 { + for _ in 0..100 { + data.push(i as u16); + } + } + + let mut block = FixedWidthDataBlock { + bits_per_value: 16, + data: LanceBuffer::reinterpret_vec(data), + num_values, + block_info: BlockInfo::default(), + }; + + block.compute_stat(); + + let data_block = DataBlock::FixedWidth(block); + + let strategy = DefaultCompressionStrategy::with_params(CompressionParams::new()) + .with_version(LanceFileVersion::V2_2); + + let (compressor, _) = strategy + .create_block_compressor(&field, &data_block) + .unwrap(); + + let debug_str = format!("{:?}", compressor); + assert!(debug_str.contains("RleEncoder")); + } + + #[test] + fn test_rle_block_not_used_for_version_less_than_v2_2() { + let field = create_test_field("test_repdef", DataType::UInt16); + + // Create highly repetitive data + let num_values = 1000u64; + let mut data = Vec::with_capacity(num_values as usize); + for i in 0..10 { + for _ in 0..100 { + data.push(i as u16); + } + } + + let mut block = FixedWidthDataBlock { + bits_per_value: 16, + data: LanceBuffer::reinterpret_vec(data), + num_values, + block_info: BlockInfo::default(), + }; + + block.compute_stat(); + + let data_block = DataBlock::FixedWidth(block); + + let strategy = DefaultCompressionStrategy::with_params(CompressionParams::new()) + .with_version(LanceFileVersion::V2_1); + + let (compressor, _) = strategy + .create_block_compressor(&field, &data_block) + .unwrap(); + + let debug_str = format!("{:?}", compressor); + assert!( + !debug_str.contains("RleEncoder"), + "RLE should not be used for V2.1" + ); + } } diff --git a/rust/lance-encoding/src/encodings/physical/general.rs b/rust/lance-encoding/src/encodings/physical/general.rs index eb5ff12e62a..d8aed9c66c2 100644 --- a/rust/lance-encoding/src/encodings/physical/general.rs +++ b/rust/lance-encoding/src/encodings/physical/general.rs @@ -140,7 +140,7 @@ mod tests { use crate::compression::{DecompressionStrategy, DefaultDecompressionStrategy}; use crate::data::{BlockInfo, FixedWidthDataBlock}; use crate::encodings::physical::block::CompressionScheme; - use crate::encodings::physical::rle::RleMiniBlockEncoder; + use crate::encodings::physical::rle::RleEncoder; use crate::encodings::physical::value::ValueEncoder; use crate::format::pb21; use crate::format::pb21::compressive_encoding::Compression; @@ -161,7 +161,7 @@ mod tests { // Small data with RLE - should not compress due to size threshold TestCase { name: "small_rle_data", - inner_encoder: Box::new(RleMiniBlockEncoder), + inner_encoder: Box::new(RleEncoder), compression: CompressionConfig { scheme: CompressionScheme::Lz4, level: None, @@ -173,7 +173,7 @@ mod tests { // Large repeated data with RLE + LZ4 TestCase { name: "large_rle_lz4", - inner_encoder: Box::new(RleMiniBlockEncoder), + inner_encoder: Box::new(RleEncoder), compression: CompressionConfig { scheme: CompressionScheme::Lz4, level: None, @@ -185,7 +185,7 @@ mod tests { // Large repeated data with RLE + Zstd TestCase { name: "large_rle_zstd", - inner_encoder: Box::new(RleMiniBlockEncoder), + inner_encoder: Box::new(RleEncoder), compression: CompressionConfig { scheme: CompressionScheme::Zstd, level: Some(3), @@ -403,7 +403,7 @@ mod tests { // Test that small buffers don't get compressed let small_test = TestCase { name: "small_buffer_no_compression", - inner_encoder: Box::new(RleMiniBlockEncoder), + inner_encoder: Box::new(RleEncoder), compression: CompressionConfig { scheme: CompressionScheme::Lz4, level: None, @@ -496,7 +496,7 @@ mod tests { // RLE produces 2 buffers (values and lengths), test that both are handled correctly let data = create_repeated_i32_block(vec![1; 100]); let compressor = GeneralMiniBlockCompressor::new( - Box::new(RleMiniBlockEncoder), + Box::new(RleEncoder), CompressionConfig { scheme: CompressionScheme::Lz4, level: None, @@ -519,7 +519,7 @@ mod tests { // Test case 1: 32-bit RLE data let test_32 = TestCase { name: "rle_32bit_with_general_wrapper", - inner_encoder: Box::new(RleMiniBlockEncoder), + inner_encoder: Box::new(RleEncoder), compression: CompressionConfig { scheme: CompressionScheme::Lz4, level: None, @@ -532,7 +532,7 @@ mod tests { // For 32-bit RLE, the compression strategy should automatically wrap it // Let's directly test the compressor let compressor = GeneralMiniBlockCompressor::new( - Box::new(RleMiniBlockEncoder), + Box::new(RleEncoder), CompressionConfig { scheme: CompressionScheme::Lz4, level: None, @@ -589,7 +589,7 @@ mod tests { let block_64 = DataBlock::from_array(array_64); let compressor_64 = GeneralMiniBlockCompressor::new( - Box::new(RleMiniBlockEncoder), + Box::new(RleEncoder), CompressionConfig { scheme: CompressionScheme::Lz4, level: None, diff --git a/rust/lance-encoding/src/encodings/physical/rle.rs b/rust/lance-encoding/src/encodings/physical/rle.rs index 8f5dcc3fa0f..5eeae5a24a0 100644 --- a/rust/lance-encoding/src/encodings/physical/rle.rs +++ b/rust/lance-encoding/src/encodings/physical/rle.rs @@ -53,7 +53,7 @@ use log::trace; use snafu::location; use crate::buffer::LanceBuffer; -use crate::compression::MiniBlockDecompressor; +use crate::compression::{BlockCompressor, BlockDecompressor, MiniBlockDecompressor}; use crate::data::DataBlock; use crate::data::{BlockInfo, FixedWidthDataBlock}; use crate::encodings::logical::primitive::miniblock::{ @@ -66,9 +66,9 @@ use lance_core::{Error, Result}; /// RLE encoder for miniblock format #[derive(Debug, Default)] -pub struct RleMiniBlockEncoder; +pub struct RleEncoder; -impl RleMiniBlockEncoder { +impl RleEncoder { pub fn new() -> Self { Self } @@ -354,7 +354,7 @@ impl RleMiniBlockEncoder { } } -impl MiniBlockCompressor for RleMiniBlockEncoder { +impl MiniBlockCompressor for RleEncoder { fn compress(&self, data: DataBlock) -> Result<(MiniBlockCompressed, CompressiveEncoding)> { match data { DataBlock::FixedWidth(fixed_width) => { @@ -385,13 +385,40 @@ impl MiniBlockCompressor for RleMiniBlockEncoder { } } +impl BlockCompressor for RleEncoder { + // Block format: [8-byte header: values buffer size][values buffer][run_lengths buffer] + fn compress(&self, data: DataBlock) -> Result { + match data { + DataBlock::FixedWidth(fixed_width) => { + let num_values = fixed_width.num_values; + let bits_per_value = fixed_width.bits_per_value; + + let (all_buffers, _) = + self.encode_data(&fixed_width.data, num_values, bits_per_value)?; + + let values_size = all_buffers[0].len() as u64; + + let mut combined = Vec::new(); + combined.extend_from_slice(&values_size.to_le_bytes()); + combined.extend_from_slice(&all_buffers[0]); + combined.extend_from_slice(&all_buffers[1]); + Ok(LanceBuffer::from(combined)) + } + _ => Err(Error::InvalidInput { + location: location!(), + source: "RLE encoding only supports FixedWidth data blocks".into(), + }), + } + } +} + /// RLE decompressor for miniblock format #[derive(Debug)] -pub struct RleMiniBlockDecompressor { +pub struct RleDecompressor { bits_per_value: u64, } -impl RleMiniBlockDecompressor { +impl RleDecompressor { pub fn new(bits_per_value: u64) -> Self { Self { bits_per_value } } @@ -524,30 +551,60 @@ impl RleMiniBlockDecompressor { } } -impl MiniBlockDecompressor for RleMiniBlockDecompressor { +impl MiniBlockDecompressor for RleDecompressor { fn decompress(&self, data: Vec, num_values: u64) -> Result { self.decode_data(data, num_values) } } +impl BlockDecompressor for RleDecompressor { + fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result { + // fetch the values_size + if data.len() < 8 { + return Err(Error::InvalidInput { + location: location!(), + source: format!("Insufficient data size: {}", data.len()).into(), + }); + } + + let values_size_bytes: [u8; 8] = data[..8].try_into().unwrap(); + let values_size: u64 = u64::from_le_bytes(values_size_bytes); + + // parse values + let values_start: usize = 8; + let lengths_start: usize = values_start + values_size as usize; + + if data.len() < lengths_start { + return Err(Error::InvalidInput { + location: location!(), + source: format!("Insufficient data size: {}", data.len()).into(), + }); + } + + let values_buffer = data.slice_with_length(values_start, values_size as usize); + let lengths_buffer = data.slice_with_length(lengths_start, data.len() - lengths_start); + + self.decode_data(vec![values_buffer, lengths_buffer], num_values) + } +} + #[cfg(test)] mod tests { use super::*; use crate::data::DataBlock; use crate::encodings::logical::primitive::miniblock::MAX_MINIBLOCK_VALUES; use arrow_array::Int32Array; - // ========== Core Functionality Tests ========== #[test] - fn test_basic_rle_encoding() { - let encoder = RleMiniBlockEncoder::new(); + fn test_basic_miniblock_rle_encoding() { + let encoder = RleEncoder::new(); // Test basic RLE pattern: [1, 1, 1, 2, 2, 3, 3, 3, 3] let array = Int32Array::from(vec![1, 1, 1, 2, 2, 3, 3, 3, 3]); let data_block = DataBlock::from_array(array); - let (compressed, _) = encoder.compress(data_block).unwrap(); + let (compressed, _) = MiniBlockCompressor::compress(&encoder, data_block).unwrap(); assert_eq!(compressed.num_values, 9); assert_eq!(compressed.chunks.len(), 1); @@ -561,14 +618,15 @@ mod tests { #[test] fn test_long_run_splitting() { - let encoder = RleMiniBlockEncoder::new(); + let encoder = RleEncoder::new(); // Create a run longer than 255 to test splitting let mut data = vec![42i32; 1000]; // Will be split into 255+255+255+235 data.extend(&[100i32; 300]); // Will be split into 255+45 let array = Int32Array::from(data); - let (compressed, _) = encoder.compress(DataBlock::from_array(array)).unwrap(); + let (compressed, _) = + MiniBlockCompressor::compress(&encoder, DataBlock::from_array(array)).unwrap(); // Should have 6 runs total (4 for first value, 2 for second) let lengths_buffer = &compressed.data[1]; @@ -596,7 +654,7 @@ mod tests { where T: bytemuck::Pod + PartialEq + std::fmt::Debug, { - let encoder = RleMiniBlockEncoder::new(); + let encoder = RleEncoder::new(); let bytes: Vec = data .iter() .flat_map(|v| bytemuck::bytes_of(v)) @@ -610,11 +668,14 @@ mod tests { block_info: BlockInfo::default(), }); - let (compressed, _) = encoder.compress(block).unwrap(); - let decompressor = RleMiniBlockDecompressor::new(bits_per_value); - let decompressed = decompressor - .decompress(compressed.data, compressed.num_values) - .unwrap(); + let (compressed, _) = MiniBlockCompressor::compress(&encoder, block).unwrap(); + let decompressor = RleDecompressor::new(bits_per_value); + let decompressed = MiniBlockDecompressor::decompress( + &decompressor, + compressed.data, + compressed.num_values, + ) + .unwrap(); match decompressed { DataBlock::FixedWidth(ref block) => { @@ -629,7 +690,7 @@ mod tests { #[test] fn test_power_of_two_chunking() { - let encoder = RleMiniBlockEncoder::new(); + let encoder = RleEncoder::new(); // Create data that will require multiple chunks let test_sizes = vec![1000, 2500, 5000, 10000]; @@ -640,7 +701,8 @@ mod tests { .collect(); let array = Int32Array::from(data); - let (compressed, _) = encoder.compress(DataBlock::from_array(array)).unwrap(); + let (compressed, _) = + MiniBlockCompressor::compress(&encoder, DataBlock::from_array(array)).unwrap(); // Verify all non-last chunks have power-of-2 values for (i, chunk) in compressed.chunks.iter().enumerate() { @@ -661,22 +723,26 @@ mod tests { #[test] #[should_panic(expected = "RLE decompressor expects exactly 2 buffers")] fn test_invalid_buffer_count() { - let decompressor = RleMiniBlockDecompressor::new(32); - let _ = decompressor.decompress(vec![LanceBuffer::from(vec![1, 2, 3, 4])], 10); + let decompressor = RleDecompressor::new(32); + let _ = MiniBlockDecompressor::decompress( + &decompressor, + vec![LanceBuffer::from(vec![1, 2, 3, 4])], + 10, + ); } #[test] #[should_panic(expected = "Inconsistent RLE buffers")] fn test_buffer_consistency() { - let decompressor = RleMiniBlockDecompressor::new(32); + let decompressor = RleDecompressor::new(32); let values = LanceBuffer::from(vec![1, 0, 0, 0]); // 1 i32 value let lengths = LanceBuffer::from(vec![5, 10]); // 2 lengths - mismatch! - let _ = decompressor.decompress(vec![values, lengths], 15); + let _ = MiniBlockDecompressor::decompress(&decompressor, vec![values, lengths], 15); } #[test] fn test_empty_data_handling() { - let encoder = RleMiniBlockEncoder::new(); + let encoder = RleEncoder::new(); // Test empty block let empty_block = DataBlock::FixedWidth(FixedWidthDataBlock { @@ -686,13 +752,13 @@ mod tests { block_info: BlockInfo::default(), }); - let (compressed, _) = encoder.compress(empty_block).unwrap(); + let (compressed, _) = MiniBlockCompressor::compress(&encoder, empty_block).unwrap(); assert_eq!(compressed.num_values, 0); assert!(compressed.data.is_empty()); // Test decompression of empty data - let decompressor = RleMiniBlockDecompressor::new(32); - let decompressed = decompressor.decompress(vec![], 0).unwrap(); + let decompressor = RleDecompressor::new(32); + let decompressed = MiniBlockDecompressor::decompress(&decompressor, vec![], 0).unwrap(); match decompressed { DataBlock::FixedWidth(ref block) => { @@ -707,7 +773,7 @@ mod tests { #[test] fn test_multi_chunk_round_trip() { - let encoder = RleMiniBlockEncoder::new(); + let encoder = RleEncoder::new(); // Create data that spans multiple chunks with mixed patterns let mut data = Vec::new(); @@ -720,7 +786,8 @@ mod tests { data.extend(vec![777i32; 2000]); let array = Int32Array::from(data.clone()); - let (compressed, _) = encoder.compress(DataBlock::from_array(array)).unwrap(); + let (compressed, _) = + MiniBlockCompressor::compress(&encoder, DataBlock::from_array(array)).unwrap(); // Manually decompress all chunks let mut reconstructed = Vec::new(); @@ -748,13 +815,13 @@ mod tests { let chunk_lengths_buffer = global_lengths.slice_with_length(lengths_offset, lengths_size); - let decompressor = RleMiniBlockDecompressor::new(32); - let chunk_data = decompressor - .decompress( - vec![chunk_values_buffer, chunk_lengths_buffer], - chunk_values, - ) - .unwrap(); + let decompressor = RleDecompressor::new(32); + let chunk_data = MiniBlockDecompressor::decompress( + &decompressor, + vec![chunk_values_buffer, chunk_lengths_buffer], + chunk_values, + ) + .unwrap(); values_offset += values_size; lengths_offset += lengths_size; @@ -776,8 +843,8 @@ mod tests { fn test_1024_boundary_conditions() { // Comprehensive test for various boundary conditions at 1024 values // This consolidates multiple bug tests that were previously separate - let encoder = RleMiniBlockEncoder::new(); - let decompressor = RleMiniBlockDecompressor::new(32); + let encoder = RleEncoder::new(); + let decompressor = RleDecompressor::new(32); let test_cases = [ ("runs_of_2", { @@ -832,10 +899,15 @@ mod tests { // Compress the data let array = Int32Array::from(data.clone()); - let (compressed, _) = encoder.compress(DataBlock::from_array(array)).unwrap(); + let (compressed, _) = + MiniBlockCompressor::compress(&encoder, DataBlock::from_array(array)).unwrap(); // Decompress and verify - match decompressor.decompress(compressed.data, compressed.num_values) { + match MiniBlockDecompressor::decompress( + &decompressor, + compressed.data, + compressed.num_values, + ) { Ok(decompressed) => match decompressed { DataBlock::FixedWidth(ref block) => { let values: &[i32] = bytemuck::cast_slice(block.data.as_ref()); @@ -871,7 +943,7 @@ mod tests { fn test_low_repetition_50pct_bug() { // Test case that reproduces the 4092 bytes bug with low repetition (50%) // This simulates the 1M benchmark case - let encoder = RleMiniBlockEncoder::new(); + let encoder = RleEncoder::new(); // Create 1M values with low repetition (50% chance of change) let num_values = 1_048_576; // 1M values @@ -898,7 +970,7 @@ mod tests { block_info: BlockInfo::default(), }); - let (compressed, _) = encoder.compress(block).unwrap(); + let (compressed, _) = MiniBlockCompressor::compress(&encoder, block).unwrap(); // Debug first few chunks for (i, chunk) in compressed.chunks.iter().take(5).enumerate() { @@ -915,8 +987,12 @@ mod tests { } // Try to decompress - let decompressor = RleMiniBlockDecompressor::new(32); - match decompressor.decompress(compressed.data, compressed.num_values) { + let decompressor = RleDecompressor::new(32); + match MiniBlockDecompressor::decompress( + &decompressor, + compressed.data, + compressed.num_values, + ) { Ok(decompressed) => match decompressed { DataBlock::FixedWidth(ref block) => { assert_eq!( @@ -1020,4 +1096,90 @@ mod tests { Some(lance_datagen::ByteCount::from(4)) } } + + // ========== Block Related tests ========== + #[test] + fn test_block_decompressor_too_small() { + let decompressor = RleDecompressor::new(32); + let result = + BlockDecompressor::decompress(&decompressor, LanceBuffer::from(vec![1, 2, 3]), 10); + assert!(result.is_err()); + assert!(result + .unwrap_err() + .to_string() + .contains("Insufficient data size: 3")); + } + + #[test] + fn test_block_compressor_header_format() { + let encoder = RleEncoder::new(); + + let data = vec![1i32, 1, 1]; + let array = Int32Array::from(data); + let compressed = BlockCompressor::compress(&encoder, DataBlock::from_array(array)).unwrap(); + + // Verify header format: first 8 bytes should be values_size as u64 + assert!(compressed.len() >= 8); + let values_size_bytes: [u8; 8] = compressed.as_ref()[..8].try_into().unwrap(); + let values_size = u64::from_le_bytes(values_size_bytes); + + // Values buffer should contain 1 i32 value (4 bytes) + assert_eq!(values_size, 4); + + // Total size should be: 8 (header) + 4 (values) + 1 (lengths) + assert_eq!(compressed.len(), 13); + } + + #[test] + fn test_block_compressor_round_trip() { + let encoder = RleEncoder::new(); + let decompressor = RleDecompressor::new(32); + + // Test basic pattern + let data = vec![1i32, 1, 1, 2, 2, 3, 3, 3, 3]; + let array = Int32Array::from(data.clone()); + let data_block = DataBlock::from_array(array); + + let compressed = BlockCompressor::compress(&encoder, data_block).unwrap(); + let decompressed = + BlockDecompressor::decompress(&decompressor, compressed, data.len() as u64).unwrap(); + + match decompressed { + DataBlock::FixedWidth(block) => { + let values: &[i32] = bytemuck::cast_slice(block.data.as_ref()); + assert_eq!(values, &data[..]); + } + _ => panic!("Expected FixedWidth block"), + } + } + + #[test] + fn test_block_compressor_large_data() { + let encoder = RleEncoder::new(); + let decompressor = RleDecompressor::new(32); + + // Create data that will span multiple chunks + // Each chunks can handle ~2048 values, so use 10K values + let mut data = Vec::new(); + data.extend(vec![999i32; 3000]); // First ~2 chunks + data.extend(vec![777i32; 3000]); // Next ~2 chunks + data.extend(vec![555i32; 4000]); // Final ~2 chunks + + let total_values = data.len(); + assert_eq!(total_values, 10000); + + let array = Int32Array::from(data.clone()); + let compressed = BlockCompressor::compress(&encoder, DataBlock::from_array(array)).unwrap(); + let decompressed = + BlockDecompressor::decompress(&decompressor, compressed, total_values as u64).unwrap(); + + match decompressed { + DataBlock::FixedWidth(block) => { + let values: &[i32] = bytemuck::cast_slice(block.data.as_ref()); + assert_eq!(values.len(), total_values); + assert_eq!(values, &data[..]); + } + _ => panic!("Expected FixedWidth block"), + } + } } From fed390cf93da32ed6dfbb1221d2393bf857dabdd Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 22 Jan 2026 19:56:21 +0800 Subject: [PATCH 2/4] lance-encoding: harden RLE decoding and unify thresholds --- rust/lance-encoding/src/compression.rs | 74 ++++++++----- .../src/encodings/physical/rle.rs | 101 +++++++++++++----- 2 files changed, 123 insertions(+), 52 deletions(-) diff --git a/rust/lance-encoding/src/compression.rs b/rust/lance-encoding/src/compression.rs index bfee2c509ed..79039d07fcc 100644 --- a/rust/lance-encoding/src/compression.rs +++ b/rust/lance-encoding/src/compression.rs @@ -139,7 +139,6 @@ pub struct DefaultCompressionStrategy { /// User-configured compression parameters params: CompressionParams, /// The lance file version for compatibilities. - // LanceFile Version version: LanceFileVersion, } @@ -217,6 +216,7 @@ fn try_rle_for_mini_block( fn try_rle_for_block( data: &FixedWidthDataBlock, version: LanceFileVersion, + params: &CompressionFieldParams, ) -> Option<(Box, CompressiveEncoding)> { if version < LanceFileVersion::V2_2 { return None; @@ -228,8 +228,9 @@ fn try_rle_for_block( } let run_count = data.expect_single_stat::(Stat::RunCount); - // TODO: Make this configurable - let threshold = DEFAULT_RLE_COMPRESSION_THRESHOLD; + let threshold = params + .rle_threshold + .unwrap_or(DEFAULT_RLE_COMPRESSION_THRESHOLD); if (run_count as f64) < (data.num_values as f64) * threshold { let compressor = Box::new(RleEncoder::new()); @@ -670,7 +671,9 @@ impl CompressionStrategy for DefaultCompressionStrategy { match data { DataBlock::FixedWidth(fixed_width) => { - if let Some((compressor, encoding)) = try_rle_for_block(fixed_width, self.version) { + if let Some((compressor, encoding)) = + try_rle_for_block(fixed_width, self.version, &field_params) + { return Ok((compressor, encoding)); } if let Some((compressor, encoding)) = try_bitpack_for_block(fixed_width) { @@ -822,7 +825,7 @@ impl DecompressionStrategy for DefaultDecompressionStrategy { Ok(Box::new(ValueDecompressor::from_fsl(fsl))) } Compression::Rle(rle) => { - let bits_per_value = validate_rle_compression(rle); + let bits_per_value = validate_rle_compression(rle)?; Ok(Box::new(RleDecompressor::new(bits_per_value))) } Compression::ByteStreamSplit(bss) => { @@ -1027,7 +1030,7 @@ impl DecompressionStrategy for DefaultDecompressionStrategy { Ok(Box::new(general_decompressor)) } Compression::Rle(rle) => { - let bits_per_value = validate_rle_compression(rle); + let bits_per_value = validate_rle_compression(rle)?; Ok(Box::new(RleDecompressor::new(bits_per_value))) } _ => todo!(), @@ -1035,26 +1038,45 @@ impl DecompressionStrategy for DefaultDecompressionStrategy { } } /// Validates RLE compression format and extracts bits_per_value -fn validate_rle_compression(rle: &crate::format::pb21::Rle) -> u64 { - let Compression::Flat(values) = rle.values.as_ref().unwrap().compression.as_ref().unwrap() - else { - panic!("RLE compression only supports flat values") +fn validate_rle_compression(rle: &crate::format::pb21::Rle) -> Result { + let values = rle.values.as_ref().ok_or_else(|| { + Error::invalid_input("RLE compression missing values encoding", location!()) + })?; + let run_lengths = rle.run_lengths.as_ref().ok_or_else(|| { + Error::invalid_input("RLE compression missing run lengths encoding", location!()) + })?; + + let values = values.compression.as_ref().ok_or_else(|| { + Error::invalid_input("RLE compression missing values compression", location!()) + })?; + let Compression::Flat(values) = values else { + return Err(Error::invalid_input( + "RLE compression only supports flat values", + location!(), + )); }; - let Compression::Flat(run_lengths) = rle - .run_lengths - .as_ref() - .unwrap() - .compression - .as_ref() - .unwrap() - else { - panic!("RLE compression only supports flat run lengths") + + let run_lengths = run_lengths.compression.as_ref().ok_or_else(|| { + Error::invalid_input("RLE compression missing run lengths compression", location!()) + })?; + let Compression::Flat(run_lengths) = run_lengths else { + return Err(Error::invalid_input( + "RLE compression only supports flat run lengths", + location!(), + )); }; - assert_eq!( - run_lengths.bits_per_value, 8, - "RLE compression only supports 8-bit run lengths" - ); - values.bits_per_value + + if run_lengths.bits_per_value != 8 { + return Err(Error::invalid_input( + format!( + "RLE compression only supports 8-bit run lengths, got {}", + run_lengths.bits_per_value + ), + location!(), + )); + } + + Ok(values.bits_per_value) } #[cfg(test)] @@ -1704,7 +1726,7 @@ mod tests { } #[test] - fn test_rle_block_not_used_for_version_larger_than_v2_1() { + fn test_rle_block_used_for_version_v2_2() { let field = create_test_field("test_repdef", DataType::UInt16); // Create highly repetitive data @@ -1739,7 +1761,7 @@ mod tests { } #[test] - fn test_rle_block_not_used_for_version_less_than_v2_2() { + fn test_rle_block_not_used_for_version_v2_1() { let field = create_test_field("test_repdef", DataType::UInt16); // Create highly repetitive data diff --git a/rust/lance-encoding/src/encodings/physical/rle.rs b/rust/lance-encoding/src/encodings/physical/rle.rs index 354ec230552..489dcf503a9 100644 --- a/rust/lance-encoding/src/encodings/physical/rle.rs +++ b/rust/lance-encoding/src/encodings/physical/rle.rs @@ -1,9 +1,9 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors -//! # RLE (Run-Length Encoding) Miniblock Format +//! # RLE (Run-Length Encoding) //! -//! RLE compression for Lance miniblock format, optimized for data with repeated values. +//! RLE compression for Lance, optimized for data with repeated values. //! //! ## Encoding Format //! @@ -40,13 +40,19 @@ //! - The run count (number of value transitions) < 50% of total values //! - This indicates sufficient repetition for RLE to be effective //! -//! ## Chunk Handling +//! ## MiniBlock Chunk Handling //! -//! - Maximum chunk size: 4096 values (miniblock constraint) -//! - All chunks share two global buffers (values and lengths) -//! - Each chunk's buffer_sizes indicate its portion of the global buffers -//! - Non-last chunks always contain power-of-2 values -//! - Byte limits are enforced dynamically during encoding +//! When used in the miniblock path, all chunks share two global buffers (values and lengths). +//! Each chunk's `buffer_sizes` identifies its slice within those global buffers. Non-last chunks +//! contain a power-of-2 number of values. +//! +//! NOTE: The current encoder uses a 2048-value cap per chunk as a workaround for +//! https://github.com/lancedb/lance/issues/4429. +//! +//! ## Block Format +//! +//! When used in the block compression path, the encoded output is a single buffer: +//! `[8-byte header: values buffer size][values buffer][run_lengths buffer]`. use arrow_buffer::ArrowNativeType; use log::trace; @@ -432,12 +438,16 @@ impl RleDecompressor { })); } - assert_eq!( - data.len(), - 2, - "RLE decompressor expects exactly 2 buffers, got {}", - data.len() - ); + if data.len() != 2 { + return Err(Error::InvalidInput { + location: location!(), + source: format!( + "RLE decompressor expects exactly 2 buffers, got {}", + data.len() + ) + .into(), + }); + } let values_buffer = &data[0]; let lengths_buffer = &data[1]; @@ -496,11 +506,16 @@ impl RleDecompressor { let num_runs = values_buffer.len() / type_size; let num_length_entries = lengths_buffer.len(); - assert_eq!( - num_runs, num_length_entries, - "Inconsistent RLE buffers: {} runs but {} length entries", - num_runs, num_length_entries - ); + if num_runs != num_length_entries { + return Err(Error::InvalidInput { + location: location!(), + source: format!( + "Inconsistent RLE buffers: {} runs but {} length entries", + num_runs, num_length_entries + ) + .into(), + }); + } let values_ref = values_buffer.borrow_to_typed_slice::(); let values: &[T] = values_ref.as_ref(); @@ -564,12 +579,23 @@ impl BlockDecompressor for RleDecompressor { }); } - let values_size_bytes: [u8; 8] = data[..8].try_into().unwrap(); + let values_size_bytes: [u8; 8] = data[..8] + .try_into() + .expect("slice length already checked"); let values_size: u64 = u64::from_le_bytes(values_size_bytes); // parse values let values_start: usize = 8; - let lengths_start: usize = values_start + values_size as usize; + let values_size: usize = values_size.try_into().map_err(|_| Error::InvalidInput { + location: location!(), + source: format!("Invalid values buffer size: {}", values_size).into(), + })?; + let lengths_start = values_start.checked_add(values_size).ok_or_else(|| { + Error::InvalidInput { + location: location!(), + source: "Invalid RLE values buffer size".into(), + } + })?; if data.len() < lengths_start { return Err(Error::InvalidInput { @@ -578,7 +604,7 @@ impl BlockDecompressor for RleDecompressor { }); } - let values_buffer = data.slice_with_length(values_start, values_size as usize); + let values_buffer = data.slice_with_length(values_start, values_size); let lengths_buffer = data.slice_with_length(lengths_start, data.len() - lengths_start); self.decode_data(vec![values_buffer, lengths_buffer], num_values) @@ -590,6 +616,7 @@ mod tests { use super::*; use crate::data::DataBlock; use crate::encodings::logical::primitive::miniblock::MAX_MINIBLOCK_VALUES; + use crate::{buffer::LanceBuffer, compression::BlockDecompressor}; use arrow_array::Int32Array; // ========== Core Functionality Tests ========== @@ -718,23 +745,31 @@ mod tests { // ========== Error Handling Tests ========== #[test] - #[should_panic(expected = "RLE decompressor expects exactly 2 buffers")] fn test_invalid_buffer_count() { let decompressor = RleDecompressor::new(32); - let _ = MiniBlockDecompressor::decompress( + let result = MiniBlockDecompressor::decompress( &decompressor, vec![LanceBuffer::from(vec![1, 2, 3, 4])], 10, ); + assert!(result.is_err()); + assert!(result + .unwrap_err() + .to_string() + .contains("expects exactly 2 buffers")); } #[test] - #[should_panic(expected = "Inconsistent RLE buffers")] fn test_buffer_consistency() { let decompressor = RleDecompressor::new(32); let values = LanceBuffer::from(vec![1, 0, 0, 0]); // 1 i32 value let lengths = LanceBuffer::from(vec![5, 10]); // 2 lengths - mismatch! - let _ = MiniBlockDecompressor::decompress(&decompressor, vec![values, lengths], 15); + let result = MiniBlockDecompressor::decompress(&decompressor, vec![values, lengths], 15); + assert!(result.is_err()); + assert!(result + .unwrap_err() + .to_string() + .contains("Inconsistent RLE buffers")); } #[test] @@ -1119,6 +1154,20 @@ mod tests { } // ========== Block Related tests ========== + #[test] + fn test_block_decompressor_rejects_overflowing_values_size() { + let decompressor = RleDecompressor::new(32); + + let mut data = Vec::new(); + data.extend_from_slice(&u64::MAX.to_le_bytes()); + let result = BlockDecompressor::decompress(&decompressor, LanceBuffer::from(data), 1); + assert!(result.is_err()); + assert!(result + .unwrap_err() + .to_string() + .contains("Invalid RLE values buffer size")); + } + #[test] fn test_block_decompressor_too_small() { let decompressor = RleDecompressor::new(32); From bea9a11255d59a1ad741ad3f61f4ade7f95faebd Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 28 Jan 2026 10:46:27 +0800 Subject: [PATCH 3/4] ci: fix rustdoc/rustfmt and deflake RSS leak test --- python/python/tests/test_memory_leaks.py | 21 +++++++++++++++++++ rust/lance-encoding/src/compression.rs | 5 ++++- .../src/encodings/physical/rle.rs | 20 +++++++++--------- 3 files changed, 35 insertions(+), 11 deletions(-) diff --git a/python/python/tests/test_memory_leaks.py b/python/python/tests/test_memory_leaks.py index 9a0d8356882..f8fedfada9a 100644 --- a/python/python/tests/test_memory_leaks.py +++ b/python/python/tests/test_memory_leaks.py @@ -5,6 +5,7 @@ import gc import os +import sys from typing import Callable import lance @@ -18,6 +19,23 @@ def get_memory_usage() -> int: return psutil.Process(os.getpid()).memory_info().rss +def maybe_trim_allocator() -> None: + # The leak checks in this file use RSS, which can include allocator arena growth. + # On Linux/glibc, malloc_trim helps return freed heap memory back to the OS and + # reduces test flakiness while still catching true leaks. + if sys.platform != "linux": + return + try: + import ctypes + + libc = ctypes.CDLL("libc.so.6") + trim = getattr(libc, "malloc_trim", None) + if trim is not None: + trim(0) + except Exception: + return + + def assert_noleaks( operation: Callable[[], None], *, @@ -48,6 +66,7 @@ def assert_noleaks( for _ in range(warmup_iterations): operation() gc.collect() + maybe_trim_allocator() baseline = get_memory_usage() @@ -56,6 +75,7 @@ def assert_noleaks( if i > 0 and i % check_interval == 0: gc.collect() + maybe_trim_allocator() current = get_memory_usage() growth_mb = (current - baseline) / MiB if growth_mb > threshold_mb * leeway_factor: @@ -66,6 +86,7 @@ def assert_noleaks( ) gc.collect() + maybe_trim_allocator() final = get_memory_usage() total_mb = (final - baseline) / MiB diff --git a/rust/lance-encoding/src/compression.rs b/rust/lance-encoding/src/compression.rs index 79039d07fcc..24c2c71d654 100644 --- a/rust/lance-encoding/src/compression.rs +++ b/rust/lance-encoding/src/compression.rs @@ -1057,7 +1057,10 @@ fn validate_rle_compression(rle: &crate::format::pb21::Rle) -> Result { }; let run_lengths = run_lengths.compression.as_ref().ok_or_else(|| { - Error::invalid_input("RLE compression missing run lengths compression", location!()) + Error::invalid_input( + "RLE compression missing run lengths compression", + location!(), + ) })?; let Compression::Flat(run_lengths) = run_lengths else { return Err(Error::invalid_input( diff --git a/rust/lance-encoding/src/encodings/physical/rle.rs b/rust/lance-encoding/src/encodings/physical/rle.rs index 489dcf503a9..e911695a521 100644 --- a/rust/lance-encoding/src/encodings/physical/rle.rs +++ b/rust/lance-encoding/src/encodings/physical/rle.rs @@ -47,7 +47,7 @@ //! contain a power-of-2 number of values. //! //! NOTE: The current encoder uses a 2048-value cap per chunk as a workaround for -//! https://github.com/lancedb/lance/issues/4429. +//! . //! //! ## Block Format //! @@ -579,9 +579,8 @@ impl BlockDecompressor for RleDecompressor { }); } - let values_size_bytes: [u8; 8] = data[..8] - .try_into() - .expect("slice length already checked"); + let values_size_bytes: [u8; 8] = + data[..8].try_into().expect("slice length already checked"); let values_size: u64 = u64::from_le_bytes(values_size_bytes); // parse values @@ -590,12 +589,13 @@ impl BlockDecompressor for RleDecompressor { location: location!(), source: format!("Invalid values buffer size: {}", values_size).into(), })?; - let lengths_start = values_start.checked_add(values_size).ok_or_else(|| { - Error::InvalidInput { - location: location!(), - source: "Invalid RLE values buffer size".into(), - } - })?; + let lengths_start = + values_start + .checked_add(values_size) + .ok_or_else(|| Error::InvalidInput { + location: location!(), + source: "Invalid RLE values buffer size".into(), + })?; if data.len() < lengths_start { return Err(Error::InvalidInput { From 049f9d15e2eeea52f441cbd61b951cc78f3a736f Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 28 Jan 2026 11:05:42 +0800 Subject: [PATCH 4/4] Revert deflaking memory leak test --- python/python/tests/test_memory_leaks.py | 21 --------------------- 1 file changed, 21 deletions(-) diff --git a/python/python/tests/test_memory_leaks.py b/python/python/tests/test_memory_leaks.py index f8fedfada9a..9a0d8356882 100644 --- a/python/python/tests/test_memory_leaks.py +++ b/python/python/tests/test_memory_leaks.py @@ -5,7 +5,6 @@ import gc import os -import sys from typing import Callable import lance @@ -19,23 +18,6 @@ def get_memory_usage() -> int: return psutil.Process(os.getpid()).memory_info().rss -def maybe_trim_allocator() -> None: - # The leak checks in this file use RSS, which can include allocator arena growth. - # On Linux/glibc, malloc_trim helps return freed heap memory back to the OS and - # reduces test flakiness while still catching true leaks. - if sys.platform != "linux": - return - try: - import ctypes - - libc = ctypes.CDLL("libc.so.6") - trim = getattr(libc, "malloc_trim", None) - if trim is not None: - trim(0) - except Exception: - return - - def assert_noleaks( operation: Callable[[], None], *, @@ -66,7 +48,6 @@ def assert_noleaks( for _ in range(warmup_iterations): operation() gc.collect() - maybe_trim_allocator() baseline = get_memory_usage() @@ -75,7 +56,6 @@ def assert_noleaks( if i > 0 and i % check_interval == 0: gc.collect() - maybe_trim_allocator() current = get_memory_usage() growth_mb = (current - baseline) / MiB if growth_mb > threshold_mb * leeway_factor: @@ -86,7 +66,6 @@ def assert_noleaks( ) gc.collect() - maybe_trim_allocator() final = get_memory_usage() total_mb = (final - baseline) / MiB