diff --git a/rust/lance-encoding/src/encodings/logical/primitive.rs b/rust/lance-encoding/src/encodings/logical/primitive.rs index 50f3453484c..3eb424230e8 100644 --- a/rust/lance-encoding/src/encodings/logical/primitive.rs +++ b/rust/lance-encoding/src/encodings/logical/primitive.rs @@ -4477,7 +4477,6 @@ impl PrimitiveStructuralEncoder { /// /// For VariableWidth (strings/binary): /// - Dictionary values: cardinality × avg_value_size (actual data) - /// - Dictionary offsets: cardinality × offset_size (32 or 64 bits) /// - Indices: num_values × 4 bytes (32-bit i32) fn estimate_dict_size(data_block: &DataBlock, version: LanceFileVersion) -> Option { let cardinality = if let Some(cardinality_array) = data_block.get_stat(Stat::Cardinality) { @@ -4490,7 +4489,6 @@ impl PrimitiveStructuralEncoder { if num_values == 0 { return None; } - match data_block { DataBlock::FixedWidth(fixed) => { if fixed.bits_per_value == 64 && version < LanceFileVersion::V2_2 { @@ -4518,23 +4516,20 @@ impl PrimitiveStructuralEncoder { if var.bits_per_offset != 32 && var.bits_per_offset != 64 { return None; } - let bits_per_offset = var.bits_per_offset as u64; - // Dictionary indices are always i32. if cardinality > i32::MAX as u64 { return None; } - let data_size = data_block.data_size(); - let avg_value_size = data_size / num_values; + let bytes_per_offset = var.bits_per_offset as u64 / 8; + let avg_value_size = (var.data.len() as u64) / num_values; - // Dictionary values: actual bytes of unique strings/binary - let dict_values_size = cardinality * avg_value_size; - // Dictionary offsets: pointers into dictionary values - let dict_offsets_size = cardinality * (bits_per_offset / 8); - // Indices: map each row to dictionary entry (always i32) - let indices_size = num_values * (DICT_INDICES_BITS_PER_VALUE / 8); + let dict_values_size = cardinality.checked_mul(avg_value_size)?; + let dict_offsets_size = cardinality.checked_mul(bytes_per_offset)?; + let indices_size = num_values.checked_mul(DICT_INDICES_BITS_PER_VALUE / 8)?; - Some(dict_values_size + dict_offsets_size + indices_size) + dict_values_size + .checked_add(dict_offsets_size)? + .checked_add(indices_size) } _ => None, } @@ -4741,60 +4736,67 @@ impl PrimitiveStructuralEncoder { num_rows, support_large_chunk, ) - } else if Self::should_dictionary_encode(&data_block, &field, version) { - log::debug!( - "Encoding column {} with {} items using dictionary encoding (mini-block layout)", - column_idx, - num_values - ); - let (indices_data_block, dictionary_data_block) = - dict::dictionary_encode(data_block); - - Self::encode_miniblock( - column_idx, - &field, - compression_strategy.as_ref(), - indices_data_block, - repdef, - row_number, - Some(dictionary_data_block), - num_rows, - support_large_chunk, - ) - } else if Self::prefers_miniblock(&data_block, encoding_metadata.as_ref()) { - log::debug!( - "Encoding column {} with {} items using mini-block layout", - column_idx, - num_values - ); - Self::encode_miniblock( - column_idx, - &field, - compression_strategy.as_ref(), - data_block, - repdef, - row_number, - None, - num_rows, - support_large_chunk, - ) - } else if Self::prefers_fullzip(encoding_metadata.as_ref()) { - log::debug!( - "Encoding column {} with {} items using full-zip layout", - column_idx, - num_values - ); - Self::encode_full_zip( - column_idx, - &field, - compression_strategy.as_ref(), - data_block, - repdef, - row_number, - num_rows, - ) } else { - Err(Error::InvalidInput { source: format!("Cannot determine structural encoding for field {}. This typically indicates an invalid value of the field metadata key {}", field.name, STRUCTURAL_ENCODING_META_KEY).into(), location: location!() }) + // Try dictionary encoding first if applicable. If encoding aborts, fall back to the + // preferred structural encoding. + let dict_result = if Self::should_dictionary_encode(&data_block, &field, version) { + log::debug!( + "Encoding column {} with {} items using dictionary encoding (mini-block layout)", + column_idx, + num_values + ); + dict::dictionary_encode(data_block.clone()) + } else { + None + }; + + if let Some((indices_data_block, dictionary_data_block)) = dict_result { + Self::encode_miniblock( + column_idx, + &field, + compression_strategy.as_ref(), + indices_data_block, + repdef, + row_number, + Some(dictionary_data_block), + num_rows, + support_large_chunk, + ) + } else if Self::prefers_miniblock(&data_block, encoding_metadata.as_ref()) { + log::debug!( + "Encoding column {} with {} items using mini-block layout", + column_idx, + num_values + ); + Self::encode_miniblock( + column_idx, + &field, + compression_strategy.as_ref(), + data_block, + repdef, + row_number, + None, + num_rows, + support_large_chunk, + ) + } else if Self::prefers_fullzip(encoding_metadata.as_ref()) { + log::debug!( + "Encoding column {} with {} items using full-zip layout", + column_idx, + num_values + ); + Self::encode_full_zip( + column_idx, + &field, + compression_strategy.as_ref(), + data_block, + repdef, + row_number, + num_rows, + ) + } else { + Err(Error::InvalidInput { source: format!("Cannot determine structural encoding for field {}. This typically indicates an invalid value of the field metadata key {}", field.name, STRUCTURAL_ENCODING_META_KEY).into(), location: location!() }) + } } }) .boxed(); @@ -6157,7 +6159,7 @@ mod tests { let data_size = block.data_size(); let avg_value_size = data_size / 1000; - let expected = 400 * avg_value_size + 400 * 4 + 1000 * 4; + let expected = 400 * avg_value_size + 1000 * 4; assert_eq!(estimated_size, expected); } diff --git a/rust/lance-encoding/src/encodings/logical/primitive/dict.rs b/rust/lance-encoding/src/encodings/logical/primitive/dict.rs index 0b643a69d55..22bb0b9a009 100644 --- a/rust/lance-encoding/src/encodings/logical/primitive/dict.rs +++ b/rust/lance-encoding/src/encodings/logical/primitive/dict.rs @@ -116,11 +116,12 @@ fn dict_encode_variable_width( variable_width_data_block: &VariableWidthBlock, bits_per_offset: u8, cardinality: u64, -) -> (DataBlock, DataBlock) +) -> Option<(DataBlock, DataBlock)> where T: ArrowNativeType, usize: TryFrom, { + use std::collections::hash_map::Entry; let mut map = HashMap::new(); let offsets = variable_width_data_block .offsets @@ -132,34 +133,74 @@ where .expect("VariableWidth DataBlock should have valid `Stat::MaxLength` statistics"); let max_len = max_len.as_primitive::().value(0); - let mut dictionary_buffer: Vec = Vec::with_capacity((max_len * cardinality) as usize); + let max_dict_data_len = variable_width_data_block.data.len(); + let expected_dict_data_len = max_len + .checked_mul(cardinality) + .and_then(|v| >::try_from(v).ok()); + let dict_data_capacity = expected_dict_data_len + .map(|len| len.min(max_dict_data_len)) + .unwrap_or(max_dict_data_len); + + let mut dictionary_buffer: Vec = Vec::with_capacity(dict_data_capacity); let mut dictionary_offsets_buffer = vec![T::default()]; let mut curr_idx = 0; let mut indices_buffer = Vec::with_capacity(variable_width_data_block.num_values as usize); + let original_size = variable_width_data_block + .data_size() + .try_into() + .unwrap_or(usize::MAX); + let bytes_per_offset = (bits_per_offset / 8) as usize; + + for window in offsets.windows(2) { + let start = usize::try_from(window[0]).ok()?; + let end = usize::try_from(window[1]).ok()?; + if start > end || end > variable_width_data_block.data.len() { + return None; + } - offsets - .iter() - .zip(offsets.iter().skip(1)) - .for_each(|(&start, &end)| { - let start_usize = usize::try_from(start).ok().unwrap(); - let end_usize = usize::try_from(end).ok().unwrap(); - let key = &variable_width_data_block.data[start_usize..end_usize]; - let idx: i32 = *map.entry(U8SliceKey(key)).or_insert_with(|| { + let key = &variable_width_data_block.data[start..end]; + + let idx = match map.entry(U8SliceKey(key)) { + Entry::Occupied(entry) => *entry.get(), + Entry::Vacant(entry) => { + if curr_idx == i32::MAX { + return None; + } dictionary_buffer.extend_from_slice(key); - dictionary_offsets_buffer.push(T::from_usize(dictionary_buffer.len()).unwrap()); + let dict_offset = T::from_usize(dictionary_buffer.len())?; + dictionary_offsets_buffer.push(dict_offset); + let idx = curr_idx; + entry.insert(idx); curr_idx += 1; - curr_idx - 1 - }); - indices_buffer.push(idx); - }); + idx + } + }; - let dictionary_data_block = DataBlock::VariableWidth(VariableWidthBlock { + indices_buffer.push(idx); + + let indices_bytes = indices_buffer + .len() + .saturating_mul(DICT_INDICES_BITS_PER_VALUE as usize / 8); + let offsets_bytes = dictionary_offsets_buffer + .len() + .saturating_mul(bytes_per_offset); + let encoded_size = dictionary_buffer + .len() + .saturating_add(indices_bytes) + .saturating_add(offsets_bytes); + if encoded_size > original_size { + return None; + } + } + + let mut dictionary_data_block = DataBlock::VariableWidth(VariableWidthBlock { data: LanceBuffer::reinterpret_vec(dictionary_buffer), offsets: LanceBuffer::reinterpret_vec(dictionary_offsets_buffer), bits_per_offset, num_values: curr_idx as u64, block_info: BlockInfo::default(), }); + dictionary_data_block.compute_stat(); let mut indices_data_block = DataBlock::FixedWidth(FixedWidthDataBlock { data: LanceBuffer::reinterpret_vec(indices_buffer), @@ -169,7 +210,7 @@ where }); indices_data_block.compute_stat(); - (indices_data_block, dictionary_data_block) + Some((indices_data_block, dictionary_data_block)) } /// Dictionary encodes a data block @@ -177,14 +218,23 @@ where /// Currently only supported for some common cases (string / binary / 64-bit / 128-bit) /// /// Returns a block of indices (will always be a fixed width data block) and a block of dictionary -pub fn dictionary_encode(mut data_block: DataBlock) -> (DataBlock, DataBlock) { +pub fn dictionary_encode(mut data_block: DataBlock) -> Option<(DataBlock, DataBlock)> { let cardinality = data_block .get_stat(Stat::Cardinality) .unwrap() .as_primitive::() .value(0); + let data_block_size = usize::try_from(data_block.data_size()).unwrap_or(usize::MAX); match data_block { DataBlock::FixedWidth(ref mut fixed_width_data_block) => { + use std::collections::hash_map::Entry; + + let bytes_per_value = match fixed_width_data_block.bits_per_value { + 64 => 8usize, + 128 => 16usize, + _ => return None, + }; + match fixed_width_data_block.bits_per_value { 64 => { let mut map = HashMap::new(); @@ -194,14 +244,32 @@ pub fn dictionary_encode(mut data_block: DataBlock) -> (DataBlock, DataBlock) { let mut indices_buffer = Vec::with_capacity(fixed_width_data_block.num_values as usize); let mut curr_idx: i32 = 0; - u64_slice.iter().for_each(|&value| { - let idx = *map.entry(value).or_insert_with(|| { - dictionary_buffer.push(value); - curr_idx += 1; - curr_idx - 1 - }); + + for &value in u64_slice.iter() { + let idx = match map.entry(value) { + Entry::Occupied(entry) => *entry.get(), + Entry::Vacant(entry) => { + if curr_idx == i32::MAX { + return None; + } + dictionary_buffer.push(value); + let idx = curr_idx; + entry.insert(idx); + curr_idx += 1; + idx + } + }; indices_buffer.push(idx); - }); + let dict_bytes = dictionary_buffer.len().saturating_mul(bytes_per_value); + let indices_bytes = indices_buffer + .len() + .saturating_mul(DICT_INDICES_BITS_PER_VALUE as usize / 8); + let encoded_size = dict_bytes.saturating_add(indices_bytes); + if encoded_size > data_block_size { + return None; + } + } + let mut dictionary_data_block = DataBlock::FixedWidth(FixedWidthDataBlock { data: LanceBuffer::reinterpret_vec(dictionary_buffer), bits_per_value: 64, @@ -216,7 +284,8 @@ pub fn dictionary_encode(mut data_block: DataBlock) -> (DataBlock, DataBlock) { block_info: BlockInfo::default(), }); indices_data_block.compute_stat(); - (indices_data_block, dictionary_data_block) + + Some((indices_data_block, dictionary_data_block)) } 128 => { // TODO: a follow up PR to support `FixedWidth DataBlock with bits_per_value == 256`. @@ -227,14 +296,32 @@ pub fn dictionary_encode(mut data_block: DataBlock) -> (DataBlock, DataBlock) { let mut indices_buffer = Vec::with_capacity(fixed_width_data_block.num_values as usize); let mut curr_idx: i32 = 0; - u128_slice.iter().for_each(|&value| { - let idx = *map.entry(value).or_insert_with(|| { - dictionary_buffer.push(value); - curr_idx += 1; - curr_idx - 1 - }); + + for &value in u128_slice.iter() { + let idx = match map.entry(value) { + Entry::Occupied(entry) => *entry.get(), + Entry::Vacant(entry) => { + if curr_idx == i32::MAX { + return None; + } + dictionary_buffer.push(value); + let idx = curr_idx; + entry.insert(idx); + curr_idx += 1; + idx + } + }; indices_buffer.push(idx); - }); + let dict_bytes = dictionary_buffer.len().saturating_mul(bytes_per_value); + let indices_bytes = indices_buffer + .len() + .saturating_mul(DICT_INDICES_BITS_PER_VALUE as usize / 8); + let encoded_size = dict_bytes.saturating_add(indices_bytes); + if encoded_size > data_block_size { + return None; + } + } + let mut dictionary_data_block = DataBlock::FixedWidth(FixedWidthDataBlock { data: LanceBuffer::reinterpret_vec(dictionary_buffer), bits_per_value: DICT_FIXED_WIDTH_BITS_PER_VALUE, @@ -249,23 +336,182 @@ pub fn dictionary_encode(mut data_block: DataBlock) -> (DataBlock, DataBlock) { block_info: BlockInfo::default(), }); indices_data_block.compute_stat(); - (indices_data_block, dictionary_data_block) + + Some((indices_data_block, dictionary_data_block)) } - other => unreachable!( - "dictionary encode called with FixedWidth DataBlock bits_per_value={}", - other - ), + _ => None, } } DataBlock::VariableWidth(ref variable_width_data_block) => { match variable_width_data_block.bits_per_offset { 32 => dict_encode_variable_width::(variable_width_data_block, 32, cardinality), 64 => dict_encode_variable_width::(variable_width_data_block, 64, cardinality), - _ => unreachable!("Variable width offsets can only be 32 or 64 bits"), + _ => None, } } - _ => { - unreachable!("dictionary encode called with data block {:?}", data_block) + _ => None, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{ + buffer::LanceBuffer, + data::{BlockInfo, FixedWidthDataBlock}, + }; + use arrow_array::{Array, StringArray}; + use std::sync::Arc; + + #[test] + fn test_dictionary_encode_abort_fixed_width() { + // Create a u128 block with very high cardinality where dict encoding + // would result in larger data (dictionary overhead + indices > original) + let num_values = 120u64; + + // Create actual data: each value is unique u128 so dictionary encode will not be helpful + let mut data = Vec::with_capacity(num_values as usize); + for i in 0..num_values { + data.push(i as u128); } + + let mut data_block = DataBlock::FixedWidth(FixedWidthDataBlock { + bits_per_value: DICT_FIXED_WIDTH_BITS_PER_VALUE, + data: LanceBuffer::reinterpret_vec(data), + num_values, + block_info: BlockInfo::default(), + }); + + // Compute stats naturally + data_block.compute_stat(); + + // Dictionary encoding should abort and return None + let result = dictionary_encode(data_block); + assert!( + result.is_none(), + "Dictionary encoding should abort for high cardinality u128 data" + ); + } + + #[test] + fn test_dictionary_encode_success_fixed_width() { + // Create a u128 block with low cardinality where dict encoding helps + let num_values = 120u64; + let cardinality = 3u64; + + // Create data with few unique u128 values + let mut data = Vec::with_capacity(num_values as usize); + for i in 0..num_values { + data.push((i % cardinality) as u128); + } + + let mut data_block = DataBlock::FixedWidth(FixedWidthDataBlock { + bits_per_value: DICT_FIXED_WIDTH_BITS_PER_VALUE, + data: LanceBuffer::reinterpret_vec(data), + num_values, + block_info: BlockInfo::default(), + }); + + // Compute stats naturally + data_block.compute_stat(); + + // Dictionary encoding should succeed and return Some + let result = dictionary_encode(data_block); + assert!( + result.is_some(), + "Dictionary encoding should succeed for low cardinality u128 data" + ); + + if let Some((indices, dictionary)) = result { + // Verify indices block + if let DataBlock::FixedWidth(indices_block) = indices { + assert_eq!(indices_block.num_values, num_values); + assert_eq!(indices_block.bits_per_value, DICT_INDICES_BITS_PER_VALUE); + } else { + panic!("Expected FixedWidth indices block"); + } + + // Verify dictionary block + if let DataBlock::FixedWidth(dict_block) = dictionary { + assert_eq!(dict_block.num_values, cardinality); + assert_eq!(dict_block.bits_per_value, DICT_FIXED_WIDTH_BITS_PER_VALUE); + } else { + panic!("Expected FixedWidth dictionary block"); + } + } + } + + #[test] + fn test_dictionary_encode_abort_variable_width() { + // Create a variable-width block with high cardinality where dict encoding + // won't provide sufficient benefit + let num_values = 120u64; + let mut values = Vec::with_capacity(num_values as usize); + for i in 0..num_values { + values.push(format!("unique_value_{:04}", i)); + } + let array = StringArray::from(values); + // from_array already computes stats + let data_block = DataBlock::from_array(Arc::new(array) as Arc); + + // Dictionary encoding should abort and return None + let result = dictionary_encode(data_block); + assert!( + result.is_none(), + "Dictionary encoding should abort for high cardinality string data" + ); + } + + #[test] + fn test_dictionary_encode_success_low_cardinality() { + // Create a variable-width block with low cardinality where dict encoding helps + let num_values = 120u64; + let cardinality = 3u64; + + let mut values = Vec::with_capacity(num_values as usize); + for i in 0..num_values { + values.push(format!("value_{}", i % cardinality)); + } + + let array = StringArray::from(values); + let data_block = DataBlock::from_array(Arc::new(array) as Arc); + + // Dictionary encoding should succeed and return Some + let result = dictionary_encode(data_block); + assert!( + result.is_some(), + "Dictionary encoding should succeed for low cardinality data" + ); + + if let Some((indices, dictionary)) = result { + // Verify indices block + if let DataBlock::FixedWidth(indices_block) = indices { + assert_eq!(indices_block.num_values, num_values); + assert_eq!(indices_block.bits_per_value, DICT_INDICES_BITS_PER_VALUE); + } else { + panic!("Expected FixedWidth indices block"); + } + + // Verify dictionary block + if let DataBlock::VariableWidth(dict_block) = dictionary { + assert_eq!(dict_block.num_values, cardinality); + } else { + panic!("Expected VariableWidth dictionary block"); + } + } + } + + #[test] + fn test_dictionary_encode_invalid_offset_width_returns_none() { + let array = StringArray::from(vec!["a", "b", "c", "a"]); + let data_block = DataBlock::from_array(Arc::new(array) as Arc); + let invalid_block = match data_block { + DataBlock::VariableWidth(mut var) => { + var.bits_per_offset = 16; + DataBlock::VariableWidth(var) + } + other => panic!("Expected VariableWidth data block, got {:?}", other), + }; + assert!(dictionary_encode(invalid_block).is_none()); } } diff --git a/rust/lance-encoding/src/encodings/physical/binary.rs b/rust/lance-encoding/src/encodings/physical/binary.rs index 7d3a2b34693..8872a1502ef 100644 --- a/rust/lance-encoding/src/encodings/physical/binary.rs +++ b/rust/lance-encoding/src/encodings/physical/binary.rs @@ -159,7 +159,15 @@ fn search_next_offset_idx( last_offset_idx: usize, minichunk_size: i64, ) -> usize { - let mut num_values = 1; + // MiniBlockChunk uses `log_num_values == 0` as a sentinel for the final chunk. This means we + // must avoid creating 1-value chunks except for the final chunk, even if the configured + // `minichunk_size` is too small to fit more than one value. + let remaining_values = offsets.len().saturating_sub(last_offset_idx + 1); + if remaining_values <= 1 { + return offsets.len() - 1; + } + + let mut num_values = 2; let mut new_num_values = num_values * 2; loop { if last_offset_idx + new_num_values >= offsets.len() {