diff --git a/rust/lance-encoding/src/compression.rs b/rust/lance-encoding/src/compression.rs index f65b7bb2025..1e7ca8a442d 100644 --- a/rust/lance-encoding/src/compression.rs +++ b/rust/lance-encoding/src/compression.rs @@ -474,7 +474,7 @@ impl CompressionStrategy for DefaultCompressionStrategy { field: &Field, data: &DataBlock, ) -> Result> { - let field_params = Self::parse_field_metadata(field); + let field_params = self.get_merged_field_params(field); match data { DataBlock::FixedWidth(_) => Ok(Box::new(ValueEncoder::default())), @@ -506,6 +506,11 @@ impl CompressionStrategy for DefaultCompressionStrategy { } } DataBlock::VariableWidth(variable_width) => { + // Check for explicit "none" compression + if field_params.compression.as_deref() == Some("none") { + return Ok(Box::new(VariableEncoder::default())); + } + let max_len = variable_width.expect_single_stat::(Stat::MaxLength); let data_size = variable_width.expect_single_stat::(Stat::DataSize); @@ -515,7 +520,7 @@ impl CompressionStrategy for DefaultCompressionStrategy { let per_value_requested = if let Some(compression) = field_params.compression.as_deref() { - compression != "none" && compression != "fsst" + compression != "fsst" } else { false }; @@ -947,6 +952,7 @@ mod tests { use super::*; use crate::buffer::LanceBuffer; use crate::data::{BlockInfo, DataBlock, FixedWidthDataBlock}; + use crate::testing::extract_array_encoding_chain; use arrow_schema::{DataType, Field as ArrowField}; use std::collections::HashMap; @@ -1026,6 +1032,67 @@ mod tests { DataBlock::FixedWidth(block) } + fn create_variable_width_block( + bits_per_offset: u8, + num_values: u64, + avg_value_size: usize, + ) -> DataBlock { + use crate::statistics::ComputeStat; + + // Create offsets buffer (num_values + 1 offsets) + let mut offsets = Vec::with_capacity((num_values + 1) as usize); + let mut current_offset = 0i64; + offsets.push(current_offset); + + // Generate offsets with varying value sizes + for i in 0..num_values { + let value_size = if avg_value_size == 0 { + 1 + } else { + ((avg_value_size as i64 + (i as i64 % 8) - 4).max(1) as usize) + .min(avg_value_size * 2) + }; + current_offset += value_size as i64; + offsets.push(current_offset); + } + + // Create data buffer with realistic content + let total_data_size = current_offset as usize; + let mut data = vec![0u8; total_data_size]; + + // Fill data with varied content + for i in 0..num_values { + let start_offset = offsets[i as usize] as usize; + let end_offset = offsets[(i + 1) as usize] as usize; + + let content = (i % 256) as u8; + for j in 0..end_offset - start_offset { + data[start_offset + j] = content.wrapping_add(j as u8); + } + } + + // Convert offsets to appropriate lance buffer + let offsets_buffer = match bits_per_offset { + 32 => { + let offsets_32: Vec = offsets.iter().map(|&o| o as i32).collect(); + LanceBuffer::reinterpret_vec(offsets_32) + } + 64 => LanceBuffer::reinterpret_vec(offsets), + _ => panic!("Unsupported bits_per_offset: {}", bits_per_offset), + }; + + let mut block = VariableWidthBlock { + data: LanceBuffer::from(data), + offsets: offsets_buffer, + bits_per_offset, + num_values, + block_info: BlockInfo::default(), + }; + + block.compute_stat(); + DataBlock::VariableWidth(block) + } + #[test] fn test_parameter_based_compression() { let mut params = CompressionParams::new(); @@ -1082,6 +1149,18 @@ mod tests { assert!(format!("{:?}", compressor).contains("RleMiniBlockEncoder")); } + fn check_uncompressed_encoding(encoding: &CompressiveEncoding, variable: bool) { + let chain = extract_array_encoding_chain(encoding); + if variable { + assert_eq!(chain.len(), 2); + assert_eq!(chain.first().unwrap().as_str(), "variable"); + assert_eq!(chain.get(1).unwrap().as_str(), "flat"); + } else { + assert_eq!(chain.len(), 1); + assert_eq!(chain.first().unwrap().as_str(), "flat"); + } + } + #[test] fn test_none_compression() { let mut params = CompressionParams::new(); @@ -1097,11 +1176,65 @@ mod tests { let strategy = DefaultCompressionStrategy::with_params(params); let field = create_test_field("embeddings", DataType::Float32); - let data = create_fixed_width_block(32, 1000); + let fixed_data = create_fixed_width_block(32, 1000); + let variable_data = create_variable_width_block(32, 10, 32 * 1024); + + // Test miniblock + let compressor = strategy + .create_miniblock_compressor(&field, &fixed_data) + .unwrap(); + let (_block, encoding) = compressor.compress(fixed_data.clone()).unwrap(); + check_uncompressed_encoding(&encoding, false); + let compressor = strategy + .create_miniblock_compressor(&field, &variable_data) + .unwrap(); + let (_block, encoding) = compressor.compress(variable_data.clone()).unwrap(); + check_uncompressed_encoding(&encoding, true); + + // Test pervalue + let compressor = strategy.create_per_value(&field, &fixed_data).unwrap(); + let (_block, encoding) = compressor.compress(fixed_data).unwrap(); + check_uncompressed_encoding(&encoding, false); + let compressor = strategy.create_per_value(&field, &variable_data).unwrap(); + let (_block, encoding) = compressor.compress(variable_data).unwrap(); + check_uncompressed_encoding(&encoding, true); + } - let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap(); - // Should use ValueEncoder (no compression) - assert!(format!("{:?}", compressor).contains("ValueEncoder")); + #[test] + fn test_field_metadata_none_compression() { + // Prepare field with metadata for none compression + let mut arrow_field = ArrowField::new("simple_col", DataType::Binary, true); + let mut metadata = HashMap::new(); + metadata.insert(COMPRESSION_META_KEY.to_string(), "none".to_string()); + arrow_field = arrow_field.with_metadata(metadata); + let field = Field::try_from(&arrow_field).unwrap(); + + let strategy = DefaultCompressionStrategy::with_params(CompressionParams::new()); + + // Test miniblock + let fixed_data = create_fixed_width_block(32, 1000); + let variable_data = create_variable_width_block(32, 10, 32 * 1024); + + let compressor = strategy + .create_miniblock_compressor(&field, &fixed_data) + .unwrap(); + let (_block, encoding) = compressor.compress(fixed_data.clone()).unwrap(); + check_uncompressed_encoding(&encoding, false); + + let compressor = strategy + .create_miniblock_compressor(&field, &variable_data) + .unwrap(); + let (_block, encoding) = compressor.compress(variable_data.clone()).unwrap(); + check_uncompressed_encoding(&encoding, true); + + // Test pervalue + let compressor = strategy.create_per_value(&field, &fixed_data).unwrap(); + let (_block, encoding) = compressor.compress(fixed_data).unwrap(); + check_uncompressed_encoding(&encoding, false); + + let compressor = strategy.create_per_value(&field, &variable_data).unwrap(); + let (_block, encoding) = compressor.compress(variable_data).unwrap(); + check_uncompressed_encoding(&encoding, true); } #[test]