diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index cb67d62d13f..50a7c9a9fb2 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -4808,7 +4808,6 @@ def compact_files( to reduce this if you are running out of memory during compaction. The default will use the same default from ``scanner``. - Returns ------- CompactionMetrics diff --git a/python/python/tests/test_optimize.py b/python/python/tests/test_optimize.py index 8bf12db91ae..5b58891d28a 100644 --- a/python/python/tests/test_optimize.py +++ b/python/python/tests/test_optimize.py @@ -36,6 +36,38 @@ def test_dataset_optimize(tmp_path: Path): assert dataset.version == 3 +def test_blob_compaction(tmp_path: Path): + base_dir = tmp_path / "blob_dataset" + blob_field = pa.field( + "blob", pa.large_binary(), metadata={"lance-encoding:blob": "true"} + ) + schema = pa.schema([pa.field("id", pa.int32()), blob_field]) + blobs = [b"\x01\x02", b"\x03\x04\x05"] + table = pa.table( + { + "id": pa.array([0, 1], type=pa.int32()), + "blob": pa.array(blobs, type=pa.large_binary()), + }, + schema=schema, + ) + + dataset = lance.write_dataset( + table, + base_dir, + schema=schema, + max_rows_per_file=1, + data_storage_version="stable", + ) + assert len(dataset.get_fragments()) == 2 + + dataset.optimize.compact_files(num_threads=1) + assert len(dataset.get_fragments()) == 1 + + blob_files = dataset.take_blobs("blob", indices=[0, 1]) + contents = [blob_files[0].readall(), blob_files[1].readall()] + assert contents == blobs + + def test_optimize_max_bytes(tmp_path: Path): base_dir = tmp_path / "dataset" arr = pa.array(range(4 * 1024 * 1024)) diff --git a/rust/lance-core/src/datatypes.rs b/rust/lance-core/src/datatypes.rs index e7f576d4a18..4a6eede868d 100644 --- a/rust/lance-core/src/datatypes.rs +++ b/rust/lance-core/src/datatypes.rs @@ -22,8 +22,8 @@ pub use field::{ BlobVersion, Encoding, Field, NullabilityComparison, OnTypeMismatch, SchemaCompareOptions, }; pub use schema::{ - escape_field_path_for_project, format_field_path, parse_field_path, FieldRef, OnMissing, - Projectable, Projection, Schema, + escape_field_path_for_project, format_field_path, parse_field_path, BlobHandling, FieldRef, + OnMissing, Projectable, Projection, Schema, }; pub static BLOB_DESC_FIELDS: LazyLock = LazyLock::new(|| { diff --git a/rust/lance/src/dataset/optimize.rs b/rust/lance/src/dataset/optimize.rs index a6e00614213..f45431ab312 100644 --- a/rust/lance/src/dataset/optimize.rs +++ b/rust/lance/src/dataset/optimize.rs @@ -97,6 +97,7 @@ use crate::Result; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::SendableRecordBatchStream; use futures::{StreamExt, TryStreamExt}; +use lance_core::datatypes::BlobHandling; use lance_core::utils::tokio::get_num_compute_intensive_cpus; use lance_core::utils::tracing::{DATASET_COMPACTING_EVENT, TRACE_DATASET_EVENTS}; use lance_core::Error; @@ -672,6 +673,13 @@ async fn rewrite_files( // If we aren't using stable row ids, then we need to remap indices. let needs_remapping = !dataset.manifest.uses_stable_row_ids(); let mut scanner = dataset.scan(); + let has_blob_columns = dataset + .schema() + .fields_pre_order() + .any(|field| field.is_blob()); + if has_blob_columns { + scanner.blob_handling(BlobHandling::AllBinary); + } if let Some(batch_size) = options.batch_size { scanner.batch_size(batch_size); } @@ -1081,12 +1089,13 @@ mod tests { use crate::utils::test::{DatagenExt, FragmentCount, FragmentRowCount}; use arrow_array::types::{Float32Type, Int32Type, Int64Type}; use arrow_array::{ - Float32Array, Int64Array, LargeStringArray, PrimitiveArray, RecordBatch, - RecordBatchIterator, + ArrayRef, Float32Array, Int32Array, Int64Array, LargeBinaryArray, LargeStringArray, + PrimitiveArray, RecordBatch, RecordBatchIterator, }; use arrow_schema::{DataType, Field, Schema}; use arrow_select::concat::concat_batches; use async_trait::async_trait; + use lance_arrow::BLOB_META_KEY; use lance_core::utils::address::RowAddress; use lance_core::utils::tempfile::TempStrDir; use lance_core::Error; @@ -1103,6 +1112,7 @@ mod tests { use rstest::rstest; use std::collections::HashSet; use std::io::Cursor; + use std::sync::Arc; use uuid::Uuid; #[test] @@ -1332,6 +1342,57 @@ mod tests { assert_eq!(plan.tasks().len(), 0); } + #[tokio::test] + async fn test_compact_blob_columns() { + let test_dir = TempStrDir::default(); + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("blob", DataType::LargeBinary, false) + .with_metadata([(BLOB_META_KEY.to_string(), "true".to_string())].into()), + ])); + let expected_payload: Vec> = + vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9, 10], vec![11]]; + let id_column: ArrayRef = Arc::new(Int32Array::from_iter_values( + 0..expected_payload.len() as i32, + )); + let blob_array: ArrayRef = Arc::new(LargeBinaryArray::from_iter( + expected_payload.iter().map(|value| Some(value.as_slice())), + )); + let batch = RecordBatch::try_new(schema.clone(), vec![id_column, blob_array]).unwrap(); + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + + let mut dataset = Dataset::write( + reader, + &test_dir, + Some(WriteParams { + max_rows_per_file: 1, + ..Default::default() + }), + ) + .await + .unwrap(); + dataset.validate().await.unwrap(); + assert!(dataset.get_fragments().len() > 1); + + compact_files(&mut dataset, CompactionOptions::default(), None) + .await + .unwrap(); + dataset.validate().await.unwrap(); + assert_eq!(dataset.get_fragments().len(), 1); + + let dataset = Arc::new(dataset); + let row_indices: Vec = (0..expected_payload.len() as u64).collect(); + let blobs = dataset + .take_blobs_by_indices(&row_indices, "blob") + .await + .unwrap(); + assert_eq!(blobs.len(), expected_payload.len()); + for (blob, expected) in blobs.iter().zip(expected_payload.iter()) { + let bytes = blob.read().await.unwrap(); + assert_eq!(bytes.as_ref(), expected.as_slice()); + } + } + fn row_addrs(frag_idx: u32, offsets: Range) -> Range { let start = RowAddress::new_from_parts(frag_idx, offsets.start); let end = RowAddress::new_from_parts(frag_idx, offsets.end); diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index 1108131625c..ea193dfba84 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -44,7 +44,7 @@ use futures::{FutureExt, TryStreamExt}; use lance_arrow::floats::{coerce_float_vector, FloatType}; use lance_arrow::DataTypeExt; use lance_core::datatypes::{ - escape_field_path_for_project, format_field_path, Field, OnMissing, Projection, + escape_field_path_for_project, format_field_path, BlobHandling, Field, OnMissing, Projection, }; use lance_core::error::LanceOptionExt; use lance_core::utils::address::RowAddress; @@ -324,6 +324,7 @@ pub struct Scanner { /// - Dynamic expressions that are evaluated after the physical projection /// - The names of the output columns projection_plan: ProjectionPlan, + blob_handling: BlobHandling, /// If true then the filter will be applied before an index scan prefilter: bool, @@ -599,9 +600,10 @@ impl Scanner { pub fn new(dataset: Arc) -> Self { let projection_plan = ProjectionPlan::full(dataset.clone()).unwrap(); let file_reader_options = dataset.file_reader_options.clone(); - Self { + let mut scanner = Self { dataset, projection_plan, + blob_handling: BlobHandling::default(), prefilter: false, materialization_style: MaterializationStyle::Heuristic, filter: None, @@ -627,7 +629,24 @@ impl Scanner { legacy_with_row_id: false, explicit_projection: false, autoproject_scoring_columns: true, - } + }; + scanner.apply_blob_handling(); + scanner + } + + fn apply_blob_handling(&mut self) { + let projection = self + .projection_plan + .physical_projection + .clone() + .with_blob_handling(self.blob_handling.clone()); + self.projection_plan.physical_projection = projection; + } + + pub fn blob_handling(&mut self, blob_handling: BlobHandling) -> &mut Self { + self.blob_handling = blob_handling; + self.apply_blob_handling(); + self } pub fn from_fragment(dataset: Arc, fragment: Fragment) -> Self { @@ -710,6 +729,7 @@ impl Scanner { if self.legacy_with_row_addr { self.projection_plan.include_row_addr(); } + self.apply_blob_handling(); Ok(self) }