From 34c81a5bab65436c95a8c43e8836b42e70108fba Mon Sep 17 00:00:00 2001 From: BubbleCal Date: Fri, 19 Dec 2025 14:32:09 +0800 Subject: [PATCH] Offload IVF partition build --- rust/lance/src/index/vector/builder.rs | 67 ++++++++++++++------------ 1 file changed, 36 insertions(+), 31 deletions(-) diff --git a/rust/lance/src/index/vector/builder.rs b/rust/lance/src/index/vector/builder.rs index 164ff08b3b1..194624f718f 100644 --- a/rust/lance/src/index/vector/builder.rs +++ b/rust/lance/src/index/vector/builder.rs @@ -780,42 +780,47 @@ impl IvfIndexBuilder .await? }; - if let Some((assign_batch, deleted_row_ids)) = assign_batch { - if !deleted_row_ids.is_empty() { - let deleted_row_ids = HashSet::::from_iter( - deleted_row_ids.values().iter().copied(), - ); - for batch in batches.iter_mut() { - let row_ids = batch[ROW_ID].as_primitive::(); - let mask = - BooleanArray::from_iter(row_ids.iter().map(|row_id| { - row_id.map(|row_id| !deleted_row_ids.contains(&row_id)) - })); - *batch = arrow::compute::filter_record_batch(batch, &mask)?; + spawn_cpu(move || { + if let Some((assign_batch, deleted_row_ids)) = assign_batch { + if !deleted_row_ids.is_empty() { + let deleted_row_ids = HashSet::::from_iter( + deleted_row_ids.values().iter().copied(), + ); + for batch in batches.iter_mut() { + let row_ids = batch[ROW_ID].as_primitive::(); + let mask = + BooleanArray::from_iter(row_ids.iter().map(|row_id| { + row_id.map(|row_id| { + !deleted_row_ids.contains(&row_id) + }) + })); + *batch = arrow::compute::filter_record_batch(batch, &mask)?; + } } - } - if assign_batch.num_rows() > 0 { - // Drop PART_ID column from assign_batch to match schema of existing batches - let assign_batch = assign_batch.drop_column(PART_ID_COLUMN)?; - batches.push(assign_batch); + if assign_batch.num_rows() > 0 { + // Drop PART_ID column from assign_batch to match schema of existing batches + let assign_batch = assign_batch.drop_column(PART_ID_COLUMN)?; + batches.push(assign_batch); + } } - } - let num_rows = batches.iter().map(|b| b.num_rows()).sum::(); - if num_rows == 0 { - return Ok(None); - } + let num_rows = batches.iter().map(|b| b.num_rows()).sum::(); + if num_rows == 0 { + return Ok(None); + } - let (storage, sub_index) = Self::build_index( - distance_type, - quantizer, - sub_index_params, - batches, - column, - frag_reuse_index, - )?; - Ok(Some((storage, sub_index, loss))) + let (storage, sub_index) = Self::build_index( + distance_type, + quantizer, + sub_index_params, + batches, + column, + frag_reuse_index, + )?; + Ok(Some((storage, sub_index, loss))) + }) + .await } }); Ok(stream::iter(build_iter)