diff --git a/docs/src/guide/distributed_indexing.md b/docs/src/guide/distributed_indexing.md index e6aa241feee..abf0f5e9b0f 100644 --- a/docs/src/guide/distributed_indexing.md +++ b/docs/src/guide/distributed_indexing.md @@ -15,12 +15,12 @@ write: 1. multiple workers build index data in parallel 2. the caller invokes Lance segment build APIs for one distributed build -3. Lance discovers the relevant worker outputs, then plans and builds index artifacts +3. Lance plans and builds index artifacts from the worker outputs supplied by the caller 4. the built artifacts are committed into the dataset manifest -For vector indices, the worker outputs are temporary shard directories under a -shared UUID. Internally, Lance can turn these shard outputs into one or more -built physical segments. +For vector indices, the worker outputs are segments stored directly +under `indices//`. Lance can turn these outputs into one or more +physical segments and then commit them as one logical index. ![Distributed Vector Segment Build](../images/distributed_vector_segment_build.svg) @@ -28,50 +28,52 @@ built physical segments. This guide uses the following terms consistently: -- **Staging root**: the shared UUID directory used during distributed index build -- **Partial shard**: one worker output written under the staging root as - `partial_/` -- **Built segment**: one physical index segment produced during segment build and - ready to be committed into the manifest +- **Segment**: one worker output written by `execute_uncommitted()` under + `indices//` +- **Physical segment**: one index segment that is ready to be committed into + the manifest - **Logical index**: the user-visible index identified by name; a logical index - may contain one or more built segments + may contain one or more physical segments For example, a distributed vector build may create a layout like: ```text -indices// -├── partial_/ -│ ├── index.idx -│ └── auxiliary.idx -├── partial_/ -│ ├── index.idx -│ └── auxiliary.idx -└── partial_/ - ├── index.idx - └── auxiliary.idx +indices// +├── index.idx +└── auxiliary.idx + +indices// +├── index.idx +└── auxiliary.idx + +indices// +├── index.idx +└── auxiliary.idx ``` After segment build, Lance produces one or more segment directories: ```text -indices// +indices// ├── index.idx └── auxiliary.idx -indices// +indices// ├── index.idx └── auxiliary.idx ``` -These physical segments are then committed together as one logical index. +These physical segments are then committed together as one logical index. In the +common no-merge case, the input segments are already the physical +segments and `build_all()` returns them unchanged. ## Roles There are two parties involved in distributed indexing: -- **Workers** build partial shards -- **The caller** launches workers, chooses when a distributed build should be - turned into built segments, provides any additional inputs requested by the +- **Workers** build segments +- **The caller** launches workers, chooses how those segments should be turned + into physical segments, provides any additional inputs requested by the segment build APIs, and commits the final result @@ -82,21 +84,22 @@ launching workers and driving the overall workflow. The current model for distributed vector indexing has two layers of parallelism. -### Shard Build +### Worker Build -First, multiple workers build partial shards in parallel: +First, multiple workers build segments in parallel: -1. on each worker, call an uncommitted shard-build API such as - `create_index_builder(...).fragments(...).index_uuid(staging_index_uuid).execute_uncommitted()` - or Python `create_index_uncommitted(..., fragment_ids=..., index_uuid=...)` -2. each worker writes one `partial_/` under the shared staging root +1. on each worker, call a shard-build API such as + `create_index_builder(...).fragments(...).execute_uncommitted()` + or Python `create_index_uncommitted(..., fragment_ids=...)` +2. each worker writes one segment under `indices//` ### Segment Build -Then the caller turns that staging root into one or more built segments: +Then the caller turns those existing segments into one or more physical +segments: -1. open the staging root with `create_index_segment_builder(staging_index_uuid)` -2. provide partial index metadata with `with_partial_indices(...)` +1. create a builder with `create_index_segment_builder()` +2. provide segment metadata with `with_segments(...)` 3. optionally choose a grouping policy with `with_target_segment_bytes(...)` 4. call `plan()` to get `Vec` @@ -105,51 +108,52 @@ At that point the caller has two execution choices: - call `build(plan)` for each plan and run those builds in parallel - call `build_all()` to let Lance build every planned segment on the current node -After the segments are built, publish them with +After the physical segments are built, publish them with `commit_existing_index_segments(...)`. ## Internal Segmented Finalize Model Internally, Lance models distributed vector segment build as: -1. **plan** which partial shards should become each built segment -2. **build** each segment from its selected partial shards +1. **plan** which input segments should become each physical segment +2. **build** each segment from its selected input segments 3. **commit** the resulting physical segments as one logical index -The plan step is driven by the staging root and any additional shard metadata -required by the segment build APIs. +The plan step is driven by the segment metadata returned from +`execute_uncommitted()` and any additional inputs requested by the segment +build APIs. This is intentionally a storage-level model: -- partial shards are temporary worker outputs -- built segments are durable physical artifacts +- segments are worker outputs that are not yet published +- physical segments are durable artifacts referenced by the manifest - the logical index identity is attached only at commit time ## Segment Grouping -When Lance builds segments from a staging root, it may either: +When Lance builds segments from existing inputs, it may either: -- keep shard boundaries, so each partial shard becomes one built segment -- group multiple partial shards into a larger built segment +- keep segment boundaries, so each input segment becomes one physical segment +- group multiple input segments into a larger physical segment -The grouping decision is separate from shard build. Workers only build partial -shards; Lance applies the segment build policy when it plans built segments. +The grouping decision is separate from worker build. Workers only build +segments; Lance applies the segment build policy when it plans +physical segments. ## Responsibility Boundaries The caller is expected to know: - which distributed build is ready for segment build -- any additional shard metadata requested by the segment build APIs -- how the resulting built segments should be published +- the segment metadata returned by worker builds +- how the resulting physical segments should be published Lance is responsible for: -- writing partial shard artifacts -- discovering partial shards under the staging root -- planning built segments from the discovered shard set -- merging shard storage into built segment artifacts -- committing built segments into the manifest +- writing segment artifacts +- planning physical segments from the supplied segment set +- merging segment storage into physical segment artifacts +- committing physical segments into the manifest This split keeps distributed scheduling outside the storage engine while still letting Lance own the on-disk index format. diff --git a/java/lance-jni/src/blocking_dataset.rs b/java/lance-jni/src/blocking_dataset.rs index ff2c261ebdd..203ffcdffa8 100644 --- a/java/lance-jni/src/blocking_dataset.rs +++ b/java/lance-jni/src/blocking_dataset.rs @@ -8,7 +8,7 @@ use crate::namespace::{ }; use crate::session::{handle_from_session, session_from_handle}; use crate::storage_options::JavaStorageOptionsProvider; -use crate::traits::{FromJObjectWithEnv, FromJString, export_vec, import_vec}; +use crate::traits::{FromJObjectWithEnv, FromJString, export_vec, import_vec, import_vec_to_rust}; use crate::utils::{ build_compaction_options, extract_storage_options, extract_write_params, get_scalar_index_params, get_vector_index_params, to_rust_map, @@ -49,7 +49,7 @@ use lance_index::DatasetIndexExt; use lance_index::IndexCriteria as RustIndexCriteria; use lance_index::optimize::OptimizeOptions; use lance_index::scalar::btree::BTreeParameters; -use lance_index::{IndexParams, IndexType}; +use lance_index::{IndexParams, IndexSegment, IndexType}; use lance_io::object_store::ObjectStoreRegistry; use lance_io::object_store::StorageOptionsProvider; use lance_namespace::LanceNamespace; @@ -1070,6 +1070,182 @@ fn inner_merge_index_metadata( Ok(()) } +#[unsafe(no_mangle)] +pub extern "system" fn Java_org_lance_Dataset_nativeBuildIndexSegments<'local>( + mut env: JNIEnv<'local>, + java_dataset: JObject, + java_segments: JObject, + target_segment_bytes_jobj: JObject, +) -> JObject<'local> { + ok_or_throw!( + env, + inner_build_index_segments( + &mut env, + java_dataset, + java_segments, + target_segment_bytes_jobj + ) + ) +} + +fn inner_build_index_segments<'local>( + env: &mut JNIEnv<'local>, + java_dataset: JObject, + java_segments: JObject, + target_segment_bytes_jobj: JObject, +) -> Result> { + let segments = import_vec_to_rust(env, &java_segments, |env, obj| obj.extract_object(env))?; + let target_segment_bytes = env + .get_long_opt(&target_segment_bytes_jobj)? + .map(|v| v as u64); + let template = segment_template(&segments)?; + + let built_segments = { + let dataset_guard = + unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }?; + let mut builder = dataset_guard + .inner + .create_index_segment_builder() + .with_segments(segments); + if let Some(target_segment_bytes) = target_segment_bytes { + builder = builder.with_target_segment_bytes(target_segment_bytes); + } + RT.block_on(builder.build_all())? + }; + + let built_metadata = built_segments + .into_iter() + .map(|segment| index_segment_to_metadata(&template, segment)) + .collect::>(); + export_vec(env, &built_metadata) +} + +#[unsafe(no_mangle)] +pub extern "system" fn Java_org_lance_Dataset_nativeCommitExistingIndexSegments<'local>( + mut env: JNIEnv<'local>, + java_dataset: JObject, + index_name: JString, + column: JString, + java_segments: JObject, +) -> JObject<'local> { + ok_or_throw!( + env, + inner_commit_existing_index_segments( + &mut env, + java_dataset, + index_name, + column, + java_segments + ) + ) +} + +fn inner_commit_existing_index_segments<'local>( + env: &mut JNIEnv<'local>, + java_dataset: JObject, + index_name: JString, + column: JString, + java_segments: JObject, +) -> Result> { + let index_name = index_name.extract(env)?; + let column = column.extract(env)?; + let segment_metadata = + import_vec_to_rust(env, &java_segments, |env, obj| obj.extract_object(env))?; + let segments = segment_metadata + .iter() + .map(index_metadata_to_segment) + .collect::>>()?; + + let committed = { + let mut dataset_guard = + unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }?; + RT.block_on(dataset_guard.inner.commit_existing_index_segments( + &index_name, + &column, + segments, + ))?; + RT.block_on(dataset_guard.inner.load_indices_by_name(&index_name))? + }; + + export_vec(env, &committed) +} + +struct SegmentTemplate { + name: String, + fields: Vec, + dataset_version: u64, +} + +fn segment_template(segments: &[IndexMetadata]) -> Result { + let first = segments + .first() + .ok_or_else(|| Error::input_error("segments cannot be empty".to_string()))?; + for segment in &segments[1..] { + if segment.name != first.name { + return Err(Error::input_error(format!( + "All segments must share the same index name, got '{}' and '{}'", + first.name, segment.name + ))); + } + if segment.fields != first.fields { + return Err(Error::input_error(format!( + "All segments must target the same field ids, got {:?} and {:?}", + first.fields, segment.fields + ))); + } + if segment.dataset_version != first.dataset_version { + return Err(Error::input_error(format!( + "All segments must share the same dataset version, got {} and {}", + first.dataset_version, segment.dataset_version + ))); + } + } + + Ok(SegmentTemplate { + name: first.name.clone(), + fields: first.fields.clone(), + dataset_version: first.dataset_version, + }) +} + +fn index_metadata_to_segment(metadata: &IndexMetadata) -> Result { + let fragment_bitmap = metadata.fragment_bitmap.clone().ok_or_else(|| { + Error::input_error(format!( + "Segment '{}' is missing fragment coverage metadata", + metadata.uuid + )) + })?; + let index_details = metadata.index_details.clone().ok_or_else(|| { + Error::input_error(format!( + "Segment '{}' is missing index details metadata", + metadata.uuid + )) + })?; + + Ok(IndexSegment::new( + metadata.uuid, + fragment_bitmap, + index_details, + metadata.index_version, + )) +} + +fn index_segment_to_metadata(template: &SegmentTemplate, segment: IndexSegment) -> IndexMetadata { + let (uuid, fragment_bitmap, index_details, index_version) = segment.into_parts(); + IndexMetadata { + uuid, + fields: template.fields.clone(), + name: template.name.clone(), + dataset_version: template.dataset_version, + fragment_bitmap: Some(fragment_bitmap), + index_details: Some(index_details), + index_version, + created_at: Some(Utc::now()), + base_id: None, + files: None, + } +} + #[unsafe(no_mangle)] pub extern "system" fn Java_org_lance_Dataset_nativeOptimizeIndices( mut env: JNIEnv, diff --git a/java/src/main/java/org/lance/Dataset.java b/java/src/main/java/org/lance/Dataset.java index 166feebba20..506827be902 100644 --- a/java/src/main/java/org/lance/Dataset.java +++ b/java/src/main/java/org/lance/Dataset.java @@ -1019,6 +1019,51 @@ public void mergeIndexMetadata( private native void innerMergeIndexMetadata( String indexUUID, int indexType, Optional batchReadHead); + /** + * Build physical vector index segments from previously-created fragment-level index outputs. + * + * @param segments segment metadata returned by {@link #createIndex(IndexOptions)} when + * fragmentIds are provided + * @param targetSegmentBytes optional size target for merged physical segments + * @return built physical segment metadata + */ + public List buildIndexSegments(List segments, Optional targetSegmentBytes) { + Preconditions.checkNotNull(segments, "segments cannot be null"); + Preconditions.checkArgument(!segments.isEmpty(), "segments cannot be empty"); + try (LockManager.WriteLock writeLock = lockManager.acquireWriteLock()) { + Preconditions.checkArgument(nativeDatasetHandle != 0, "Dataset is closed"); + return nativeBuildIndexSegments(segments, targetSegmentBytes); + } + } + + private native List nativeBuildIndexSegments( + List segments, Optional targetSegmentBytes); + + /** + * Publish one or more existing physical index segments as a logical index. + * + * @param indexName logical index name + * @param column indexed column name + * @param segments physical segment metadata to publish + * @return committed manifest metadata + */ + public List commitExistingIndexSegments( + String indexName, String column, List segments) { + Preconditions.checkArgument( + indexName != null && !indexName.isEmpty(), "indexName cannot be null or empty"); + Preconditions.checkArgument( + column != null && !column.isEmpty(), "column cannot be null or empty"); + Preconditions.checkNotNull(segments, "segments cannot be null"); + Preconditions.checkArgument(!segments.isEmpty(), "segments cannot be empty"); + try (LockManager.WriteLock writeLock = lockManager.acquireWriteLock()) { + Preconditions.checkArgument(nativeDatasetHandle != 0, "Dataset is closed"); + return nativeCommitExistingIndexSegments(indexName, column, segments); + } + } + + private native List nativeCommitExistingIndexSegments( + String indexName, String column, List segments); + /** * Count the number of rows in the dataset. * diff --git a/java/src/test/java/org/lance/index/VectorIndexTest.java b/java/src/test/java/org/lance/index/VectorIndexTest.java index 4a5902044da..a96b6593d30 100755 --- a/java/src/test/java/org/lance/index/VectorIndexTest.java +++ b/java/src/test/java/org/lance/index/VectorIndexTest.java @@ -13,18 +13,15 @@ */ package org.lance.index; -import org.lance.CommitBuilder; import org.lance.Dataset; import org.lance.Fragment; import org.lance.TestVectorDataset; -import org.lance.Transaction; import org.lance.index.vector.IvfBuildParams; import org.lance.index.vector.PQBuildParams; import org.lance.index.vector.RQBuildParams; import org.lance.index.vector.SQBuildParams; import org.lance.index.vector.VectorIndexParams; import org.lance.index.vector.VectorTrainer; -import org.lance.operation.CreateIndex; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -33,8 +30,6 @@ import java.util.Collections; import java.util.List; import java.util.Optional; -import java.util.UUID; -import java.util.stream.Collectors; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -75,72 +70,40 @@ public void testCreateIvfFlatIndexDistributively(@TempDir Path tempDir) throws E IndexParams indexParams = IndexParams.builder().setVectorIndexParams(vectorIndexParams).build(); - UUID indexUUID = UUID.randomUUID(); - - // Partially create index on the first fragment - dataset.createIndex( - IndexOptions.builder( - Collections.singletonList(TestVectorDataset.vectorColumnName), - IndexType.IVF_FLAT, - indexParams) - .withIndexName(TestVectorDataset.indexName) - .withIndexUUID(indexUUID.toString()) - .withFragmentIds(Collections.singletonList(fragments.get(0).getId())) - .build()); - - // Partially create index on the second fragment with the same UUID - dataset.createIndex( - IndexOptions.builder( - Collections.singletonList(TestVectorDataset.vectorColumnName), - IndexType.IVF_FLAT, - indexParams) - .withIndexName(TestVectorDataset.indexName) - .withIndexUUID(indexUUID.toString()) - .withFragmentIds(Collections.singletonList(fragments.get(1).getId())) - .build()); + Index firstSegment = + dataset.createIndex( + IndexOptions.builder( + Collections.singletonList(TestVectorDataset.vectorColumnName), + IndexType.IVF_FLAT, + indexParams) + .withIndexName(TestVectorDataset.indexName) + .withFragmentIds(Collections.singletonList(fragments.get(0).getId())) + .build()); + + Index secondSegment = + dataset.createIndex( + IndexOptions.builder( + Collections.singletonList(TestVectorDataset.vectorColumnName), + IndexType.IVF_FLAT, + indexParams) + .withIndexName(TestVectorDataset.indexName) + .withFragmentIds(Collections.singletonList(fragments.get(1).getId())) + .build()); // The index should not be visible before metadata merge & commit assertFalse( dataset.listIndexes().contains(TestVectorDataset.indexName), "Partially created IVF_FLAT index should not present before commit"); - // Merge index metadata for all fragment-level pieces - dataset.mergeIndexMetadata(indexUUID.toString(), IndexType.IVF_FLAT, Optional.empty()); - - int fieldId = - dataset.getLanceSchema().fields().stream() - .filter(f -> f.getName().equals(TestVectorDataset.vectorColumnName)) - .findAny() - .orElseThrow( - () -> new RuntimeException("Cannot find vector field for TestVectorDataset")) - .getId(); - - long datasetVersion = dataset.version(); - - Index index = - Index.builder() - .uuid(indexUUID) - .name(TestVectorDataset.indexName) - .fields(Collections.singletonList(fieldId)) - .datasetVersion(datasetVersion) - .indexVersion(0) - .fragments( - fragments.stream().limit(2).map(Fragment::getId).collect(Collectors.toList())) - .build(); + List builtSegments = + dataset.buildIndexSegments(List.of(firstSegment, secondSegment), Optional.empty()); + assertEquals(2, builtSegments.size()); - CreateIndex createIndexOp = - CreateIndex.builder().withNewIndices(Collections.singletonList(index)).build(); - - try (Transaction createIndexTx = - new Transaction.Builder() - .readVersion(dataset.version()) - .operation(createIndexOp) - .build()) { - try (Dataset newDataset = new CommitBuilder(dataset).execute(createIndexTx)) { - assertEquals(datasetVersion + 1, newDataset.version()); - assertTrue(newDataset.listIndexes().contains(TestVectorDataset.indexName)); - } - } + List committed = + dataset.commitExistingIndexSegments( + TestVectorDataset.indexName, TestVectorDataset.vectorColumnName, builtSegments); + assertEquals(2, committed.size()); + assertTrue(dataset.listIndexes().contains(TestVectorDataset.indexName)); } } } @@ -200,68 +163,39 @@ public void testCreateIvfPqIndexDistributively(@TempDir Path tempDir) throws Exc IndexParams indexParams = IndexParams.builder().setVectorIndexParams(vectorIndexParams).build(); - UUID indexUUID = UUID.randomUUID(); - - dataset.createIndex( - IndexOptions.builder( - Collections.singletonList(TestVectorDataset.vectorColumnName), - IndexType.IVF_PQ, - indexParams) - .withIndexName(TestVectorDataset.indexName) - .withIndexUUID(indexUUID.toString()) - .withFragmentIds(Collections.singletonList(fragments.get(0).getId())) - .build()); - - dataset.createIndex( - IndexOptions.builder( - Collections.singletonList(TestVectorDataset.vectorColumnName), - IndexType.IVF_PQ, - indexParams) - .withIndexName(TestVectorDataset.indexName) - .withIndexUUID(indexUUID.toString()) - .withFragmentIds(Collections.singletonList(fragments.get(1).getId())) - .build()); + Index firstSegment = + dataset.createIndex( + IndexOptions.builder( + Collections.singletonList(TestVectorDataset.vectorColumnName), + IndexType.IVF_PQ, + indexParams) + .withIndexName(TestVectorDataset.indexName) + .withFragmentIds(Collections.singletonList(fragments.get(0).getId())) + .build()); + + Index secondSegment = + dataset.createIndex( + IndexOptions.builder( + Collections.singletonList(TestVectorDataset.vectorColumnName), + IndexType.IVF_PQ, + indexParams) + .withIndexName(TestVectorDataset.indexName) + .withFragmentIds(Collections.singletonList(fragments.get(1).getId())) + .build()); assertFalse( dataset.listIndexes().contains(TestVectorDataset.indexName), "Partially created IVF_PQ index should not present before commit"); - dataset.mergeIndexMetadata(indexUUID.toString(), IndexType.IVF_PQ, Optional.empty()); - - int fieldId = - dataset.getLanceSchema().fields().stream() - .filter(f -> f.getName().equals(TestVectorDataset.vectorColumnName)) - .findAny() - .orElseThrow( - () -> new RuntimeException("Cannot find vector field for TestVectorDataset")) - .getId(); - - long datasetVersion = dataset.version(); - - Index index = - Index.builder() - .uuid(indexUUID) - .name(TestVectorDataset.indexName) - .fields(Collections.singletonList(fieldId)) - .datasetVersion(datasetVersion) - .indexVersion(0) - .fragments( - fragments.stream().limit(2).map(Fragment::getId).collect(Collectors.toList())) - .build(); + List builtSegments = + dataset.buildIndexSegments(List.of(firstSegment, secondSegment), Optional.empty()); + assertEquals(2, builtSegments.size()); - CreateIndex createIndexOp = - CreateIndex.builder().withNewIndices(Collections.singletonList(index)).build(); - - try (Transaction createIndexTx = - new Transaction.Builder() - .readVersion(dataset.version()) - .operation(createIndexOp) - .build()) { - try (Dataset newDataset = new CommitBuilder(dataset).execute(createIndexTx)) { - assertEquals(datasetVersion + 1, newDataset.version()); - assertTrue(newDataset.listIndexes().contains(TestVectorDataset.indexName)); - } - } + List committed = + dataset.commitExistingIndexSegments( + TestVectorDataset.indexName, TestVectorDataset.vectorColumnName, builtSegments); + assertEquals(2, committed.size()); + assertTrue(dataset.listIndexes().contains(TestVectorDataset.indexName)); } } } @@ -305,68 +239,39 @@ public void testCreateIvfSqIndexDistributively(@TempDir Path tempDir) throws Exc IndexParams indexParams = IndexParams.builder().setVectorIndexParams(vectorIndexParams).build(); - UUID indexUUID = UUID.randomUUID(); - - dataset.createIndex( - IndexOptions.builder( - Collections.singletonList(TestVectorDataset.vectorColumnName), - IndexType.IVF_SQ, - indexParams) - .withIndexName(TestVectorDataset.indexName) - .withIndexUUID(indexUUID.toString()) - .withFragmentIds(Collections.singletonList(fragments.get(0).getId())) - .build()); - - dataset.createIndex( - IndexOptions.builder( - Collections.singletonList(TestVectorDataset.vectorColumnName), - IndexType.IVF_SQ, - indexParams) - .withIndexName(TestVectorDataset.indexName) - .withIndexUUID(indexUUID.toString()) - .withFragmentIds(Collections.singletonList(fragments.get(1).getId())) - .build()); + Index firstSegment = + dataset.createIndex( + IndexOptions.builder( + Collections.singletonList(TestVectorDataset.vectorColumnName), + IndexType.IVF_SQ, + indexParams) + .withIndexName(TestVectorDataset.indexName) + .withFragmentIds(Collections.singletonList(fragments.get(0).getId())) + .build()); + + Index secondSegment = + dataset.createIndex( + IndexOptions.builder( + Collections.singletonList(TestVectorDataset.vectorColumnName), + IndexType.IVF_SQ, + indexParams) + .withIndexName(TestVectorDataset.indexName) + .withFragmentIds(Collections.singletonList(fragments.get(1).getId())) + .build()); assertFalse( dataset.listIndexes().contains(TestVectorDataset.indexName), "Partially created IVF_SQ index should not present before commit"); - dataset.mergeIndexMetadata(indexUUID.toString(), IndexType.IVF_SQ, Optional.empty()); - - int fieldId = - dataset.getLanceSchema().fields().stream() - .filter(f -> f.getName().equals(TestVectorDataset.vectorColumnName)) - .findAny() - .orElseThrow( - () -> new RuntimeException("Cannot find vector field for TestVectorDataset")) - .getId(); - - long datasetVersion = dataset.version(); - - Index index = - Index.builder() - .uuid(indexUUID) - .name(TestVectorDataset.indexName) - .fields(Collections.singletonList(fieldId)) - .datasetVersion(datasetVersion) - .indexVersion(0) - .fragments( - fragments.stream().limit(2).map(Fragment::getId).collect(Collectors.toList())) - .build(); + List builtSegments = + dataset.buildIndexSegments(List.of(firstSegment, secondSegment), Optional.empty()); + assertEquals(2, builtSegments.size()); - CreateIndex createIndexOp = - CreateIndex.builder().withNewIndices(Collections.singletonList(index)).build(); - - try (Transaction createIndexTx = - new Transaction.Builder() - .readVersion(dataset.version()) - .operation(createIndexOp) - .build()) { - try (Dataset newDataset = new CommitBuilder(dataset).execute(createIndexTx)) { - assertEquals(datasetVersion + 1, newDataset.version()); - assertTrue(newDataset.listIndexes().contains(TestVectorDataset.indexName)); - } - } + List committed = + dataset.commitExistingIndexSegments( + TestVectorDataset.indexName, TestVectorDataset.vectorColumnName, builtSegments); + assertEquals(2, committed.size()); + assertTrue(dataset.listIndexes().contains(TestVectorDataset.indexName)); } } } diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index e8d632b0ecb..fd2db9de351 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -2591,14 +2591,14 @@ def create_scalar_index( fragment_ids : List[int], optional If provided, the index will be created only on the specified fragments. This enables distributed/fragment-level indexing. When provided, the - method returns an IndexMetadata object but does not commit the index - to the dataset. The index can be committed later using the commit API. + method returns metadata for one segment but does not commit + the index to the dataset. The segment can be planned, merged, and + committed later using the segment builder and commit APIs. This parameter is passed via kwargs internally. index_uuid : str, optional - A UUID to use for fragment-level distributed indexing - multiple fragment-level indices need to share UUID for later merging. - If not provided, a new UUID will be generated. This parameter is passed via - kwargs internally. + A UUID to use for the segment written by this call. + If not provided, a new UUID will be generated. This parameter is + passed via kwargs internally. with_position: bool, default False This is for the ``INVERTED`` index. If True, the index will store the @@ -3258,12 +3258,12 @@ def create_index( fragment_ids : List[int], optional If provided, the index will be created only on the specified fragments. This enables distributed/fragment-level indexing. When provided, the - method creates temporary index metadata but does not commit the index - to the dataset. The index can be committed later using - merge_index_metadata(index_uuid, "VECTOR", column=..., index_name=...). + method creates one segment but does not commit the index + to the dataset. The returned metadata can be passed to + ``create_index_segment_builder().with_segments(...)`` + and then committed with ``commit_existing_index_segments(...)``. index_uuid : str, optional - A UUID to use for fragment-level distributed indexing. Multiple - fragment-level indices need to share UUID for later merging. + A UUID to use for the segment written by this call. If not provided, a new UUID will be generated. target_partition_size: int, optional The target partition size. If set, the number of partitions will be computed @@ -3426,33 +3426,32 @@ def create_index_uncommitted( **kwargs, ) -> Index: """ - Create one uncommitted partial index and return its metadata. + Create one segment without publishing it and return its metadata. - This is the public shard-build API for distributed index construction. - Unlike :meth:`create_index`, this method does not publish the index into - the dataset manifest. Instead, it writes one partial index under the - staging UUID and returns the resulting :class:`Index` metadata. + This is the public distributed-build API for vector index + construction. Unlike :meth:`create_index`, this method does not publish + the index into the dataset manifest. Instead, it writes one segment + under ``_indices//`` and returns the resulting + :class:`Index` metadata. Callers should: - 1. run :meth:`create_index_uncommitted` on each worker with the worker's - assigned ``fragment_ids`` and a shared ``index_uuid`` + 1. run :meth:`create_index_uncommitted` on each worker with that worker's + assigned ``fragment_ids`` 2. collect the returned :class:`Index` objects - 3. pass them to :meth:`IndexSegmentBuilder.with_partial_indices` - 4. build one or more segments and commit them with + 3. pass them to :meth:`IndexSegmentBuilder.with_segments` + 4. build one or more physical segments and commit them with :meth:`commit_existing_index_segments` - Parameters are the same as :meth:`create_index`, with two additional - requirements for distributed shard build: + Parameters are the same as :meth:`create_index`, with one additional + requirement: - ``fragment_ids`` must be provided - - workers that belong to the same distributed build must share the same - ``index_uuid`` Returns ------- Index - Metadata for the partial index that was written by this call. + Metadata for the segment that was written by this call. """ return self._create_index_impl( column, @@ -3514,16 +3513,17 @@ def merge_index_metadata( batch_readhead: Optional[int] = None, ): """ - Merge distributed index metadata for supported scalar - and vector index types. + Merge distributed scalar index metadata. - This method supports all index types defined in - :class:`lance.indices.SupportedDistributedIndices`, - including scalar indices and precise vector index types. + Vector distributed indexing no longer uses this API. For vector indices, + build segments with :meth:`create_index_uncommitted`, plan or + merge them with :meth:`create_index_segment_builder`, and publish them + with :meth:`commit_existing_index_segments`. This method does NOT commit changes. - This API merges temporary index files (e.g., per-fragment partials). + This API merges temporary scalar index files (for example per-fragment + BTree or inverted index outputs). After this method returns, callers MUST explicitly commit the index manifest using lance.LanceDataset.commit(...) with a LanceOperation.CreateIndex. @@ -3531,11 +3531,11 @@ def merge_index_metadata( Parameters ---------- index_uuid: str - The shared UUID used when building fragment-level indices. + The shared UUID used when building fragment-level scalar indices. index_type: str Index type name. Must be one of the enum values in :class:`lance.indices.SupportedDistributedIndices` - (for example ``"IVF_PQ"``). + supported by scalar distributed merge. batch_readhead: int, optional Prefetch concurrency used by BTREE merge reader. Default: 1. """ @@ -3552,16 +3552,15 @@ def merge_index_metadata( self._ds.merge_index_metadata(index_uuid, t, batch_readhead) return None - def create_index_segment_builder(self, staging_index_uuid: str): + def create_index_segment_builder(self): """ - Create a builder for turning partial index outputs into committed segments. + Create a builder for turning existing segments into physical segments. - The caller should pass the shared index UUID used during - :meth:`create_index` with ``fragment_ids=...`` and ``index_uuid=...``. - Then provide the returned partial index metadata through - :meth:`IndexSegmentBuilder.with_partial_indices`. + Provide the segment metadata returned by + :meth:`create_index_uncommitted` through + :meth:`IndexSegmentBuilder.with_segments`. """ - return self._ds.create_index_segment_builder(staging_index_uuid) + return self._ds.create_index_segment_builder() def commit_existing_index_segments( self, index_name: str, column: str, segments: List[IndexSegment] diff --git a/python/python/lance/indices/__init__.py b/python/python/lance/indices/__init__.py index 085ff66e252..8dfb1148345 100644 --- a/python/python/lance/indices/__init__.py +++ b/python/python/lance/indices/__init__.py @@ -31,9 +31,11 @@ class SupportedDistributedIndices(str, Enum): # Scalar index types BTREE = "BTREE" INVERTED = "INVERTED" + # Precise vector index types supported by distributed merge IVF_FLAT = "IVF_FLAT" IVF_PQ = "IVF_PQ" IVF_SQ = "IVF_SQ" + # Deprecated generic placeholder (kept for backward compatibility) VECTOR = "VECTOR" diff --git a/python/python/tests/test_vector_index.py b/python/python/tests/test_vector_index.py index 973264c475c..2451f4684b4 100644 --- a/python/python/tests/test_vector_index.py +++ b/python/python/tests/test_vector_index.py @@ -9,7 +9,6 @@ import string import tempfile import time -import uuid from pathlib import Path from typing import Optional @@ -19,7 +18,7 @@ import pyarrow.compute as pc import pytest from lance import LanceDataset, LanceFragment -from lance.dataset import Index, VectorIndexReader +from lance.dataset import VectorIndexReader from lance.indices import IndexFileVersion, IndicesBuilder from lance.query import MatchQuery, PhraseQuery from lance.util import validate_vector_index # noqa: E402 @@ -2112,38 +2111,67 @@ def build_distributed_vector_index( world=2, **index_params, ): - """Build a distributed vector index over fragment groups and commit. - - Steps: - - Partition fragments into `world` groups - - For each group, call create_index with fragment_ids and a shared index_uuid - - Merge metadata (commit index manifest) - - Returns the dataset (post-merge) for querying. - """ + """Build a distributed vector index over fragment groups and commit.""" frags = dataset.get_fragments() frag_ids = [f.fragment_id for f in frags] groups = _split_fragments_evenly(frag_ids, world) - shared_uuid = str(uuid.uuid4()) + segments = [] for g in groups: if not g: continue - dataset.create_index( - column=column, - index_type=index_type, - fragment_ids=g, - index_uuid=shared_uuid, - num_partitions=num_partitions, - num_sub_vectors=num_sub_vectors, - **index_params, + segments.append( + dataset.create_index_uncommitted( + column=column, + index_type=index_type, + fragment_ids=g, + num_partitions=num_partitions, + num_sub_vectors=num_sub_vectors, + **index_params, + ) ) - # Merge physical index metadata and commit manifest for VECTOR - dataset.merge_index_metadata(shared_uuid, index_type) - dataset = _commit_index_helper(dataset, shared_uuid, column="vector") - return dataset + segments = ( + dataset.create_index_segment_builder().with_segments(segments).build_all() + ) + return dataset.commit_existing_index_segments(f"{column}_idx", column, segments) + + +def _commit_segments_helper( + ds, segments, column: str, index_name: Optional[str] = None +): + if index_name is None: + index_name = f"{column}_idx" + return ds.commit_existing_index_segments(index_name, column, segments) + + +def _build_segments( + ds, + column: str, + index_type: str, + fragment_groups, + *, + index_name: Optional[str] = None, + **index_kwargs, +): + if index_name is None: + index_name = f"{column}_idx" + + segments = [] + for group in fragment_groups: + if not group: + continue + segments.append( + ds.create_index_uncommitted( + column=column, + index_type=index_type, + name=index_name, + fragment_ids=group, + **index_kwargs, + ) + ) + return segments def assert_distributed_vector_consistency( @@ -2458,7 +2486,6 @@ def test_metadata_merge_pq_success(tmp_path): mid = max(1, len(frags) // 2) node1 = [f.fragment_id for f in frags[:mid]] node2 = [f.fragment_id for f in frags[mid:]] - shared_uuid = str(uuid.uuid4()) builder = IndicesBuilder(ds, "vector") pre = builder.prepare_global_ivf_pq( num_partitions=8, @@ -2468,28 +2495,19 @@ def test_metadata_merge_pq_success(tmp_path): max_iters=20, ) try: - ds.create_index( - column="vector", - index_type="IVF_PQ", - fragment_ids=node1, - index_uuid=shared_uuid, - num_partitions=8, - num_sub_vectors=16, - ivf_centroids=pre["ivf_centroids"], - pq_codebook=pre["pq_codebook"], - ) - ds.create_index( - column="vector", - index_type="IVF_PQ", - fragment_ids=node2, - index_uuid=shared_uuid, + segments = _build_segments( + ds, + "vector", + "IVF_PQ", + [node1, node2], + index_name="vector_idx", num_partitions=8, num_sub_vectors=16, ivf_centroids=pre["ivf_centroids"], pq_codebook=pre["pq_codebook"], ) - ds.merge_index_metadata(shared_uuid, "IVF_PQ") - ds = _commit_index_helper(ds, shared_uuid, "vector") + segments = ds.create_index_segment_builder().with_segments(segments).build_all() + ds = _commit_segments_helper(ds, segments, "vector") q = np.random.rand(128).astype(np.float32) results = ds.to_table(nearest={"column": "vector", "q": q, "k": 10}) assert 0 < len(results) <= 10 @@ -2504,7 +2522,6 @@ def test_distributed_workflow_merge_and_search(tmp_path): frags = ds.get_fragments() if len(frags) < 2: pytest.skip("Need at least 2 fragments for distributed testing") - shared_uuid = str(uuid.uuid4()) mid = len(frags) // 2 node1 = [f.fragment_id for f in frags[:mid]] node2 = [f.fragment_id for f in frags[mid:]] @@ -2517,28 +2534,19 @@ def test_distributed_workflow_merge_and_search(tmp_path): max_iters=20, ) try: - ds.create_index( - column="vector", - index_type="IVF_PQ", - fragment_ids=node1, - index_uuid=shared_uuid, - num_partitions=4, - num_sub_vectors=4, - ivf_centroids=pre["ivf_centroids"], - pq_codebook=pre["pq_codebook"], - ) - ds.create_index( - column="vector", - index_type="IVF_PQ", - fragment_ids=node2, - index_uuid=shared_uuid, + segments = _build_segments( + ds, + "vector", + "IVF_PQ", + [node1, node2], + index_name="vector_idx", num_partitions=4, num_sub_vectors=4, ivf_centroids=pre["ivf_centroids"], pq_codebook=pre["pq_codebook"], ) - ds._ds.merge_index_metadata(shared_uuid, "IVF_PQ") - ds = _commit_index_helper(ds, shared_uuid, "vector") + segments = ds.create_index_segment_builder().with_segments(segments).build_all() + ds = _commit_segments_helper(ds, segments, "vector") q = np.random.rand(128).astype(np.float32) results = ds.to_table(nearest={"column": "vector", "q": q, "k": 10}) assert 0 < len(results) <= 10 @@ -2552,8 +2560,6 @@ def test_vector_merge_two_shards_success_flat(tmp_path): assert len(frags) >= 2 shard1 = [frags[0].fragment_id] shard2 = [frags[1].fragment_id] - shared_uuid = str(uuid.uuid4()) - # Global preparation builder = IndicesBuilder(ds, "vector") preprocessed = builder.prepare_global_ivf_pq( @@ -2564,28 +2570,19 @@ def test_vector_merge_two_shards_success_flat(tmp_path): max_iters=20, ) - ds.create_index( - column="vector", - index_type="IVF_FLAT", - fragment_ids=shard1, - index_uuid=shared_uuid, - num_partitions=4, - num_sub_vectors=128, - ivf_centroids=preprocessed["ivf_centroids"], - pq_codebook=preprocessed["pq_codebook"], - ) - ds.create_index( - column="vector", - index_type="IVF_FLAT", - fragment_ids=shard2, - index_uuid=shared_uuid, + segments = _build_segments( + ds, + "vector", + "IVF_FLAT", + [shard1, shard2], + index_name="vector_idx", num_partitions=4, num_sub_vectors=128, ivf_centroids=preprocessed["ivf_centroids"], pq_codebook=preprocessed["pq_codebook"], ) - ds._ds.merge_index_metadata(shared_uuid, "IVF_FLAT", None) - ds = _commit_index_helper(ds, shared_uuid, column="vector") + segments = ds.create_index_segment_builder().with_segments(segments).build_all() + ds = _commit_segments_helper(ds, segments, column="vector") q = np.random.rand(128).astype(np.float32) result = ds.to_table(nearest={"column": "vector", "q": q, "k": 5}) assert 0 < len(result) <= 5 @@ -2605,8 +2602,6 @@ def test_distributed_ivf_parameterized(tmp_path, index_type, num_sub_vectors): mid = len(frags) // 2 node1 = [f.fragment_id for f in frags[:mid]] node2 = [f.fragment_id for f in frags[mid:]] - shared_uuid = str(uuid.uuid4()) - builder = IndicesBuilder(ds, "vector") pre = builder.prepare_global_ivf_pq( num_partitions=4, @@ -2620,7 +2615,6 @@ def test_distributed_ivf_parameterized(tmp_path, index_type, num_sub_vectors): base_kwargs = dict( column="vector", index_type=index_type, - index_uuid=shared_uuid, num_partitions=4, num_sub_vectors=num_sub_vectors, ) @@ -2636,11 +2630,12 @@ def test_distributed_ivf_parameterized(tmp_path, index_type, num_sub_vectors): ivf_centroids=pre["ivf_centroids"], pq_codebook=pre["pq_codebook"] ) - ds.create_index(**kwargs1) - ds.create_index(**kwargs2) - - ds._ds.merge_index_metadata(shared_uuid, index_type, None) - ds = _commit_index_helper(ds, shared_uuid, "vector") + segments = [ + ds.create_index_uncommitted(**kwargs1), + ds.create_index_uncommitted(**kwargs2), + ] + segments = ds.create_index_segment_builder().with_segments(segments).build_all() + ds = _commit_segments_helper(ds, segments, "vector") q = np.random.rand(128).astype(np.float32) results = ds.to_table(nearest={"column": "vector", "q": q, "k": 10}) @@ -2649,45 +2644,6 @@ def test_distributed_ivf_parameterized(tmp_path, index_type, num_sub_vectors): raise e -def _commit_index_helper( - ds, index_uuid: str, column: str, index_name: Optional[str] = None -): - """Helper to finalize index commit after merge_index_metadata. - - Builds a lance.dataset.Index record and commits a CreateIndex operation. - Returns the updated dataset object. - """ - - # Resolve field id for the target column - lance_field = ds.lance_schema.field(column) - if lance_field is None: - raise KeyError(f"{column} not found in schema") - field_id = lance_field.id() - - # Default index name if not provided - if index_name is None: - index_name = f"{column}_idx" - - # Build fragment id set - frag_ids = set(f.fragment_id for f in ds.get_fragments()) - - # Construct Index dataclass and commit operation - index = Index( - uuid=index_uuid, - name=index_name, - fields=[field_id], - dataset_version=ds.version, - fragment_ids=frag_ids, - index_version=0, - ) - create_index_op = lance.LanceOperation.CreateIndex( - new_indices=[index], removed_indices=[] - ) - ds = lance.LanceDataset.commit(ds.uri, create_index_op, read_version=ds.version) - # Ensure unified index partitions are materialized - return ds - - @pytest.mark.parametrize( "index_type,num_sub_vectors", [ @@ -2701,8 +2657,6 @@ def test_merge_two_shards_parameterized(tmp_path, index_type, num_sub_vectors): assert len(frags) >= 2 shard1 = [frags[0].fragment_id] shard2 = [frags[1].fragment_id] - shared_uuid = str(uuid.uuid4()) - builder = IndicesBuilder(ds, "vector") pre = builder.prepare_global_ivf_pq( num_partitions=4, @@ -2715,7 +2669,6 @@ def test_merge_two_shards_parameterized(tmp_path, index_type, num_sub_vectors): base_kwargs = { "column": "vector", "index_type": index_type, - "index_uuid": shared_uuid, "num_partitions": 4, } @@ -2729,7 +2682,7 @@ def test_merge_two_shards_parameterized(tmp_path, index_type, num_sub_vectors): # only PQ has pq_codebook if "pq_codebook" in pre: kwargs1["pq_codebook"] = pre["pq_codebook"] - ds.create_index(**kwargs1) + segment1 = ds.create_index_uncommitted(**kwargs1) # second shard kwargs2 = dict(base_kwargs) @@ -2740,10 +2693,14 @@ def test_merge_two_shards_parameterized(tmp_path, index_type, num_sub_vectors): kwargs2["ivf_centroids"] = pre["ivf_centroids"] if "pq_codebook" in pre: kwargs2["pq_codebook"] = pre["pq_codebook"] - ds.create_index(**kwargs2) + segment2 = ds.create_index_uncommitted(**kwargs2) - ds._ds.merge_index_metadata(shared_uuid, index_type, None) - ds = _commit_index_helper(ds, shared_uuid, column="vector") + segments = ( + ds.create_index_segment_builder() + .with_segments([segment1, segment2]) + .build_all() + ) + ds = _commit_segments_helper(ds, segments, column="vector") q = np.random.rand(128).astype(np.float32) results = ds.to_table(nearest={"column": "vector", "q": q, "k": 5}) @@ -2754,8 +2711,6 @@ def test_index_segment_builder_builds_vector_segments(tmp_path): ds = _make_sample_dataset_base(tmp_path, "segment_builder_ds", 2000, 128) frags = ds.get_fragments() assert len(frags) >= 2 - shared_uuid = str(uuid.uuid4()) - builder = IndicesBuilder(ds, "vector") preprocessed = builder.prepare_global_ivf_pq( num_partitions=4, @@ -2765,14 +2720,13 @@ def test_index_segment_builder_builds_vector_segments(tmp_path): max_iters=20, ) - partial_indices = [ + segments = [ ds.create_index_uncommitted( "vector", "IVF_FLAT", name="vector_idx", train=True, fragment_ids=[fragment.fragment_id], - index_uuid=shared_uuid, num_partitions=4, num_sub_vectors=128, ivf_centroids=preprocessed["ivf_centroids"], @@ -2781,12 +2735,10 @@ def test_index_segment_builder_builds_vector_segments(tmp_path): for fragment in frags[:2] ] - segment_builder = ds.create_index_segment_builder(shared_uuid).with_partial_indices( - partial_indices - ) + segment_builder = ds.create_index_segment_builder().with_segments(segments) plans = segment_builder.plan() assert len(plans) == 2 - assert all(len(plan.partial_indices) == 1 for plan in plans) + assert all(len(plan.segments) == 1 for plan in plans) segments = segment_builder.build_all() assert len(segments) == 2 @@ -2839,21 +2791,24 @@ def test_distributed_ivf_pq_order_invariance(tmp_path: Path): pytest.skip("Failed to split fragments into two non-empty groups (order_21)") def build_distributed_ivf_pq(ds_copy, shard_order): - shared_uuid = str(uuid.uuid4()) try: - for shard in shard_order: - ds_copy.create_index( - column="vector", - index_type="IVF_PQ", - fragment_ids=shard, - index_uuid=shared_uuid, - num_partitions=4, - num_sub_vectors=16, - ivf_centroids=pre["ivf_centroids"], - pq_codebook=pre["pq_codebook"], - ) - ds_copy.merge_index_metadata(shared_uuid, "IVF_PQ") - return _commit_index_helper(ds_copy, shared_uuid, column="vector") + segments = _build_segments( + ds_copy, + "vector", + "IVF_PQ", + shard_order, + index_name="vector_idx", + num_partitions=4, + num_sub_vectors=16, + ivf_centroids=pre["ivf_centroids"], + pq_codebook=pre["pq_codebook"], + ) + segments = ( + ds_copy.create_index_segment_builder() + .with_segments(segments) + .build_all() + ) + return _commit_segments_helper(ds_copy, segments, column="vector") except ValueError as e: raise e diff --git a/python/src/dataset.rs b/python/src/dataset.rs index 3b76f3ce043..695d8b317c5 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -327,27 +327,34 @@ impl MergeInsertBuilder { #[derive(Clone)] pub struct PyIndexSegmentBuilder { dataset: Arc, - staging_index_uuid: String, - partial_indices: Vec, + segments: Vec, target_segment_bytes: Option, } -#[pymethods] impl PyIndexSegmentBuilder { - #[getter] - fn staging_index_uuid(&self) -> String { - self.staging_index_uuid.clone() + fn builder(&self) -> ::IndexSegmentBuilder<'_> { + let mut builder = self + .dataset + .create_index_segment_builder() + .with_segments(self.segments.clone()); + if let Some(target_segment_bytes) = self.target_segment_bytes { + builder = builder.with_target_segment_bytes(target_segment_bytes); + } + builder } +} - fn with_partial_indices<'a>( +#[pymethods] +impl PyIndexSegmentBuilder { + fn with_segments<'a>( mut slf: PyRefMut<'a, Self>, - partial_indices: &Bound<'_, PyAny>, + segments: &Bound<'_, PyAny>, ) -> PyResult> { let mut indices = Vec::new(); - for item in partial_indices.try_iter()? { + for item in segments.try_iter()? { indices.push(item?.extract::>()?.0); } - slf.partial_indices = indices; + slf.segments = indices; Ok(slf) } @@ -360,14 +367,9 @@ impl PyIndexSegmentBuilder { } fn plan(&self, py: Python<'_>) -> PyResult>> { - let mut builder = self - .dataset - .create_index_segment_builder(self.staging_index_uuid.clone()) - .with_partial_indices(self.partial_indices.clone()); - if let Some(target_segment_bytes) = self.target_segment_bytes { - builder = builder.with_target_segment_bytes(target_segment_bytes); - } - let plans = rt().block_on(Some(py), builder.plan())?.infer_error()?; + let plans = rt() + .block_on(Some(py), self.builder().plan())? + .infer_error()?; plans .into_iter() .map(|plan| Py::new(py, PyIndexSegmentPlan::from_inner(plan))) @@ -376,26 +378,15 @@ impl PyIndexSegmentBuilder { fn build(&self, py: Python<'_>, plan: &Bound<'_, PyAny>) -> PyResult> { let plan = plan.extract::>()?; - let builder = self - .dataset - .create_index_segment_builder(self.staging_index_uuid.clone()) - .with_partial_indices(self.partial_indices.clone()); let segment = rt() - .block_on(Some(py), builder.build(&plan.inner))? + .block_on(Some(py), self.builder().build(&plan.inner))? .infer_error()?; Py::new(py, PyIndexSegment::from_inner(segment)) } fn build_all(&self, py: Python<'_>) -> PyResult>> { - let mut builder = self - .dataset - .create_index_segment_builder(self.staging_index_uuid.clone()) - .with_partial_indices(self.partial_indices.clone()); - if let Some(target_segment_bytes) = self.target_segment_bytes { - builder = builder.with_target_segment_bytes(target_segment_bytes); - } let segments = rt() - .block_on(Some(py), builder.build_all())? + .block_on(Some(py), self.builder().build_all())? .infer_error()?; segments .into_iter() @@ -2100,14 +2091,10 @@ impl Dataset { Ok(PyLance(index_metadata)) } - fn create_index_segment_builder( - &self, - staging_index_uuid: String, - ) -> PyResult { + fn create_index_segment_builder(&self) -> PyResult { Ok(PyIndexSegmentBuilder { dataset: self.ds.clone(), - staging_index_uuid, - partial_indices: Vec::new(), + segments: Vec::new(), target_segment_bytes: None, }) } diff --git a/python/src/indices.rs b/python/src/indices.rs index 9589b78ff36..f1d42918962 100644 --- a/python/src/indices.rs +++ b/python/src/indices.rs @@ -111,24 +111,14 @@ impl PyIndexSegmentPlan { #[pymethods] impl PyIndexSegmentPlan { - #[getter] - fn staging_index_uuid(&self) -> String { - self.inner.staging_index_uuid().to_string() - } - #[getter] fn segment(&self) -> PyIndexSegment { PyIndexSegment::from_inner(self.inner.segment().clone()) } #[getter] - fn partial_indices(&self) -> Vec> { - self.inner - .partial_indices() - .iter() - .cloned() - .map(PyLance) - .collect() + fn segments(&self) -> Vec> { + self.inner.segments().iter().cloned().map(PyLance).collect() } #[getter] @@ -137,9 +127,8 @@ impl PyIndexSegmentPlan { } fn __repr__(&self) -> String { format!( - "IndexSegmentPlan(staging_index_uuid={}, partial_indices={}, estimated_bytes={})", - self.staging_index_uuid(), - self.inner.partial_indices().len(), + "IndexSegmentPlan(segments={}, estimated_bytes={})", + self.inner.segments().len(), self.estimated_bytes() ) } diff --git a/rust/lance-index/src/traits.rs b/rust/lance-index/src/traits.rs index 1c5923e4050..a883c3ce494 100644 --- a/rust/lance-index/src/traits.rs +++ b/rust/lance-index/src/traits.rs @@ -148,20 +148,17 @@ pub trait DatasetIndexExt { params: &'a dyn IndexParams, ) -> Self::IndexBuilder<'a>; - /// Create a builder for building index segments from partial index outputs. + /// Create a builder for building physical index segments from uncommitted + /// vector index outputs. /// - /// The staging UUID identifies a directory containing previously-built shard - /// outputs. The caller supplies the partial index metadata returned by + /// The caller supplies the uncommitted index metadata returned by /// `execute_uncommitted()` so the builder can plan segment grouping without - /// rediscovering shard coverage. + /// rediscovering fragment coverage. /// /// This is the canonical entry point for distributed vector segment build. /// After building the physical segments, publish them as a /// logical index with [`Self::commit_existing_index_segments`]. - fn create_index_segment_builder<'a>( - &'a self, - staging_index_uuid: String, - ) -> Self::IndexSegmentBuilder<'a>; + fn create_index_segment_builder<'a>(&'a self) -> Self::IndexSegmentBuilder<'a>; /// Create indices on columns. /// diff --git a/rust/lance-index/src/types.rs b/rust/lance-index/src/types.rs index 6520878991b..4d653f21f13 100644 --- a/rust/lance-index/src/types.rs +++ b/rust/lance-index/src/types.rs @@ -76,13 +76,12 @@ impl IndexSegment { } } -/// A plan for building one physical segment from one or more vector index -/// partial indices. +/// A plan for building one physical segment from one or more existing +/// vector index segments. #[derive(Debug, Clone, PartialEq)] pub struct IndexSegmentPlan { - staging_index_uuid: Uuid, segment: IndexSegment, - partial_indices: Vec, + segments: Vec, estimated_bytes: u64, requested_index_type: Option, } @@ -90,34 +89,27 @@ pub struct IndexSegmentPlan { impl IndexSegmentPlan { /// Create a plan for one built segment. pub fn new( - staging_index_uuid: Uuid, segment: IndexSegment, - partial_indices: Vec, + segments: Vec, estimated_bytes: u64, requested_index_type: Option, ) -> Self { Self { - staging_index_uuid, segment, - partial_indices, + segments, estimated_bytes, requested_index_type, } } - /// Return the staging index UUID that owns the partial shard outputs. - pub fn staging_index_uuid(&self) -> Uuid { - self.staging_index_uuid - } - /// Return the segment metadata that should be committed after this plan is built. pub fn segment(&self) -> &IndexSegment { &self.segment } - /// Return the uncommitted partial index metadata that should be combined into the segment. - pub fn partial_indices(&self) -> &[IndexMetadata] { - &self.partial_indices + /// Return the input segment metadata that should be combined into the segment. + pub fn segments(&self) -> &[IndexMetadata] { + &self.segments } /// Return the estimated number of bytes covered by this plan. diff --git a/rust/lance-index/src/vector/distributed/index_merger.rs b/rust/lance-index/src/vector/distributed/index_merger.rs index 2edcf3e0707..a122fbe04d9 100755 --- a/rust/lance-index/src/vector/distributed/index_merger.rs +++ b/rust/lance-index/src/vector/distributed/index_merger.rs @@ -602,16 +602,16 @@ async fn read_shard_window_partitions( Ok(per_partition_batches) } -/// Merge the selected partial-shard auxiliary files into `target_dir`. +/// Merge the selected segment auxiliary files into `target_dir`. /// -/// This is the storage merge kernel for vector staged segment build. Callers -/// choose which partial shards belong to one built segment and pass the corresponding -/// auxiliary files here. The merge writes one unified `auxiliary.idx` into -/// `target_dir`. +/// This is the storage merge kernel for vector segment build. Callers choose +/// which segments belong to one built segment and pass the +/// corresponding auxiliary files here. The merge writes one unified +/// `auxiliary.idx` into `target_dir`. /// /// Supports IVF_FLAT, IVF_PQ, IVF_SQ, IVF_HNSW_FLAT, IVF_HNSW_PQ, and -/// IVF_HNSW_SQ storage types. For PQ and SQ, this assumes all selected partial -/// shards share the same quantizer/codebook and distance type; it reuses the +/// IVF_HNSW_SQ storage types. For PQ and SQ, this assumes all selected source +/// segments share the same quantizer/codebook and distance type; it reuses the /// first encountered metadata. pub async fn merge_partial_vector_auxiliary_files( object_store: &lance_io::object_store::ObjectStore, diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index cf7888d97f7..02f4b28e047 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -2684,7 +2684,7 @@ impl Dataset { self.merge_impl(stream, left_on, right_on).await } - /// Merge a staged distributed index into a single root artifact. + /// Merge a distributed scalar index into a single root artifact. pub async fn merge_index_metadata( &self, index_uuid: &str, @@ -2713,61 +2713,13 @@ impl Dataset { ) .await } - // Precise vector index types: IVF_FLAT, IVF_PQ, IVF_SQ IndexType::IvfFlat | IndexType::IvfPq | IndexType::IvfSq | IndexType::Vector => { - let mut partial_indices = self - .object_store() - .read_dir(index_dir.clone()) - .await? - .into_iter() - .filter(|name| name.starts_with("partial_")) - .map(|name| { - name.strip_prefix("partial_") - .ok_or_else(|| { - Error::index(format!( - "Distributed vector shard '{}' does not start with 'partial_'", - name - )) - }) - .and_then(|shard_uuid| { - uuid::Uuid::parse_str(shard_uuid).map_err(|err| { - Error::index(format!( - "Distributed vector shard '{}' does not end with a valid UUID: {}", - name, err - )) - }) - }) - .map(|shard_uuid| IndexMetadata { - uuid: shard_uuid, - name: String::new(), - fields: Vec::new(), - dataset_version: self.manifest.version, - fragment_bitmap: Some(RoaringBitmap::new()), - index_details: None, - index_version: index_type.version(), - created_at: None, - base_id: None, - files: Some(Vec::new()), - }) - }) - .collect::>>()?; - partial_indices.sort_by_key(|index| index.uuid); - let segment_plans = crate::index::vector::ivf::plan_staging_segments( - &index_dir, - &partial_indices, - Some(index_type), - None, - ) - .await?; - let merged_plan = - crate::index::vector::ivf::collapse_segment_plans(&segment_plans)?; - crate::index::vector::ivf::build_staging_segment( - self.object_store(), - &self.indices_dir(), - &merged_plan, - ) - .await - .map(|_| ()) + Err(Error::invalid_input( + "Vector distributed indexing no longer supports merge_index_metadata; \ + build segments, use create_index_segment_builder(), \ + and commit with commit_existing_index_segments(...)" + .to_string(), + )) } _ => Err(Error::invalid_input_source(Box::new(std::io::Error::new( std::io::ErrorKind::InvalidInput, diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index 5ae8ee91b05..28da8f61e1b 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -87,6 +87,57 @@ use crate::session::index_caches::{FragReuseIndexKey, IndexMetadataKey}; use crate::{Error, Result, dataset::Dataset}; pub use create::CreateIndexBuilder; +fn validate_index_segments(index_name: &str, segments: &[IndexSegment]) -> Result<()> { + if segments.is_empty() { + return Err(Error::invalid_input( + "CreateIndex: at least one index segment is required".to_string(), + )); + } + + let mut seen_segment_ids = HashSet::with_capacity(segments.len()); + for segment in segments { + if !seen_segment_ids.insert(segment.uuid()) { + return Err(Error::invalid_input(format!( + "CreateIndex: duplicate segment uuid {} for index '{}'", + segment.uuid(), + index_name + ))); + } + } + + Ok(()) +} + +pub(crate) async fn build_index_metadata_from_segments( + dataset: &Dataset, + index_name: &str, + field_id: i32, + segments: Vec, +) -> Result> { + validate_index_segments(index_name, &segments)?; + + let mut new_indices = Vec::with_capacity(segments.len()); + for segment in segments { + let (uuid, fragment_bitmap, index_details, index_version) = segment.into_parts(); + let index_dir = dataset.indices_dir().child(uuid.to_string()); + let files = list_index_files_with_sizes(&dataset.object_store, &index_dir).await?; + new_indices.push(IndexMetadata { + uuid, + name: index_name.to_string(), + fields: vec![field_id], + dataset_version: dataset.manifest.version, + fragment_bitmap: Some(fragment_bitmap), + index_details: Some(index_details), + index_version, + created_at: Some(chrono::Utc::now()), + base_id: None, + files: Some(files), + }); + } + + Ok(new_indices) +} + // Cache keys for different index types #[derive(Debug, Clone)] pub struct ScalarIndexCacheKey<'a> { @@ -628,11 +679,8 @@ impl DatasetIndexExt for Dataset { CreateIndexBuilder::new(self, columns, index_type, params) } - fn create_index_segment_builder<'a>( - &'a self, - staging_index_uuid: String, - ) -> create::IndexSegmentBuilder<'a> { - create::IndexSegmentBuilder::new(self, staging_index_uuid) + fn create_index_segment_builder<'a>(&'a self) -> create::IndexSegmentBuilder<'a> { + create::IndexSegmentBuilder::new(self) } #[instrument(skip_all)] @@ -800,35 +848,8 @@ impl DatasetIndexExt for Dataset { ))); }; - let mut seen_segment_ids = HashSet::with_capacity(segments.len()); - for segment in &segments { - if !seen_segment_ids.insert(segment.uuid()) { - return Err(Error::invalid_input(format!( - "CreateIndex: duplicate segment uuid {} for index '{}'", - segment.uuid(), - index_name - ))); - } - } - - let new_indices = segments - .into_iter() - .map(|segment| { - let (uuid, fragment_bitmap, index_details, index_version) = segment.into_parts(); - IndexMetadata { - uuid, - name: index_name.to_string(), - fields: vec![field.id], - dataset_version: self.manifest.version, - fragment_bitmap: Some(fragment_bitmap), - index_details: Some(index_details), - index_version, - created_at: Some(chrono::Utc::now()), - base_id: None, - files: None, // File info will be populated when index is created - } - }) - .collect(); + let new_indices = + build_index_metadata_from_segments(self, index_name, field.id, segments).await?; let transaction = Transaction::new( self.manifest.version, @@ -5225,6 +5246,24 @@ mod tests { Arc::new(vector_index_details()), IndexType::Vector.version(), ); + let seg0_path = dataset + .indices_dir() + .child(seg0.uuid().to_string()) + .child(INDEX_FILE_NAME); + let seg1_path = dataset + .indices_dir() + .child(seg1.uuid().to_string()) + .child(INDEX_FILE_NAME); + dataset + .object_store() + .put(&seg0_path, b"seg0") + .await + .unwrap(); + dataset + .object_store() + .put(&seg1_path, b"seg1") + .await + .unwrap(); dataset .commit_existing_index_segments( @@ -5256,6 +5295,12 @@ mod tests { HashSet::from([vec![0], vec![1]]), "each committed segment should preserve its fragment coverage" ); + assert!( + committed + .iter() + .all(|idx| idx.files.as_ref().is_some_and(|files| !files.is_empty())), + "committed segment metadata should capture on-disk file info" + ); } #[tokio::test] diff --git a/rust/lance/src/index/create.rs b/rust/lance/src/index/create.rs index 41c44994e69..a394f52258e 100644 --- a/rust/lance/src/index/create.rs +++ b/rust/lance/src/index/create.rs @@ -8,7 +8,7 @@ use crate::{ transaction::{Operation, TransactionBuilder}, }, index::{ - DatasetIndexExt, DatasetIndexInternalExt, + DatasetIndexExt, DatasetIndexInternalExt, build_index_metadata_from_segments, scalar::build_scalar_index, vector::{ LANCE_VECTOR_INDEX, VectorIndexParams, build_distributed_vector_index, @@ -337,7 +337,7 @@ impl<'a> CreateIndexBuilder<'a> { if let Some(fragments) = &self.fragments { // For distributed indexing, build only on specified fragments // This creates temporary index metadata without committing - let shard_uuid = Box::pin(build_distributed_vector_index( + let segment_uuid = Box::pin(build_distributed_vector_index( self.dataset, column, &index_name, @@ -348,7 +348,7 @@ impl<'a> CreateIndexBuilder<'a> { self.progress.clone(), )) .await?; - output_index_uuid = shard_uuid; + output_index_uuid = segment_uuid; } else { // Standard full dataset indexing Box::pin(build_vector_index( @@ -374,14 +374,10 @@ impl<'a> CreateIndexBuilder<'a> { .await?; } // Capture file sizes after vector index creation - let index_dir = if self.fragments.is_some() { - self.dataset - .indices_dir() - .child(index_id.to_string()) - .child(format!("partial_{}", output_index_uuid)) - } else { - self.dataset.indices_dir().child(index_id.to_string()) - }; + let index_dir = self + .dataset + .indices_dir() + .child(output_index_uuid.to_string()); let files = list_index_files_with_sizes(&self.dataset.object_store, &index_dir).await?; CreatedIndex { @@ -478,15 +474,42 @@ impl<'a> CreateIndexBuilder<'a> { } else { vec![] }; - let transaction = TransactionBuilder::new( - new_idx.dataset_version, - Operation::CreateIndex { - new_indices: vec![new_idx], - removed_indices, - }, - ) - .transaction_properties(self.transaction_properties.clone()) - .build(); + let transaction = if uses_segment_commit_path(self.index_type) { + let field_id = *new_idx.fields.first().ok_or_else(|| { + Error::internal(format!( + "Index '{}' is missing field ids after build", + new_idx.name + )) + })?; + let segments = self + .dataset + .create_index_segment_builder() + .with_segments(vec![new_idx.clone()]) + .build_all() + .await?; + let new_indices = + build_index_metadata_from_segments(self.dataset, &new_idx.name, field_id, segments) + .await?; + TransactionBuilder::new( + new_idx.dataset_version, + Operation::CreateIndex { + new_indices, + removed_indices, + }, + ) + .transaction_properties(self.transaction_properties.clone()) + .build() + } else { + TransactionBuilder::new( + new_idx.dataset_version, + Operation::CreateIndex { + new_indices: vec![new_idx], + removed_indices, + }, + ) + .transaction_properties(self.transaction_properties.clone()) + .build() + }; self.dataset .apply_commit(transaction, &Default::default(), &Default::default()) @@ -508,6 +531,20 @@ impl<'a> CreateIndexBuilder<'a> { } } +fn uses_segment_commit_path(index_type: IndexType) -> bool { + matches!( + index_type, + IndexType::Vector + | IndexType::IvfPq + | IndexType::IvfSq + | IndexType::IvfFlat + | IndexType::IvfRq + | IndexType::IvfHnswFlat + | IndexType::IvfHnswPq + | IndexType::IvfHnswSq + ) +} + impl<'a> IntoFuture for CreateIndexBuilder<'a> { type Output = Result; type IntoFuture = BoxFuture<'a, Result>; @@ -517,10 +554,9 @@ impl<'a> IntoFuture for CreateIndexBuilder<'a> { } } -/// Build physical index segments from previously-written partial index outputs. +/// Build physical index segments from previously-written vector segment outputs. /// -/// Use [`DatasetIndexExt::create_index_segment_builder`] to open a staging root -/// and then either: +/// Use [`DatasetIndexExt::create_index_segment_builder`] and then either: /// /// - call [`Self::plan`] and orchestrate individual segment builds externally, or /// - call [`Self::build_all`] to build all segments on the current node. @@ -531,63 +567,55 @@ impl<'a> IntoFuture for CreateIndexBuilder<'a> { #[derive(Clone)] pub struct IndexSegmentBuilder<'a> { dataset: &'a Dataset, - staging_index_uuid: String, - partial_indices: Vec, + segments: Vec, target_segment_bytes: Option, } impl<'a> IndexSegmentBuilder<'a> { - pub(crate) fn new(dataset: &'a Dataset, staging_index_uuid: String) -> Self { + pub(crate) fn new(dataset: &'a Dataset) -> Self { Self { dataset, - staging_index_uuid, - partial_indices: Vec::new(), + segments: Vec::new(), target_segment_bytes: None, } } - /// Provide the partial index metadata returned by `execute_uncommitted()` - /// for this staging root. - pub fn with_partial_indices(mut self, partial_indices: Vec) -> Self { - self.partial_indices = partial_indices; + /// Provide the segment metadata returned by `execute_uncommitted()`. + /// + /// These segments must already exist in storage and must not have been + /// published into a logical index yet. + pub fn with_segments(mut self, segments: Vec) -> Self { + self.segments = segments; self } - /// Set the target size, in bytes, for merged built segments. + /// Set the target size, in bytes, for merged physical segments. /// - /// When set, shard outputs will be grouped into larger built segments up to - /// approximately this size. When unset, each shard output becomes one built - /// segment. + /// When set, input segments will be grouped into larger physical segments + /// up to approximately this size. When unset, each input segment becomes + /// one physical segment. pub fn with_target_segment_bytes(mut self, bytes: u64) -> Self { self.target_segment_bytes = Some(bytes); self } - /// Plan how partial indices should be grouped into built segments. + /// Plan how input segments should be grouped into physical segments. pub async fn plan(&self) -> Result> { - if self.partial_indices.is_empty() { + if self.segments.is_empty() { return Err(Error::invalid_input( - "IndexSegmentBuilder requires at least one partial index; \ - call with_partial_indices(...) with execute_uncommitted() outputs" + "IndexSegmentBuilder requires at least one segment; \ + call with_segments(...) with execute_uncommitted() outputs" .to_string(), )); } - crate::index::vector::ivf::plan_staging_segments( - &self - .dataset - .indices_dir() - .child(self.staging_index_uuid.as_str()), - &self.partial_indices, - None, - self.target_segment_bytes, - ) - .await + crate::index::vector::ivf::plan_segments(&self.segments, None, self.target_segment_bytes) + .await } /// Build one segment from a previously-generated plan. pub async fn build(&self, plan: &IndexSegmentPlan) -> Result { - crate::index::vector::ivf::build_staging_segment( + crate::index::vector::ivf::build_segment( self.dataset.object_store(), &self.dataset.indices_dir(), plan, @@ -595,7 +623,7 @@ impl<'a> IndexSegmentBuilder<'a> { .await } - /// Plan and build all segments from this staging root. + /// Plan and build all segments from the provided inputs. pub async fn build_all(&self) -> Result> { let plans = self.plan().await?; try_join_all(plans.iter().map(|plan| self.build(plan))).await @@ -1071,7 +1099,7 @@ mod tests { } #[tokio::test] - async fn test_merge_index_metadata_vector_preserves_shared_uuid_commit_workflow() { + async fn test_vector_execute_uncommitted_segments_commit_without_staging() { let tmpdir = TempStrDir::default(); let dataset_uri = format!("file://{}", tmpdir.as_str()); @@ -1099,62 +1127,54 @@ mod tests { let fragments = dataset.get_fragments(); assert!(fragments.len() >= 2); - let shared_uuid = Uuid::new_v4(); let params = VectorIndexParams::with_ivf_flat_params( DistanceType::L2, prepare_vector_ivf(&dataset, "vector").await, ); + let mut input_segments = Vec::new(); for fragment in &fragments { - let mut builder = + let segment = CreateIndexBuilder::new(&mut dataset, &["vector"], IndexType::Vector, ¶ms) .name("vector_idx".to_string()) .fragments(vec![fragment.id() as u32]) - .index_uuid(shared_uuid.to_string()); - builder.execute_uncommitted().await.unwrap(); + .execute_uncommitted() + .await + .unwrap(); + let segment_index = dataset + .indices_dir() + .child(segment.uuid.to_string()) + .child(crate::index::INDEX_FILE_NAME); + assert!(dataset.object_store().exists(&segment_index).await.unwrap()); + input_segments.push(segment); } - dataset - .merge_index_metadata(&shared_uuid.to_string(), IndexType::IvfFlat, None) + let segments = dataset + .create_index_segment_builder() + .with_segments(input_segments.clone()) + .build_all() .await .unwrap(); - - let merged_root = dataset - .indices_dir() - .child(shared_uuid.to_string()) - .child(crate::index::INDEX_FILE_NAME); - assert!(dataset.object_store().exists(&merged_root).await.unwrap()); + assert_eq!(segments.len(), fragments.len()); + let mut built_segment_ids = segments + .iter() + .map(|segment| segment.uuid()) + .collect::>(); + built_segment_ids.sort(); + let mut input_segment_ids = input_segments + .iter() + .map(|segment| segment.uuid) + .collect::>(); + input_segment_ids.sort(); + assert_eq!(built_segment_ids, input_segment_ids); dataset - .commit_existing_index_segments( - "vector_idx", - "vector", - vec![IndexSegment::new( - shared_uuid, - fragments.iter().map(|fragment| fragment.id() as u32), - Arc::new(vector_index_details()), - IndexType::IvfFlat.version(), - )], - ) + .commit_existing_index_segments("vector_idx", "vector", segments) .await .unwrap(); let indices = dataset.load_indices_by_name("vector_idx").await.unwrap(); - assert_eq!(indices.len(), 1); - assert_eq!(indices[0].uuid, shared_uuid); - let committed_fragments: Vec = indices[0] - .fragment_bitmap - .as_ref() - .unwrap() - .iter() - .collect(); - assert_eq!( - committed_fragments, - fragments - .iter() - .map(|fragment| fragment.id() as u32) - .collect::>() - ); + assert_eq!(indices.len(), fragments.len()); let query_batch = dataset .scan() @@ -1207,28 +1227,26 @@ mod tests { let fragments = dataset.get_fragments(); assert!(fragments.len() >= 2); - let shared_uuid = Uuid::new_v4(); let params = VectorIndexParams::with_ivf_flat_params( DistanceType::L2, prepare_vector_ivf(&dataset, "vector").await, ); - let mut partial_indices = Vec::new(); + let mut input_segments = Vec::new(); for fragment in fragments.iter().take(2) { - let index_metadata = + let segment = CreateIndexBuilder::new(&mut dataset, &["vector"], IndexType::Vector, ¶ms) .name("vector_idx".to_string()) .fragments(vec![fragment.id() as u32]) - .index_uuid(shared_uuid.to_string()) .execute_uncommitted() .await .unwrap(); - partial_indices.push(index_metadata); + input_segments.push(segment); } let segments = dataset - .create_index_segment_builder(shared_uuid.to_string()) - .with_partial_indices(partial_indices) + .create_index_segment_builder() + .with_segments(input_segments) .build_all() .await .unwrap(); @@ -1341,6 +1359,84 @@ mod tests { ); } + #[tokio::test] + async fn test_create_index_vector_commits_with_segment_metadata() { + let tmpdir = TempStrDir::default(); + let dataset_uri = format!("file://{}", tmpdir.as_str()); + + let reader = gen_batch() + .col("id", lance_datagen::array::step::()) + .col( + "vector", + lance_datagen::array::rand_vec::(lance_datagen::Dimension::from(16)), + ) + .into_reader_rows( + lance_datagen::RowCount::from(128), + lance_datagen::BatchCount::from(2), + ); + let mut dataset = Dataset::write(reader, &dataset_uri, None).await.unwrap(); + + let params = VectorIndexParams::with_ivf_flat_params( + DistanceType::L2, + prepare_vector_ivf(&dataset, "vector").await, + ); + + let committed = dataset + .create_index(&["vector"], IndexType::Vector, None, ¶ms, false) + .await + .unwrap(); + + assert!( + committed + .files + .as_ref() + .is_some_and(|files| !files.is_empty()), + "single-machine vector create_index should preserve committed file info" + ); + + let loaded = dataset.load_indices_by_name(&committed.name).await.unwrap(); + assert_eq!(loaded.len(), 1); + assert_eq!(loaded[0].uuid, committed.uuid); + assert!( + loaded[0] + .files + .as_ref() + .is_some_and(|files| !files.is_empty()), + "committed metadata loaded from the manifest should include file info" + ); + } + + #[tokio::test] + async fn test_create_index_ivf_rq_preserves_index_version_on_segment_commit_path() { + let tmpdir = TempStrDir::default(); + let dataset_uri = format!("file://{}", tmpdir.as_str()); + + let reader = gen_batch() + .col("id", lance_datagen::array::step::()) + .col( + "vector", + lance_datagen::array::rand_vec::(lance_datagen::Dimension::from(16)), + ) + .into_reader_rows( + lance_datagen::RowCount::from(128), + lance_datagen::BatchCount::from(2), + ); + let mut dataset = Dataset::write(reader, &dataset_uri, None).await.unwrap(); + + let params = VectorIndexParams::ivf_rq(4, 1, DistanceType::L2); + + let committed = dataset + .create_index(&["vector"], IndexType::IvfRq, None, ¶ms, false) + .await + .unwrap(); + + assert_eq!(committed.index_version, IndexType::IvfRq.version()); + + let loaded = dataset.load_indices_by_name(&committed.name).await.unwrap(); + assert_eq!(loaded.len(), 1); + assert_eq!(loaded[0].index_version, IndexType::IvfRq.version()); + } + #[tokio::test] async fn test_optimize_should_not_removes_delta_indices() { let tmpdir = TempStrDir::default(); diff --git a/rust/lance/src/index/vector.rs b/rust/lance/src/index/vector.rs index c0c957a7fcf..a19f472a9bf 100644 --- a/rust/lance/src/index/vector.rs +++ b/rust/lance/src/index/vector.rs @@ -431,9 +431,9 @@ pub(crate) async fn build_distributed_vector_index( let filtered_dataset = dataset.clone(); - let out_base = dataset.indices_dir().child(uuid); - let shard_uuid = Uuid::new_v4(); - let index_dir = out_base.child(format!("partial_{}", shard_uuid)); + let segment_uuid = Uuid::parse_str(uuid) + .map_err(|err| Error::invalid_input(format!("Invalid index UUID '{uuid}': {err}")))?; + let index_dir = dataset.indices_dir().child(segment_uuid.to_string()); let fragment_filter = fragment_ids.to_vec(); @@ -700,7 +700,7 @@ pub(crate) async fn build_distributed_vector_index( } }; - Ok(shard_uuid) + Ok(segment_uuid) } /// Build a Vector Index diff --git a/rust/lance/src/index/vector/ivf.rs b/rust/lance/src/index/vector/ivf.rs index b693de2e0a0..8e47b77cdfd 100644 --- a/rust/lance/src/index/vector/ivf.rs +++ b/rust/lance/src/index/vector/ivf.rs @@ -1862,48 +1862,41 @@ async fn write_ivf_hnsw_file( /// Distributed vector segment build uses three storage-level concepts: /// -/// - A **staging root** is the shared UUID directory used during distributed -/// shard build. Each worker writes one `partial_/` directory under this -/// root by calling `execute_uncommitted()` with the same `index_uuid`. -/// - A **partial shard** is one such worker output. The caller provides the -/// `IndexMetadata` returned by `execute_uncommitted()` so the planner knows -/// shard UUIDs, fragment coverage, and approximate shard sizes. -/// - A **built segment** is a physical index segment that can be committed into -/// the manifest with `commit_existing_index_segments(...)`. +/// - A **segment** is a worker output written by `execute_uncommitted()`. It +/// already lives at its final storage path under `indices//`, +/// but it is not yet published in the manifest. +/// - A **physical segment** is an `IndexSegment` that can be committed into the +/// manifest with `commit_existing_index_segments(...)`. +/// - A **logical index** is the user-visible index identified by name; it may +/// contain one or more physical segments. /// -/// The staged segment-build path is therefore: +/// The segment-build path is therefore: /// -/// 1. workers build `partial_*` shards under one staging root -/// 2. the caller groups those shards into one or more built segments -/// 3. each segment is built from its selected shards +/// 1. workers build segments +/// 2. the caller groups those segments into one or more physical segments +/// 3. each grouped segment is built from its selected inputs /// 4. the resulting physical segments are committed as one logical index /// -/// A single merge work item produced from one staging root. -/// /// Each plan says: -/// - which staging root it belongs to -/// - which partial shards should be consumed together -/// - what the built segment metadata should look like +/// - which source segments should be consumed together +/// - what the physical segment metadata should look like /// -/// The planner returns a `Vec` so callers can decide -/// whether to execute the work serially or fan it out externally. -/// Plan how one staging root should be turned into built physical segments. +/// The planner returns a `Vec` so callers can decide whether +/// to execute the work serially or fan it out externally. /// /// This function does not touch storage. It only: -/// - validates that the caller-supplied shard contract is self-consistent -/// - enforces that shard fragment coverage is disjoint -/// - groups shards into built segments according to `target_segment_bytes` +/// - validates that the caller-supplied segment contract is self-consistent +/// - enforces that source fragment coverage is disjoint +/// - groups source segments into physical segments according to +/// `target_segment_bytes` /// /// The grouping rule is intentionally simple: -/// - `target_segment_bytes = None`: keep the shard boundary, so each shard becomes one segment -/// - `target_segment_bytes = Some(limit)`: greedily pack consecutive shards until the next shard -/// would exceed `limit` -/// -/// Callers that want one built segment for the entire staging root should pass a -/// sufficiently large `target_segment_bytes`. -pub(crate) async fn plan_staging_segments( - index_dir: &Path, - partial_indices: &[TableIndexMetadata], +/// - `target_segment_bytes = None`: keep the existing segment boundary, so each +/// input segment becomes one physical segment +/// - `target_segment_bytes = Some(limit)`: greedily pack consecutive source +/// segments until the next source would exceed `limit` +pub(crate) async fn plan_segments( + segments: &[TableIndexMetadata], requested_index_type: Option, target_segment_bytes: Option, ) -> Result> { @@ -1913,6 +1906,7 @@ pub(crate) async fn plan_staging_segments( IndexType::IvfFlat | IndexType::IvfPq | IndexType::IvfSq + | IndexType::IvfRq | IndexType::IvfHnswFlat | IndexType::IvfHnswPq | IndexType::IvfHnswSq @@ -1931,43 +1925,28 @@ pub(crate) async fn plan_staging_segments( )); } - if partial_indices.is_empty() { - return Err(Error::index(format!( - "No partial index metadata was provided for '{}'", - index_dir - ))); + if segments.is_empty() { + return Err(Error::index("No segment metadata was provided".to_string())); } - let mut sorted_partial_indices = partial_indices.to_vec(); - sorted_partial_indices.sort_by_key(|index| index.uuid); - let mut expected_shard_ids = HashSet::with_capacity(sorted_partial_indices.len()); - for partial_index in &sorted_partial_indices { - if !expected_shard_ids.insert(partial_index.uuid) { + let mut sorted_segments = segments.to_vec(); + sorted_segments.sort_by_key(|index| index.uuid); + let mut expected_segment_ids = HashSet::with_capacity(sorted_segments.len()); + for segment in &sorted_segments { + if !expected_segment_ids.insert(segment.uuid) { return Err(Error::index(format!( - "Distributed vector partial shard '{}' was provided more than once", - partial_index.uuid + "Distributed vector segment '{}' was provided more than once", + segment.uuid ))); } } - let staging_index_uuid = index_dir - .filename() - .ok_or_else(|| Error::index(format!("Index directory '{}' has no filename", index_dir))) - .and_then(|name| { - Uuid::parse_str(name).map_err(|err| { - Error::index(format!( - "Index directory '{}' does not end with a valid UUID: {}", - index_dir, err - )) - }) - })?; - let mut covered_fragments = RoaringBitmap::new(); - for partial_index in &sorted_partial_indices { - let fragment_bitmap = partial_index.fragment_bitmap.as_ref().ok_or_else(|| { + for segment in &sorted_segments { + let fragment_bitmap = segment.fragment_bitmap.as_ref().ok_or_else(|| { Error::index(format!( - "Partial index '{}' is missing fragment coverage", - partial_index.uuid + "Segment '{}' is missing fragment coverage", + segment.uuid )) })?; if covered_fragments.intersection_len(fragment_bitmap) > 0 { @@ -1979,15 +1958,9 @@ pub(crate) async fn plan_staging_segments( } if target_segment_bytes.is_none() { - return sorted_partial_indices + return sorted_segments .into_iter() - .map(|partial_index| { - build_segment_plan( - staging_index_uuid, - vec![partial_index], - requested_index_type, - ) - }) + .map(|segment| build_segment_plan(vec![segment], requested_index_type)) .collect(); } @@ -1996,28 +1969,23 @@ pub(crate) async fn plan_staging_segments( let mut current_group = Vec::new(); let mut current_bytes = 0_u64; - for partial_index in sorted_partial_indices { - let partial_bytes = estimate_partial_index_bytes(&partial_index); + for segment in sorted_segments { + let source_bytes = estimate_source_index_bytes(&segment); if !current_group.is_empty() - && current_bytes.saturating_add(partial_bytes) > target_segment_bytes + && current_bytes.saturating_add(source_bytes) > target_segment_bytes { plans.push(build_segment_plan( - staging_index_uuid, std::mem::take(&mut current_group), requested_index_type, )?); current_bytes = 0; } - current_bytes = current_bytes.saturating_add(partial_bytes); - current_group.push(partial_index); + current_bytes = current_bytes.saturating_add(source_bytes); + current_group.push(segment); } if !current_group.is_empty() { - plans.push(build_segment_plan( - staging_index_uuid, - current_group, - requested_index_type, - )?); + plans.push(build_segment_plan(current_group, requested_index_type)?); } Ok(plans) @@ -2025,96 +1993,63 @@ pub(crate) async fn plan_staging_segments( /// Build one planned segment into its output directory. /// -/// Most plans write directly to `indices//`. If the target -/// directory is also the staging root, we first write into a temporary -/// directory and then swap the final files back into place. -/// -/// This is similar in shape to a compaction step: several temporary shard -/// outputs are consumed and replaced by a new built physical segment. The -/// difference is that this operates on index shard outputs instead of data -/// fragments. -pub(crate) async fn build_staging_segment( +/// Single-source plans are already materialized and return immediately. For +/// multi-source plans, this function writes a new merged physical segment under +/// `indices//`. +pub(crate) async fn build_segment( object_store: &ObjectStore, indices_dir: &Path, segment_plan: &IndexSegmentPlan, ) -> Result { let built_segment = segment_plan.segment().clone(); - let final_dir = indices_dir.child(built_segment.uuid().to_string()); - let staging_dir = indices_dir.child(segment_plan.staging_index_uuid().to_string()); - if final_dir == staging_dir { - let temp_dir = indices_dir.child(Uuid::new_v4().to_string()); - build_staging_segment_to_dir(object_store, indices_dir, &temp_dir, segment_plan, false) - .await?; - - // Re-materializing back into the staging root is not atomic: we delete - // and rewrite the root files one by one because object stores do not - // offer a directory rename primitive. We only clean up the source - // `partial_*` shards after these copies succeed, so a crash here can - // leave a partial root artifact but should not destroy the source data. - for file_name in [INDEX_FILE_NAME, INDEX_AUXILIARY_FILE_NAME] { - let target_file = final_dir.child(file_name); - // ObjectStore::copy is additive. Remove any previous root artifact first so - // re-materialization back into the staging root does not leave stale files - // behind. - if object_store.exists(&target_file).await? { - object_store.delete(&target_file).await?; - } - let source_file = temp_dir.child(file_name); - if object_store.exists(&source_file).await? { - object_store.copy(&source_file, &target_file).await?; - } - } + let segments = segment_plan.segments(); + debug_assert!( + !segments.is_empty(), + "segment plans must have at least one source segment" + ); - cleanup_consumed_partial_shards(object_store, indices_dir, segment_plan).await?; - reset_final_segment_dir(object_store, &temp_dir).await?; - } else { - build_staging_segment_to_dir(object_store, indices_dir, &final_dir, segment_plan, true) - .await?; + if segments.len() == 1 && segments[0].uuid == built_segment.uuid() { + return Ok(built_segment); } + let final_dir = indices_dir.child(built_segment.uuid().to_string()); + merge_segments_to_dir(object_store, indices_dir, &final_dir, segment_plan).await?; + Ok(built_segment) } -/// Write one built segment into `final_dir`. +/// Merge the selected input segments into `final_dir`. /// -/// For a single-shard plan this is just a file copy. For a multi-shard plan we -/// read the selected `partial_*` shards directly from the staging root and -/// write the merged auxiliary/index files into `final_dir`. -async fn build_staging_segment_to_dir( +/// Callers must only invoke this helper for multi-source plans. It reads the +/// selected input segments directly from `indices//` and writes +/// the merged auxiliary/index files into `final_dir`. +async fn merge_segments_to_dir( object_store: &ObjectStore, indices_dir: &Path, final_dir: &Path, segment_plan: &IndexSegmentPlan, - cleanup_source_shards: bool, ) -> Result<()> { reset_final_segment_dir(object_store, final_dir).await?; - let partial_indices = segment_plan.partial_indices(); - if partial_indices.len() == 1 { - let source_dir = indices_dir - .child(segment_plan.staging_index_uuid().to_string()) - .child(format!("partial_{}", partial_indices[0].uuid)); - copy_partial_segment_contents(object_store, &source_dir, final_dir).await?; - if cleanup_source_shards { - cleanup_consumed_partial_shards(object_store, indices_dir, segment_plan).await?; - } - return Ok(()); - } + let segments = segment_plan.segments(); + debug_assert!( + segments.len() > 1, + "merge helper should only be used for multi-source plans" + ); - let staging_root = indices_dir.child(segment_plan.staging_index_uuid().to_string()); - let aux_paths = partial_indices + let aux_paths = segments .iter() - .map(|partial_index| { - staging_root - .child(format!("partial_{}", partial_index.uuid)) + .map(|segment| { + indices_dir + .child(segment.uuid.to_string()) .child(INDEX_AUXILIARY_FILE_NAME) }) .collect::>(); - let partial_index_paths = partial_indices + let source_index_paths = segments .iter() - .map(|partial_index| { - staging_root - .child(format!("partial_{}", partial_index.uuid)) + .map(|segment| { + indices_dir + .child(segment.uuid.to_string()) .child(INDEX_FILE_NAME) }) .collect::>(); @@ -2129,19 +2064,15 @@ async fn build_staging_segment_to_dir( object_store, final_dir, segment_plan.requested_index_type(), - &partial_index_paths, + &source_index_paths, ) .await?; - if cleanup_source_shards { - cleanup_consumed_partial_shards(object_store, indices_dir, segment_plan).await?; - } Ok(()) } -/// Collapse one group of staging shards into a single built-segment plan. +/// Collapse one group of source segments into a single physical-segment plan. fn build_segment_plan( - staging_index_uuid: Uuid, group: Vec, requested_index_type: Option, ) -> Result { @@ -2149,100 +2080,56 @@ fn build_segment_plan( let first = &group[0]; let mut fragment_bitmap = RoaringBitmap::new(); let mut estimated_bytes = 0_u64; - let mut partial_indices = Vec::with_capacity(group.len()); + let mut segments = Vec::with_capacity(group.len()); - for partial_index in &group { - let partial_fragment_bitmap = partial_index.fragment_bitmap.as_ref().ok_or_else(|| { + for segment in &group { + let source_fragment_bitmap = segment.fragment_bitmap.as_ref().ok_or_else(|| { Error::index(format!( - "Partial index '{}' is missing fragment coverage", - partial_index.uuid + "Segment '{}' is missing fragment coverage", + segment.uuid )) })?; - fragment_bitmap |= partial_fragment_bitmap.clone(); - estimated_bytes = - estimated_bytes.saturating_add(estimate_partial_index_bytes(partial_index)); - partial_indices.push(partial_index.clone()); + fragment_bitmap |= source_fragment_bitmap.clone(); + estimated_bytes = estimated_bytes.saturating_add(estimate_source_index_bytes(segment)); + segments.push(segment.clone()); } - let final_uuid = if group.len() == 1 { + let segment_uuid = if group.len() == 1 { first.uuid } else { Uuid::new_v4() }; - let index_type = requested_index_type.unwrap_or(IndexType::Vector); + let index_version = match requested_index_type { + Some(index_type) => index_type.version(), + None => infer_source_index_version(&group)?, + }; let segment = IndexSegment::new( - final_uuid, + segment_uuid, fragment_bitmap, Arc::new(crate::index::vector_index_details()), - index_type.version(), + index_version, ); Ok(IndexSegmentPlan::new( - staging_index_uuid, segment, - partial_indices, + segments, estimated_bytes, requested_index_type, )) } -/// Collapse an entire staging root into one built-segment plan. -/// -/// Some callers want one final output for the entire staging root instead of -/// one output per planned group. This helper reduces an existing set of plans -/// into a single plan covering the same shard set. -pub(crate) fn collapse_segment_plans( - segment_plans: &[IndexSegmentPlan], -) -> Result { - let Some(first_plan) = segment_plans.first() else { +fn infer_source_index_version(group: &[TableIndexMetadata]) -> Result { + debug_assert!(!group.is_empty()); + let first = group[0].index_version; + if group.iter().any(|segment| segment.index_version != first) { return Err(Error::index( - "Distributed vector segment build plan contains no segment plans".to_string(), + "Distributed vector segments must all have the same index version".to_string(), )); - }; - - let mut fragment_bitmap = RoaringBitmap::new(); - let mut partial_indices = Vec::new(); - let mut estimated_bytes = 0_u64; - - for plan in segment_plans { - fragment_bitmap |= plan.segment().fragment_bitmap().clone(); - partial_indices.extend_from_slice(plan.partial_indices()); - estimated_bytes = estimated_bytes.saturating_add(plan.estimated_bytes()); - } - - let staging_index_uuid = first_plan.staging_index_uuid(); - let segment = IndexSegment::new( - staging_index_uuid, - fragment_bitmap, - first_plan.segment().index_details().clone(), - first_plan.segment().index_version(), - ); - - Ok(IndexSegmentPlan::new( - staging_index_uuid, - segment, - partial_indices, - estimated_bytes, - first_plan.requested_index_type(), - )) -} - -/// Remove the source `partial_*` directories consumed by one segment plan. -async fn cleanup_consumed_partial_shards( - object_store: &ObjectStore, - indices_dir: &Path, - segment_plan: &IndexSegmentPlan, -) -> Result<()> { - for partial_index in segment_plan.partial_indices() { - let source_dir = indices_dir - .child(segment_plan.staging_index_uuid().to_string()) - .child(format!("partial_{}", partial_index.uuid)); - reset_final_segment_dir(object_store, &source_dir).await?; } - Ok(()) + Ok(first) } -fn estimate_partial_index_bytes(index_metadata: &TableIndexMetadata) -> u64 { +fn estimate_source_index_bytes(index_metadata: &TableIndexMetadata) -> u64 { index_metadata .files .as_ref() @@ -2250,31 +2137,6 @@ fn estimate_partial_index_bytes(index_metadata: &TableIndexMetadata) -> u64 { .unwrap_or(0) } -/// Copy all files that belong to one partial shard into a new directory. -async fn copy_partial_segment_contents( - object_store: &ObjectStore, - source_dir: &Path, - target_dir: &Path, -) -> Result<()> { - let mut files = object_store.list(Some(source_dir.clone())); - while let Some(item) = files.next().await { - let meta = item?; - let Some(relative_parts) = meta.location.prefix_match(source_dir) else { - continue; - }; - let relative_parts = relative_parts.collect::>(); - if relative_parts.is_empty() { - continue; - } - let mut final_path = target_dir.clone(); - for part in relative_parts { - final_path = final_path.child(part.as_ref()); - } - object_store.copy(&meta.location, &final_path).await?; - } - Ok(()) -} - /// Best-effort reset of one target directory before rewriting it. async fn reset_final_segment_dir(object_store: &ObjectStore, final_dir: &Path) -> Result<()> { match object_store.remove_dir_all(final_dir.clone()).await { diff --git a/rust/lance/src/index/vector/ivf/v2.rs b/rust/lance/src/index/vector/ivf/v2.rs index 13faa22e8d5..92b0549f43b 100644 --- a/rust/lance/src/index/vector/ivf/v2.rs +++ b/rust/lance/src/index/vector/ivf/v2.rs @@ -618,7 +618,6 @@ mod tests { }; use arrow_buffer::OffsetBuffer; use arrow_schema::{DataType, Field, Schema, SchemaRef}; - use futures::StreamExt; use itertools::Itertools; use lance_arrow::FixedSizeListArrayExt; use lance_index::vector::bq::{ @@ -629,7 +628,7 @@ mod tests { use crate::dataset::{InsertBuilder, UpdateBuilder, WriteMode, WriteParams}; use crate::index::DatasetIndexInternalExt; use crate::index::vector::ivf::v2::IvfPq; - use crate::index::vector::ivf::{build_staging_segment, plan_staging_segments}; + use crate::index::vector::ivf::{build_segment, plan_segments}; use crate::utils::test::copy_test_data_to_tmp; use crate::{ Dataset, @@ -671,7 +670,6 @@ mod tests { use rand::distr::uniform::SampleUniform; use rand::{Rng, SeedableRng, rngs::StdRng}; use rstest::rstest; - use uuid::Uuid; const NUM_ROWS: usize = 512; const DIM: usize = 32; @@ -1476,25 +1474,21 @@ mod tests { ivf_params } - async fn build_distributed_partial_index_for_fragment_groups( + async fn build_segments_for_fragment_groups( dataset: &mut Dataset, fragment_groups: Vec>, // each group is a set of fragment ids params: &VectorIndexParams, index_name: &str, - ) -> (Uuid, Vec) { - let shared_uuid = Uuid::new_v4(); - let mut partial_indices = Vec::new(); + ) -> Vec { + let mut segments = Vec::new(); for fragments in fragment_groups { let mut builder = dataset.create_index_builder(&["vector"], IndexType::Vector, params); - builder = builder - .name(index_name.to_string()) - .fragments(fragments) - .index_uuid(shared_uuid.to_string()); - partial_indices.push(builder.execute_uncommitted().await.unwrap()); + builder = builder.name(index_name.to_string()).fragments(fragments); + segments.push(builder.execute_uncommitted().await.unwrap()); } - (shared_uuid, partial_indices) + segments } async fn build_ivfpq_for_fragment_groups( @@ -1504,50 +1498,23 @@ mod tests { pq_params: &PQBuildParams, index_name: &str, ) { - let shared_uuid = Uuid::new_v4(); let params = VectorIndexParams::with_ivf_pq_params( DistanceType::L2, ivf_params.clone(), pq_params.clone(), ); - for fragments in fragment_groups { - let mut builder = dataset.create_index_builder(&["vector"], IndexType::Vector, ¶ms); - builder = builder - .name(index_name.to_string()) - .fragments(fragments) - .index_uuid(shared_uuid.to_string()); - builder.execute_uncommitted().await.unwrap(); - } - - dataset - .merge_index_metadata(&shared_uuid.to_string(), IndexType::IvfPq, None) - .await - .unwrap(); - - dataset - .commit_existing_index_segments( - index_name, - "vector", - vec![IndexSegment::new( - shared_uuid, - dataset.fragment_bitmap.as_ref().clone(), - Arc::new(crate::index::vector_index_details()), - IndexType::IvfPq.version(), - )], - ) - .await - .unwrap(); + let segments = + build_segments_for_fragment_groups(dataset, fragment_groups, ¶ms, index_name).await; + let built_segments = build_distributed_segments(dataset, &segments, None, index_name).await; + assert!(!built_segments.is_empty()); } - fn assert_ivf_layout_equal(stats_a: &serde_json::Value, stats_b: &serde_json::Value) { - let idx_a = &stats_a["indices"][0]; - let idx_b = &stats_b["indices"][0]; - - let centroids_a = idx_a["centroids"] + fn assert_centroids_equal(reference: &serde_json::Value, candidate: &serde_json::Value) { + let centroids_a = reference["centroids"] .as_array() .expect("centroids should be an array"); - let centroids_b = idx_b["centroids"] + let centroids_b = candidate["centroids"] .as_array() .expect("centroids should be an array"); assert_eq!( @@ -1574,111 +1541,74 @@ mod tests { ); } } - - let parts_a = idx_a["partitions"] - .as_array() - .expect("partitions should be an array"); - let parts_b = idx_b["partitions"] - .as_array() - .expect("partitions should be an array"); - assert_eq!(parts_a.len(), parts_b.len(), "num partitions mismatch"); - let sizes_a: Vec = parts_a - .iter() - .map(|p| p["size"].as_u64().expect("partition size")) - .collect(); - let sizes_b: Vec = parts_b - .iter() - .map(|p| p["size"].as_u64().expect("partition size")) - .collect(); - assert_eq!(sizes_a, sizes_b, "partition sizes mismatch"); } - async fn load_staging_shard_uuids(dataset: &Dataset, shared_uuid: Uuid) -> Vec { - let mut shard_uuids = dataset - .object_store() - .read_dir(dataset.indices_dir().child(shared_uuid.to_string())) - .await - .unwrap() - .into_iter() - .filter(|name| name.starts_with("partial_")) - .map(|name| Uuid::parse_str(name.trim_start_matches("partial_")).unwrap()) - .collect::>(); - shard_uuids.sort(); - shard_uuids + fn sum_partition_sizes(indices: &[serde_json::Value]) -> Vec { + let mut totals = Vec::new(); + for index in indices { + let partitions = index["partitions"] + .as_array() + .expect("partitions should be an array"); + if totals.is_empty() { + totals.resize(partitions.len(), 0); + } else { + assert_eq!(totals.len(), partitions.len(), "num partitions mismatch"); + } + for (total, partition) in totals.iter_mut().zip(partitions.iter()) { + *total += partition["size"].as_u64().expect("partition size"); + } + } + totals } - /// Reconstruct the caller-side shard contract used by the tests. - /// - /// Production callers are expected to already know this mapping from - /// distributed scheduling state. The test helper rebuilds it by inspecting - /// which `partial_*` directories were created and pairing them with the - /// fragment groups we originally assigned. - async fn build_partial_indices( - dataset: &Dataset, - shared_uuid: Uuid, - fragment_groups: &[Vec], - ) -> Vec { - let shard_uuids = load_staging_shard_uuids(dataset, shared_uuid).await; - assert_eq!(shard_uuids.len(), fragment_groups.len()); - let mut partial_indices = Vec::with_capacity(shard_uuids.len()); - for (shard_uuid, fragment_group) in shard_uuids.into_iter().zip(fragment_groups.iter()) { - let partial_dir = dataset - .indices_dir() - .child(shared_uuid.to_string()) - .child(format!("partial_{}", shard_uuid)); - let mut estimated_bytes = 0_u64; - let mut files = dataset.object_store().list(Some(partial_dir)); - while let Some(item) = files.next().await { - estimated_bytes += item.unwrap().size; - } - partial_indices.push(IndexMetadata { - uuid: shard_uuid, - name: String::new(), - fields: Vec::new(), - dataset_version: dataset.version().version, - fragment_bitmap: Some(fragment_group.iter().copied().collect()), - index_details: None, - index_version: IndexType::Vector.version(), - created_at: None, - base_id: None, - files: Some(vec![lance_table::format::IndexFile { - path: String::new(), - size_bytes: estimated_bytes, - }]), - }); + fn assert_ivf_layout_compatible(stats_a: &serde_json::Value, stats_b: &serde_json::Value) { + let indices_a = stats_a["indices"] + .as_array() + .expect("indices should be an array"); + let indices_b = stats_b["indices"] + .as_array() + .expect("indices should be an array"); + assert!( + !indices_a.is_empty() && !indices_b.is_empty(), + "indices should not be empty", + ); + + let reference = &indices_a[0]; + for index in indices_a.iter().skip(1).chain(indices_b.iter()) { + assert_centroids_equal(reference, index); } - partial_indices + + let sizes_a = sum_partition_sizes(indices_a); + let sizes_b = sum_partition_sizes(indices_b); + assert_eq!(sizes_a, sizes_b, "aggregated partition sizes mismatch"); } - /// Execute the internal staged segment-build workflow used by the - /// regression tests: plan segment groups from caller-provided shard + /// Execute the internal segment workflow used by the + /// regression tests: plan segment groups from caller-provided segment /// metadata, build each segment, and publish them as one logical index. async fn build_distributed_segments( dataset: &mut Dataset, - shared_uuid: Uuid, - partial_indices: &[IndexMetadata], + segments: &[IndexMetadata], target_segment_bytes: Option, index_name: &str, ) -> Vec { - let index_dir = dataset.indices_dir().child(shared_uuid.to_string()); - let segment_plans = - plan_staging_segments(&index_dir, partial_indices, None, target_segment_bytes) - .await - .unwrap(); - let mut segments = Vec::with_capacity(segment_plans.len()); + let segment_plans = plan_segments(segments, None, target_segment_bytes) + .await + .unwrap(); + let mut built_segments = Vec::with_capacity(segment_plans.len()); for plan in &segment_plans { - segments.push( - build_staging_segment(dataset.object_store(), &dataset.indices_dir(), plan) + built_segments.push( + build_segment(dataset.object_store(), &dataset.indices_dir(), plan) .await .unwrap(), ); } dataset - .commit_existing_index_segments(index_name, "vector", segments.clone()) + .commit_existing_index_segments(index_name, "vector", built_segments.clone()) .await .unwrap(); - segments + built_segments } #[tokio::test] @@ -1740,27 +1670,11 @@ mod tests { let stats_split_json = ds_split.index_statistics(INDEX_NAME).await.unwrap(); let stats_single: serde_json::Value = serde_json::from_str(&stats_single_json).unwrap(); let stats_split: serde_json::Value = serde_json::from_str(&stats_split_json).unwrap(); - assert_ivf_layout_equal(&stats_single, &stats_split); - - let ctx_single = load_vector_index_context(&ds_single, "vector", INDEX_NAME).await; - let ctx_split = load_vector_index_context(&ds_split, "vector", INDEX_NAME).await; - - let ivf_single = ctx_single.ivf(); - let ivf_split = ctx_split.ivf(); - let total_partitions = ivf_single.total_partitions(); - assert_eq!(total_partitions, ivf_split.total_partitions()); - - for part_id in 0..total_partitions { - let row_ids_single = load_partition_row_ids(ivf_single, part_id).await; - let row_ids_split = load_partition_row_ids(ivf_split, part_id).await; - let set_single: HashSet = row_ids_single.into_iter().collect(); - let set_split: HashSet = row_ids_split.into_iter().collect(); - assert_eq!( - set_single, set_split, - "row id set mismatch for partition {}", - part_id - ); - } + assert_ivf_layout_compatible(&stats_single, &stats_split); + assert_eq!( + stats_single["num_indexed_rows"], + stats_split["num_indexed_rows"] + ); const K: usize = 10; const NUM_QUERIES: usize = 10; @@ -1889,30 +1803,30 @@ mod tests { .map(|fragment| vec![fragment.id() as u32]) .collect::>(); let expected_segment_count = fragment_groups.len(); - let (shared_uuid, partial_indices) = build_distributed_partial_index_for_fragment_groups( + let segments = build_segments_for_fragment_groups( &mut ds_split, fragment_groups, &distributed_params, INDEX_NAME, ) .await; - let segments = build_distributed_segments( - &mut ds_split, - shared_uuid, - &partial_indices, - None, - INDEX_NAME, - ) - .await; + let segments = build_distributed_segments(&mut ds_split, &segments, None, INDEX_NAME).await; assert_eq!(segments.len(), expected_segment_count); - let staging_dir = ds_split.indices_dir().child(shared_uuid.to_string()); - let staging_entries = ds_split.object_store().read_dir(staging_dir).await.unwrap(); - assert!( - staging_entries - .iter() - .all(|entry| !entry.starts_with("partial_")), - "built segments should clean up consumed partial shards", - ); + for segment in &segments { + let segment_index = ds_split + .indices_dir() + .child(segment.uuid().to_string()) + .child(crate::index::INDEX_FILE_NAME); + assert!( + ds_split + .object_store() + .exists(&segment_index) + .await + .unwrap(), + "segment file should exist at {}", + segment_index + ); + } let committed_segments = ds_split.load_indices_by_name(INDEX_NAME).await.unwrap(); assert_eq!(committed_segments.len(), expected_segment_count); @@ -2032,7 +1946,7 @@ mod tests { .into_iter() .map(|fragment| vec![fragment.id() as u32]) .collect::>(); - let (shared_uuid, partial_indices) = build_distributed_partial_index_for_fragment_groups( + let segments = build_segments_for_fragment_groups( &mut ds_split, fragment_groups, &distributed_params, @@ -2040,33 +1954,20 @@ mod tests { ) .await; - let index_dir = ds_split.indices_dir().child(shared_uuid.to_string()); - let shard_plan = plan_staging_segments(&index_dir, &partial_indices, None, None) - .await - .unwrap(); + let shard_plan = plan_segments(&segments, None, None).await.unwrap(); let shard_count = shard_plan.len(); assert!(shard_count >= 4); let target_segment_bytes = shard_plan[0].estimated_bytes().saturating_mul(2); - let grouped_plan = plan_staging_segments( - &index_dir, - &partial_indices, - None, - Some(target_segment_bytes), - ) - .await - .unwrap(); + let grouped_plan = plan_segments(&segments, None, Some(target_segment_bytes)) + .await + .unwrap(); assert!(grouped_plan.len() < shard_count); - assert!( - grouped_plan - .iter() - .any(|plan| plan.partial_indices().len() > 1) - ); + assert!(grouped_plan.iter().any(|plan| plan.segments().len() > 1)); let grouped_segments = build_distributed_segments( &mut ds_split, - shared_uuid, - &partial_indices, + &segments, Some(target_segment_bytes), INDEX_NAME, ) @@ -2127,29 +2028,24 @@ mod tests { let mut dataset = write_dataset_from_batches(&dataset_uri, schema, batches).await; let fragment = dataset.get_fragments()[0].id() as u32; - let shared_uuid = Uuid::new_v4(); let params = VectorIndexParams::with_ivf_flat_params( DistanceType::L2, prepare_global_ivf(&dataset, "vector").await, ); + let mut segments = Vec::new(); for _ in 0..2 { - dataset + let segment = dataset .create_index_builder(&["vector"], IndexType::Vector, ¶ms) .name("vector_idx".to_string()) .fragments(vec![fragment]) - .index_uuid(shared_uuid.to_string()) .execute_uncommitted() .await .unwrap(); + segments.push(segment); } - let index_dir = dataset.indices_dir().child(shared_uuid.to_string()); - let partial_indices = - build_partial_indices(&dataset, shared_uuid, &[vec![fragment], vec![fragment]]).await; - let err = plan_staging_segments(&index_dir, &partial_indices, None, None) - .await - .unwrap_err(); + let err = plan_segments(&segments, None, None).await.unwrap_err(); assert!(err.to_string().contains("overlapping fragment coverage")); } @@ -2163,40 +2059,31 @@ mod tests { let fragments = dataset.get_fragments(); assert!(fragments.len() >= 2); - let shared_uuid = Uuid::new_v4(); let params = VectorIndexParams::ivf_hnsw( DistanceType::L2, prepare_global_ivf(&dataset, "vector").await, HnswBuildParams::default(), ); + let mut segments = Vec::new(); for fragment in fragments.iter().take(2) { - dataset + let segment = dataset .create_index_builder(&["vector"], IndexType::Vector, ¶ms) .name("vector_idx".to_string()) .fragments(vec![fragment.id() as u32]) - .index_uuid(shared_uuid.to_string()) .execute_uncommitted() .await .unwrap(); + segments.push(segment); } - let index_dir = dataset.indices_dir().child(shared_uuid.to_string()); - let fragment_groups = fragments - .iter() - .take(2) - .map(|fragment| vec![fragment.id() as u32]) - .collect::>(); - let partial_indices = build_partial_indices(&dataset, shared_uuid, &fragment_groups).await; - let plans = plan_staging_segments(&index_dir, &partial_indices, None, Some(1)) - .await - .unwrap(); + let plans = plan_segments(&segments, None, Some(1)).await.unwrap(); assert_eq!(plans.len(), fragments.iter().take(2).count()); let mut segments = Vec::with_capacity(plans.len()); for plan in &plans { segments.push( - build_staging_segment(dataset.object_store(), &dataset.indices_dir(), plan) + build_segment(dataset.object_store(), &dataset.indices_dir(), plan) .await .unwrap(), );