Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 168 additions & 37 deletions rust/lance-encoding/src/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use crate::{
PackedStructVariablePerValueEncoder, VariablePackedStructFieldDecoder,
VariablePackedStructFieldKind,
},
rle::{RleMiniBlockDecompressor, RleMiniBlockEncoder},
rle::{RleDecompressor, RleEncoder},
value::{ValueDecompressor, ValueEncoder},
},
},
Expand Down Expand Up @@ -194,7 +194,7 @@ fn try_rle_for_mini_block(
// multiple entries of up to 255 values. We don't know the run length distribution here,
// so we conservatively account for splitting with an upper bound.
let num_values = data.num_values;
let estimated_pairs = (run_count + (num_values / 255)).min(num_values);
let estimated_pairs = (run_count.saturating_add(num_values / 255)).min(num_values);

let raw_bytes = (num_values as u128) * (type_size as u128);
let rle_bytes = (estimated_pairs as u128) * ((type_size + 1) as u128);
Expand All @@ -208,7 +208,37 @@ fn try_rle_for_mini_block(
}
}
}
return Some(Box::new(RleMiniBlockEncoder::new()));
return Some(Box::new(RleEncoder::new()));
}
None
}

fn try_rle_for_block(
data: &FixedWidthDataBlock,
version: LanceFileVersion,
params: &CompressionFieldParams,
) -> Option<(Box<dyn BlockCompressor>, CompressiveEncoding)> {
if version < LanceFileVersion::V2_2 {
return None;
}

let bits = data.bits_per_value;
if !matches!(bits, 8 | 16 | 32 | 64) {
return None;
}

let run_count = data.expect_single_stat::<UInt64Type>(Stat::RunCount);
let threshold = params
.rle_threshold
.unwrap_or(DEFAULT_RLE_COMPRESSION_THRESHOLD);

if (run_count as f64) < (data.num_values as f64) * threshold {
let compressor = Box::new(RleEncoder::new());
let encoding = ProtobufUtils21::rle(
ProtobufUtils21::flat(bits, None),
ProtobufUtils21::flat(/*bits_per_value=*/ 8, None),
);
return Some((compressor, encoding));
}
None
}
Expand Down Expand Up @@ -304,10 +334,7 @@ fn maybe_wrap_general_for_mini_block(
None | Some("none") | Some("fsst") => Ok(inner),
Some(raw) => {
let scheme = CompressionScheme::from_str(raw).map_err(|_| {
lance_core::Error::invalid_input(
format!("Unknown compression scheme: {raw}"),
location!(),
)
Error::invalid_input(format!("Unknown compression scheme: {raw}"), location!())
})?;
let cfg = CompressionConfig::new(scheme, params.compression_level);
Ok(Box::new(GeneralMiniBlockCompressor::new(inner, cfg)))
Expand Down Expand Up @@ -644,6 +671,11 @@ impl CompressionStrategy for DefaultCompressionStrategy {

match data {
DataBlock::FixedWidth(fixed_width) => {
if let Some((compressor, encoding)) =
try_rle_for_block(fixed_width, self.version, &field_params)
{
return Ok((compressor, encoding));
}
if let Some((compressor, encoding)) = try_bitpack_for_block(fixed_width) {
return Ok((compressor, encoding));
}
Expand Down Expand Up @@ -793,28 +825,8 @@ impl DecompressionStrategy for DefaultDecompressionStrategy {
Ok(Box::new(ValueDecompressor::from_fsl(fsl)))
}
Compression::Rle(rle) => {
let Compression::Flat(values) =
rle.values.as_ref().unwrap().compression.as_ref().unwrap()
else {
panic!("RLE compression only supports flat values")
};
let Compression::Flat(run_lengths) = rle
.run_lengths
.as_ref()
.unwrap()
.compression
.as_ref()
.unwrap()
else {
panic!("RLE compression only supports flat run lengths")
};
assert_eq!(
run_lengths.bits_per_value, 8,
"RLE compression only supports 8-bit run lengths"
);
Ok(Box::new(RleMiniBlockDecompressor::new(
values.bits_per_value,
)))
let bits_per_value = validate_rle_compression(rle)?;
Ok(Box::new(RleDecompressor::new(bits_per_value)))
}
Compression::ByteStreamSplit(bss) => {
let Compression::Flat(values) =
Expand Down Expand Up @@ -842,10 +854,7 @@ impl DecompressionStrategy for DefaultDecompressionStrategy {

let scheme = compression.scheme().try_into()?;

let compression_config = crate::encodings::physical::block::CompressionConfig::new(
scheme,
compression.level,
);
let compression_config = CompressionConfig::new(scheme, compression.level);

Ok(Box::new(GeneralMiniBlockDecompressor::new(
inner_decompressor,
Expand Down Expand Up @@ -1020,16 +1029,65 @@ impl DecompressionStrategy for DefaultDecompressionStrategy {

Ok(Box::new(general_decompressor))
}
Compression::Rle(rle) => {
let bits_per_value = validate_rle_compression(rle)?;
Ok(Box::new(RleDecompressor::new(bits_per_value)))
}
_ => todo!(),
}
}
}
/// Validates RLE compression format and extracts bits_per_value
fn validate_rle_compression(rle: &crate::format::pb21::Rle) -> Result<u64> {
let values = rle.values.as_ref().ok_or_else(|| {
Error::invalid_input("RLE compression missing values encoding", location!())
})?;
let run_lengths = rle.run_lengths.as_ref().ok_or_else(|| {
Error::invalid_input("RLE compression missing run lengths encoding", location!())
})?;

let values = values.compression.as_ref().ok_or_else(|| {
Error::invalid_input("RLE compression missing values compression", location!())
})?;
let Compression::Flat(values) = values else {
return Err(Error::invalid_input(
"RLE compression only supports flat values",
location!(),
));
};

let run_lengths = run_lengths.compression.as_ref().ok_or_else(|| {
Error::invalid_input(
"RLE compression missing run lengths compression",
location!(),
)
})?;
let Compression::Flat(run_lengths) = run_lengths else {
return Err(Error::invalid_input(
"RLE compression only supports flat run lengths",
location!(),
));
};

if run_lengths.bits_per_value != 8 {
return Err(Error::invalid_input(
format!(
"RLE compression only supports 8-bit run lengths, got {}",
run_lengths.bits_per_value
),
location!(),
));
}

Ok(values.bits_per_value)
}

#[cfg(test)]
mod tests {
use super::*;
use crate::buffer::LanceBuffer;
use crate::data::{BlockInfo, DataBlock, FixedWidthDataBlock};
use crate::statistics::ComputeStat;
use crate::testing::extract_array_encoding_chain;
use arrow_schema::{DataType, Field as ArrowField};
use std::collections::HashMap;
Expand Down Expand Up @@ -1200,7 +1258,7 @@ mod tests {

// The compressor should be RLE wrapped in general compression
assert!(debug_str.contains("GeneralMiniBlockCompressor"));
assert!(debug_str.contains("RleMiniBlockEncoder"));
assert!(debug_str.contains("RleEncoder"));
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's worth to add an end-to-end test for RLE on block.


#[test]
Expand All @@ -1226,7 +1284,7 @@ mod tests {

let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
// Should use RLE due to very low threshold
assert!(format!("{:?}", compressor).contains("RleMiniBlockEncoder"));
assert!(format!("{:?}", compressor).contains("RleEncoder"));
}

#[test]
Expand Down Expand Up @@ -1265,7 +1323,7 @@ mod tests {
"expected InlineBitpacking, got: {debug_str}"
);
assert!(
!debug_str.contains("RleMiniBlockEncoder"),
!debug_str.contains("RleEncoder"),
"expected RLE to be skipped when bitpacking is smaller, got: {debug_str}"
);
}
Expand Down Expand Up @@ -1509,7 +1567,7 @@ mod tests {

// Should use RLE because run_count (100) < num_values * threshold (800)
let debug_str = format!("{:?}", compressor);
assert!(debug_str.contains("RleMiniBlockEncoder"));
assert!(debug_str.contains("RleEncoder"));
}

#[test]
Expand Down Expand Up @@ -1669,4 +1727,77 @@ mod tests {
_ => panic!("expected fixed width block"),
}
}

#[test]
fn test_rle_block_used_for_version_v2_2() {
let field = create_test_field("test_repdef", DataType::UInt16);

// Create highly repetitive data
let num_values = 1000u64;
let mut data = Vec::with_capacity(num_values as usize);
for i in 0..10 {
for _ in 0..100 {
data.push(i as u16);
}
}

let mut block = FixedWidthDataBlock {
bits_per_value: 16,
data: LanceBuffer::reinterpret_vec(data),
num_values,
block_info: BlockInfo::default(),
};

block.compute_stat();

let data_block = DataBlock::FixedWidth(block);

let strategy = DefaultCompressionStrategy::with_params(CompressionParams::new())
.with_version(LanceFileVersion::V2_2);

let (compressor, _) = strategy
.create_block_compressor(&field, &data_block)
.unwrap();

let debug_str = format!("{:?}", compressor);
assert!(debug_str.contains("RleEncoder"));
}

#[test]
fn test_rle_block_not_used_for_version_v2_1() {
let field = create_test_field("test_repdef", DataType::UInt16);

// Create highly repetitive data
let num_values = 1000u64;
let mut data = Vec::with_capacity(num_values as usize);
for i in 0..10 {
for _ in 0..100 {
data.push(i as u16);
}
}

let mut block = FixedWidthDataBlock {
bits_per_value: 16,
data: LanceBuffer::reinterpret_vec(data),
num_values,
block_info: BlockInfo::default(),
};

block.compute_stat();

let data_block = DataBlock::FixedWidth(block);

let strategy = DefaultCompressionStrategy::with_params(CompressionParams::new())
.with_version(LanceFileVersion::V2_1);

let (compressor, _) = strategy
.create_block_compressor(&field, &data_block)
.unwrap();

let debug_str = format!("{:?}", compressor);
assert!(
!debug_str.contains("RleEncoder"),
"RLE should not be used for V2.1"
);
}
}
18 changes: 9 additions & 9 deletions rust/lance-encoding/src/encodings/physical/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ mod tests {
use crate::compression::{DecompressionStrategy, DefaultDecompressionStrategy};
use crate::data::{BlockInfo, FixedWidthDataBlock};
use crate::encodings::physical::block::CompressionScheme;
use crate::encodings::physical::rle::RleMiniBlockEncoder;
use crate::encodings::physical::rle::RleEncoder;
use crate::encodings::physical::value::ValueEncoder;
use crate::format::pb21;
use crate::format::pb21::compressive_encoding::Compression;
Expand All @@ -161,7 +161,7 @@ mod tests {
// Small data with RLE - should not compress due to size threshold
TestCase {
name: "small_rle_data",
inner_encoder: Box::new(RleMiniBlockEncoder),
inner_encoder: Box::new(RleEncoder),
compression: CompressionConfig {
scheme: CompressionScheme::Lz4,
level: None,
Expand All @@ -173,7 +173,7 @@ mod tests {
// Large repeated data with RLE + LZ4
TestCase {
name: "large_rle_lz4",
inner_encoder: Box::new(RleMiniBlockEncoder),
inner_encoder: Box::new(RleEncoder),
compression: CompressionConfig {
scheme: CompressionScheme::Lz4,
level: None,
Expand All @@ -185,7 +185,7 @@ mod tests {
// Large repeated data with RLE + Zstd
TestCase {
name: "large_rle_zstd",
inner_encoder: Box::new(RleMiniBlockEncoder),
inner_encoder: Box::new(RleEncoder),
compression: CompressionConfig {
scheme: CompressionScheme::Zstd,
level: Some(3),
Expand Down Expand Up @@ -403,7 +403,7 @@ mod tests {
// Test that small buffers don't get compressed
let small_test = TestCase {
name: "small_buffer_no_compression",
inner_encoder: Box::new(RleMiniBlockEncoder),
inner_encoder: Box::new(RleEncoder),
compression: CompressionConfig {
scheme: CompressionScheme::Lz4,
level: None,
Expand Down Expand Up @@ -496,7 +496,7 @@ mod tests {
// RLE produces 2 buffers (values and lengths), test that both are handled correctly
let data = create_repeated_i32_block(vec![1; 100]);
let compressor = GeneralMiniBlockCompressor::new(
Box::new(RleMiniBlockEncoder),
Box::new(RleEncoder),
CompressionConfig {
scheme: CompressionScheme::Lz4,
level: None,
Expand All @@ -519,7 +519,7 @@ mod tests {
// Test case 1: 32-bit RLE data
let test_32 = TestCase {
name: "rle_32bit_with_general_wrapper",
inner_encoder: Box::new(RleMiniBlockEncoder),
inner_encoder: Box::new(RleEncoder),
compression: CompressionConfig {
scheme: CompressionScheme::Lz4,
level: None,
Expand All @@ -532,7 +532,7 @@ mod tests {
// For 32-bit RLE, the compression strategy should automatically wrap it
// Let's directly test the compressor
let compressor = GeneralMiniBlockCompressor::new(
Box::new(RleMiniBlockEncoder),
Box::new(RleEncoder),
CompressionConfig {
scheme: CompressionScheme::Lz4,
level: None,
Expand Down Expand Up @@ -589,7 +589,7 @@ mod tests {
let block_64 = DataBlock::from_array(array_64);

let compressor_64 = GeneralMiniBlockCompressor::new(
Box::new(RleMiniBlockEncoder),
Box::new(RleEncoder),
CompressionConfig {
scheme: CompressionScheme::Lz4,
level: None,
Expand Down
Loading