Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions rust/lance-arrow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ pub const ARROW_EXT_META_KEY: &str = "ARROW:extension:metadata";
pub const BLOB_META_KEY: &str = "lance-encoding:blob";
/// Arrow extension type name for Lance blob v2 columns
pub const BLOB_V2_EXT_NAME: &str = "lance.blob.v2";
/// Metadata key for overriding the dedicated blob size threshold (in bytes)
pub const BLOB_DEDICATED_SIZE_THRESHOLD_META_KEY: &str =
Comment thread
yanghua marked this conversation as resolved.
"lance-encoding:blob-dedicated-size-threshold";

type Result<T> = std::result::Result<T, ArrowError>;

Expand Down
119 changes: 116 additions & 3 deletions rust/lance/src/dataset/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use arrow_array::builder::{LargeBinaryBuilder, PrimitiveBuilder, StringBuilder};
use arrow_array::Array;
use arrow_array::RecordBatch;
use arrow_schema::DataType as ArrowDataType;
use lance_arrow::FieldExt;
use lance_arrow::{FieldExt, BLOB_DEDICATED_SIZE_THRESHOLD_META_KEY};
use lance_io::object_store::{ObjectStore, ObjectStoreParams, ObjectStoreRegistry};
use object_store::path::Path;
use snafu::location;
Expand Down Expand Up @@ -187,6 +187,13 @@ impl BlobPreprocessor {
continue;
}

let dedicated_threshold = field
.metadata()
.get(BLOB_DEDICATED_SIZE_THRESHOLD_META_KEY)
.and_then(|value| value.parse::<usize>().ok())
.filter(|&value| value > DEDICATED_THRESHOLD)
.unwrap_or(DEDICATED_THRESHOLD);

let struct_arr = array
.as_any()
.downcast_ref::<arrow_array::StructArray>()
Expand Down Expand Up @@ -234,7 +241,7 @@ impl BlobPreprocessor {
let has_uri = !uri_col.is_null(i);
let data_len = if has_data { data_col.value(i).len() } else { 0 };

if has_data && data_len > DEDICATED_THRESHOLD {
if has_data && data_len > dedicated_threshold {
let blob_id = self.next_blob_id();
self.write_dedicated(blob_id, data_col.value(i)).await?;

Expand Down Expand Up @@ -788,9 +795,10 @@ mod tests {
use arrow_array::{RecordBatchIterator, UInt32Array};
use arrow_schema::{DataType, Field, Schema};
use futures::TryStreamExt;
use lance_arrow::DataTypeExt;
use lance_arrow::{DataTypeExt, BLOB_DEDICATED_SIZE_THRESHOLD_META_KEY};
use lance_io::stream::RecordBatchStream;

use lance_core::datatypes::BlobKind;
use lance_core::{utils::tempfile::TempStrDir, Error, Result};
use lance_datagen::{array, BatchCount, RowCount};
use lance_file::version::LanceFileVersion;
Expand Down Expand Up @@ -1099,4 +1107,109 @@ mod tests {
assert_eq!(first.as_ref(), b"hello");
assert_eq!(second.as_ref(), b"world");
}

fn build_schema_with_meta(threshold_opt: Option<usize>) -> Arc<Schema> {
let mut blob_field_with_meta = blob_field("blob", true);
if let Some(threshold) = threshold_opt {
let mut metadata = blob_field_with_meta.metadata().clone();
metadata.insert(
BLOB_DEDICATED_SIZE_THRESHOLD_META_KEY.to_string(),
threshold.to_string(),
);
blob_field_with_meta = blob_field_with_meta.with_metadata(metadata);
}

Arc::new(Schema::new(vec![
Field::new("id", DataType::UInt32, false),
blob_field_with_meta,
]))
}

async fn write_then_get_blob_kinds(
blob_sizes: Vec<usize>,
threshold_opt: Option<usize>,
) -> Vec<BlobKind> {
let test_dir = TempStrDir::default();

let mut blob_builder = BlobArrayBuilder::new(blob_sizes.len());
for size in &blob_sizes {
blob_builder.push_bytes(vec![0u8; *size]).unwrap();
}
let blob_array: arrow_array::ArrayRef = blob_builder.finish().unwrap();

let id_values: Vec<u32> = (0..blob_sizes.len() as u32).collect();
let id_array: arrow_array::ArrayRef = Arc::new(UInt32Array::from(id_values));

let schema = build_schema_with_meta(threshold_opt);

let batch = RecordBatch::try_new(schema.clone(), vec![id_array, blob_array]).unwrap();
let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone());

let params = WriteParams::with_storage_version(LanceFileVersion::V2_2);
let dataset = Arc::new(
Dataset::write(reader, &test_dir, Some(params))
.await
.unwrap(),
);

let indices: Vec<u64> = (0..blob_sizes.len() as u64).collect();
let blobs = dataset
.take_blobs_by_indices(&indices, "blob")
.await
.unwrap();

assert_eq!(blobs.len(), blob_sizes.len());

blobs.into_iter().map(|b| b.kind()).collect()
}

#[tokio::test]
async fn test_blob_v2_dedicated_threshold_ignores_non_positive_metadata() {
let small_blob_len = super::DEDICATED_THRESHOLD / 2;
let large_blob_len = super::DEDICATED_THRESHOLD + 1;

// Sanity check assumptions for this test
assert!(small_blob_len > super::INLINE_MAX);

let cases = vec![(None, "no_metadata"), (Some(0), "zero_threshold")];

for (threshold_opt, label) in cases {
let kinds =
write_then_get_blob_kinds(vec![small_blob_len, large_blob_len], threshold_opt)
.await;

assert_eq!(kinds.len(), 2, "case: {label}");
assert_eq!(kinds[0], BlobKind::Packed, "case: {label}");
assert_eq!(kinds[1], BlobKind::Dedicated, "case: {label}");
}
}

#[tokio::test]
async fn test_blob_v2_dedicated_threshold_respects_smaller_metadata() {
let blob_len = super::DEDICATED_THRESHOLD / 2;
let overridden_threshold = super::DEDICATED_THRESHOLD / 4;

assert!(blob_len > super::INLINE_MAX);
assert!(overridden_threshold > 0);
assert!(blob_len > overridden_threshold);

let kinds = write_then_get_blob_kinds(vec![blob_len], Some(overridden_threshold)).await;

assert_eq!(kinds.len(), 1);
assert_eq!(kinds[0], BlobKind::Packed);
}

#[tokio::test]
async fn test_blob_v2_dedicated_threshold_respects_larger_metadata() {
let blob_len = super::DEDICATED_THRESHOLD + 1;
let overridden_threshold = super::DEDICATED_THRESHOLD * 2;

assert!(blob_len > super::INLINE_MAX);
assert!(blob_len < overridden_threshold);

let kinds = write_then_get_blob_kinds(vec![blob_len], Some(overridden_threshold)).await;

assert_eq!(kinds.len(), 1);
assert_eq!(kinds[0], BlobKind::Packed);
}
}