diff --git a/rust/lance-arrow/src/lib.rs b/rust/lance-arrow/src/lib.rs index 97738938ef2..ce335ce826d 100644 --- a/rust/lance-arrow/src/lib.rs +++ b/rust/lance-arrow/src/lib.rs @@ -47,6 +47,9 @@ pub const ARROW_EXT_META_KEY: &str = "ARROW:extension:metadata"; pub const BLOB_META_KEY: &str = "lance-encoding:blob"; /// Arrow extension type name for Lance blob v2 columns pub const BLOB_V2_EXT_NAME: &str = "lance.blob.v2"; +/// Metadata key for overriding the dedicated blob size threshold (in bytes) +pub const BLOB_DEDICATED_SIZE_THRESHOLD_META_KEY: &str = + "lance-encoding:blob-dedicated-size-threshold"; type Result = std::result::Result; diff --git a/rust/lance/src/dataset/blob.rs b/rust/lance/src/dataset/blob.rs index 7238cad866f..f349304c3cf 100644 --- a/rust/lance/src/dataset/blob.rs +++ b/rust/lance/src/dataset/blob.rs @@ -9,7 +9,7 @@ use arrow_array::builder::{LargeBinaryBuilder, PrimitiveBuilder, StringBuilder}; use arrow_array::Array; use arrow_array::RecordBatch; use arrow_schema::DataType as ArrowDataType; -use lance_arrow::FieldExt; +use lance_arrow::{FieldExt, BLOB_DEDICATED_SIZE_THRESHOLD_META_KEY}; use lance_io::object_store::{ObjectStore, ObjectStoreParams, ObjectStoreRegistry}; use object_store::path::Path; use snafu::location; @@ -187,6 +187,13 @@ impl BlobPreprocessor { continue; } + let dedicated_threshold = field + .metadata() + .get(BLOB_DEDICATED_SIZE_THRESHOLD_META_KEY) + .and_then(|value| value.parse::().ok()) + .filter(|&value| value > DEDICATED_THRESHOLD) + .unwrap_or(DEDICATED_THRESHOLD); + let struct_arr = array .as_any() .downcast_ref::() @@ -234,7 +241,7 @@ impl BlobPreprocessor { let has_uri = !uri_col.is_null(i); let data_len = if has_data { data_col.value(i).len() } else { 0 }; - if has_data && data_len > DEDICATED_THRESHOLD { + if has_data && data_len > dedicated_threshold { let blob_id = self.next_blob_id(); self.write_dedicated(blob_id, data_col.value(i)).await?; @@ -788,9 +795,10 @@ mod tests { use arrow_array::{RecordBatchIterator, UInt32Array}; use arrow_schema::{DataType, Field, Schema}; use futures::TryStreamExt; - use lance_arrow::DataTypeExt; + use lance_arrow::{DataTypeExt, BLOB_DEDICATED_SIZE_THRESHOLD_META_KEY}; use lance_io::stream::RecordBatchStream; + use lance_core::datatypes::BlobKind; use lance_core::{utils::tempfile::TempStrDir, Error, Result}; use lance_datagen::{array, BatchCount, RowCount}; use lance_file::version::LanceFileVersion; @@ -1099,4 +1107,109 @@ mod tests { assert_eq!(first.as_ref(), b"hello"); assert_eq!(second.as_ref(), b"world"); } + + fn build_schema_with_meta(threshold_opt: Option) -> Arc { + let mut blob_field_with_meta = blob_field("blob", true); + if let Some(threshold) = threshold_opt { + let mut metadata = blob_field_with_meta.metadata().clone(); + metadata.insert( + BLOB_DEDICATED_SIZE_THRESHOLD_META_KEY.to_string(), + threshold.to_string(), + ); + blob_field_with_meta = blob_field_with_meta.with_metadata(metadata); + } + + Arc::new(Schema::new(vec![ + Field::new("id", DataType::UInt32, false), + blob_field_with_meta, + ])) + } + + async fn write_then_get_blob_kinds( + blob_sizes: Vec, + threshold_opt: Option, + ) -> Vec { + let test_dir = TempStrDir::default(); + + let mut blob_builder = BlobArrayBuilder::new(blob_sizes.len()); + for size in &blob_sizes { + blob_builder.push_bytes(vec![0u8; *size]).unwrap(); + } + let blob_array: arrow_array::ArrayRef = blob_builder.finish().unwrap(); + + let id_values: Vec = (0..blob_sizes.len() as u32).collect(); + let id_array: arrow_array::ArrayRef = Arc::new(UInt32Array::from(id_values)); + + let schema = build_schema_with_meta(threshold_opt); + + let batch = RecordBatch::try_new(schema.clone(), vec![id_array, blob_array]).unwrap(); + let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone()); + + let params = WriteParams::with_storage_version(LanceFileVersion::V2_2); + let dataset = Arc::new( + Dataset::write(reader, &test_dir, Some(params)) + .await + .unwrap(), + ); + + let indices: Vec = (0..blob_sizes.len() as u64).collect(); + let blobs = dataset + .take_blobs_by_indices(&indices, "blob") + .await + .unwrap(); + + assert_eq!(blobs.len(), blob_sizes.len()); + + blobs.into_iter().map(|b| b.kind()).collect() + } + + #[tokio::test] + async fn test_blob_v2_dedicated_threshold_ignores_non_positive_metadata() { + let small_blob_len = super::DEDICATED_THRESHOLD / 2; + let large_blob_len = super::DEDICATED_THRESHOLD + 1; + + // Sanity check assumptions for this test + assert!(small_blob_len > super::INLINE_MAX); + + let cases = vec![(None, "no_metadata"), (Some(0), "zero_threshold")]; + + for (threshold_opt, label) in cases { + let kinds = + write_then_get_blob_kinds(vec![small_blob_len, large_blob_len], threshold_opt) + .await; + + assert_eq!(kinds.len(), 2, "case: {label}"); + assert_eq!(kinds[0], BlobKind::Packed, "case: {label}"); + assert_eq!(kinds[1], BlobKind::Dedicated, "case: {label}"); + } + } + + #[tokio::test] + async fn test_blob_v2_dedicated_threshold_respects_smaller_metadata() { + let blob_len = super::DEDICATED_THRESHOLD / 2; + let overridden_threshold = super::DEDICATED_THRESHOLD / 4; + + assert!(blob_len > super::INLINE_MAX); + assert!(overridden_threshold > 0); + assert!(blob_len > overridden_threshold); + + let kinds = write_then_get_blob_kinds(vec![blob_len], Some(overridden_threshold)).await; + + assert_eq!(kinds.len(), 1); + assert_eq!(kinds[0], BlobKind::Packed); + } + + #[tokio::test] + async fn test_blob_v2_dedicated_threshold_respects_larger_metadata() { + let blob_len = super::DEDICATED_THRESHOLD + 1; + let overridden_threshold = super::DEDICATED_THRESHOLD * 2; + + assert!(blob_len > super::INLINE_MAX); + assert!(blob_len < overridden_threshold); + + let kinds = write_then_get_blob_kinds(vec![blob_len], Some(overridden_threshold)).await; + + assert_eq!(kinds.len(), 1); + assert_eq!(kinds[0], BlobKind::Packed); + } }