diff --git a/rust/lance-index/src/lib.rs b/rust/lance-index/src/lib.rs index 5f32a73675c..e706ee1ab2f 100644 --- a/rust/lance-index/src/lib.rs +++ b/rust/lance-index/src/lib.rs @@ -33,7 +33,7 @@ pub mod types; pub mod vector; pub use crate::traits::*; -pub use crate::types::IndexSegment; +pub use crate::types::{IndexSegment, StagingIndexShard, VectorIndexSegmentPlan}; pub const INDEX_FILE_NAME: &str = "index.idx"; /// The name of the auxiliary index file. diff --git a/rust/lance-index/src/traits.rs b/rust/lance-index/src/traits.rs index 5ad13861ddc..514873eb02a 100644 --- a/rust/lance-index/src/traits.rs +++ b/rust/lance-index/src/traits.rs @@ -126,6 +126,9 @@ pub trait IndexDescription: Send + Sync { #[async_trait] pub trait DatasetIndexExt { type IndexBuilder<'a> + where + Self: 'a; + type IndexSegmentBuilder<'a> where Self: 'a; @@ -145,6 +148,21 @@ pub trait DatasetIndexExt { params: &'a dyn IndexParams, ) -> Self::IndexBuilder<'a>; + /// Create a builder for materializing final index segments from staging shards. + /// + /// The staging UUID identifies a directory containing previously-built shard + /// outputs. The caller must provide the shard contract with + /// [`crate::StagingIndexShard`] so the planner knows which fragments each + /// shard covers. + /// + /// This is the canonical entry point for distributed vector index finalize. + /// After materializing the final physical segments, publish them as a + /// logical index with [`Self::commit_existing_index_segments`]. + fn create_index_segment_builder<'a>( + &'a self, + staging_index_uuid: String, + ) -> Self::IndexSegmentBuilder<'a>; + /// Create indices on columns. /// /// Upon finish, a new dataset version is generated. @@ -275,6 +293,11 @@ pub trait DatasetIndexExt { async fn index_statistics(&self, index_name: &str) -> Result; /// Commit one or more existing physical index segments as a logical index. + /// + /// This publishes already-materialized physical segments. It does not build + /// or merge index data; callers should first materialize final segments with + /// [`Self::create_index_segment_builder`] or another index-specific build + /// path and then pass the resulting segments here. async fn commit_existing_index_segments( &mut self, index_name: &str, diff --git a/rust/lance-index/src/types.rs b/rust/lance-index/src/types.rs index 6a991be9932..6af2f5c7f7e 100644 --- a/rust/lance-index/src/types.rs +++ b/rust/lance-index/src/types.rs @@ -3,6 +3,7 @@ use std::sync::Arc; +use crate::IndexType; use roaring::RoaringBitmap; use uuid::Uuid; @@ -73,3 +74,100 @@ impl IndexSegment { ) } } + +/// Coordinator-provided metadata for one staging shard. +/// +/// During distributed index build, each worker writes one shard under a shared +/// staging root. The coordinator is responsible for tracking which fragments +/// were assigned to each shard and the approximate shard size used for segment +/// grouping. +#[derive(Debug, Clone, PartialEq)] +pub struct StagingIndexShard { + uuid: Uuid, + fragment_bitmap: RoaringBitmap, + estimated_bytes: u64, +} + +impl StagingIndexShard { + /// Create metadata for one staging shard. + pub fn new(uuid: Uuid, fragment_bitmap: I, estimated_bytes: u64) -> Self + where + I: IntoIterator, + { + Self { + uuid, + fragment_bitmap: fragment_bitmap.into_iter().collect(), + estimated_bytes, + } + } + + /// Return the shard UUID. + pub fn uuid(&self) -> Uuid { + self.uuid + } + + /// Return the fragment coverage of this shard. + pub fn fragment_bitmap(&self) -> &RoaringBitmap { + &self.fragment_bitmap + } + + /// Return the approximate number of bytes represented by this shard. + pub fn estimated_bytes(&self) -> u64 { + self.estimated_bytes + } +} + +/// A plan for materializing one final physical segment from one or more +/// vector index staging shard outputs. +#[derive(Debug, Clone, PartialEq)] +pub struct VectorIndexSegmentPlan { + staging_index_uuid: Uuid, + final_segment: IndexSegment, + partial_shards: Vec, + estimated_bytes: u64, + requested_index_type: Option, +} + +impl VectorIndexSegmentPlan { + /// Create a plan for one final segment. + pub fn new( + staging_index_uuid: Uuid, + final_segment: IndexSegment, + partial_shards: Vec, + estimated_bytes: u64, + requested_index_type: Option, + ) -> Self { + Self { + staging_index_uuid, + final_segment, + partial_shards, + estimated_bytes, + requested_index_type, + } + } + + /// Return the staging index UUID that owns the partial shard outputs. + pub fn staging_index_uuid(&self) -> Uuid { + self.staging_index_uuid + } + + /// Return the final segment metadata that should be committed. + pub fn final_segment(&self) -> &IndexSegment { + &self.final_segment + } + + /// Return the shard metadata that should be combined into the final segment. + pub fn partial_shards(&self) -> &[StagingIndexShard] { + &self.partial_shards + } + + /// Return the estimated number of bytes covered by this plan. + pub fn estimated_bytes(&self) -> u64 { + self.estimated_bytes + } + + /// Return the requested logical index type, if one was supplied to the planner. + pub fn requested_index_type(&self) -> Option { + self.requested_index_type + } +} diff --git a/rust/lance-index/src/vector/distributed/index_merger.rs b/rust/lance-index/src/vector/distributed/index_merger.rs index 0632a693342..a0a7f4d93e7 100755 --- a/rust/lance-index/src/vector/distributed/index_merger.rs +++ b/rust/lance-index/src/vector/distributed/index_merger.rs @@ -608,47 +608,21 @@ async fn read_shard_window_partitions( /// Supports IVF_FLAT, IVF_PQ, IVF_SQ, IVF_HNSW_FLAT, IVF_HNSW_PQ, IVF_HNSW_SQ storage types. /// For PQ and SQ, this assumes all partial indices share the same quantizer/codebook /// and distance type; it will reuse the first encountered metadata. +/// Merge the selected partial shard auxiliary files into `target_dir`. +/// +/// Callers provide the exact shard auxiliary files that should participate in +/// the merge, which lets the segmented finalize path read directly from the +/// original staging root without first restaging `partial_*` directories under +/// a new target. pub async fn merge_partial_vector_auxiliary_files( object_store: &lance_io::object_store::ObjectStore, - index_dir: &object_store::path::Path, + aux_paths: &[object_store::path::Path], + target_dir: &object_store::path::Path, ) -> Result<()> { - let mut aux_paths: Vec = Vec::new(); - let mut stream = object_store.list(Some(index_dir.clone())); - while let Some(item) = stream.next().await { - if let Ok(meta) = item - && let Some(fname) = meta.location.filename() - && fname == INDEX_AUXILIARY_FILE_NAME - { - // Check parent dir name starts with partial_ - let parts: Vec<_> = meta.location.parts().collect(); - if parts.len() >= 2 { - let pname = parts[parts.len() - 2].as_ref(); - if pname.starts_with("partial_") { - aux_paths.push(meta.location.clone()); - } - } - } - } - if aux_paths.is_empty() { - // If a unified auxiliary file already exists at the root, no merge is required. - let aux_out = index_dir.child(INDEX_AUXILIARY_FILE_NAME); - if object_store.exists(&aux_out).await.unwrap_or(false) { - log::warn!( - "No partial_* auxiliary files found under index dir: {}, but unified auxiliary file already exists; skipping merge", - index_dir - ); - return Ok(()); - } - // For certain index types (e.g., FLAT/HNSW-only) the merge may be a no-op in distributed setups - // where shards were committed directly. In such cases, proceed without error to avoid blocking - // index manifest merge. PQ/SQ variants still require merging artifacts and will be handled by - // downstream open logic if missing. - log::warn!( - "No partial_* auxiliary files found under index dir: {}; proceeding without merge for index types that do not require auxiliary shards", - index_dir - ); - return Ok(()); + return Err(Error::index( + "No partial auxiliary files were selected for merge".to_string(), + )); } // Prepare IVF model and storage metadata aggregation @@ -661,7 +635,7 @@ pub async fn merge_partial_vector_auxiliary_files( let mut format_version: Option = None; // Prepare output path; we'll create writer once when we know schema - let aux_out = index_dir.child(INDEX_AUXILIARY_FILE_NAME); + let aux_out = target_dir.child(INDEX_AUXILIARY_FILE_NAME); // We'll delay creating the V2 writer until we know the vector schema (dim and quantizer type) let mut v2w_opt: Option = None; @@ -682,7 +656,7 @@ pub async fn merge_partial_vector_auxiliary_files( let mut shard_infos: Vec = Vec::new(); // Iterate over each shard auxiliary file and merge its metadata and collect lengths - for aux in &aux_paths { + for aux in aux_paths { let fh = sched.open_file(aux, &CachedFileSize::unknown()).await?; let reader = V2Reader::try_open( fh, @@ -1417,9 +1391,13 @@ mod tests { .await .unwrap(); - merge_partial_vector_auxiliary_files(&object_store, &index_dir) - .await - .unwrap(); + merge_partial_vector_auxiliary_files( + &object_store, + &[aux0.clone(), aux1.clone()], + &index_dir, + ) + .await + .unwrap(); let aux_out = index_dir.child(INDEX_AUXILIARY_FILE_NAME); assert!(object_store.exists(&aux_out).await.unwrap()); @@ -1515,7 +1493,12 @@ mod tests { .await .unwrap(); - let res = merge_partial_vector_auxiliary_files(&object_store, &index_dir).await; + let res = merge_partial_vector_auxiliary_files( + &object_store, + &[aux0.clone(), aux1.clone()], + &index_dir, + ) + .await; match res { Err(Error::Index { message, .. }) => { assert!( @@ -1690,9 +1673,13 @@ mod tests { .unwrap(); // Merge PQ auxiliary files. - merge_partial_vector_auxiliary_files(&object_store, &index_dir) - .await - .unwrap(); + merge_partial_vector_auxiliary_files( + &object_store, + &[aux0.clone(), aux1.clone()], + &index_dir, + ) + .await + .unwrap(); // 3) Unified auxiliary file exists. let aux_out = index_dir.child(INDEX_AUXILIARY_FILE_NAME); @@ -1818,7 +1805,12 @@ mod tests { .await .unwrap(); - let res = merge_partial_vector_auxiliary_files(&object_store, &index_dir).await; + let res = merge_partial_vector_auxiliary_files( + &object_store, + &[aux0.clone(), aux1.clone()], + &index_dir, + ) + .await; match res { Err(Error::Index { message, .. }) => { assert!( @@ -1893,9 +1885,13 @@ mod tests { .unwrap(); // Merge must succeed and produce a unified auxiliary file. - merge_partial_vector_auxiliary_files(&object_store, &index_dir) - .await - .unwrap(); + merge_partial_vector_auxiliary_files( + &object_store, + &[aux_a.clone(), aux_b.clone()], + &index_dir, + ) + .await + .unwrap(); let aux_out = index_dir.child(INDEX_AUXILIARY_FILE_NAME); assert!(object_store.exists(&aux_out).await.unwrap()); diff --git a/rust/lance-index/src/vector/sq/storage.rs b/rust/lance-index/src/vector/sq/storage.rs index 8f6bcea6c1a..8311c20acaa 100644 --- a/rust/lance-index/src/vector/sq/storage.rs +++ b/rust/lance-index/src/vector/sq/storage.rs @@ -36,7 +36,7 @@ use crate::{ pub const SQ_METADATA_KEY: &str = "lance:sq"; -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct ScalarQuantizationMetadata { pub dim: usize, pub num_bits: u16, diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 6adefc7bf3d..7360e224c87 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -1817,6 +1817,11 @@ impl Dataset { .collect() } + /// Iterate over manifest fragments without allocating [`FileFragment`] wrappers. + pub fn iter_fragments(&self) -> impl Iterator { + self.manifest.fragments.iter() + } + pub fn get_fragment(&self, fragment_id: usize) -> Option { let dataset = Arc::new(self.clone()); let fragment = self @@ -2658,6 +2663,16 @@ impl Dataset { self.merge_impl(stream, left_on, right_on).await } + /// Finalize a staged distributed index into a single merged root. + /// + /// For vector indices this is the legacy compatibility path that preserves + /// the historical `shared_uuid -> merge_index_metadata(...) -> commit_existing_index_segments(...)` + /// workflow. The canonical multi-segment vector workflow is: + /// + /// 1. `create_index_segment_builder(staging_uuid)` + /// 2. `with_partial_shards(...)` + /// 3. `build_all()` + /// 4. `commit_existing_index_segments(index_name, column, segments)` pub async fn merge_index_metadata( &self, index_uuid: &str, @@ -2688,14 +2703,54 @@ impl Dataset { } // Precise vector index types: IVF_FLAT, IVF_PQ, IVF_SQ IndexType::IvfFlat | IndexType::IvfPq | IndexType::IvfSq | IndexType::Vector => { - // Merge distributed vector index partials and finalize root index via Lance IVF helper - crate::index::vector::ivf::finalize_distributed_merge( - self.object_store(), + let mut partial_shards = self + .object_store() + .read_dir(index_dir.clone()) + .await? + .into_iter() + .filter(|name| name.starts_with("partial_")) + .map(|name| { + name.strip_prefix("partial_") + .ok_or_else(|| { + Error::index(format!( + "Distributed vector shard '{}' does not start with 'partial_'", + name + )) + }) + .and_then(|shard_uuid| { + uuid::Uuid::parse_str(shard_uuid).map_err(|err| { + Error::index(format!( + "Distributed vector shard '{}' does not end with a valid UUID: {}", + name, err + )) + }) + }) + .map(|shard_uuid| { + crate::index::vector::ivf::PartialShard::new( + shard_uuid, + std::iter::empty::(), + 0, + ) + }) + }) + .collect::>>()?; + partial_shards.sort_by_key(|shard| shard.uuid()); + let segment_plans = crate::index::vector::ivf::plan_staging_segments( &index_dir, + &partial_shards, Some(index_type), + None, ) .await?; - Ok(()) + let merged_plan = + crate::index::vector::ivf::collapse_segment_plans(&segment_plans)?; + crate::index::vector::ivf::merge_staging_segment( + self.object_store(), + &self.indices_dir(), + &merged_plan, + ) + .await + .map(|_| ()) } _ => Err(Error::invalid_input_source(Box::new(std::io::Error::new( std::io::ErrorKind::InvalidInput, diff --git a/rust/lance/src/dataset/mem_wal/memtable/flush.rs b/rust/lance/src/dataset/mem_wal/memtable/flush.rs index 5869f6d2db7..8396a5548c4 100644 --- a/rust/lance/src/dataset/mem_wal/memtable/flush.rs +++ b/rust/lance/src/dataset/mem_wal/memtable/flush.rs @@ -247,11 +247,8 @@ impl MemTableFlusher { index_meta.fields = vec![field_idx]; index_meta.dataset_version = dataset.version().version; // Calculate fragment_bitmap from dataset fragments - let fragment_ids: roaring::RoaringBitmap = dataset - .get_fragments() - .iter() - .map(|f| f.id() as u32) - .collect(); + let fragment_ids: roaring::RoaringBitmap = + dataset.fragment_bitmap.as_ref().clone(); index_meta.fragment_bitmap = Some(fragment_ids); // Commit the index to the dataset @@ -467,11 +464,7 @@ impl MemTableFlusher { let schema = dataset.schema(); let field_idx = schema.field(&fts_cfg.column).map(|f| f.id).unwrap_or(0); - let fragment_ids: roaring::RoaringBitmap = dataset - .get_fragments() - .iter() - .map(|f| f.id() as u32) - .collect(); + let fragment_ids: roaring::RoaringBitmap = dataset.fragment_bitmap.as_ref().clone(); let index_meta = IndexMetadata { uuid: index_uuid, diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index fdc91760a2e..1063fdd05af 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -582,6 +582,7 @@ impl IndexDescription for IndexDescriptionImpl { #[async_trait] impl DatasetIndexExt for Dataset { type IndexBuilder<'a> = CreateIndexBuilder<'a>; + type IndexSegmentBuilder<'a> = create::IndexSegmentBuilder<'a>; /// Create a builder for creating an index on columns. /// @@ -627,6 +628,13 @@ impl DatasetIndexExt for Dataset { CreateIndexBuilder::new(self, columns, index_type, params) } + fn create_index_segment_builder<'a>( + &'a self, + staging_index_uuid: String, + ) -> create::IndexSegmentBuilder<'a> { + create::IndexSegmentBuilder::new(self, staging_index_uuid) + } + #[instrument(skip_all)] async fn create_index( &mut self, diff --git a/rust/lance/src/index/create.rs b/rust/lance/src/index/create.rs index 2678cb22471..179587b1ac0 100644 --- a/rust/lance/src/index/create.rs +++ b/rust/lance/src/index/create.rs @@ -17,10 +17,13 @@ use crate::{ vector_index_details, }, }; -use futures::future::BoxFuture; +use futures::future::{BoxFuture, try_join_all}; use lance_core::datatypes::format_field_path; use lance_index::progress::{IndexBuildProgress, NoopIndexBuildProgress}; -use lance_index::{IndexParams, IndexType, scalar::CreatedIndex}; +use lance_index::{ + IndexParams, IndexSegment, IndexType, StagingIndexShard, VectorIndexSegmentPlan, + scalar::CreatedIndex, +}; use lance_index::{ metrics::NoOpMetricsCollector, scalar::{LANCE_SCALAR_INDEX, ScalarIndexParams, inverted::tokenizer::InvertedIndexParams}, @@ -494,21 +497,112 @@ impl<'a> IntoFuture for CreateIndexBuilder<'a> { } } +/// Build final physical index segments from previously-written staging shards. +/// +/// Use [`DatasetIndexExt::create_index_segment_builder`] to open a staging root +/// and then either: +/// +/// - call [`Self::plan`] and orchestrate individual segment builds externally, or +/// - call [`Self::build_all`] to materialize all final segments on the current node. +/// +/// This builder only materializes physical segments. Publishing those segments as +/// a logical index still requires [`DatasetIndexExt::commit_existing_index_segments`]. +/// Together these two APIs form the canonical distributed vector finalize workflow. +#[derive(Clone)] +pub struct IndexSegmentBuilder<'a> { + dataset: &'a Dataset, + staging_index_uuid: String, + partial_shards: Vec, + target_segment_bytes: Option, +} + +impl<'a> IndexSegmentBuilder<'a> { + pub(crate) fn new(dataset: &'a Dataset, staging_index_uuid: String) -> Self { + Self { + dataset, + staging_index_uuid, + partial_shards: Vec::new(), + target_segment_bytes: None, + } + } + + /// Provide the coordinator-known shard contract for this staging root. + pub fn with_partial_shards(mut self, partial_shards: Vec) -> Self { + self.partial_shards = partial_shards; + self + } + + /// Set the target size, in bytes, for merged final segments. + /// + /// When set, shard outputs will be grouped into larger final segments up to + /// approximately this size. When unset, each shard output becomes one final + /// segment. + pub fn with_target_segment_bytes(mut self, bytes: u64) -> Self { + self.target_segment_bytes = Some(bytes); + self + } + + /// Plan how staging shards should be grouped into final segments. + pub async fn plan(&self) -> Result> { + if self.partial_shards.is_empty() { + return Err(Error::invalid_input( + "IndexSegmentBuilder requires at least one staging shard; \ + call with_partial_shards(...) with coordinator-provided shard metadata" + .to_string(), + )); + } + + crate::index::vector::ivf::plan_staging_segments( + &self + .dataset + .indices_dir() + .child(self.staging_index_uuid.as_str()), + &self.partial_shards, + None, + self.target_segment_bytes, + ) + .await + } + + /// Materialize one final segment from a previously-generated plan. + pub async fn build(&self, plan: &VectorIndexSegmentPlan) -> Result { + crate::index::vector::ivf::merge_staging_segment( + self.dataset.object_store(), + &self.dataset.indices_dir(), + plan, + ) + .await + } + + /// Plan and materialize all final segments from this staging root. + pub async fn build_all(&self) -> Result> { + let plans = self.plan().await?; + try_join_all(plans.iter().map(|plan| self.build(plan))).await + } +} + #[cfg(test)] mod tests { use super::*; use crate::dataset::{WriteMode, WriteParams}; + use crate::index::DatasetIndexExt; use crate::utils::test::{DatagenExt, FragmentCount, FragmentRowCount}; use arrow::datatypes::{Float32Type, Int32Type}; - use arrow_array::RecordBatchIterator; + use arrow_array::cast::AsArray; + use arrow_array::{FixedSizeListArray, RecordBatchIterator}; use arrow_array::{Int32Array, RecordBatch, StringArray}; use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema}; + use lance_arrow::FixedSizeListArrayExt; use lance_core::utils::tempfile::TempStrDir; use lance_datagen::{self, gen_batch}; use lance_index::optimize::OptimizeOptions; use lance_index::scalar::inverted::tokenizer::InvertedIndexParams; - use lance_linalg::distance::MetricType; + use lance_index::vector::hnsw::builder::HnswBuildParams; + use lance_index::vector::ivf::IvfBuildParams; + use lance_index::vector::kmeans::{KMeansParams, train_kmeans}; + use lance_linalg::distance::{DistanceType, MetricType}; use std::sync::Arc; + use uuid::Uuid; #[test] fn test_inverted_training_params_include_build_only_fields() { @@ -759,6 +853,39 @@ mod tests { .unwrap() } + async fn prepare_vector_ivf(dataset: &Dataset, vector_column: &str) -> IvfBuildParams { + let batch = dataset + .scan() + .project(&[vector_column.to_string()]) + .unwrap() + .try_into_batch() + .await + .unwrap(); + let vectors = batch + .column_by_name(vector_column) + .expect("vector column should exist") + .as_fixed_size_list(); + let dim = vectors.value_length() as usize; + let values = vectors.values().as_primitive::(); + + let kmeans = train_kmeans::( + values, + KMeansParams::new(None, 10, 1, DistanceType::L2), + dim, + 4, + 3, + ) + .unwrap(); + let centroids = Arc::new( + FixedSizeListArray::try_new_from_values( + kmeans.centroids.as_primitive::().clone(), + dim as i32, + ) + .unwrap(), + ); + IvfBuildParams::try_with_centroids(4, centroids).unwrap() + } + #[tokio::test] async fn test_execute_uncommitted() { // Test the complete workflow that covers the user's specified code pattern: @@ -922,6 +1049,293 @@ mod tests { assert_eq!(all_covered_fragments, expected_fragments); } + #[tokio::test] + async fn test_merge_index_metadata_vector_preserves_shared_uuid_commit_workflow() { + let tmpdir = TempStrDir::default(); + let dataset_uri = format!("file://{}", tmpdir.as_str()); + + let reader = gen_batch() + .col("id", lance_datagen::array::step::()) + .col( + "vector", + lance_datagen::array::rand_vec::(lance_datagen::Dimension::from(16)), + ) + .into_reader_rows( + lance_datagen::RowCount::from(256), + lance_datagen::BatchCount::from(4), + ); + let mut dataset = Dataset::write( + reader, + &dataset_uri, + Some(WriteParams { + max_rows_per_file: 64, + mode: WriteMode::Overwrite, + ..Default::default() + }), + ) + .await + .unwrap(); + + let fragments = dataset.get_fragments(); + assert!(fragments.len() >= 2); + let shared_uuid = Uuid::new_v4(); + let params = VectorIndexParams::with_ivf_flat_params( + DistanceType::L2, + prepare_vector_ivf(&dataset, "vector").await, + ); + + for fragment in &fragments { + let mut builder = + CreateIndexBuilder::new(&mut dataset, &["vector"], IndexType::Vector, ¶ms) + .name("vector_idx".to_string()) + .fragments(vec![fragment.id() as u32]) + .index_uuid(shared_uuid.to_string()); + builder.execute_uncommitted().await.unwrap(); + } + + dataset + .merge_index_metadata(&shared_uuid.to_string(), IndexType::IvfFlat, None) + .await + .unwrap(); + + let merged_root = dataset + .indices_dir() + .child(shared_uuid.to_string()) + .child(crate::index::INDEX_FILE_NAME); + assert!(dataset.object_store().exists(&merged_root).await.unwrap()); + + dataset + .commit_existing_index_segments( + "vector_idx", + "vector", + vec![IndexSegment::new( + shared_uuid, + fragments.iter().map(|fragment| fragment.id() as u32), + Arc::new(vector_index_details()), + IndexType::IvfFlat.version(), + )], + ) + .await + .unwrap(); + + let indices = dataset.load_indices_by_name("vector_idx").await.unwrap(); + assert_eq!(indices.len(), 1); + assert_eq!(indices[0].uuid, shared_uuid); + let committed_fragments: Vec = indices[0] + .fragment_bitmap + .as_ref() + .unwrap() + .iter() + .collect(); + assert_eq!( + committed_fragments, + fragments + .iter() + .map(|fragment| fragment.id() as u32) + .collect::>() + ); + + let query_batch = dataset + .scan() + .project(&["vector"] as &[&str]) + .unwrap() + .limit(Some(4), None) + .unwrap() + .try_into_batch() + .await + .unwrap(); + let q = query_batch["vector"].as_fixed_size_list().value(0); + let result = dataset + .scan() + .project(&["_rowid"] as &[&str]) + .unwrap() + .nearest("vector", q.as_ref(), 5) + .unwrap() + .try_into_batch() + .await + .unwrap(); + assert!(result.num_rows() > 0); + } + + #[tokio::test] + async fn test_index_segment_builder_vector_commits_multi_segment_logical_index() { + let tmpdir = TempStrDir::default(); + let dataset_uri = format!("file://{}", tmpdir.as_str()); + + let reader = gen_batch() + .col("id", lance_datagen::array::step::()) + .col( + "vector", + lance_datagen::array::rand_vec::(lance_datagen::Dimension::from(16)), + ) + .into_reader_rows( + lance_datagen::RowCount::from(256), + lance_datagen::BatchCount::from(4), + ); + let mut dataset = Dataset::write( + reader, + &dataset_uri, + Some(WriteParams { + max_rows_per_file: 64, + mode: WriteMode::Overwrite, + ..Default::default() + }), + ) + .await + .unwrap(); + + let fragments = dataset.get_fragments(); + assert!(fragments.len() >= 2); + let shared_uuid = Uuid::new_v4(); + let params = VectorIndexParams::with_ivf_flat_params( + DistanceType::L2, + prepare_vector_ivf(&dataset, "vector").await, + ); + let staging_root = dataset.indices_dir().child(shared_uuid.to_string()); + let mut seen_shards = std::collections::HashSet::new(); + let mut partial_shards = Vec::new(); + + for fragment in fragments.iter().take(2) { + CreateIndexBuilder::new(&mut dataset, &["vector"], IndexType::Vector, ¶ms) + .name("vector_idx".to_string()) + .fragments(vec![fragment.id() as u32]) + .index_uuid(shared_uuid.to_string()) + .execute_uncommitted() + .await + .unwrap(); + + let shard_uuid = dataset + .object_store() + .read_dir(staging_root.clone()) + .await + .unwrap() + .into_iter() + .filter_map(|name| name.strip_prefix("partial_").map(|name| name.to_string())) + .map(|name| Uuid::parse_str(&name).unwrap()) + .find(|uuid| seen_shards.insert(*uuid)) + .expect("each staged shard build should create exactly one new partial shard"); + partial_shards.push(StagingIndexShard::new( + shard_uuid, + [fragment.id() as u32], + 0, + )); + } + + let segments = dataset + .create_index_segment_builder(shared_uuid.to_string()) + .with_partial_shards(partial_shards) + .build_all() + .await + .unwrap(); + assert_eq!(segments.len(), 2); + + dataset + .commit_existing_index_segments("vector_idx", "vector", segments) + .await + .unwrap(); + + let indices = dataset.load_indices_by_name("vector_idx").await.unwrap(); + assert_eq!(indices.len(), 2); + let mut committed_fragment_sets = indices + .iter() + .map(|metadata| { + metadata + .fragment_bitmap + .as_ref() + .unwrap() + .iter() + .collect::>() + }) + .collect::>(); + committed_fragment_sets.sort(); + assert_eq!(committed_fragment_sets, vec![vec![0], vec![1]]); + + let query_batch = dataset + .scan() + .project(&["vector"] as &[&str]) + .unwrap() + .limit(Some(4), None) + .unwrap() + .try_into_batch() + .await + .unwrap(); + let q = query_batch["vector"].as_fixed_size_list().value(0); + let result = dataset + .scan() + .project(&["_rowid"] as &[&str]) + .unwrap() + .nearest("vector", q.as_ref(), 5) + .unwrap() + .try_into_batch() + .await + .unwrap(); + assert!(result.num_rows() > 0); + } + + #[tokio::test] + async fn test_commit_existing_index_supports_local_hnsw_segments() { + let tmpdir = TempStrDir::default(); + let dataset_uri = format!("file://{}", tmpdir.as_str()); + + let reader = gen_batch() + .col("id", lance_datagen::array::step::()) + .col( + "vector", + lance_datagen::array::rand_vec::(lance_datagen::Dimension::from(16)), + ) + .into_reader_rows( + lance_datagen::RowCount::from(128), + lance_datagen::BatchCount::from(2), + ); + let mut dataset = Dataset::write( + reader, + &dataset_uri, + Some(WriteParams { + max_rows_per_file: 64, + mode: WriteMode::Overwrite, + ..Default::default() + }), + ) + .await + .unwrap(); + + let uuid = Uuid::new_v4(); + let params = VectorIndexParams::ivf_hnsw( + DistanceType::L2, + prepare_vector_ivf(&dataset, "vector").await, + HnswBuildParams::default(), + ); + + CreateIndexBuilder::new(&mut dataset, &["vector"], IndexType::Vector, ¶ms) + .name("vector_idx".to_string()) + .index_uuid(uuid.to_string()) + .execute_uncommitted() + .await + .unwrap(); + + dataset + .commit_existing_index_segments( + "vector_idx", + "vector", + vec![IndexSegment::new( + uuid, + dataset.fragment_bitmap.as_ref().clone(), + Arc::new(vector_index_details()), + IndexType::IvfHnswFlat.version(), + )], + ) + .await + .unwrap(); + + let indices = dataset.load_indices_by_name("vector_idx").await.unwrap(); + assert_eq!(indices.len(), 1); + assert_eq!(indices[0].uuid, uuid); + assert_eq!( + indices[0].fragment_bitmap.as_ref().unwrap(), + dataset.fragment_bitmap.as_ref() + ); + } + #[tokio::test] async fn test_optimize_should_not_removes_delta_indices() { let tmpdir = TempStrDir::default(); diff --git a/rust/lance/src/index/vector.rs b/rust/lance/src/index/vector.rs index 0d0ad64993a..763056145ac 100644 --- a/rust/lance/src/index/vector.rs +++ b/rust/lance/src/index/vector.rs @@ -35,7 +35,7 @@ use object_store::path::Path; use lance_arrow::FixedSizeListArrayExt; use lance_index::vector::pq::ProductQuantizer; use lance_index::vector::quantizer::QuantizationType; -use lance_index::vector::v3::shuffler::create_ivf_shuffler; +use lance_index::vector::v3::shuffler::{Shuffler, create_ivf_shuffler}; use lance_index::vector::v3::subindex::SubIndexType; use lance_index::vector::{ VectorIndex, @@ -329,56 +329,51 @@ impl IndexParams for VectorIndexParams { } } -/// Build a Distributed Vector Index for specific fragments -#[allow(clippy::too_many_arguments)] -#[instrument(level = "debug", skip(dataset))] -pub(crate) async fn build_distributed_vector_index( +/// Prepare the shared build inputs used by both direct local builds and +/// staged shard builds. +/// +/// These paths emit different file layouts, but they follow the same rules for +/// validating the vector column, deriving the effective index type, sizing IVF +/// partitions, and constructing the shuffler. +async fn prepare_vector_segment_build( dataset: &Dataset, column: &str, - _name: &str, - uuid: &str, params: &VectorIndexParams, - frag_reuse_index: Option>, - fragment_ids: &[u32], progress: Arc, -) -> Result<()> { + mode: &str, + require_precomputed_ivf: bool, +) -> Result<(DataType, IndexType, IvfBuildParams, Box)> { let stages = ¶ms.stages; if stages.is_empty() { - return Err(Error::index( - "Build Distributed Vector Index: must have at least 1 stage".to_string(), - )); - }; + return Err(Error::index(format!("{mode}: must have at least 1 stage"))); + } let StageParams::Ivf(ivf_params0) = &stages[0] else { return Err(Error::index(format!( - "Build Distributed Vector Index: invalid stages: {:?}", + "{mode}: invalid stages: {:?}", stages ))); }; - if ivf_params0.centroids.is_none() { - return Err(Error::index( - "Build Distributed Vector Index: missing precomputed IVF centroids; \ - please provide IvfBuildParams.centroids \ - for concurrent distributed create_index" - .to_string(), - )); + if require_precomputed_ivf && ivf_params0.centroids.is_none() { + return Err(Error::index(format!( + "{mode}: missing precomputed IVF centroids; please provide \ + IvfBuildParams.centroids for distributed segment build" + ))); } let (vector_type, element_type) = get_vector_type(dataset.schema(), column)?; if let DataType::List(_) = vector_type && params.metric_type != DistanceType::Cosine { - return Err(Error::index( - "Build Distributed Vector Index: multivector type supports only cosine distance" - .to_string(), - )); + return Err(Error::index(format!( + "{mode}: multivector type supports only cosine distance" + ))); } let num_rows = dataset.count_rows(None).await?; let index_type = params.index_type(); - let num_partitions = ivf_params0.num_partitions.unwrap_or_else(|| { recommended_num_partitions( num_rows, @@ -387,37 +382,58 @@ pub(crate) async fn build_distributed_vector_index( .unwrap_or(index_type.target_partition_size()), ) }); - let mut ivf_params = ivf_params0.clone(); ivf_params.num_partitions = Some(num_partitions); - let ivf_centroids = ivf_params - .centroids - .as_ref() - .expect("precomputed IVF centroids required for distributed indexing; checked above") - .as_ref() - .clone(); - let format_version = dataset_format_version(dataset); - let temp_dir = TempStdDir::default(); let temp_dir_path = Path::from_filesystem_path(&temp_dir)?; let shuffler = create_ivf_shuffler( temp_dir_path, num_partitions, format_version, - Some(progress.clone()), + Some(progress), ); + Ok((element_type, index_type, ivf_params, shuffler)) +} + +/// Build a Distributed Vector Index for specific fragments +#[allow(clippy::too_many_arguments)] +#[instrument(level = "debug", skip(dataset))] +pub(crate) async fn build_distributed_vector_index( + dataset: &Dataset, + column: &str, + _name: &str, + uuid: &str, + params: &VectorIndexParams, + frag_reuse_index: Option>, + fragment_ids: &[u32], + progress: Arc, +) -> Result<()> { + let (element_type, index_type, ivf_params, shuffler) = prepare_vector_segment_build( + dataset, + column, + params, + progress.clone(), + "Build Distributed Vector Index", + true, + ) + .await?; + let stages = ¶ms.stages; + + let ivf_centroids = ivf_params + .centroids + .as_ref() + .expect("precomputed IVF centroids required for distributed indexing; checked above") + .as_ref() + .clone(); + let filtered_dataset = dataset.clone(); let out_base = dataset.indices_dir().child(uuid); - - let make_partial_index_dir = |out_base: &Path| -> Path { - let shard_uuid = Uuid::new_v4(); - out_base.child(format!("partial_{}", shard_uuid)) - }; - let new_index_dir = || make_partial_index_dir(&out_base); + let shard_uuid = Uuid::new_v4(); + let index_dir = out_base.child(format!("partial_{}", shard_uuid)); let fragment_filter = fragment_ids.to_vec(); @@ -458,13 +474,12 @@ pub(crate) async fn build_distributed_vector_index( match index_type { IndexType::IvfFlat => match element_type { DataType::Float16 | DataType::Float32 | DataType::Float64 => { - let index_dir = new_index_dir(); let ivf_model = make_ivf_model(); IvfIndexBuilder::::new( filtered_dataset, column.to_owned(), - index_dir, + index_dir.clone(), params.metric_type, shuffler, Some(ivf_params), @@ -479,13 +494,12 @@ pub(crate) async fn build_distributed_vector_index( .await?; } DataType::UInt8 => { - let index_dir = new_index_dir(); let ivf_model = make_ivf_model(); IvfIndexBuilder::::new( filtered_dataset, column.to_owned(), - index_dir, + index_dir.clone(), params.metric_type, shuffler, Some(ivf_params), @@ -523,14 +537,13 @@ pub(crate) async fn build_distributed_vector_index( )); } IndexFileVersion::V3 => { - let index_dir = new_index_dir(); let ivf_model = make_ivf_model(); let global_pq = make_global_pq(pq_params)?; IvfIndexBuilder::::new( filtered_dataset, column.to_owned(), - index_dir, + index_dir.clone(), params.metric_type, shuffler, Some(ivf_params), @@ -559,12 +572,10 @@ pub(crate) async fn build_distributed_vector_index( ))); }; - let index_dir = new_index_dir(); - IvfIndexBuilder::::new( filtered_dataset, column.to_owned(), - index_dir, + index_dir.clone(), params.metric_type, shuffler, Some(ivf_params), @@ -586,12 +597,10 @@ pub(crate) async fn build_distributed_vector_index( ))); }; - let index_dir = new_index_dir(); - IvfIndexBuilder::::new( filtered_dataset, column.to_owned(), - index_dir, + index_dir.clone(), params.metric_type, shuffler, Some(ivf_params), @@ -619,14 +628,13 @@ pub(crate) async fn build_distributed_vector_index( ))); }; - let index_dir = new_index_dir(); let ivf_model = make_ivf_model(); let global_pq = make_global_pq(pq_params)?; IvfIndexBuilder::::new( filtered_dataset, column.to_owned(), - index_dir, + index_dir.clone(), params.metric_type, shuffler, Some(ivf_params), @@ -659,12 +667,10 @@ pub(crate) async fn build_distributed_vector_index( ))); }; - let index_dir = new_index_dir(); - IvfIndexBuilder::::new( filtered_dataset, column.to_owned(), - index_dir, + index_dir.clone(), params.metric_type, shuffler, Some(ivf_params), @@ -708,53 +714,17 @@ pub(crate) async fn build_vector_index( frag_reuse_index: Option>, progress: Arc, ) -> Result<()> { + let (element_type, index_type, ivf_params, shuffler) = prepare_vector_segment_build( + dataset, + column, + params, + progress.clone(), + "Build Vector Index", + false, + ) + .await?; let stages = ¶ms.stages; - if stages.is_empty() { - return Err(Error::index( - "Build Vector Index: must have at least 1 stage".to_string(), - )); - }; - - let StageParams::Ivf(ivf_params) = &stages[0] else { - return Err(Error::index(format!( - "Build Vector Index: invalid stages: {:?}", - stages - ))); - }; - - let (vector_type, element_type) = get_vector_type(dataset.schema(), column)?; - if let DataType::List(_) = vector_type - && params.metric_type != DistanceType::Cosine - { - return Err(Error::index( - "Build Vector Index: multivector type supports only cosine distance".to_string(), - )); - } - - let num_rows = dataset.count_rows(None).await?; - let index_type = params.index_type(); - let num_partitions = ivf_params.num_partitions.unwrap_or_else(|| { - recommended_num_partitions( - num_rows, - ivf_params - .target_partition_size - .unwrap_or(index_type.target_partition_size()), - ) - }); - let mut ivf_params = ivf_params.clone(); - ivf_params.num_partitions = Some(num_partitions); - - let format_version = dataset_format_version(dataset); - - let temp_dir = TempStdDir::default(); - let temp_dir_path = Path::from_filesystem_path(&temp_dir)?; - let shuffler = create_ivf_shuffler( - temp_dir_path, - num_partitions, - format_version, - Some(progress.clone()), - ); match index_type { IndexType::IvfFlat => match element_type { DataType::Float16 | DataType::Float32 | DataType::Float64 => { diff --git a/rust/lance/src/index/vector/ivf.rs b/rust/lance/src/index/vector/ivf.rs index 621773c60c0..596a0299a10 100644 --- a/rust/lance/src/index/vector/ivf.rs +++ b/rust/lance/src/index/vector/ivf.rs @@ -56,14 +56,16 @@ use lance_index::vector::bq::builder::RabitQuantizer; use lance_index::vector::flat::index::{FlatBinQuantizer, FlatIndex, FlatQuantizer}; use lance_index::vector::hnsw::HnswMetadata; use lance_index::vector::hnsw::builder::HNSW_METADATA_KEY; -use lance_index::vector::ivf::storage::{IVF_METADATA_KEY, IvfModel}; +use lance_index::vector::ivf::storage::IVF_METADATA_KEY; +use lance_index::vector::ivf::storage::IvfModel; use lance_index::vector::kmeans::KMeansParams; use lance_index::vector::pq::storage::transpose; use lance_index::vector::quantizer::QuantizationType; use lance_index::vector::v3::shuffler::create_ivf_shuffler; use lance_index::vector::v3::subindex::{IvfSubIndex, SubIndexType}; use lance_index::{ - INDEX_AUXILIARY_FILE_NAME, INDEX_METADATA_SCHEMA_KEY, Index, IndexMetadata, IndexType, + INDEX_AUXILIARY_FILE_NAME, INDEX_METADATA_SCHEMA_KEY, Index, IndexMetadata, IndexSegment, + IndexType, StagingIndexShard, VectorIndexSegmentPlan, optimize::OptimizeOptions, vector::{ Query, VectorIndex, @@ -1857,29 +1859,390 @@ async fn write_ivf_hnsw_file( Ok(()) } -/// Finalize distributed merge for IVF-based vector indices. +/// Coordinator-provided shard metadata for one staging shard. /// -/// This helper merges partial auxiliary index files produced by distributed -/// jobs into a unified `auxiliary.idx` and then creates a root `index.idx` -/// using the v2 index format so that `open_vector_index_v2` can load it. +/// The distributed coordinator already knows which fragments were assigned to +/// each shard. We pass that contract into the planner directly instead of +/// reconstructing fragment coverage by scanning row ids back out of storage. +pub(crate) type PartialShard = StagingIndexShard; + +/// Plan how one staging root should be turned into final physical segments. /// -/// The caller must pass `index_dir` pointing at the index UUID directory -/// (e.g. `/indices/`). `requested_index_type` is only used as -/// a fallback when the unified auxiliary file does not contain index -/// metadata. -pub async fn finalize_distributed_merge( - object_store: &ObjectStore, - index_dir: &object_store::path::Path, +/// This function does not touch storage. It only: +/// - validates that the caller-supplied shard contract is self-consistent +/// - enforces that shard fragment coverage is disjoint +/// - groups shards into final segments according to `target_segment_bytes` +/// +/// The grouping rule is intentionally simple: +/// - `target_segment_bytes = None`: keep the shard boundary, so each shard becomes one segment +/// - `target_segment_bytes = Some(limit)`: greedily pack consecutive shards until the next shard +/// would exceed `limit` +/// +/// Callers that want "merge everything into one final segment" should pass a +/// sufficiently large `target_segment_bytes`. +pub(crate) async fn plan_staging_segments( + index_dir: &Path, + partial_shards: &[PartialShard], requested_index_type: Option, + target_segment_bytes: Option, +) -> Result> { + if let Some(index_type) = requested_index_type + && !matches!( + index_type, + IndexType::IvfFlat + | IndexType::IvfPq + | IndexType::IvfSq + | IndexType::IvfHnswFlat + | IndexType::IvfHnswPq + | IndexType::IvfHnswSq + | IndexType::Vector + ) + { + return Err(Error::invalid_input(format!( + "Unsupported distributed vector finalize type: {}", + index_type + ))); + } + + if let Some(0) = target_segment_bytes { + return Err(Error::invalid_input( + "target_segment_bytes must be greater than zero".to_string(), + )); + } + + if partial_shards.is_empty() { + return Err(Error::index(format!( + "No partial shard metadata was provided for '{}'", + index_dir + ))); + } + + let mut sorted_partial_shards = partial_shards.to_vec(); + sorted_partial_shards.sort_by_key(|shard| shard.uuid()); + let mut expected_shard_ids = HashSet::with_capacity(sorted_partial_shards.len()); + for shard in &sorted_partial_shards { + if !expected_shard_ids.insert(shard.uuid()) { + return Err(Error::index(format!( + "Distributed vector partial shard '{}' was provided more than once", + shard.uuid() + ))); + } + } + + let staging_index_uuid = index_dir + .filename() + .ok_or_else(|| Error::index(format!("Index directory '{}' has no filename", index_dir))) + .and_then(|name| { + Uuid::parse_str(name).map_err(|err| { + Error::index(format!( + "Index directory '{}' does not end with a valid UUID: {}", + index_dir, err + )) + }) + })?; + + let mut covered_fragments = RoaringBitmap::new(); + for partial_shard in &sorted_partial_shards { + if covered_fragments.intersection_len(partial_shard.fragment_bitmap()) > 0 { + return Err(Error::index( + "Distributed vector shards have overlapping fragment coverage".to_string(), + )); + } + covered_fragments |= partial_shard.fragment_bitmap().clone(); + } + + if target_segment_bytes.is_none() { + return sorted_partial_shards + .into_iter() + .map(|partial_shard| { + build_segment_plan( + staging_index_uuid, + vec![partial_shard], + requested_index_type, + ) + }) + .collect(); + } + + let target_segment_bytes = target_segment_bytes.unwrap(); + let mut plans = Vec::new(); + let mut current_group = Vec::new(); + let mut current_bytes = 0_u64; + + for partial_shard in sorted_partial_shards { + let partial_bytes = partial_shard.estimated_bytes(); + if !current_group.is_empty() + && current_bytes.saturating_add(partial_bytes) > target_segment_bytes + { + plans.push(build_segment_plan( + staging_index_uuid, + std::mem::take(&mut current_group), + requested_index_type, + )?); + current_bytes = 0; + } + current_bytes = current_bytes.saturating_add(partial_bytes); + current_group.push(partial_shard); + } + + if !current_group.is_empty() { + plans.push(build_segment_plan( + staging_index_uuid, + current_group, + requested_index_type, + )?); + } + + Ok(plans) +} + +/// Materialize one planned segment into its final directory. +/// +/// Most plans write directly to `indices//`. The only +/// awkward case is the legacy compat path where the target directory is also +/// the staging root. In that case we first write into a temporary directory and +/// then swap the final files back into place. +pub(crate) async fn merge_staging_segment( + object_store: &ObjectStore, + indices_dir: &Path, + segment_plan: &VectorIndexSegmentPlan, +) -> Result { + let final_segment = segment_plan.final_segment().clone(); + let final_dir = indices_dir.child(final_segment.uuid().to_string()); + let staging_dir = indices_dir.child(segment_plan.staging_index_uuid().to_string()); + if final_dir == staging_dir { + let temp_dir = indices_dir.child(Uuid::new_v4().to_string()); + merge_staging_segment_to_dir(object_store, indices_dir, &temp_dir, segment_plan, false) + .await?; + + // Re-materializing back into the staging root is not atomic: we delete + // and rewrite the root files one by one because object stores do not + // offer a directory rename primitive. We only clean up the source + // `partial_*` shards after these copies succeed, so a crash here can + // leave a partial root artifact but should not destroy the source data. + for file_name in [INDEX_FILE_NAME, INDEX_AUXILIARY_FILE_NAME] { + let target_file = final_dir.child(file_name); + // ObjectStore::copy is additive. Remove any previous root artifact first so + // the compat path cannot leave stale files behind when re-materializing. + if object_store.exists(&target_file).await? { + object_store.delete(&target_file).await?; + } + let source_file = temp_dir.child(file_name); + if object_store.exists(&source_file).await? { + object_store.copy(&source_file, &target_file).await?; + } + } + + cleanup_consumed_partial_shards(object_store, indices_dir, segment_plan).await?; + reset_final_segment_dir(object_store, &temp_dir).await?; + } else { + merge_staging_segment_to_dir(object_store, indices_dir, &final_dir, segment_plan, true) + .await?; + } + + Ok(final_segment) +} + +/// Write one planned segment into `final_dir`. +/// +/// For a single-shard plan this is just a file copy. For a multi-shard plan we +/// read the selected `partial_*` shards directly from the staging root and +/// materialize the merged auxiliary/index files into `final_dir`. +async fn merge_staging_segment_to_dir( + object_store: &ObjectStore, + indices_dir: &Path, + final_dir: &Path, + segment_plan: &VectorIndexSegmentPlan, + cleanup_source_shards: bool, ) -> Result<()> { - // Merge per-shard auxiliary files into a unified auxiliary.idx. + reset_final_segment_dir(object_store, &final_dir).await?; + + let partial_shards = segment_plan.partial_shards(); + if partial_shards.len() == 1 { + let source_dir = indices_dir + .child(segment_plan.staging_index_uuid().to_string()) + .child(format!("partial_{}", partial_shards[0].uuid())); + copy_partial_segment_contents(object_store, &source_dir, &final_dir).await?; + if cleanup_source_shards { + cleanup_consumed_partial_shards(object_store, indices_dir, segment_plan).await?; + } + return Ok(()); + } + + let staging_root = indices_dir.child(segment_plan.staging_index_uuid().to_string()); + let aux_paths = partial_shards + .iter() + .map(|partial_shard| { + staging_root + .child(format!("partial_{}", partial_shard.uuid())) + .child(INDEX_AUXILIARY_FILE_NAME) + }) + .collect::>(); + let partial_index_paths = partial_shards + .iter() + .map(|partial_shard| { + staging_root + .child(format!("partial_{}", partial_shard.uuid())) + .child(INDEX_FILE_NAME) + }) + .collect::>(); + lance_index::vector::distributed::index_merger::merge_partial_vector_auxiliary_files( object_store, - index_dir, + &aux_paths, + final_dir, ) .await?; + write_root_vector_index_from_auxiliary( + object_store, + &final_dir, + segment_plan.requested_index_type(), + &partial_index_paths, + ) + .await?; + if cleanup_source_shards { + cleanup_consumed_partial_shards(object_store, indices_dir, segment_plan).await?; + } + + Ok(()) +} + +/// Collapse one group of staging shards into a single final-segment plan. +fn build_segment_plan( + staging_index_uuid: Uuid, + group: Vec, + requested_index_type: Option, +) -> Result { + debug_assert!(!group.is_empty()); + let first = &group[0]; + let mut fragment_bitmap = RoaringBitmap::new(); + let mut estimated_bytes = 0_u64; + let mut partial_shards = Vec::with_capacity(group.len()); + + for partial in &group { + fragment_bitmap |= partial.fragment_bitmap().clone(); + estimated_bytes = estimated_bytes.saturating_add(partial.estimated_bytes()); + partial_shards.push(partial.clone()); + } + + let final_uuid = if group.len() == 1 { + first.uuid() + } else { + Uuid::new_v4() + }; + let index_type = requested_index_type.unwrap_or(IndexType::Vector); + let final_segment = IndexSegment::new( + final_uuid, + fragment_bitmap, + Arc::new(crate::index::vector_index_details()), + index_type.version(), + ); + + Ok(VectorIndexSegmentPlan::new( + staging_index_uuid, + final_segment, + partial_shards, + estimated_bytes, + requested_index_type, + )) +} + +/// Collapse an entire staging root into one final segment plan. +/// +/// This is used by the legacy compat path where `merge_index_metadata(...)` +/// still needs to materialize one shared UUID artifact. +pub(crate) fn collapse_segment_plans( + segment_plans: &[VectorIndexSegmentPlan], +) -> Result { + let Some(first_plan) = segment_plans.first() else { + return Err(Error::index( + "Distributed vector finalize plan contains no segment plans".to_string(), + )); + }; + + let mut fragment_bitmap = RoaringBitmap::new(); + let mut partial_shards = Vec::new(); + let mut estimated_bytes = 0_u64; + + for plan in segment_plans { + fragment_bitmap |= plan.final_segment().fragment_bitmap().clone(); + partial_shards.extend_from_slice(plan.partial_shards()); + estimated_bytes = estimated_bytes.saturating_add(plan.estimated_bytes()); + } + + let staging_index_uuid = first_plan.staging_index_uuid(); + let final_segment = IndexSegment::new( + staging_index_uuid, + fragment_bitmap, + first_plan.final_segment().index_details().clone(), + first_plan.final_segment().index_version(), + ); + + Ok(VectorIndexSegmentPlan::new( + staging_index_uuid, + final_segment, + partial_shards, + estimated_bytes, + first_plan.requested_index_type(), + )) +} + +/// Remove the source `partial_*` directories consumed by one segment plan. +async fn cleanup_consumed_partial_shards( + object_store: &ObjectStore, + indices_dir: &Path, + segment_plan: &VectorIndexSegmentPlan, +) -> Result<()> { + for shard_uuid in segment_plan.partial_shards() { + let source_dir = indices_dir + .child(segment_plan.staging_index_uuid().to_string()) + .child(format!("partial_{}", shard_uuid.uuid())); + reset_final_segment_dir(object_store, &source_dir).await?; + } + Ok(()) +} + +/// Copy all files that belong to one partial shard into a new directory. +async fn copy_partial_segment_contents( + object_store: &ObjectStore, + source_dir: &Path, + target_dir: &Path, +) -> Result<()> { + let mut files = object_store.list(Some(source_dir.clone())); + while let Some(item) = files.next().await { + let meta = item?; + let Some(relative_parts) = meta.location.prefix_match(source_dir) else { + continue; + }; + let relative_parts = relative_parts.collect::>(); + if relative_parts.is_empty() { + continue; + } + let mut final_path = target_dir.clone(); + for part in relative_parts { + final_path = final_path.child(part.as_ref()); + } + object_store.copy(&meta.location, &final_path).await?; + } + Ok(()) +} - // Open the unified auxiliary file. +/// Best-effort reset of one target directory before rewriting it. +async fn reset_final_segment_dir(object_store: &ObjectStore, final_dir: &Path) -> Result<()> { + match object_store.remove_dir_all(final_dir.clone()).await { + Ok(()) => {} + Err(Error::NotFound { .. }) => {} + Err(err) => return Err(err), + } + Ok(()) +} + +async fn write_root_vector_index_from_auxiliary( + object_store: &ObjectStore, + index_dir: &Path, + requested_index_type: Option, + centroid_source_index_paths: &[Path], +) -> Result<()> { let aux_path = index_dir.child(INDEX_AUXILIARY_FILE_NAME); let scheduler = ScanScheduler::new( Arc::new(object_store.clone()), @@ -1912,36 +2275,20 @@ pub async fn finalize_distributed_merge( let mut pb_ivf: lance_index::pb::Ivf = Message::decode(raw_ivf_bytes.clone())?; // If the unified IVF metadata does not contain centroids, try to source them - // from any partial_* index.idx under this index directory. + // from one of the shard index files that fed this merge. if pb_ivf.centroids_tensor.is_none() { - let mut stream = object_store.list(Some(index_dir.clone())); - let mut partial_index_path = None; - - while let Some(item) = stream.next().await { - let meta = item?; - if let Some(fname) = meta.location.filename() - && fname == INDEX_FILE_NAME - { - let parts: Vec<_> = meta.location.parts().collect(); - if parts.len() >= 2 { - let parent = parts[parts.len() - 2].as_ref(); - if parent.starts_with("partial_") { - partial_index_path = Some(meta.location.clone()); - break; - } - } + for partial_index_path in centroid_source_index_paths { + if !object_store.exists(partial_index_path).await? { + continue; } - } - - if let Some(partial_index_path) = partial_index_path { let fh = scheduler - .open_file(&partial_index_path, &CachedFileSize::unknown()) + .open_file(partial_index_path, &CachedFileSize::unknown()) .await?; let partial_reader = V2Reader::try_open( fh, None, Arc::default(), - &lance_core::cache::LanceCache::no_cache(), + &LanceCache::no_cache(), V2ReaderOptions::default(), ) .await?; @@ -1953,6 +2300,7 @@ pub async fn finalize_distributed_merge( let partial_pb_ivf: lance_index::pb::Ivf = Message::decode(partial_ivf_bytes)?; if partial_pb_ivf.centroids_tensor.is_some() { pb_ivf.centroids_tensor = partial_pb_ivf.centroids_tensor; + break; } } } @@ -2033,69 +2381,6 @@ pub async fn finalize_distributed_merge( let empty_batch = RecordBatch::new_empty(arrow_schema); v2_writer.write_batch(&empty_batch).await?; v2_writer.finish().await?; - - if let Err(err) = cleanup_partial_vector_dirs(object_store, index_dir).await { - warn!( - "Failed to cleanup partial_* vector index directories under '{}': {}", - index_dir.as_ref(), - err - ); - } - - Ok(()) -} - -/// Cleanup for distributed partial vector index directories after -/// a distributed merge. -/// -/// This helper scans `index_dir` for direct child directories whose names -/// start with `partial_` (e.g. `/partial_0`, `/partial_1`) -/// and attempts to recursively delete them via [`ObjectStore::remove_dir_all`]. -/// -/// Listing and deletion failures are logged with [`warn!`] and ignored so that -/// index finalization is never blocked by cleanup. The function always returns -/// `Ok(())`. -async fn cleanup_partial_vector_dirs( - object_store: &ObjectStore, - index_dir: &object_store::path::Path, -) -> Result<()> { - let mut partial_dirs: HashSet = HashSet::new(); - let mut list_stream = object_store.list(Some(index_dir.clone())); - - while let Some(item) = list_stream.next().await { - match item { - Ok(meta) => { - if let Some(relative_parts) = meta.location.prefix_match(index_dir) { - let rel_parts: Vec<_> = relative_parts.collect(); - // Expect paths like: /partial_*/ - if rel_parts.len() >= 2 { - let parent_name = rel_parts[0].as_ref(); - if parent_name.starts_with("partial_") { - partial_dirs.insert(index_dir.child(parent_name)); - } - } - } - } - Err(e) => { - warn!( - "Failed to list index directory '{}' while collecting partial_* dirs: {}", - index_dir.as_ref(), - e - ); - } - } - } - - for dir in partial_dirs { - if let Err(e) = object_store.remove_dir_all(dir.clone()).await { - warn!( - "Failed to remove partial_* directory '{}' after distributed merge: {}", - dir.as_ref(), - e - ); - } - } - Ok(()) } @@ -3644,50 +3929,6 @@ mod tests { assert!(correct_times >= 9, "correct: {}", correct_times); } - #[tokio::test] - async fn test_cleanup_removes_only_partial_dirs() { - let object_store = ObjectStore::memory(); - let index_dir = Path::from("index/uuid_test_cleanup"); - - // partial_* directories that should be removed - let partial0_file = index_dir.child("partial_0").child("file.bin"); - let partial_abc_file = index_dir.child("partial_abc").child("file.bin"); - - // Non-partial paths that must be preserved - let partialx_file = index_dir.child("partialX").child("file.bin"); - let shard_file = index_dir.child("shard_0").child("file.bin"); - let keep_root_file = index_dir.child("keep_root.txt"); - - object_store.put(&partial0_file, b"partial0").await.unwrap(); - object_store - .put(&partial_abc_file, b"partial_abc") - .await - .unwrap(); - object_store.put(&partialx_file, b"partialx").await.unwrap(); - object_store.put(&shard_file, b"shard").await.unwrap(); - object_store.put(&keep_root_file, b"root").await.unwrap(); - - // Sanity: all files exist before cleanup - assert!(object_store.exists(&partial0_file).await.unwrap()); - assert!(object_store.exists(&partial_abc_file).await.unwrap()); - assert!(object_store.exists(&partialx_file).await.unwrap()); - assert!(object_store.exists(&shard_file).await.unwrap()); - assert!(object_store.exists(&keep_root_file).await.unwrap()); - - cleanup_partial_vector_dirs(&object_store, &index_dir) - .await - .unwrap(); - - // partial_* directories should be removed - assert!(!object_store.exists(&partial0_file).await.unwrap()); - assert!(!object_store.exists(&partial_abc_file).await.unwrap()); - - // Non-partial directories and root files must be preserved - assert!(object_store.exists(&partialx_file).await.unwrap()); - assert!(object_store.exists(&shard_file).await.unwrap()); - assert!(object_store.exists(&keep_root_file).await.unwrap()); - } - #[tokio::test(flavor = "multi_thread")] async fn test_build_ivf_model_progress_callback() { use lance_index::progress::IndexBuildProgress; @@ -3758,27 +3999,6 @@ mod tests { } } - #[tokio::test] - async fn test_cleanup_idempotent() { - let object_store = ObjectStore::memory(); - let index_dir = Path::from("index/uuid_test_cleanup_idempotent"); - - let partial_file = index_dir.child("partial_0").child("file.bin"); - object_store.put(&partial_file, b"partial").await.unwrap(); - - assert!(object_store.exists(&partial_file).await.unwrap()); - - cleanup_partial_vector_dirs(&object_store, &index_dir) - .await - .unwrap(); - assert!(!object_store.exists(&partial_file).await.unwrap()); - - // Second call should succeed even when there are no partial_* directories left. - cleanup_partial_vector_dirs(&object_store, &index_dir) - .await - .unwrap(); - } - #[tokio::test] async fn test_prewarm_ivf_legacy() { use lance_io::assert_io_eq; diff --git a/rust/lance/src/index/vector/ivf/v2.rs b/rust/lance/src/index/vector/ivf/v2.rs index 0bdc0389648..1f6a09fde78 100644 --- a/rust/lance/src/index/vector/ivf/v2.rs +++ b/rust/lance/src/index/vector/ivf/v2.rs @@ -618,6 +618,7 @@ mod tests { }; use arrow_buffer::OffsetBuffer; use arrow_schema::{DataType, Field, Schema, SchemaRef}; + use futures::StreamExt; use itertools::Itertools; use lance_arrow::FixedSizeListArrayExt; use lance_index::vector::bq::{ @@ -627,8 +628,8 @@ mod tests { use crate::dataset::{InsertBuilder, UpdateBuilder, WriteMode, WriteParams}; use crate::index::DatasetIndexInternalExt; - use crate::index::vector::ivf::finalize_distributed_merge; use crate::index::vector::ivf::v2::IvfPq; + use crate::index::vector::ivf::{PartialShard, merge_staging_segment, plan_staging_segments}; use crate::utils::test::copy_test_data_to_tmp; use crate::{ Dataset, @@ -1437,6 +1438,101 @@ mod tests { (ivf_params, pq_params) } + async fn prepare_global_ivf(dataset: &Dataset, vector_column: &str) -> IvfBuildParams { + let batch = dataset + .scan() + .project(&[vector_column.to_string()]) + .unwrap() + .try_into_batch() + .await + .unwrap(); + let vectors = batch + .column_by_name(vector_column) + .expect("vector column should exist") + .as_fixed_size_list(); + + let dim = vectors.value_length() as usize; + assert_eq!(dim, TWO_FRAG_DIM, "unexpected vector dimension"); + + let values = vectors.values().as_primitive::(); + let kmeans_params = KMeansParams::new(None, TWO_FRAG_MAX_ITERS, 1, DistanceType::L2); + let kmeans = train_kmeans::( + values, + kmeans_params, + dim, + TWO_FRAG_NUM_PARTITIONS, + TWO_FRAG_SAMPLE_RATE, + ) + .unwrap(); + + let centroids_flat = kmeans.centroids.as_primitive::().clone(); + let centroids_fsl = + Arc::new(FixedSizeListArray::try_new_from_values(centroids_flat, dim as i32).unwrap()); + let mut ivf_params = + IvfBuildParams::try_with_centroids(TWO_FRAG_NUM_PARTITIONS, centroids_fsl).unwrap(); + ivf_params.max_iters = TWO_FRAG_MAX_ITERS as usize; + ivf_params.sample_rate = TWO_FRAG_SAMPLE_RATE; + ivf_params + } + + async fn build_distributed_partial_index_for_fragment_groups( + dataset: &mut Dataset, + fragment_groups: Vec>, // each group is a set of fragment ids + params: &VectorIndexParams, + index_name: &str, + ) -> (Uuid, Vec) { + let shared_uuid = Uuid::new_v4(); + let mut seen_shards = HashSet::new(); + let mut partial_shards = Vec::new(); + + for fragments in fragment_groups { + let fragment_bitmap = fragments.clone(); + let mut builder = dataset.create_index_builder(&["vector"], IndexType::Vector, params); + builder = builder + .name(index_name.to_string()) + .fragments(fragments) + .index_uuid(shared_uuid.to_string()); + // Build partial index shards without committing to manifest. + builder.execute_uncommitted().await.unwrap(); + let index_dir = dataset.indices_dir().child(shared_uuid.to_string()); + let mut new_shards = dataset + .object_store() + .read_dir(index_dir) + .await + .unwrap() + .into_iter() + .filter(|name| name.starts_with("partial_")) + .filter_map(|name| { + let shard_uuid = name.strip_prefix("partial_")?; + let shard_uuid = Uuid::parse_str(shard_uuid).ok()?; + if seen_shards.insert(shard_uuid) { + Some(shard_uuid) + } else { + None + } + }) + .collect::>(); + new_shards.sort(); + assert_eq!(new_shards.len(), 1); + let partial_dir = dataset + .indices_dir() + .child(shared_uuid.to_string()) + .child(format!("partial_{}", new_shards[0])); + let mut estimated_bytes = 0_u64; + let mut files = dataset.object_store().list(Some(partial_dir)); + while let Some(item) = files.next().await { + estimated_bytes += item.unwrap().size; + } + partial_shards.push(PartialShard::new( + new_shards[0], + fragment_bitmap, + estimated_bytes, + )); + } + + (shared_uuid, partial_shards) + } + async fn build_ivfpq_for_fragment_groups( dataset: &mut Dataset, fragment_groups: Vec>, // each group is a set of fragment ids @@ -1457,12 +1553,11 @@ mod tests { .name(index_name.to_string()) .fragments(fragments) .index_uuid(shared_uuid.to_string()); - // Build partial index shards without committing to manifest. builder.execute_uncommitted().await.unwrap(); } - let index_dir = dataset.indices_dir().child(shared_uuid.to_string()); - finalize_distributed_merge(dataset.object_store(), &index_dir, Some(IndexType::IvfPq)) + dataset + .merge_index_metadata(&shared_uuid.to_string(), IndexType::IvfPq, None) .await .unwrap(); @@ -1485,7 +1580,6 @@ mod tests { let idx_a = &stats_a["indices"][0]; let idx_b = &stats_b["indices"][0]; - // Centroids: same shape and values (within tolerance). let centroids_a = idx_a["centroids"] .as_array() .expect("centroids should be an array"); @@ -1517,7 +1611,6 @@ mod tests { } } - // Partitions sizes. let parts_a = idx_a["partitions"] .as_array() .expect("partitions should be an array"); @@ -1536,6 +1629,75 @@ mod tests { assert_eq!(sizes_a, sizes_b, "partition sizes mismatch"); } + async fn load_staging_shard_uuids(dataset: &Dataset, shared_uuid: Uuid) -> Vec { + let mut shard_uuids = dataset + .object_store() + .read_dir(dataset.indices_dir().child(shared_uuid.to_string())) + .await + .unwrap() + .into_iter() + .filter(|name| name.starts_with("partial_")) + .map(|name| Uuid::parse_str(name.trim_start_matches("partial_")).unwrap()) + .collect::>(); + shard_uuids.sort(); + shard_uuids + } + + async fn build_partial_shards( + dataset: &Dataset, + shared_uuid: Uuid, + fragment_groups: &[Vec], + ) -> Vec { + let shard_uuids = load_staging_shard_uuids(dataset, shared_uuid).await; + assert_eq!(shard_uuids.len(), fragment_groups.len()); + let mut partial_shards = Vec::with_capacity(shard_uuids.len()); + for (shard_uuid, fragment_group) in shard_uuids.into_iter().zip(fragment_groups.iter()) { + let partial_dir = dataset + .indices_dir() + .child(shared_uuid.to_string()) + .child(format!("partial_{}", shard_uuid)); + let mut estimated_bytes = 0_u64; + let mut files = dataset.object_store().list(Some(partial_dir)); + while let Some(item) = files.next().await { + estimated_bytes += item.unwrap().size; + } + partial_shards.push(PartialShard::new( + shard_uuid, + fragment_group.iter().copied(), + estimated_bytes, + )); + } + partial_shards + } + + async fn materialize_distributed_plan( + dataset: &mut Dataset, + shared_uuid: Uuid, + partial_shards: &[PartialShard], + target_segment_bytes: Option, + index_name: &str, + ) -> Vec { + let index_dir = dataset.indices_dir().child(shared_uuid.to_string()); + let segment_plans = + plan_staging_segments(&index_dir, partial_shards, None, target_segment_bytes) + .await + .unwrap(); + let mut segments = Vec::with_capacity(segment_plans.len()); + for plan in &segment_plans { + segments.push( + merge_staging_segment(dataset.object_store(), &dataset.indices_dir(), plan) + .await + .unwrap(), + ); + } + dataset + .commit_existing_index_segments(index_name, "vector", segments.clone()) + .await + .unwrap(); + + segments + } + #[tokio::test] async fn test_ivfpq_recall_performance_on_two_frags_single_vs_split() { const INDEX_NAME: &str = "vector_idx"; @@ -1543,7 +1705,6 @@ mod tests { let test_dir = TempStrDir::default(); let base_uri = test_dir.as_str(); - // Generate the data once, then write it twice to two independent dataset URIs. let (schema, batches) = make_two_fragment_batches(); let ds_single_uri = format!("{}/single", base_uri); @@ -1553,7 +1714,6 @@ mod tests { write_dataset_from_batches(&ds_single_uri, schema.clone(), batches.clone()).await; let mut ds_split = write_dataset_from_batches(&ds_split_uri, schema, batches).await; - // Ensure we have at least 2 fragments. let fragments_single = ds_single.get_fragments(); assert!( fragments_single.len() >= 2, @@ -1567,10 +1727,8 @@ mod tests { fragments_split.len() ); - // Pretrain global IVF centroids and PQ codebook. let (ivf_params, pq_params) = prepare_global_ivf_pq(&ds_single, "vector").await; - // Build single index using two fragments in one distributed build. let group_single = vec![ fragments_single[0].id() as u32, fragments_single[1].id() as u32, @@ -1584,7 +1742,6 @@ mod tests { ) .await; - // Build split index: one fragment per distributed build, then merge. let group0 = vec![fragments_split[0].id() as u32]; let group1 = vec![fragments_split[1].id() as u32]; build_ivfpq_for_fragment_groups( @@ -1596,14 +1753,12 @@ mod tests { ) .await; - // Compare IVF layout via index statistics. let stats_single_json = ds_single.index_statistics(INDEX_NAME).await.unwrap(); let stats_split_json = ds_split.index_statistics(INDEX_NAME).await.unwrap(); let stats_single: serde_json::Value = serde_json::from_str(&stats_single_json).unwrap(); let stats_split: serde_json::Value = serde_json::from_str(&stats_split_json).unwrap(); assert_ivf_layout_equal(&stats_single, &stats_split); - // Compare row id sets per partition. let ctx_single = load_vector_index_context(&ds_single, "vector", INDEX_NAME).await; let ctx_split = load_vector_index_context(&ds_split, "vector", INDEX_NAME).await; @@ -1624,7 +1779,6 @@ mod tests { ); } - // Compare Top-K row ids on a deterministic set of queries. const K: usize = 10; const NUM_QUERIES: usize = 10; @@ -1653,7 +1807,6 @@ mod tests { ids_per_query } - // Collect a deterministic query set from ds_single. let query_batch = ds_single .scan() .project(&["vector"] as &[&str]) @@ -1677,6 +1830,421 @@ mod tests { ); } + #[rstest] + #[case::ivf_flat(IndexType::IvfFlat)] + #[case::ivf_pq(IndexType::IvfPq)] + #[case::ivf_sq(IndexType::IvfSq)] + #[tokio::test] + async fn test_distributed_vector_finalize_commits_multiple_segments_and_preserves_query_results( + #[case] index_type: IndexType, + ) { + const INDEX_NAME: &str = "vector_idx"; + const K: usize = 10; + const NUM_QUERIES: usize = 10; + + let test_dir = TempStrDir::default(); + let base_uri = test_dir.as_str(); + + // Generate the data once, then write it twice to two independent dataset URIs. + let (schema, batches) = make_two_fragment_batches(); + + let ds_single_uri = format!("{}/single", base_uri); + let ds_split_uri = format!("{}/split", base_uri); + + let mut ds_single = + write_dataset_from_batches(&ds_single_uri, schema.clone(), batches.clone()).await; + let mut ds_split = write_dataset_from_batches(&ds_split_uri, schema, batches).await; + + // Ensure we have at least 2 fragments. + let fragments_single = ds_single.get_fragments(); + assert!( + fragments_single.len() >= 2, + "expected at least 2 fragments in ds_single, got {}", + fragments_single.len() + ); + let fragments_split = ds_split.get_fragments(); + assert!( + fragments_split.len() >= 2, + "expected at least 2 fragments in ds_split, got {}", + fragments_split.len() + ); + + let distributed_params = match index_type { + IndexType::IvfFlat => { + let ivf_params = prepare_global_ivf(&ds_single, "vector").await; + VectorIndexParams::with_ivf_flat_params(DistanceType::L2, ivf_params) + } + IndexType::IvfPq => { + let (ivf_params, pq_params) = prepare_global_ivf_pq(&ds_single, "vector").await; + VectorIndexParams::with_ivf_pq_params(DistanceType::L2, ivf_params, pq_params) + } + IndexType::IvfSq => { + let ivf_params = prepare_global_ivf(&ds_single, "vector").await; + VectorIndexParams::with_ivf_sq_params( + DistanceType::L2, + ivf_params, + SQBuildParams::default(), + ) + } + other => panic!("unsupported test index type: {}", other), + }; + + ds_single + .create_index( + &["vector"], + IndexType::Vector, + Some(INDEX_NAME.to_string()), + &distributed_params, + true, + ) + .await + .unwrap(); + + let fragment_groups = fragments_split + .iter() + .map(|fragment| vec![fragment.id() as u32]) + .collect::>(); + let expected_segment_count = fragment_groups.len(); + let (shared_uuid, partial_shards) = build_distributed_partial_index_for_fragment_groups( + &mut ds_split, + fragment_groups, + &distributed_params, + INDEX_NAME, + ) + .await; + let segments = materialize_distributed_plan( + &mut ds_split, + shared_uuid, + &partial_shards, + None, + INDEX_NAME, + ) + .await; + assert_eq!(segments.len(), expected_segment_count); + let staging_dir = ds_split.indices_dir().child(shared_uuid.to_string()); + let staging_entries = ds_split.object_store().read_dir(staging_dir).await.unwrap(); + assert!( + staging_entries + .iter() + .all(|entry| !entry.starts_with("partial_")), + "materialized segments should clean up consumed partial shards", + ); + + let committed_segments = ds_split.load_indices_by_name(INDEX_NAME).await.unwrap(); + assert_eq!(committed_segments.len(), expected_segment_count); + for committed in committed_segments { + let covered_fragments = committed + .fragment_bitmap + .as_ref() + .expect("distributed segment should have fragment coverage"); + assert_eq!(covered_fragments.len(), 1); + } + + async fn collect_row_ids(ds: &Dataset, queries: &[Arc]) -> Vec> { + let mut ids_per_query = Vec::with_capacity(queries.len()); + for q in queries { + let result = ds + .scan() + .with_row_id() + .project(&["_rowid"] as &[&str]) + .unwrap() + .nearest("vector", q.as_ref(), K) + .unwrap() + .try_into_batch() + .await + .unwrap(); + + let row_ids = result[ROW_ID] + .as_primitive::() + .values() + .iter() + .copied() + .collect::>(); + ids_per_query.push(row_ids); + } + ids_per_query + } + + // Collect a deterministic query set from ds_single. + let query_batch = ds_single + .scan() + .project(&["vector"] as &[&str]) + .unwrap() + .limit(Some(NUM_QUERIES as i64), None) + .unwrap() + .try_into_batch() + .await + .unwrap(); + let vectors = query_batch["vector"].as_fixed_size_list(); + let queries: Vec> = (0..vectors.len()) + .map(|i| vectors.value(i) as Arc) + .collect(); + + let ids_single = collect_row_ids(&ds_single, &queries).await; + let ids_split = collect_row_ids(&ds_split, &queries).await; + + assert_eq!( + ids_single, ids_split, + "single vs segmented distributed index returned different Top-K row ids", + ); + } + + #[rstest] + #[case::ivf_flat(IndexType::IvfFlat)] + #[case::ivf_pq(IndexType::IvfPq)] + #[case::ivf_sq(IndexType::IvfSq)] + #[tokio::test] + async fn test_distributed_vector_grouped_finalize_allows_concurrent_group_execution( + #[case] index_type: IndexType, + ) { + const INDEX_NAME: &str = "grouped_idx"; + const K: usize = 10; + const NUM_QUERIES: usize = 10; + + let test_dir = TempStrDir::default(); + let base_uri = test_dir.as_str(); + + let (schema, batches) = make_two_fragment_batches(); + let ds_single_uri = format!("{}/grouped_single", base_uri); + let ds_split_uri = format!("{}/grouped_split", base_uri); + + let mut ds_single = + write_dataset_from_batches(&ds_single_uri, schema.clone(), batches.clone()).await; + let mut ds_split = write_dataset_from_batches(&ds_split_uri, schema, batches).await; + + let distributed_params = match index_type { + IndexType::IvfFlat => { + let ivf_params = prepare_global_ivf(&ds_single, "vector").await; + VectorIndexParams::with_ivf_flat_params(DistanceType::L2, ivf_params) + } + IndexType::IvfPq => { + let (ivf_params, pq_params) = prepare_global_ivf_pq(&ds_single, "vector").await; + VectorIndexParams::with_ivf_pq_params(DistanceType::L2, ivf_params, pq_params) + } + IndexType::IvfSq => { + let ivf_params = prepare_global_ivf(&ds_single, "vector").await; + VectorIndexParams::with_ivf_sq_params( + DistanceType::L2, + ivf_params, + SQBuildParams::default(), + ) + } + other => panic!("unsupported test index type: {}", other), + }; + + ds_single + .create_index( + &["vector"], + IndexType::Vector, + Some(INDEX_NAME.to_string()), + &distributed_params, + true, + ) + .await + .unwrap(); + + let fragment_groups = ds_split + .get_fragments() + .into_iter() + .map(|fragment| vec![fragment.id() as u32]) + .collect::>(); + let (shared_uuid, partial_shards) = build_distributed_partial_index_for_fragment_groups( + &mut ds_split, + fragment_groups, + &distributed_params, + INDEX_NAME, + ) + .await; + + let index_dir = ds_split.indices_dir().child(shared_uuid.to_string()); + let shard_plan = plan_staging_segments(&index_dir, &partial_shards, None, None) + .await + .unwrap(); + let shard_count = shard_plan.len(); + assert!(shard_count >= 4); + let target_segment_bytes = shard_plan[0].estimated_bytes().saturating_mul(2); + + let grouped_plan = plan_staging_segments( + &index_dir, + &partial_shards, + None, + Some(target_segment_bytes), + ) + .await + .unwrap(); + assert!(grouped_plan.len() < shard_count); + assert!( + grouped_plan + .iter() + .any(|plan| plan.partial_shards().len() > 1) + ); + + let grouped_segments = materialize_distributed_plan( + &mut ds_split, + shared_uuid, + &partial_shards, + Some(target_segment_bytes), + INDEX_NAME, + ) + .await; + assert_eq!(grouped_segments.len(), grouped_plan.len()); + + async fn collect_row_ids(ds: &Dataset, queries: &[Arc]) -> Vec> { + let mut ids_per_query = Vec::with_capacity(queries.len()); + for q in queries { + let result = ds + .scan() + .with_row_id() + .project(&["_rowid"] as &[&str]) + .unwrap() + .nearest("vector", q.as_ref(), K) + .unwrap() + .try_into_batch() + .await + .unwrap(); + + ids_per_query.push( + result[ROW_ID] + .as_primitive::() + .values() + .iter() + .copied() + .collect(), + ); + } + ids_per_query + } + + let query_batch = ds_single + .scan() + .project(&["vector"] as &[&str]) + .unwrap() + .limit(Some(NUM_QUERIES as i64), None) + .unwrap() + .try_into_batch() + .await + .unwrap(); + let vectors = query_batch["vector"].as_fixed_size_list(); + let queries: Vec> = (0..vectors.len()) + .map(|i| vectors.value(i) as Arc) + .collect(); + + let ids_single = collect_row_ids(&ds_single, &queries).await; + let ids_split = collect_row_ids(&ds_split, &queries).await; + assert_eq!(ids_single, ids_split); + } + + #[tokio::test] + async fn test_distributed_vector_plan_rejects_overlapping_fragment_coverage() { + let test_dir = TempStrDir::default(); + let base_uri = test_dir.as_str(); + let (schema, batches) = make_two_fragment_batches(); + let dataset_uri = format!("{}/overlap_fragments", base_uri); + let mut dataset = write_dataset_from_batches(&dataset_uri, schema, batches).await; + + let fragment = dataset.get_fragments()[0].id() as u32; + let shared_uuid = Uuid::new_v4(); + let params = VectorIndexParams::with_ivf_flat_params( + DistanceType::L2, + prepare_global_ivf(&dataset, "vector").await, + ); + + for _ in 0..2 { + dataset + .create_index_builder(&["vector"], IndexType::Vector, ¶ms) + .name("vector_idx".to_string()) + .fragments(vec![fragment]) + .index_uuid(shared_uuid.to_string()) + .execute_uncommitted() + .await + .unwrap(); + } + + let index_dir = dataset.indices_dir().child(shared_uuid.to_string()); + let partial_shards = + build_partial_shards(&dataset, shared_uuid, &[vec![fragment], vec![fragment]]).await; + let err = plan_staging_segments(&index_dir, &partial_shards, None, None) + .await + .unwrap_err(); + assert!(err.to_string().contains("overlapping fragment coverage")); + } + + #[tokio::test] + async fn test_distributed_vector_build_supports_hnsw_variants() { + let test_dir = TempStrDir::default(); + let base_uri = test_dir.as_str(); + let (schema, batches) = make_two_fragment_batches(); + let dataset_uri = format!("{}/distributed_hnsw_supported", base_uri); + let mut dataset = write_dataset_from_batches(&dataset_uri, schema, batches).await; + + let fragments = dataset.get_fragments(); + assert!(fragments.len() >= 2); + let shared_uuid = Uuid::new_v4(); + let params = VectorIndexParams::ivf_hnsw( + DistanceType::L2, + prepare_global_ivf(&dataset, "vector").await, + HnswBuildParams::default(), + ); + + for fragment in fragments.iter().take(2) { + dataset + .create_index_builder(&["vector"], IndexType::Vector, ¶ms) + .name("vector_idx".to_string()) + .fragments(vec![fragment.id() as u32]) + .index_uuid(shared_uuid.to_string()) + .execute_uncommitted() + .await + .unwrap(); + } + + let index_dir = dataset.indices_dir().child(shared_uuid.to_string()); + let fragment_groups = fragments + .iter() + .take(2) + .map(|fragment| vec![fragment.id() as u32]) + .collect::>(); + let partial_shards = build_partial_shards(&dataset, shared_uuid, &fragment_groups).await; + let plans = plan_staging_segments(&index_dir, &partial_shards, None, Some(1)) + .await + .unwrap(); + assert_eq!(plans.len(), fragments.iter().take(2).count()); + + let mut segments = Vec::with_capacity(plans.len()); + for plan in &plans { + segments.push( + merge_staging_segment(dataset.object_store(), &dataset.indices_dir(), plan) + .await + .unwrap(), + ); + } + assert_eq!(segments.len(), plans.len()); + + dataset + .commit_existing_index_segments("vector_idx", "vector", segments) + .await + .unwrap(); + + let query_batch = dataset + .scan() + .project(&["vector"] as &[&str]) + .unwrap() + .limit(Some(4), None) + .unwrap() + .try_into_batch() + .await + .unwrap(); + let q = query_batch["vector"].as_fixed_size_list().value(0); + let result = dataset + .scan() + .project(&["_rowid"] as &[&str]) + .unwrap() + .nearest("vector", q.as_ref(), 5) + .unwrap() + .try_into_batch() + .await + .unwrap(); + assert!(result.num_rows() > 0); + } async fn test_index( params: VectorIndexParams, nlist: usize,