diff --git a/docs/src/guide/distributed_indexing.md b/docs/src/guide/distributed_indexing.md index abf0f5e9b0f..9c75c5ceb88 100644 --- a/docs/src/guide/distributed_indexing.md +++ b/docs/src/guide/distributed_indexing.md @@ -111,6 +111,8 @@ At that point the caller has two execution choices: After the physical segments are built, publish them with `commit_existing_index_segments(...)`. +Within a single commit, built segments must have disjoint fragment coverage. + ## Internal Segmented Finalize Model Internally, Lance models distributed vector segment build as: @@ -155,5 +157,10 @@ Lance is responsible for: - merging segment storage into physical segment artifacts - committing physical segments into the manifest +If a staging root or built segment directory is never committed, it remains an +unreferenced index directory under `_indices/`. These artifacts are cleaned up +by `cleanup_old_versions(...)` using the same age-based rules as other +unreferenced index files. + This split keeps distributed scheduling outside the storage engine while still letting Lance own the on-disk index format. diff --git a/rust/lance/src/dataset/cleanup.rs b/rust/lance/src/dataset/cleanup.rs index f5162bba191..bb7e6fe2ff2 100644 --- a/rust/lance/src/dataset/cleanup.rs +++ b/rust/lance/src/dataset/cleanup.rs @@ -1182,6 +1182,7 @@ mod tests { use crate::blob::{BlobArrayBuilder, blob_field}; use crate::index::DatasetIndexExt; use crate::{ + dataset::transaction::{Operation, Transaction}, dataset::{ReadParams, WriteMode, WriteParams, builder::DatasetBuilder}, index::vector::VectorIndexParams, }; @@ -1202,6 +1203,7 @@ mod tests { use lance_table::io::commit::RenameCommitHandler; use lance_testing::datagen::{BatchGenerator, IncrementingInt32, RandomVector, some_batch}; use mock_instant::thread_local::MockClock; + use uuid::Uuid; #[derive(Debug)] struct MockObjectStore { @@ -1530,6 +1532,59 @@ mod tests { } } + async fn write_dummy_index_artifact(dataset: &Dataset, uuid: Uuid) -> Result<()> { + let index_dir = dataset.indices_dir().child(uuid.to_string()); + dataset + .object_store() + .put(&index_dir.child("index.idx"), b"idx") + .await?; + dataset + .object_store() + .put(&index_dir.child("auxiliary.idx"), b"aux") + .await?; + Ok(()) + } + + async fn write_dummy_staging_partial( + dataset: &Dataset, + staging_uuid: Uuid, + shard_uuid: Uuid, + ) -> Result<()> { + let shard_dir = dataset + .indices_dir() + .child(staging_uuid.to_string()) + .child(format!("partial_{}", shard_uuid)); + dataset + .object_store() + .put(&shard_dir.child("index.idx"), b"idx") + .await?; + dataset + .object_store() + .put(&shard_dir.child("auxiliary.idx"), b"aux") + .await?; + Ok(()) + } + + fn dummy_index_metadata( + dataset: &Dataset, + field_id: i32, + uuid: Uuid, + fragment_bitmap: impl IntoIterator, + ) -> IndexMetadata { + IndexMetadata { + uuid, + name: "some_index".to_string(), + fields: vec![field_id], + dataset_version: dataset.version().version, + fragment_bitmap: Some(fragment_bitmap.into_iter().collect()), + index_details: None, + index_version: IndexType::Vector.version(), + created_at: None, + base_id: None, + files: None, + } + } + fn blob_v2_batch(blob_len: usize) -> Box { let mut blobs = BlobArrayBuilder::new(1); blobs.push_bytes(vec![0u8; blob_len]).unwrap(); @@ -2182,6 +2237,149 @@ mod tests { assert_eq!(before_count, after_count); } + #[tokio::test] + async fn cleanup_old_replaced_segment_keeps_still_referenced_segments() { + let fixture = MockDatasetFixture::try_new().unwrap(); + fixture.create_some_data().await.unwrap(); + + let mut dataset = fixture.open().await.unwrap(); + let field_id = dataset.schema().field("indexable").unwrap().id; + + let seg_a = Uuid::new_v4(); + let seg_b = Uuid::new_v4(); + write_dummy_index_artifact(&dataset, seg_a).await.unwrap(); + write_dummy_index_artifact(&dataset, seg_b).await.unwrap(); + + let index_a = dummy_index_metadata(&dataset, field_id, seg_a, [0_u32]); + let index_b = dummy_index_metadata(&dataset, field_id, seg_b, [1_u32]); + let initial_tx = Transaction::new( + dataset.manifest.version, + Operation::CreateIndex { + new_indices: vec![index_a.clone(), index_b.clone()], + removed_indices: vec![], + }, + None, + ); + dataset + .apply_commit(initial_tx, &Default::default(), &Default::default()) + .await + .unwrap(); + + MockClock::set_system_time(TimeDelta::try_days(10).unwrap().to_std().unwrap()); + + let seg_c = Uuid::new_v4(); + write_dummy_index_artifact(&dataset, seg_c).await.unwrap(); + let index_c = dummy_index_metadata(&dataset, field_id, seg_c, [2_u32]); + let replace_tx = Transaction::new( + dataset.manifest.version, + Operation::CreateIndex { + new_indices: vec![index_c.clone()], + removed_indices: vec![index_a.clone()], + }, + None, + ); + dataset + .apply_commit(replace_tx, &Default::default(), &Default::default()) + .await + .unwrap(); + + let removed = fixture + .run_cleanup(utc_now() - TimeDelta::try_days(7).unwrap()) + .await + .unwrap(); + + assert_eq!(removed.index_files_removed, 2); + assert!( + !dataset + .object_store() + .exists( + &dataset + .indices_dir() + .child(seg_a.to_string()) + .child("index.idx") + ) + .await + .unwrap() + ); + assert!( + dataset + .object_store() + .exists( + &dataset + .indices_dir() + .child(seg_b.to_string()) + .child("index.idx") + ) + .await + .unwrap() + ); + assert!( + dataset + .object_store() + .exists( + &dataset + .indices_dir() + .child(seg_c.to_string()) + .child("index.idx") + ) + .await + .unwrap() + ); + } + + #[tokio::test] + async fn cleanup_old_uncommitted_index_artifacts() { + let fixture = MockDatasetFixture::try_new().unwrap(); + fixture.create_some_data().await.unwrap(); + + let dataset = fixture.open().await.unwrap(); + let staging_uuid = Uuid::new_v4(); + let shard_uuid = Uuid::new_v4(); + let built_segment_uuid = Uuid::new_v4(); + + write_dummy_staging_partial(&dataset, staging_uuid, shard_uuid) + .await + .unwrap(); + write_dummy_index_artifact(&dataset, built_segment_uuid) + .await + .unwrap(); + + MockClock::set_system_time(TimeDelta::try_days(10).unwrap().to_std().unwrap()); + + let removed = fixture + .run_cleanup(utc_now() - TimeDelta::try_days(7).unwrap()) + .await + .unwrap(); + + assert_eq!(removed.old_versions, 0); + assert_eq!(removed.index_files_removed, 4); + assert!( + !dataset + .object_store() + .exists( + &dataset + .indices_dir() + .child(staging_uuid.to_string()) + .child(format!("partial_{}", shard_uuid)) + .child("index.idx"), + ) + .await + .unwrap() + ); + assert!( + !dataset + .object_store() + .exists( + &dataset + .indices_dir() + .child(built_segment_uuid.to_string()) + .child("index.idx"), + ) + .await + .unwrap() + ); + } + #[tokio::test] async fn cleanup_failed_commit_data_file() { // We should clean up data files that are written but the commit failed diff --git a/rust/lance/src/dataset/index.rs b/rust/lance/src/dataset/index.rs index 91c535468e5..fded774b151 100644 --- a/rust/lance/src/dataset/index.rs +++ b/rust/lance/src/dataset/index.rs @@ -172,3 +172,121 @@ impl LanceIndexStoreExt for LanceIndexStore { Ok(store.with_file_sizes(index.file_size_map())) } } + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use super::*; + use crate::dataset::WriteParams; + use crate::index::vector::VectorIndexParams; + use crate::index::{DatasetIndexExt, IndexSegment}; + use lance_datagen::{BatchCount, RowCount, array}; + use lance_index::IndexType; + use lance_linalg::distance::MetricType; + use uuid::Uuid; + + #[tokio::test] + async fn test_remapper_only_touches_segments_with_affected_fragments() { + let test_dir = tempfile::tempdir().unwrap(); + let test_uri = test_dir.path().to_str().unwrap(); + + let reader = lance_datagen::gen_batch() + .col("id", array::step::()) + .col( + "vector", + array::rand_vec::(16.into()), + ) + .into_reader_rows(RowCount::from(40), BatchCount::from(2)); + + let mut dataset = Dataset::write( + reader, + test_uri, + Some(WriteParams { + max_rows_per_file: 20, + max_rows_per_group: 20, + ..Default::default() + }), + ) + .await + .unwrap(); + + let fragments = dataset.get_fragments(); + assert!( + fragments.len() >= 2, + "expected at least two fragments for this test" + ); + let target_fragments = fragments.iter().take(2).collect::>(); + + let params = VectorIndexParams::ivf_flat(2, MetricType::L2); + let first_segment_uuid = Uuid::new_v4(); + let second_segment_uuid = Uuid::new_v4(); + let built_index = dataset + .create_index_builder(&["vector"], IndexType::Vector, ¶ms) + .name("vector_idx".to_string()) + .index_uuid(first_segment_uuid.to_string()) + .execute_uncommitted() + .await + .unwrap(); + let first_segment_dir = dataset.indices_dir().child(first_segment_uuid.to_string()); + let second_segment_dir = dataset.indices_dir().child(second_segment_uuid.to_string()); + for file_name in ["index.idx", "auxiliary.idx"] { + dataset + .object_store() + .copy( + &first_segment_dir.child(file_name), + &second_segment_dir.child(file_name), + ) + .await + .unwrap(); + } + + let segments = vec![ + IndexSegment::new( + first_segment_uuid, + [target_fragments[0].id() as u32], + built_index.index_details.clone().unwrap(), + built_index.index_version, + ), + IndexSegment::new( + second_segment_uuid, + [target_fragments[1].id() as u32], + built_index.index_details.clone().unwrap(), + built_index.index_version, + ), + ]; + + dataset + .commit_existing_index_segments("vector_idx", "vector", segments) + .await + .unwrap(); + let committed = dataset.load_indices_by_name("vector_idx").await.unwrap(); + let committed_ids = committed + .iter() + .map(|segment| segment.uuid) + .collect::>(); + let unaffected_segment_id = committed + .iter() + .find(|segment| { + segment + .fragment_bitmap + .as_ref() + .is_some_and(|bitmap| bitmap.contains(target_fragments[1].id() as u32)) + }) + .map(|segment| segment.uuid) + .expect("expected one committed segment to cover the unaffected fragment"); + + let remapper = DatasetIndexRemapperOptions::default() + .create_remapper(&dataset) + .unwrap(); + let remapped = remapper + .remap_indices(HashMap::new(), &[target_fragments[0].id() as u64]) + .await + .unwrap(); + + assert_eq!(remapped.len(), 1); + assert!(committed_ids.contains(&remapped[0].old_id)); + assert_ne!(remapped[0].old_id, unaffected_segment_id); + assert_ne!(remapped[0].new_id, unaffected_segment_id); + } +} diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index c18cce66c6e..6a88441029e 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -98,6 +98,7 @@ fn validate_index_segments(index_name: &str, segments: &[IndexSegment]) -> Resul } let mut seen_segment_ids = HashSet::with_capacity(segments.len()); + let mut covered_fragments = RoaringBitmap::new(); for segment in segments { if !seen_segment_ids.insert(segment.uuid()) { return Err(Error::invalid_input(format!( @@ -106,6 +107,13 @@ fn validate_index_segments(index_name: &str, segments: &[IndexSegment]) -> Resul index_name ))); } + if !covered_fragments.is_disjoint(segment.fragment_bitmap()) { + return Err(Error::invalid_input(format!( + "CreateIndex: overlapping fragment coverage in segment set for index '{}'", + index_name + ))); + } + covered_fragments |= segment.fragment_bitmap().clone(); } Ok(()) @@ -5388,6 +5396,47 @@ mod tests { assert!(err.to_string().contains("at least one index segment")); } + #[tokio::test] + async fn test_commit_existing_index_segments_rejects_overlapping_fragment_coverage() { + use lance_datagen::{BatchCount, RowCount, array}; + + let test_dir = tempfile::tempdir().unwrap(); + let test_uri = test_dir.path().to_str().unwrap(); + + let reader = lance_datagen::gen_batch() + .col("id", array::step::()) + .col( + "vector", + array::rand_vec::(8.into()), + ) + .into_reader_rows(RowCount::from(20), BatchCount::from(2)); + + let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap(); + + let err = dataset + .commit_existing_index_segments( + "vector_idx", + "vector", + vec![ + IndexSegment::new( + Uuid::new_v4(), + [0_u32, 1_u32], + Arc::new(vector_index_details()), + IndexType::Vector.version(), + ), + IndexSegment::new( + Uuid::new_v4(), + [1_u32], + Arc::new(vector_index_details()), + IndexType::Vector.version(), + ), + ], + ) + .await + .unwrap_err(); + assert!(err.to_string().contains("overlapping fragment coverage")); + } + #[tokio::test] async fn test_resolve_index_column_error_cases() { use lance_datagen::{BatchCount, RowCount, array}; diff --git a/rust/lance/src/index/vector/ivf/v2.rs b/rust/lance/src/index/vector/ivf/v2.rs index 8d3b50d58c6..01577b11936 100644 --- a/rust/lance/src/index/vector/ivf/v2.rs +++ b/rust/lance/src/index/vector/ivf/v2.rs @@ -1967,6 +1967,23 @@ mod tests { .unwrap(); assert!(grouped_plan.len() < shard_count); assert!(grouped_plan.iter().any(|plan| plan.segments().len() > 1)); + let mut expected_fragment_coverage = grouped_plan + .iter() + .map(|plan| { + plan.segments() + .iter() + .flat_map(|partial| { + partial + .fragment_bitmap + .as_ref() + .expect("partial shard should have fragment coverage") + .iter() + }) + .sorted() + .collect::>() + }) + .collect::>(); + expected_fragment_coverage.sort(); let grouped_segments = build_distributed_segments( &mut ds_split, @@ -1976,6 +1993,15 @@ mod tests { ) .await; assert_eq!(grouped_segments.len(), grouped_plan.len()); + let mut actual_fragment_coverage = grouped_segments + .iter() + .map(|segment| segment.fragment_bitmap().iter().collect::>()) + .collect::>(); + actual_fragment_coverage.sort(); + assert_eq!( + actual_fragment_coverage, expected_fragment_coverage, + "built segment coverage should equal the union of its source partial shards", + ); async fn collect_row_ids(ds: &Dataset, queries: &[Arc]) -> Vec> { let mut ids_per_query = Vec::with_capacity(queries.len()); @@ -2020,7 +2046,21 @@ mod tests { let ids_single = collect_row_ids(&ds_single, &queries).await; let ids_split = collect_row_ids(&ds_split, &queries).await; - assert_eq!(ids_single, ids_split); + if matches!(index_type, IndexType::IvfSq) { + for (single, split) in ids_single.iter().zip(ids_split.iter()) { + assert_eq!(single.len(), split.len()); + let overlap = single + .iter() + .filter(|row_id| split.contains(row_id)) + .count(); + assert!( + overlap >= K / 3, + "single vs segmented distributed SQ index returned too little top-k overlap", + ); + } + } else { + assert_eq!(ids_single, ids_split); + } } #[tokio::test]