diff --git a/rust/lance-index/benches/inverted.rs b/rust/lance-index/benches/inverted.rs index 02746339bf2..bce3fac4414 100644 --- a/rust/lance-index/benches/inverted.rs +++ b/rust/lance-index/benches/inverted.rs @@ -98,7 +98,7 @@ fn bench_inverted(c: &mut Criterion) { InvertedIndexBuilder::new(InvertedIndexParams::default().with_position(false)); black_box({ builder - .update(stream, indexing_store.as_ref()) + .update(stream, indexing_store.as_ref(), None) .await .unwrap(); builder @@ -119,7 +119,7 @@ fn bench_inverted(c: &mut Criterion) { InvertedIndexBuilder::new(InvertedIndexParams::default().with_position(true)); black_box({ builder - .update(stream, indexing_with_positions_store.as_ref()) + .update(stream, indexing_with_positions_store.as_ref(), None) .await .unwrap(); builder @@ -135,7 +135,7 @@ fn bench_inverted(c: &mut Criterion) { let mut builder = InvertedIndexBuilder::new(InvertedIndexParams::default().with_position(true)); builder - .update(stream, phrase_search_store.as_ref()) + .update(stream, phrase_search_store.as_ref(), None) .await .unwrap(); }); diff --git a/rust/lance-index/src/scalar.rs b/rust/lance-index/src/scalar.rs index 050ea590330..18754544cca 100644 --- a/rust/lance-index/src/scalar.rs +++ b/rust/lance-index/src/scalar.rs @@ -836,10 +836,13 @@ pub struct UpdateCriteria { /// - stable row IDs: use exact row-id membership instead #[derive(Debug, Clone)] pub enum OldIndexDataFilter { - /// Keep old rows whose row-address fragment is in this bitmap. + /// Keeps track of which fragments are still valid and which are no longer valid. /// /// This is valid for address-style row IDs. - Fragments(RoaringBitmap), + Fragments { + to_keep: RoaringBitmap, + to_remove: RoaringBitmap, + }, /// Keep old rows whose row IDs are in this exact allow-list. /// /// This is required for stable row IDs, where row IDs are opaque and @@ -851,9 +854,9 @@ impl OldIndexDataFilter { /// Build a boolean mask that keeps only row IDs selected by this filter. pub fn filter_row_ids(&self, row_ids: &UInt64Array) -> BooleanArray { match self { - Self::Fragments(valid_fragments) => row_ids + Self::Fragments { to_keep, .. } => row_ids .iter() - .map(|id| id.map(|id| valid_fragments.contains((id >> 32) as u32))) + .map(|id| id.map(|id| to_keep.contains((id >> 32) as u32))) .collect(), Self::RowIds(valid_row_ids) => row_ids .iter() diff --git a/rust/lance-index/src/scalar/inverted.rs b/rust/lance-index/src/scalar/inverted.rs index 8435e1154e1..725b06ebde3 100644 --- a/rust/lance-index/src/scalar/inverted.rs +++ b/rust/lance-index/src/scalar/inverted.rs @@ -65,7 +65,7 @@ impl InvertedIndexPlugin { let mut inverted_index = InvertedIndexBuilder::new_with_fragment_mask(params, fragment_mask) .with_progress(progress); - inverted_index.update(data, index_store).await?; + inverted_index.update(data, index_store, None).await?; Ok(CreatedIndex { index_details: prost_types::Any::from_msg(&details).unwrap(), index_version: current_fts_format_version().index_version(), diff --git a/rust/lance-index/src/scalar/inverted/builder.rs b/rust/lance-index/src/scalar/inverted/builder.rs index 6eb8575e5ab..f752ef0e68b 100644 --- a/rust/lance-index/src/scalar/inverted/builder.rs +++ b/rust/lance-index/src/scalar/inverted/builder.rs @@ -2,17 +2,17 @@ // SPDX-FileCopyrightText: Copyright The Lance Authors use super::{InvertedIndexParams, index::*}; -use crate::scalar::IndexStore; use crate::scalar::inverted::json::JsonTextStream; use crate::scalar::inverted::lance_tokenizer::DocType; use crate::scalar::inverted::tokenizer::lance_tokenizer::LanceTokenizer; #[cfg(test)] use crate::scalar::lance_format::LanceIndexStore; +use crate::scalar::{IndexStore, OldIndexDataFilter}; use crate::vector::graph::OrderedFloat; use crate::{progress::IndexBuildProgress, progress::noop_progress}; use arrow::array::AsArray; use arrow::datatypes; -use arrow_array::{Array, RecordBatch, UInt64Array}; +use arrow_array::{Array, BinaryArray, RecordBatch, UInt64Array}; use arrow_schema::{DataType, Field, Schema, SchemaRef}; use bitpacking::{BitPacker, BitPacker4x}; use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream}; @@ -27,6 +27,7 @@ use lance_core::utils::tokio::{IO_CORE_RESERVATION, get_num_compute_intensive_cp use lance_core::{Error, ROW_ID, ROW_ID_FIELD, Result}; use lance_io::object_store::ObjectStore; use object_store::path::Path; +use roaring::RoaringBitmap; use smallvec::SmallVec; use std::collections::HashMap; use std::pin::Pin; @@ -125,6 +126,7 @@ pub struct InvertedIndexBuilder { posting_tail_codec: PostingTailCodec, src_store: Option>, progress: Arc, + deleted_fragments: RoaringBitmap, } impl InvertedIndexBuilder { @@ -139,6 +141,7 @@ impl InvertedIndexBuilder { Vec::new(), TokenSetFormat::default(), fragment_mask, + RoaringBitmap::new(), ) } @@ -154,6 +157,7 @@ impl InvertedIndexBuilder { partitions: Vec, token_set_format: TokenSetFormat, fragment_mask: Option, + deleted_fragments: RoaringBitmap, ) -> Self { Self { params, @@ -165,6 +169,7 @@ impl InvertedIndexBuilder { format_version: current_fts_format_version(), posting_tail_codec: current_fts_format_version().posting_tail_codec(), progress: noop_progress(), + deleted_fragments, } } @@ -190,6 +195,7 @@ impl InvertedIndexBuilder { &mut self, new_data: SendableRecordBatchStream, dest_store: &dyn IndexStore, + old_data_filter: Option, ) -> Result<()> { let schema = new_data.schema(); let doc_col = schema.field(0).name(); @@ -208,6 +214,11 @@ impl InvertedIndexBuilder { .stage_start("tokenize_docs", None, "rows") .await?; self.update_index(new_data, dest_store).await?; + + if let Some(OldIndexDataFilter::Fragments { to_remove, .. }) = old_data_filter { + self.deleted_fragments.extend(to_remove); + } + self.progress.stage_complete("tokenize_docs").await?; self.write(dest_store).await?; Ok(()) @@ -353,6 +364,11 @@ impl InvertedIndexBuilder { } async fn write_metadata(&self, dest_store: &dyn IndexStore, partitions: &[u64]) -> Result<()> { + let mut serialized_deleted_fragments = + Vec::with_capacity(self.deleted_fragments.serialized_size()); + self.deleted_fragments + .serialize_into(&mut serialized_deleted_fragments)?; + let mut metadata = HashMap::from_iter(vec![ ("partitions".to_owned(), serde_json::to_string(&partitions)?), ("params".to_owned(), serde_json::to_string(&self.params)?), @@ -365,6 +381,7 @@ impl InvertedIndexBuilder { self.posting_tail_codec.as_str().to_owned(), ), ]); + if self.params.with_position && self.format_version.uses_shared_position_stream() { metadata.insert( POSITIONS_LAYOUT_KEY.to_owned(), @@ -379,9 +396,22 @@ impl InvertedIndexBuilder { .to_owned(), ); } + + let metadata_file_schema = Arc::new(Schema::new(vec![Field::new( + DELETED_FRAGMENTS_COL, + DataType::Binary, + false, + )])); + let deleted_fragments_col = Arc::new(BinaryArray::from(vec![ + serialized_deleted_fragments.as_slice(), + ])) as Arc; + let record_batch = + RecordBatch::try_new(metadata_file_schema.clone(), vec![deleted_fragments_col])?; + let mut writer = dest_store - .new_index_file(METADATA_FILE, Arc::new(Schema::empty())) + .new_index_file(METADATA_FILE, metadata_file_schema) .await?; + writer.write_record_batch(record_batch).await?; writer.finish_with_metadata(metadata).await?; Ok(()) } @@ -1542,6 +1572,8 @@ async fn merge_metadata_files( let mut format_version = None; let mut posting_tail_codec = None; + let mut deleted_fragments = RoaringBitmap::new(); + for file_name in part_metadata_files { let reader = store.open_index_file(file_name).await?; let metadata = &reader.schema().metadata; @@ -1577,6 +1609,20 @@ async fn merge_metadata_files( if posting_tail_codec.is_none() { posting_tail_codec = Some(parse_posting_tail_codec(metadata)?); } + + if reader.num_rows() > 0 { + let metadata_batch = reader.read_range(0..1, None).await?; + let deleted_fragments_col = metadata_batch + .column_by_name(DELETED_FRAGMENTS_COL) + .expect_ok()?; + let deleted_fragments_arr = deleted_fragments_col + .as_any() + .downcast_ref::() + .expect_ok()?; + let part_deleted_fragments = + RoaringBitmap::deserialize_from(deleted_fragments_arr.value(0))?; + deleted_fragments.extend(part_deleted_fragments); + } } // Create ID mapping: sorted original IDs -> 0,1,2... @@ -1655,6 +1701,7 @@ async fn merge_metadata_files( remapped_partitions.clone(), token_set_format, None, + deleted_fragments, ) .with_format_version(format_version.unwrap_or(InvertedListFormatVersion::V1)) .with_posting_tail_codec(posting_tail_codec.unwrap_or(PostingTailCodec::Fixed32)); @@ -1931,6 +1978,7 @@ mod tests { partitions.clone(), token_set_format, None, + RoaringBitmap::new(), ); builder.write(dest_store.as_ref()).await?; @@ -1987,6 +2035,7 @@ mod tests { vec![0], TokenSetFormat::default(), None, + RoaringBitmap::new(), ) .with_posting_tail_codec(posting_tail_codec); metadata_writer @@ -2033,6 +2082,7 @@ mod tests { Vec::new(), TokenSetFormat::default(), None, + RoaringBitmap::new(), ) .with_format_version(InvertedListFormatVersion::V2) .with_posting_tail_codec(PostingTailCodec::Fixed32); @@ -2073,7 +2123,7 @@ mod tests { .max_token_length(None); let mut builder = InvertedIndexBuilder::new(params); - builder.update(stream, store.as_ref()).await?; + builder.update(stream, store.as_ref(), None).await?; let index = InvertedIndex::load(store, None, &LanceCache::no_cache()).await?; assert_eq!(index.partitions.len(), 1); @@ -2166,7 +2216,7 @@ mod tests { let progress = Arc::new(RecordingProgress::default()); let mut builder = InvertedIndexBuilder::new(InvertedIndexParams::default()) .with_progress(progress.clone()); - builder.update(stream, store.as_ref()).await?; + builder.update(stream, store.as_ref(), None).await?; let events = progress.events.lock().await.clone(); let tags = events @@ -2260,7 +2310,7 @@ mod tests { let progress = Arc::new(RecordingProgress::default()); let mut builder = InvertedIndexBuilder::new(InvertedIndexParams::default()) .with_progress(progress.clone()); - builder.update(stream, store.as_ref()).await?; + builder.update(stream, store.as_ref(), None).await?; let tags = progress .events @@ -2297,7 +2347,7 @@ mod tests { let mut builder = InvertedIndexBuilder::new(InvertedIndexParams::default().memory_limit_mb(0)); let err = builder - .update(stream, store.as_ref()) + .update(stream, store.as_ref(), None) .await .expect_err("single doc should exceed zero worker memory limit"); assert!( @@ -2532,4 +2582,287 @@ mod tests { "unexpected error: {result}" ); } + + #[tokio::test] + async fn test_new_index_has_empty_deleted_fragments() { + let index_dir = TempDir::default(); + let store = Arc::new(LanceIndexStore::new( + ObjectStore::local().into(), + index_dir.obj_path(), + Arc::new(LanceCache::no_cache()), + )); + + let batch = make_doc_batch("hello world", 0); + let stream = RecordBatchStreamAdapter::new(batch.schema(), stream::iter(vec![Ok(batch)])); + let stream = Box::pin(stream); + + let mut builder = InvertedIndexBuilder::new(InvertedIndexParams::default()); + builder.update(stream, store.as_ref(), None).await.unwrap(); + + let index = InvertedIndex::load(store, None, &LanceCache::no_cache()) + .await + .unwrap(); + assert!( + index.deleted_fragments().is_empty(), + "new index should have empty deleted fragments, got {:?}", + index.deleted_fragments() + ); + } + + #[tokio::test] + async fn test_remap_preserves_deleted_fragments() { + let src_dir = TempDir::default(); + let dest_dir = TempDir::default(); + let src_store = Arc::new(LanceIndexStore::new( + ObjectStore::local().into(), + src_dir.obj_path(), + Arc::new(LanceCache::no_cache()), + )); + let dest_store = Arc::new(LanceIndexStore::new( + ObjectStore::local().into(), + dest_dir.obj_path(), + Arc::new(LanceCache::no_cache()), + )); + + // Build an initial index with some deleted fragments + let batch = make_doc_batch("hello world", 0); + let stream = RecordBatchStreamAdapter::new(batch.schema(), stream::iter(vec![Ok(batch)])); + let stream = Box::pin(stream); + + let initial_deleted = RoaringBitmap::from_iter([5, 10, 42]); + let mut builder = InvertedIndexBuilder::from_existing_index( + InvertedIndexParams::default(), + None, + Vec::new(), + TokenSetFormat::default(), + None, + initial_deleted.clone(), + ); + builder + .update(stream, src_store.as_ref(), None) + .await + .unwrap(); + + // Load it back and confirm the invalidated fragments are set + let index = InvertedIndex::load(src_store.clone(), None, &LanceCache::no_cache()) + .await + .unwrap(); + assert_eq!(index.deleted_fragments(), &initial_deleted); + + // Remap the index via the ScalarIndex trait method + use crate::scalar::ScalarIndex; + let mapping = HashMap::from([(0u64, Some(50 << 32))]); + index.remap(&mapping, dest_store.as_ref()).await.unwrap(); + + // Reload from dest and verify deleted fragments are preserved + let remapped_index = InvertedIndex::load(dest_store.clone(), None, &LanceCache::no_cache()) + .await + .unwrap(); + assert_eq!( + remapped_index.deleted_fragments(), + &initial_deleted, + "remap should preserve deleted fragments" + ); + } + + #[tokio::test] + async fn test_update_grows_deleted_fragments_from_old_data_filter() { + let index_dir = TempDir::default(); + let store = Arc::new(LanceIndexStore::new( + ObjectStore::local().into(), + index_dir.obj_path(), + Arc::new(LanceCache::no_cache()), + )); + + // Build an initial index with no deleted fragments + let batch = make_doc_batch("hello world", 0); + let stream = RecordBatchStreamAdapter::new(batch.schema(), stream::iter(vec![Ok(batch)])); + let stream = Box::pin(stream); + + let mut builder = InvertedIndexBuilder::new(InvertedIndexParams::default()); + builder.update(stream, store.as_ref(), None).await.unwrap(); + + // Load the index and update it with an old_data_filter that invalidates fragments + let index = InvertedIndex::load(store.clone(), None, &LanceCache::no_cache()) + .await + .unwrap(); + assert!(index.deleted_fragments().is_empty()); + + let update_dir = TempDir::default(); + let update_store = Arc::new(LanceIndexStore::new( + ObjectStore::local().into(), + update_dir.obj_path(), + Arc::new(LanceCache::no_cache()), + )); + + let batch2 = make_doc_batch("new document", 1 << 32 | 1); + let stream2 = + RecordBatchStreamAdapter::new(batch2.schema(), stream::iter(vec![Ok(batch2)])); + let stream2 = Box::pin(stream2); + + let old_data_filter = Some(crate::scalar::OldIndexDataFilter::Fragments { + to_keep: RoaringBitmap::from_iter([0]), + to_remove: RoaringBitmap::from_iter([3, 7]), + }); + + // Use ScalarIndex::update trait method + use crate::scalar::ScalarIndex; + index + .update(stream2, update_store.as_ref(), old_data_filter) + .await + .unwrap(); + + let updated_index = + InvertedIndex::load(update_store.clone(), None, &LanceCache::no_cache()) + .await + .unwrap(); + assert_eq!( + updated_index.deleted_fragments(), + &RoaringBitmap::from_iter([3, 7]), + "update should add deleted fragments from old_data_filter" + ); + } + + #[tokio::test] + async fn test_update_accumulates_deleted_fragments() { + let dir1 = TempDir::default(); + let store1 = Arc::new(LanceIndexStore::new( + ObjectStore::local().into(), + dir1.obj_path(), + Arc::new(LanceCache::no_cache()), + )); + + // Build initial index + let batch = make_doc_batch("hello world", 0); + let stream = RecordBatchStreamAdapter::new(batch.schema(), stream::iter(vec![Ok(batch)])); + let stream = Box::pin(stream); + + let mut builder = InvertedIndexBuilder::new(InvertedIndexParams::default()); + builder.update(stream, store1.as_ref(), None).await.unwrap(); + + // First update: delete fragments 3 and 7 + let index = InvertedIndex::load(store1.clone(), None, &LanceCache::no_cache()) + .await + .unwrap(); + + let dir2 = TempDir::default(); + let store2 = Arc::new(LanceIndexStore::new( + ObjectStore::local().into(), + dir2.obj_path(), + Arc::new(LanceCache::no_cache()), + )); + + let batch2 = make_doc_batch("second doc", 1 << 32 | 1); + let stream2 = + RecordBatchStreamAdapter::new(batch2.schema(), stream::iter(vec![Ok(batch2)])); + let stream2 = Box::pin(stream2); + + use crate::scalar::ScalarIndex; + index + .update( + stream2, + store2.as_ref(), + Some(crate::scalar::OldIndexDataFilter::Fragments { + to_keep: RoaringBitmap::from_iter([0]), + to_remove: RoaringBitmap::from_iter([3, 7]), + }), + ) + .await + .unwrap(); + + // Second update: invalidate additional fragments 12 and 15 + let index2 = InvertedIndex::load(store2.clone(), None, &LanceCache::no_cache()) + .await + .unwrap(); + assert_eq!( + index2.deleted_fragments(), + &RoaringBitmap::from_iter([3, 7]) + ); + + let dir3 = TempDir::default(); + let store3 = Arc::new(LanceIndexStore::new( + ObjectStore::local().into(), + dir3.obj_path(), + Arc::new(LanceCache::no_cache()), + )); + + let batch3 = make_doc_batch("third doc", 2 << 32 | 2); + let stream3 = + RecordBatchStreamAdapter::new(batch3.schema(), stream::iter(vec![Ok(batch3)])); + let stream3 = Box::pin(stream3); + + index2 + .update( + stream3, + store3.as_ref(), + Some(crate::scalar::OldIndexDataFilter::Fragments { + to_keep: RoaringBitmap::from_iter([0, 1]), + to_remove: RoaringBitmap::from_iter([12, 15]), + }), + ) + .await + .unwrap(); + + let index3 = InvertedIndex::load(store3.clone(), None, &LanceCache::no_cache()) + .await + .unwrap(); + assert_eq!( + index3.deleted_fragments(), + &RoaringBitmap::from_iter([3, 7, 12, 15]), + "deleted fragments should accumulate across updates" + ); + } + + #[tokio::test] + async fn test_update_with_rowid_filter_does_not_grow_deleted_fragments() { + let index_dir = TempDir::default(); + let store = Arc::new(LanceIndexStore::new( + ObjectStore::local().into(), + index_dir.obj_path(), + Arc::new(LanceCache::no_cache()), + )); + + let batch = make_doc_batch("hello world", 0); + let stream = RecordBatchStreamAdapter::new(batch.schema(), stream::iter(vec![Ok(batch)])); + let stream = Box::pin(stream); + + let mut builder = InvertedIndexBuilder::new(InvertedIndexParams::default()); + builder.update(stream, store.as_ref(), None).await.unwrap(); + + let index = InvertedIndex::load(store.clone(), None, &LanceCache::no_cache()) + .await + .unwrap(); + + let update_dir = TempDir::default(); + let update_store = Arc::new(LanceIndexStore::new( + ObjectStore::local().into(), + update_dir.obj_path(), + Arc::new(LanceCache::no_cache()), + )); + + let batch2 = make_doc_batch("new doc", 1); + let stream2 = + RecordBatchStreamAdapter::new(batch2.schema(), stream::iter(vec![Ok(batch2)])); + let stream2 = Box::pin(stream2); + + // Use RowIds filter instead of Fragments — should not affect deleted_fragments + let mut valid_ids = lance_core::utils::mask::RowAddrTreeMap::new(); + valid_ids.insert(0); + let old_data_filter = Some(crate::scalar::OldIndexDataFilter::RowIds(valid_ids)); + + use crate::scalar::ScalarIndex; + index + .update(stream2, update_store.as_ref(), old_data_filter) + .await + .unwrap(); + + let updated_index = + InvertedIndex::load(update_store.clone(), None, &LanceCache::no_cache()) + .await + .unwrap(); + assert!( + updated_index.deleted_fragments().is_empty(), + "RowIds filter should not add to deleted fragments" + ); + } } diff --git a/rust/lance-index/src/scalar/inverted/index.rs b/rust/lance-index/src/scalar/inverted/index.rs index 422d154b453..59d258975aa 100644 --- a/rust/lance-index/src/scalar/inverted/index.rs +++ b/rust/lance-index/src/scalar/inverted/index.rs @@ -109,6 +109,7 @@ pub const POSTING_TAIL_CODEC_VARINT_DELTA_V1: &str = "varint_delta_v1"; pub const POSITIONS_LAYOUT_SHARED_STREAM_V2: &str = "shared_stream_v2"; pub const POSITIONS_CODEC_VARINT_DOC_DELTA_V2: &str = "varint_doc_delta_v2"; pub const POSITIONS_CODEC_PACKED_DELTA_V1: &str = "packed_delta_v1"; +pub const DELETED_FRAGMENTS_COL: &str = "deleted_fragments"; // Just a heuristic when we need to pre-allocate memory for tokens pub const ESTIMATED_MAX_TOKENS_PER_ROW: usize = 4 * 1024; @@ -342,6 +343,9 @@ pub struct InvertedIndex { tokenizer: Box, token_set_format: TokenSetFormat, pub(crate) partitions: Vec>, + // Fragments which are contained in the index, but no longer in the dataset. + // These should be pruned at search time since we don't prune them at update time. + deleted_fragments: RoaringBitmap, } impl Debug for InvertedIndex { @@ -350,6 +354,7 @@ impl Debug for InvertedIndex { .field("params", &self.params) .field("token_set_format", &self.token_set_format) .field("partitions", &self.partitions) + .field("deleted_fragments", &self.deleted_fragments) .finish() } } @@ -399,6 +404,7 @@ impl InvertedIndex { Vec::new(), self.token_set_format, fragment_mask, + self.deleted_fragments.clone(), ) .with_posting_tail_codec(self.posting_tail_codec()) } else { @@ -421,6 +427,7 @@ impl InvertedIndex { partitions, self.token_set_format, fragment_mask, + self.deleted_fragments.clone(), ) .with_format_version(self.format_version()) } @@ -439,6 +446,15 @@ impl InvertedIndex { self.partitions.len() } + /// Returns the set of fragments which are contained in the index, but no longer in the dataset. + /// + /// Most other indices remove data from deleted fragments when the index updates (copy-on-write). + /// However, this would require an expensive copy of the FTS index. Instead, we track the deleted + /// fragments and prune them at search time (merge-on-read). + pub fn deleted_fragments(&self) -> &RoaringBitmap { + &self.deleted_fragments + } + // search the documents that contain the query // return the row ids of the documents sorted by bm25 score // ref: https://en.wikipedia.org/wiki/Okapi_BM25 @@ -607,6 +623,7 @@ impl InvertedIndex { docs, token_set_format: TokenSetFormat::Arrow, })], + deleted_fragments: RoaringBitmap::new(), })) } @@ -648,6 +665,19 @@ impl InvertedIndex { .transpose()? .unwrap_or(TokenSetFormat::Arrow); + // Load deleted_fragments if present (optional for backward compatibility) + let deleted_fragments = if reader.num_rows() > 0 { + let metadata_batch = reader.read_range(0..1, None).await?; + if let Some(col) = metadata_batch.column_by_name(DELETED_FRAGMENTS_COL) { + let arr = col.as_binary_opt::().expect_ok()?; + RoaringBitmap::deserialize_from(arr.value(0))? + } else { + RoaringBitmap::new() + } + } else { + RoaringBitmap::new() + }; + let format = token_set_format; let partitions = partitions.into_iter().map(|id| { let store = store.clone(); @@ -680,6 +710,7 @@ impl InvertedIndex { tokenizer, token_set_format, partitions, + deleted_fragments, })) } Err(_) => { @@ -826,9 +857,11 @@ impl ScalarIndex for InvertedIndex { &self, new_data: SendableRecordBatchStream, dest_store: &dyn IndexStore, - _old_data_filter: Option, + old_data_filter: Option, ) -> Result { - self.to_builder().update(new_data, dest_store).await?; + self.to_builder() + .update(new_data, dest_store, old_data_filter) + .await?; let details = pbold::InvertedIndexDetails::try_from(&self.params)?; @@ -5070,4 +5103,54 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_modern_index_without_deleted_col_has_empty_bitmap() { + // An index created before the deleted_fragments feature was added + // will have a metadata file with num_rows=0 (no record batch data). + // The load path should gracefully handle this with an empty bitmap. + let tmpdir = TempObjDir::default(); + let store = Arc::new(LanceIndexStore::new( + ObjectStore::local().into(), + tmpdir.clone(), + Arc::new(LanceCache::no_cache()), + )); + + let mut builder = InnerBuilder::new(0, false, TokenSetFormat::default()); + builder.tokens.add("test".to_owned()); + builder.posting_lists.push(PostingListBuilder::new(false)); + builder.posting_lists[0].add(0, PositionRecorder::Count(1)); + builder.docs.append(100, 1); + builder.write(store.as_ref()).await.unwrap(); + + // Write a metadata file WITHOUT the deleted_fragments column + // (simulates an older index version) + let metadata = std::collections::HashMap::from_iter(vec![ + ( + "partitions".to_owned(), + serde_json::to_string(&vec![0u64]).unwrap(), + ), + ( + "params".to_owned(), + serde_json::to_string(&InvertedIndexParams::default()).unwrap(), + ), + ( + TOKEN_SET_FORMAT_KEY.to_owned(), + TokenSetFormat::default().to_string(), + ), + ]); + let mut writer = store + .new_index_file(METADATA_FILE, Arc::new(arrow_schema::Schema::empty())) + .await + .unwrap(); + writer.finish_with_metadata(metadata).await.unwrap(); + + let index = InvertedIndex::load(store, None, &LanceCache::no_cache()) + .await + .unwrap(); + assert!( + index.deleted_fragments().is_empty(), + "index without deleted_fragments column should have empty bitmap" + ); + } } diff --git a/rust/lance-table/src/format/index.rs b/rust/lance-table/src/format/index.rs index 09cf59ea64c..a8f4f3f0fd5 100644 --- a/rust/lance-table/src/format/index.rs +++ b/rust/lance-table/src/format/index.rs @@ -108,6 +108,16 @@ impl IndexMetadata { .as_ref() .map(|files| files.iter().map(|f| f.size_bytes).sum()) } + + /// Returns the set of fragments which are part of the fragment bitmap + /// but no longer in the dataset. + pub fn deleted_fragment_bitmap( + &self, + existing_fragments: &RoaringBitmap, + ) -> Option { + let fragment_bitmap = self.fragment_bitmap.as_ref()?; + Some(fragment_bitmap - existing_fragments) + } } impl DeepSizeOf for IndexMetadata { diff --git a/rust/lance/src/dataset/tests/dataset_merge_update.rs b/rust/lance/src/dataset/tests/dataset_merge_update.rs index ef64773cc3c..5990318ef5c 100644 --- a/rust/lance/src/dataset/tests/dataset_merge_update.rs +++ b/rust/lance/src/dataset/tests/dataset_merge_update.rs @@ -1705,3 +1705,369 @@ async fn test_data_replacement_invalidates_index_bitmap() { "Fragment 0 should be removed from index bitmap after DataReplacement on indexed column" ); } +/// Regression test: inverted (FTS) index should not carry stale data after +/// merge_insert + compact + optimize_indices. +/// +/// This is the FTS equivalent of test_merge_insert_with_reordered_columns_and_index. +/// The inverted index's update() ignores the valid_old_fragments filter, so stale +/// posting list entries from pruned fragments survive the merge and cause errors +/// when queries try to resolve the old row addresses. +#[tokio::test] +async fn test_fts_index_stale_data_after_merge_insert_compact_optimize() { + use lance_index::scalar::{FullTextSearchQuery, inverted::InvertedIndexParams}; + + let schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("id", DataType::Int32, false), + ArrowField::new("text", DataType::Utf8, true), + ])); + + // Step 1: Create dataset with 2 rows in separate fragments + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![0, 1])), + Arc::new(StringArray::from(vec![ + "the quick brown fox", + "the lazy dog", + ])), + ], + ) + .unwrap(); + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + let mut dataset = Dataset::write( + reader, + "memory://test_fts_stale", + Some(WriteParams { + max_rows_per_file: 1, // Force 2 fragments + ..Default::default() + }), + ) + .await + .unwrap(); + + // Step 2: Create FTS inverted index on 'text' + let params = InvertedIndexParams::default(); + dataset + .create_index(&["text"], IndexType::Inverted, None, ¶ms, true) + .await + .unwrap(); + + // Sanity check: searching "quick" should return 1 result + let results = dataset + .scan() + .full_text_search(FullTextSearchQuery::new("quick".to_owned())) + .unwrap() + .try_into_batch() + .await + .unwrap(); + assert_eq!(results.num_rows(), 1); + + // Step 3: merge_insert with reversed column order (text, id) + // This triggers the RewriteColumns/DataReplacement path, which prunes the + // index fragment bitmap for the 'text' column. + let reversed_schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("text", DataType::Utf8, true), + ArrowField::new("id", DataType::Int32, false), + ])); + let source_batch = RecordBatch::try_new( + reversed_schema.clone(), + vec![ + Arc::new(StringArray::from(vec![ + "updated fox text", + "new entry here", + ])), + Arc::new(Int32Array::from(vec![1, 2])), + ], + ) + .unwrap(); + + let merge_job = MergeInsertBuilder::try_new(Arc::new(dataset.clone()), vec!["id".to_string()]) + .unwrap() + .when_matched(WhenMatched::UpdateAll) + .when_not_matched(WhenNotMatched::InsertAll) + .try_build() + .unwrap(); + + let reader = Box::new(RecordBatchIterator::new( + vec![Ok(source_batch)], + reversed_schema.clone(), + )); + let (dataset, _stats) = merge_job.execute(reader_to_stream(reader)).await.unwrap(); + let mut dataset = dataset.as_ref().clone(); + + // Step 4: compact_files — moves rows to new fragment(s) + compact_files(&mut dataset, CompactionOptions::default(), None) + .await + .unwrap(); + + // Step 5: optimize_indices — should rebuild the FTS index without stale data. + // With the current bug, the inverted index ignores valid_old_fragments and + // merges stale posting list entries pointing at now-deleted fragments. + dataset + .optimize_indices(&OptimizeOptions::default()) + .await + .unwrap(); + + // Step 6: FTS search should not error and should return correct results. + // "quick" appeared in the original data for id=0 (never updated), so it + // should still be found. + let results = dataset + .scan() + .full_text_search(FullTextSearchQuery::new("quick".to_owned())) + .unwrap() + .try_into_batch() + .await + .unwrap(); + assert_eq!( + results.num_rows(), + 1, + "Expected 1 result for 'quick' after optimize, got {}", + results.num_rows() + ); + + // "lazy" was in the original text for id=1, but id=1 was updated to + // "updated fox text". The old posting for "lazy" should have been filtered + // out during the index update. + let results = dataset + .scan() + .full_text_search(FullTextSearchQuery::new("lazy".to_owned())) + .unwrap() + .try_into_batch() + .await + .unwrap(); + assert_eq!( + results.num_rows(), + 0, + "Expected 0 results for 'lazy' (stale data should be filtered), got {}", + results.num_rows() + ); + + // "updated" should be found (new text for id=1) + let results = dataset + .scan() + .full_text_search(FullTextSearchQuery::new("updated".to_owned())) + .unwrap() + .try_into_batch() + .await + .unwrap(); + assert_eq!(results.num_rows(), 1); + + // "entry" should be found (new row id=2) + let results = dataset + .scan() + .full_text_search(FullTextSearchQuery::new("entry".to_owned())) + .unwrap() + .try_into_batch() + .await + .unwrap(); + assert_eq!(results.num_rows(), 1); + + // Step 7: Another merge_insert should NOT error + let source_batch2 = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![1])), + Arc::new(StringArray::from(vec!["final text"])), + ], + ) + .unwrap(); + + let merge_job2 = MergeInsertBuilder::try_new(Arc::new(dataset.clone()), vec!["id".to_string()]) + .unwrap() + .when_matched(WhenMatched::UpdateAll) + .when_not_matched(WhenNotMatched::InsertAll) + .try_build() + .unwrap(); + + let reader2 = Box::new(RecordBatchIterator::new( + vec![Ok(source_batch2)], + schema.clone(), + )); + let (final_dataset, _) = merge_job2.execute(reader_to_stream(reader2)).await.unwrap(); + final_dataset.validate().await.unwrap(); +} + +/// Regression test: when rows are updated in-place, the FTS index must +/// invalidate old entries and allow re-indexing incrementally. +/// +/// Sequence: +/// 1. Write fragments 1 and 2. +/// 2. Create FTS index covering fragments 1 and 2. +/// 3. Update fragment 1 in-place via merge_insert (DataReplacement path). +/// This removes fragment 1 from the index's fragment_bitmap. +/// 4. Call optimize_indices (append) to create a new index segment covering +/// the updated fragment 1. +/// 5. Call optimize_indices (merge) to merge both segments. The first segment +/// contains the old, invalidated values for fragment 1; the second segment +/// contains the new, valid values. We must keep only the new values. +#[tokio::test] +async fn test_fts_index_incremental_reindex_after_in_place_update() { + use lance_index::scalar::{FullTextSearchQuery, inverted::InvertedIndexParams}; + + let schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("id", DataType::Int32, false), + ArrowField::new("text", DataType::Utf8, true), + ])); + + // Step 1: Create dataset with 2 rows in separate fragments + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![0, 1])), + Arc::new(StringArray::from(vec![ + "the quick brown fox", + "the lazy dog", + ])), + ], + ) + .unwrap(); + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + let mut dataset = Dataset::write( + reader, + "memory://test_fts_incremental_reindex", + Some(WriteParams { + max_rows_per_file: 1, // Force 2 fragments + ..Default::default() + }), + ) + .await + .unwrap(); + + // Step 2: Create FTS inverted index on 'text' + let params = InvertedIndexParams::default(); + dataset + .create_index(&["text"], IndexType::Inverted, None, ¶ms, true) + .await + .unwrap(); + + // Sanity check: "quick" and "lazy" should each return 1 result + let results = dataset + .scan() + .full_text_search(FullTextSearchQuery::new("quick".to_owned())) + .unwrap() + .try_into_batch() + .await + .unwrap(); + assert_eq!(results.num_rows(), 1); + let results = dataset + .scan() + .full_text_search(FullTextSearchQuery::new("lazy".to_owned())) + .unwrap() + .try_into_batch() + .await + .unwrap(); + assert_eq!(results.num_rows(), 1); + + // Step 3: merge_insert with reversed column order to trigger + // RewriteColumns/DataReplacement path, which prunes the index + // fragment bitmap for the updated fragment. + // Update id=1 ("the lazy dog" -> "a speedy cat") + let reversed_schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("text", DataType::Utf8, true), + ArrowField::new("id", DataType::Int32, false), + ])); + let source_batch = RecordBatch::try_new( + reversed_schema.clone(), + vec![ + Arc::new(StringArray::from(vec!["a speedy cat"])), + Arc::new(Int32Array::from(vec![1])), + ], + ) + .unwrap(); + + let merge_job = MergeInsertBuilder::try_new(Arc::new(dataset.clone()), vec!["id".to_string()]) + .unwrap() + .when_matched(WhenMatched::UpdateAll) + .when_not_matched(WhenNotMatched::DoNothing) + .try_build() + .unwrap(); + + let reader = Box::new(RecordBatchIterator::new( + vec![Ok(source_batch)], + reversed_schema.clone(), + )); + let (dataset, _stats) = merge_job.execute(reader_to_stream(reader)).await.unwrap(); + let mut dataset = dataset.as_ref().clone(); + + // Step 4: First optimize_indices (append) — creates a new index segment + // covering the updated (previously unindexed) fragment. + dataset + .optimize_indices(&OptimizeOptions::append()) + .await + .unwrap(); + + // At this point we have two index segments: + // - Segment 1: original index (has old data for fragment with id=1) + // - Segment 2: new delta index (has new data for the updated fragment) + + // Step 5: Second optimize_indices (merge all) — merges both segments. + // The merge must discard old invalidated entries from segment 1 for + // the updated fragment and keep only the new entries from segment 2. + dataset + .optimize_indices(&OptimizeOptions::default()) + .await + .unwrap(); + + // Step 6: Verify search correctness after merge. + + // "quick" was in the original data for id=0 (not updated), should still be found. + let results = dataset + .scan() + .full_text_search(FullTextSearchQuery::new("quick".to_owned())) + .unwrap() + .try_into_batch() + .await + .unwrap(); + assert_eq!( + results.num_rows(), + 1, + "Expected 1 result for 'quick' (id=0 was not updated), got {}", + results.num_rows() + ); + + // "lazy" was in the old text for id=1 which was updated to "a speedy cat". + // The old posting for "lazy" must have been filtered out during the merge. + let results = dataset + .scan() + .full_text_search(FullTextSearchQuery::new("lazy".to_owned())) + .unwrap() + .try_into_batch() + .await + .unwrap(); + assert_eq!( + results.num_rows(), + 0, + "Expected 0 results for 'lazy' (stale data should be filtered), got {}", + results.num_rows() + ); + + // "speedy" is in the new text for id=1, should be found. + let results = dataset + .scan() + .full_text_search(FullTextSearchQuery::new("speedy".to_owned())) + .unwrap() + .try_into_batch() + .await + .unwrap(); + assert_eq!( + results.num_rows(), + 1, + "Expected 1 result for 'speedy' (new text for id=1), got {}", + results.num_rows() + ); + + // "cat" is in the new text for id=1, should be found. + let results = dataset + .scan() + .full_text_search(FullTextSearchQuery::new("cat".to_owned())) + .unwrap() + .try_into_batch() + .await + .unwrap(); + assert_eq!( + results.num_rows(), + 1, + "Expected 1 result for 'cat' (new text for id=1), got {}", + results.num_rows() + ); +} diff --git a/rust/lance/src/index/append.rs b/rust/lance/src/index/append.rs index b9944b9e656..01cc6ae0321 100644 --- a/rust/lance/src/index/append.rs +++ b/rust/lance/src/index/append.rs @@ -166,6 +166,13 @@ pub async fn merge_indices_with_unindexed_frags<'a>( acc |= &b; acc }); + let deleted_old_frags: RoaringBitmap = old_indices + .iter() + .filter_map(|idx| idx.deleted_fragment_bitmap(&dataset.fragment_bitmap)) + .fold(RoaringBitmap::new(), |mut acc, b| { + acc |= &b; + acc + }); frag_bitmap |= &effective_old_frags; let index = dataset @@ -223,7 +230,10 @@ pub async fn merge_indices_with_unindexed_frags<'a>( } else { // Address-style row IDs encode fragment_id in high 32 bits. // Fragment bitmap filtering is valid and cheaper in this mode. - Some(OldIndexDataFilter::Fragments(effective_old_frags)) + Some(OldIndexDataFilter::Fragments { + to_keep: effective_old_frags, + to_remove: deleted_old_frags, + }) }; index .update(new_data_stream, &new_store, old_data_filter) diff --git a/rust/lance/src/index/prefilter.rs b/rust/lance/src/index/prefilter.rs index a86660acfa0..de1d97e1e31 100644 --- a/rust/lance/src/index/prefilter.rs +++ b/rust/lance/src/index/prefilter.rs @@ -49,6 +49,9 @@ pub struct DatasetPreFilter { // these tasks only when we've done as much work as we can without them. pub(super) deleted_ids: Option>>>, pub(super) filtered_ids: Option>>, + // Fragment IDs whose data is still in the index but has been removed from the dataset. + // Used by FTS merge-on-read to prune stale fragments at search time. + pub(super) deleted_fragments: Option, // When the tasks are finished this is the combined filter pub(super) final_mask: Mutex>>, } @@ -74,6 +77,7 @@ impl DatasetPreFilter { Self { deleted_ids, filtered_ids, + deleted_fragments: None, final_mask: Mutex::new(OnceCell::new()), } } @@ -174,6 +178,14 @@ impl DatasetPreFilter { .await } + /// Sets the deleted fragment IDs to block during search. + /// + /// Used by FTS indices which track fragments that have been removed from the + /// dataset but whose data is still present in the index (merge-on-read). + pub fn set_deleted_fragments(&mut self, fragments: RoaringBitmap) { + self.deleted_fragments = Some(fragments); + } + /// Creates a task to load mask to filter out deleted rows. /// /// Sometimes this will be a block list of row ids that are deleted, based @@ -244,6 +256,13 @@ impl PreFilter for DatasetPreFilter { if let Some(deleted_ids) = &self.deleted_ids { combined = combined & (*deleted_ids.get_ready()).clone(); } + if let Some(deleted) = &self.deleted_fragments { + let mut block_list = RowAddrTreeMap::new(); + for frag_id in deleted.iter() { + block_list.insert_fragment(frag_id); + } + combined = combined & RowAddrMask::from_block(block_list); + } Arc::new(combined) }); @@ -251,7 +270,9 @@ impl PreFilter for DatasetPreFilter { } fn is_empty(&self) -> bool { - self.deleted_ids.is_none() && self.filtered_ids.is_none() + self.deleted_ids.is_none() + && self.filtered_ids.is_none() + && self.deleted_fragments.is_none() } /// Get the row id mask for this prefilter diff --git a/rust/lance/src/index/vector/fixture_test.rs b/rust/lance/src/index/vector/fixture_test.rs index bf4f122be4b..facfd1f0cb3 100644 --- a/rust/lance/src/index/vector/fixture_test.rs +++ b/rust/lance/src/index/vector/fixture_test.rs @@ -278,6 +278,7 @@ mod test { Arc::new(DatasetPreFilter { deleted_ids: None, filtered_ids: None, + deleted_fragments: None, final_mask: Mutex::new(OnceCell::new()), }), &NoOpMetricsCollector, diff --git a/rust/lance/src/io/exec/fts.rs b/rust/lance/src/io/exec/fts.rs index 6e129841c76..9de276c64ef 100644 --- a/rust/lance/src/io/exec/fts.rs +++ b/rust/lance/src/io/exec/fts.rs @@ -242,7 +242,7 @@ impl ExecutionPlan for MatchQueryExec { .open_generic_index(&column, &uuid, &metrics.index_metrics) .await?; - let pre_filter = build_prefilter( + let mut pre_filter = build_prefilter( context.clone(), partition, &prefilter_source, @@ -259,6 +259,11 @@ impl ExecutionPlan for MatchQueryExec { column, )) })?; + if !inverted_idx.deleted_fragments().is_empty() { + Arc::get_mut(&mut pre_filter) + .expect("prefilter just created") + .set_deleted_fragments(inverted_idx.deleted_fragments().clone()); + } metrics.record_parts_searched(inverted_idx.partition_count()); let is_fuzzy = matches!(query.fuzziness, Some(n) if n != 0); @@ -888,7 +893,7 @@ impl ExecutionPlan for PhraseQueryExec { .open_generic_index(&column, &uuid, &metrics.index_metrics) .await?; - let pre_filter = build_prefilter( + let mut pre_filter = build_prefilter( context.clone(), partition, &prefilter_source, @@ -905,6 +910,11 @@ impl ExecutionPlan for PhraseQueryExec { column, )) })?; + if !index.deleted_fragments().is_empty() { + Arc::get_mut(&mut pre_filter) + .expect("prefilter just created") + .set_deleted_fragments(index.deleted_fragments().clone()); + } metrics.record_parts_searched(index.partition_count()); let mut tokenizer = index.tokenizer();