Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 139 additions & 6 deletions rust/lance-encoding/src/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ impl CompressionStrategy for DefaultCompressionStrategy {
field: &Field,
data: &DataBlock,
) -> Result<Box<dyn PerValueCompressor>> {
let field_params = Self::parse_field_metadata(field);
let field_params = self.get_merged_field_params(field);

match data {
DataBlock::FixedWidth(_) => Ok(Box::new(ValueEncoder::default())),
Expand Down Expand Up @@ -506,6 +506,11 @@ impl CompressionStrategy for DefaultCompressionStrategy {
}
}
DataBlock::VariableWidth(variable_width) => {
// Check for explicit "none" compression
if field_params.compression.as_deref() == Some("none") {
return Ok(Box::new(VariableEncoder::default()));
}

let max_len = variable_width.expect_single_stat::<UInt64Type>(Stat::MaxLength);
let data_size = variable_width.expect_single_stat::<UInt64Type>(Stat::DataSize);

Expand All @@ -515,7 +520,7 @@ impl CompressionStrategy for DefaultCompressionStrategy {

let per_value_requested =
if let Some(compression) = field_params.compression.as_deref() {
compression != "none" && compression != "fsst"
compression != "fsst"
} else {
false
};
Expand Down Expand Up @@ -947,6 +952,7 @@ mod tests {
use super::*;
use crate::buffer::LanceBuffer;
use crate::data::{BlockInfo, DataBlock, FixedWidthDataBlock};
use crate::testing::extract_array_encoding_chain;
use arrow_schema::{DataType, Field as ArrowField};
use std::collections::HashMap;

Expand Down Expand Up @@ -1026,6 +1032,67 @@ mod tests {
DataBlock::FixedWidth(block)
}

fn create_variable_width_block(
bits_per_offset: u8,
num_values: u64,
avg_value_size: usize,
) -> DataBlock {
use crate::statistics::ComputeStat;

// Create offsets buffer (num_values + 1 offsets)
let mut offsets = Vec::with_capacity((num_values + 1) as usize);
let mut current_offset = 0i64;
offsets.push(current_offset);

// Generate offsets with varying value sizes
for i in 0..num_values {
let value_size = if avg_value_size == 0 {
1
} else {
((avg_value_size as i64 + (i as i64 % 8) - 4).max(1) as usize)
.min(avg_value_size * 2)
};
current_offset += value_size as i64;
offsets.push(current_offset);
}

// Create data buffer with realistic content
let total_data_size = current_offset as usize;
let mut data = vec![0u8; total_data_size];

// Fill data with varied content
for i in 0..num_values {
let start_offset = offsets[i as usize] as usize;
let end_offset = offsets[(i + 1) as usize] as usize;

let content = (i % 256) as u8;
for j in 0..end_offset - start_offset {
data[start_offset + j] = content.wrapping_add(j as u8);
}
}

// Convert offsets to appropriate lance buffer
let offsets_buffer = match bits_per_offset {
32 => {
let offsets_32: Vec<i32> = offsets.iter().map(|&o| o as i32).collect();
LanceBuffer::reinterpret_vec(offsets_32)
}
64 => LanceBuffer::reinterpret_vec(offsets),
_ => panic!("Unsupported bits_per_offset: {}", bits_per_offset),
};

let mut block = VariableWidthBlock {
data: LanceBuffer::from(data),
offsets: offsets_buffer,
bits_per_offset,
num_values,
block_info: BlockInfo::default(),
};

block.compute_stat();
DataBlock::VariableWidth(block)
}

#[test]
fn test_parameter_based_compression() {
let mut params = CompressionParams::new();
Expand Down Expand Up @@ -1082,6 +1149,18 @@ mod tests {
assert!(format!("{:?}", compressor).contains("RleMiniBlockEncoder"));
}

fn check_uncompressed_encoding(encoding: &CompressiveEncoding, variable: bool) {
let chain = extract_array_encoding_chain(encoding);
if variable {
assert_eq!(chain.len(), 2);
assert_eq!(chain.first().unwrap().as_str(), "variable");
assert_eq!(chain.get(1).unwrap().as_str(), "flat");
} else {
assert_eq!(chain.len(), 1);
assert_eq!(chain.first().unwrap().as_str(), "flat");
}
}

#[test]
fn test_none_compression() {
let mut params = CompressionParams::new();
Expand All @@ -1097,11 +1176,65 @@ mod tests {

let strategy = DefaultCompressionStrategy::with_params(params);
let field = create_test_field("embeddings", DataType::Float32);
let data = create_fixed_width_block(32, 1000);
let fixed_data = create_fixed_width_block(32, 1000);
let variable_data = create_variable_width_block(32, 10, 32 * 1024);

// Test miniblock
let compressor = strategy
.create_miniblock_compressor(&field, &fixed_data)
.unwrap();
let (_block, encoding) = compressor.compress(fixed_data.clone()).unwrap();
check_uncompressed_encoding(&encoding, false);
let compressor = strategy
.create_miniblock_compressor(&field, &variable_data)
.unwrap();
let (_block, encoding) = compressor.compress(variable_data.clone()).unwrap();
check_uncompressed_encoding(&encoding, true);

// Test pervalue
let compressor = strategy.create_per_value(&field, &fixed_data).unwrap();
let (_block, encoding) = compressor.compress(fixed_data).unwrap();
check_uncompressed_encoding(&encoding, false);
let compressor = strategy.create_per_value(&field, &variable_data).unwrap();
let (_block, encoding) = compressor.compress(variable_data).unwrap();
check_uncompressed_encoding(&encoding, true);
}

let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
// Should use ValueEncoder (no compression)
assert!(format!("{:?}", compressor).contains("ValueEncoder"));
#[test]
fn test_field_metadata_none_compression() {
// Prepare field with metadata for none compression
let mut arrow_field = ArrowField::new("simple_col", DataType::Binary, true);
let mut metadata = HashMap::new();
metadata.insert(COMPRESSION_META_KEY.to_string(), "none".to_string());
arrow_field = arrow_field.with_metadata(metadata);
let field = Field::try_from(&arrow_field).unwrap();

let strategy = DefaultCompressionStrategy::with_params(CompressionParams::new());

// Test miniblock
let fixed_data = create_fixed_width_block(32, 1000);
let variable_data = create_variable_width_block(32, 10, 32 * 1024);

let compressor = strategy
.create_miniblock_compressor(&field, &fixed_data)
.unwrap();
let (_block, encoding) = compressor.compress(fixed_data.clone()).unwrap();
check_uncompressed_encoding(&encoding, false);

let compressor = strategy
.create_miniblock_compressor(&field, &variable_data)
.unwrap();
let (_block, encoding) = compressor.compress(variable_data.clone()).unwrap();
check_uncompressed_encoding(&encoding, true);

// Test pervalue
let compressor = strategy.create_per_value(&field, &fixed_data).unwrap();
let (_block, encoding) = compressor.compress(fixed_data).unwrap();
check_uncompressed_encoding(&encoding, false);

let compressor = strategy.create_per_value(&field, &variable_data).unwrap();
let (_block, encoding) = compressor.compress(variable_data).unwrap();
check_uncompressed_encoding(&encoding, true);
}

#[test]
Expand Down