diff --git a/java/lance-jni/src/blocking_dataset.rs b/java/lance-jni/src/blocking_dataset.rs index ff2c261ebdd..203ffcdffa8 100644 --- a/java/lance-jni/src/blocking_dataset.rs +++ b/java/lance-jni/src/blocking_dataset.rs @@ -8,7 +8,7 @@ use crate::namespace::{ }; use crate::session::{handle_from_session, session_from_handle}; use crate::storage_options::JavaStorageOptionsProvider; -use crate::traits::{FromJObjectWithEnv, FromJString, export_vec, import_vec}; +use crate::traits::{FromJObjectWithEnv, FromJString, export_vec, import_vec, import_vec_to_rust}; use crate::utils::{ build_compaction_options, extract_storage_options, extract_write_params, get_scalar_index_params, get_vector_index_params, to_rust_map, @@ -49,7 +49,7 @@ use lance_index::DatasetIndexExt; use lance_index::IndexCriteria as RustIndexCriteria; use lance_index::optimize::OptimizeOptions; use lance_index::scalar::btree::BTreeParameters; -use lance_index::{IndexParams, IndexType}; +use lance_index::{IndexParams, IndexSegment, IndexType}; use lance_io::object_store::ObjectStoreRegistry; use lance_io::object_store::StorageOptionsProvider; use lance_namespace::LanceNamespace; @@ -1070,6 +1070,182 @@ fn inner_merge_index_metadata( Ok(()) } +#[unsafe(no_mangle)] +pub extern "system" fn Java_org_lance_Dataset_nativeBuildIndexSegments<'local>( + mut env: JNIEnv<'local>, + java_dataset: JObject, + java_segments: JObject, + target_segment_bytes_jobj: JObject, +) -> JObject<'local> { + ok_or_throw!( + env, + inner_build_index_segments( + &mut env, + java_dataset, + java_segments, + target_segment_bytes_jobj + ) + ) +} + +fn inner_build_index_segments<'local>( + env: &mut JNIEnv<'local>, + java_dataset: JObject, + java_segments: JObject, + target_segment_bytes_jobj: JObject, +) -> Result> { + let segments = import_vec_to_rust(env, &java_segments, |env, obj| obj.extract_object(env))?; + let target_segment_bytes = env + .get_long_opt(&target_segment_bytes_jobj)? + .map(|v| v as u64); + let template = segment_template(&segments)?; + + let built_segments = { + let dataset_guard = + unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }?; + let mut builder = dataset_guard + .inner + .create_index_segment_builder() + .with_segments(segments); + if let Some(target_segment_bytes) = target_segment_bytes { + builder = builder.with_target_segment_bytes(target_segment_bytes); + } + RT.block_on(builder.build_all())? + }; + + let built_metadata = built_segments + .into_iter() + .map(|segment| index_segment_to_metadata(&template, segment)) + .collect::>(); + export_vec(env, &built_metadata) +} + +#[unsafe(no_mangle)] +pub extern "system" fn Java_org_lance_Dataset_nativeCommitExistingIndexSegments<'local>( + mut env: JNIEnv<'local>, + java_dataset: JObject, + index_name: JString, + column: JString, + java_segments: JObject, +) -> JObject<'local> { + ok_or_throw!( + env, + inner_commit_existing_index_segments( + &mut env, + java_dataset, + index_name, + column, + java_segments + ) + ) +} + +fn inner_commit_existing_index_segments<'local>( + env: &mut JNIEnv<'local>, + java_dataset: JObject, + index_name: JString, + column: JString, + java_segments: JObject, +) -> Result> { + let index_name = index_name.extract(env)?; + let column = column.extract(env)?; + let segment_metadata = + import_vec_to_rust(env, &java_segments, |env, obj| obj.extract_object(env))?; + let segments = segment_metadata + .iter() + .map(index_metadata_to_segment) + .collect::>>()?; + + let committed = { + let mut dataset_guard = + unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }?; + RT.block_on(dataset_guard.inner.commit_existing_index_segments( + &index_name, + &column, + segments, + ))?; + RT.block_on(dataset_guard.inner.load_indices_by_name(&index_name))? + }; + + export_vec(env, &committed) +} + +struct SegmentTemplate { + name: String, + fields: Vec, + dataset_version: u64, +} + +fn segment_template(segments: &[IndexMetadata]) -> Result { + let first = segments + .first() + .ok_or_else(|| Error::input_error("segments cannot be empty".to_string()))?; + for segment in &segments[1..] { + if segment.name != first.name { + return Err(Error::input_error(format!( + "All segments must share the same index name, got '{}' and '{}'", + first.name, segment.name + ))); + } + if segment.fields != first.fields { + return Err(Error::input_error(format!( + "All segments must target the same field ids, got {:?} and {:?}", + first.fields, segment.fields + ))); + } + if segment.dataset_version != first.dataset_version { + return Err(Error::input_error(format!( + "All segments must share the same dataset version, got {} and {}", + first.dataset_version, segment.dataset_version + ))); + } + } + + Ok(SegmentTemplate { + name: first.name.clone(), + fields: first.fields.clone(), + dataset_version: first.dataset_version, + }) +} + +fn index_metadata_to_segment(metadata: &IndexMetadata) -> Result { + let fragment_bitmap = metadata.fragment_bitmap.clone().ok_or_else(|| { + Error::input_error(format!( + "Segment '{}' is missing fragment coverage metadata", + metadata.uuid + )) + })?; + let index_details = metadata.index_details.clone().ok_or_else(|| { + Error::input_error(format!( + "Segment '{}' is missing index details metadata", + metadata.uuid + )) + })?; + + Ok(IndexSegment::new( + metadata.uuid, + fragment_bitmap, + index_details, + metadata.index_version, + )) +} + +fn index_segment_to_metadata(template: &SegmentTemplate, segment: IndexSegment) -> IndexMetadata { + let (uuid, fragment_bitmap, index_details, index_version) = segment.into_parts(); + IndexMetadata { + uuid, + fields: template.fields.clone(), + name: template.name.clone(), + dataset_version: template.dataset_version, + fragment_bitmap: Some(fragment_bitmap), + index_details: Some(index_details), + index_version, + created_at: Some(Utc::now()), + base_id: None, + files: None, + } +} + #[unsafe(no_mangle)] pub extern "system" fn Java_org_lance_Dataset_nativeOptimizeIndices( mut env: JNIEnv, diff --git a/java/src/main/java/org/lance/Dataset.java b/java/src/main/java/org/lance/Dataset.java index 166feebba20..506827be902 100644 --- a/java/src/main/java/org/lance/Dataset.java +++ b/java/src/main/java/org/lance/Dataset.java @@ -1019,6 +1019,51 @@ public void mergeIndexMetadata( private native void innerMergeIndexMetadata( String indexUUID, int indexType, Optional batchReadHead); + /** + * Build physical vector index segments from previously-created fragment-level index outputs. + * + * @param segments segment metadata returned by {@link #createIndex(IndexOptions)} when + * fragmentIds are provided + * @param targetSegmentBytes optional size target for merged physical segments + * @return built physical segment metadata + */ + public List buildIndexSegments(List segments, Optional targetSegmentBytes) { + Preconditions.checkNotNull(segments, "segments cannot be null"); + Preconditions.checkArgument(!segments.isEmpty(), "segments cannot be empty"); + try (LockManager.WriteLock writeLock = lockManager.acquireWriteLock()) { + Preconditions.checkArgument(nativeDatasetHandle != 0, "Dataset is closed"); + return nativeBuildIndexSegments(segments, targetSegmentBytes); + } + } + + private native List nativeBuildIndexSegments( + List segments, Optional targetSegmentBytes); + + /** + * Publish one or more existing physical index segments as a logical index. + * + * @param indexName logical index name + * @param column indexed column name + * @param segments physical segment metadata to publish + * @return committed manifest metadata + */ + public List commitExistingIndexSegments( + String indexName, String column, List segments) { + Preconditions.checkArgument( + indexName != null && !indexName.isEmpty(), "indexName cannot be null or empty"); + Preconditions.checkArgument( + column != null && !column.isEmpty(), "column cannot be null or empty"); + Preconditions.checkNotNull(segments, "segments cannot be null"); + Preconditions.checkArgument(!segments.isEmpty(), "segments cannot be empty"); + try (LockManager.WriteLock writeLock = lockManager.acquireWriteLock()) { + Preconditions.checkArgument(nativeDatasetHandle != 0, "Dataset is closed"); + return nativeCommitExistingIndexSegments(indexName, column, segments); + } + } + + private native List nativeCommitExistingIndexSegments( + String indexName, String column, List segments); + /** * Count the number of rows in the dataset. * diff --git a/java/src/test/java/org/lance/index/VectorIndexTest.java b/java/src/test/java/org/lance/index/VectorIndexTest.java index 4a5902044da..a96b6593d30 100755 --- a/java/src/test/java/org/lance/index/VectorIndexTest.java +++ b/java/src/test/java/org/lance/index/VectorIndexTest.java @@ -13,18 +13,15 @@ */ package org.lance.index; -import org.lance.CommitBuilder; import org.lance.Dataset; import org.lance.Fragment; import org.lance.TestVectorDataset; -import org.lance.Transaction; import org.lance.index.vector.IvfBuildParams; import org.lance.index.vector.PQBuildParams; import org.lance.index.vector.RQBuildParams; import org.lance.index.vector.SQBuildParams; import org.lance.index.vector.VectorIndexParams; import org.lance.index.vector.VectorTrainer; -import org.lance.operation.CreateIndex; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -33,8 +30,6 @@ import java.util.Collections; import java.util.List; import java.util.Optional; -import java.util.UUID; -import java.util.stream.Collectors; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -75,72 +70,40 @@ public void testCreateIvfFlatIndexDistributively(@TempDir Path tempDir) throws E IndexParams indexParams = IndexParams.builder().setVectorIndexParams(vectorIndexParams).build(); - UUID indexUUID = UUID.randomUUID(); - - // Partially create index on the first fragment - dataset.createIndex( - IndexOptions.builder( - Collections.singletonList(TestVectorDataset.vectorColumnName), - IndexType.IVF_FLAT, - indexParams) - .withIndexName(TestVectorDataset.indexName) - .withIndexUUID(indexUUID.toString()) - .withFragmentIds(Collections.singletonList(fragments.get(0).getId())) - .build()); - - // Partially create index on the second fragment with the same UUID - dataset.createIndex( - IndexOptions.builder( - Collections.singletonList(TestVectorDataset.vectorColumnName), - IndexType.IVF_FLAT, - indexParams) - .withIndexName(TestVectorDataset.indexName) - .withIndexUUID(indexUUID.toString()) - .withFragmentIds(Collections.singletonList(fragments.get(1).getId())) - .build()); + Index firstSegment = + dataset.createIndex( + IndexOptions.builder( + Collections.singletonList(TestVectorDataset.vectorColumnName), + IndexType.IVF_FLAT, + indexParams) + .withIndexName(TestVectorDataset.indexName) + .withFragmentIds(Collections.singletonList(fragments.get(0).getId())) + .build()); + + Index secondSegment = + dataset.createIndex( + IndexOptions.builder( + Collections.singletonList(TestVectorDataset.vectorColumnName), + IndexType.IVF_FLAT, + indexParams) + .withIndexName(TestVectorDataset.indexName) + .withFragmentIds(Collections.singletonList(fragments.get(1).getId())) + .build()); // The index should not be visible before metadata merge & commit assertFalse( dataset.listIndexes().contains(TestVectorDataset.indexName), "Partially created IVF_FLAT index should not present before commit"); - // Merge index metadata for all fragment-level pieces - dataset.mergeIndexMetadata(indexUUID.toString(), IndexType.IVF_FLAT, Optional.empty()); - - int fieldId = - dataset.getLanceSchema().fields().stream() - .filter(f -> f.getName().equals(TestVectorDataset.vectorColumnName)) - .findAny() - .orElseThrow( - () -> new RuntimeException("Cannot find vector field for TestVectorDataset")) - .getId(); - - long datasetVersion = dataset.version(); - - Index index = - Index.builder() - .uuid(indexUUID) - .name(TestVectorDataset.indexName) - .fields(Collections.singletonList(fieldId)) - .datasetVersion(datasetVersion) - .indexVersion(0) - .fragments( - fragments.stream().limit(2).map(Fragment::getId).collect(Collectors.toList())) - .build(); + List builtSegments = + dataset.buildIndexSegments(List.of(firstSegment, secondSegment), Optional.empty()); + assertEquals(2, builtSegments.size()); - CreateIndex createIndexOp = - CreateIndex.builder().withNewIndices(Collections.singletonList(index)).build(); - - try (Transaction createIndexTx = - new Transaction.Builder() - .readVersion(dataset.version()) - .operation(createIndexOp) - .build()) { - try (Dataset newDataset = new CommitBuilder(dataset).execute(createIndexTx)) { - assertEquals(datasetVersion + 1, newDataset.version()); - assertTrue(newDataset.listIndexes().contains(TestVectorDataset.indexName)); - } - } + List committed = + dataset.commitExistingIndexSegments( + TestVectorDataset.indexName, TestVectorDataset.vectorColumnName, builtSegments); + assertEquals(2, committed.size()); + assertTrue(dataset.listIndexes().contains(TestVectorDataset.indexName)); } } } @@ -200,68 +163,39 @@ public void testCreateIvfPqIndexDistributively(@TempDir Path tempDir) throws Exc IndexParams indexParams = IndexParams.builder().setVectorIndexParams(vectorIndexParams).build(); - UUID indexUUID = UUID.randomUUID(); - - dataset.createIndex( - IndexOptions.builder( - Collections.singletonList(TestVectorDataset.vectorColumnName), - IndexType.IVF_PQ, - indexParams) - .withIndexName(TestVectorDataset.indexName) - .withIndexUUID(indexUUID.toString()) - .withFragmentIds(Collections.singletonList(fragments.get(0).getId())) - .build()); - - dataset.createIndex( - IndexOptions.builder( - Collections.singletonList(TestVectorDataset.vectorColumnName), - IndexType.IVF_PQ, - indexParams) - .withIndexName(TestVectorDataset.indexName) - .withIndexUUID(indexUUID.toString()) - .withFragmentIds(Collections.singletonList(fragments.get(1).getId())) - .build()); + Index firstSegment = + dataset.createIndex( + IndexOptions.builder( + Collections.singletonList(TestVectorDataset.vectorColumnName), + IndexType.IVF_PQ, + indexParams) + .withIndexName(TestVectorDataset.indexName) + .withFragmentIds(Collections.singletonList(fragments.get(0).getId())) + .build()); + + Index secondSegment = + dataset.createIndex( + IndexOptions.builder( + Collections.singletonList(TestVectorDataset.vectorColumnName), + IndexType.IVF_PQ, + indexParams) + .withIndexName(TestVectorDataset.indexName) + .withFragmentIds(Collections.singletonList(fragments.get(1).getId())) + .build()); assertFalse( dataset.listIndexes().contains(TestVectorDataset.indexName), "Partially created IVF_PQ index should not present before commit"); - dataset.mergeIndexMetadata(indexUUID.toString(), IndexType.IVF_PQ, Optional.empty()); - - int fieldId = - dataset.getLanceSchema().fields().stream() - .filter(f -> f.getName().equals(TestVectorDataset.vectorColumnName)) - .findAny() - .orElseThrow( - () -> new RuntimeException("Cannot find vector field for TestVectorDataset")) - .getId(); - - long datasetVersion = dataset.version(); - - Index index = - Index.builder() - .uuid(indexUUID) - .name(TestVectorDataset.indexName) - .fields(Collections.singletonList(fieldId)) - .datasetVersion(datasetVersion) - .indexVersion(0) - .fragments( - fragments.stream().limit(2).map(Fragment::getId).collect(Collectors.toList())) - .build(); + List builtSegments = + dataset.buildIndexSegments(List.of(firstSegment, secondSegment), Optional.empty()); + assertEquals(2, builtSegments.size()); - CreateIndex createIndexOp = - CreateIndex.builder().withNewIndices(Collections.singletonList(index)).build(); - - try (Transaction createIndexTx = - new Transaction.Builder() - .readVersion(dataset.version()) - .operation(createIndexOp) - .build()) { - try (Dataset newDataset = new CommitBuilder(dataset).execute(createIndexTx)) { - assertEquals(datasetVersion + 1, newDataset.version()); - assertTrue(newDataset.listIndexes().contains(TestVectorDataset.indexName)); - } - } + List committed = + dataset.commitExistingIndexSegments( + TestVectorDataset.indexName, TestVectorDataset.vectorColumnName, builtSegments); + assertEquals(2, committed.size()); + assertTrue(dataset.listIndexes().contains(TestVectorDataset.indexName)); } } } @@ -305,68 +239,39 @@ public void testCreateIvfSqIndexDistributively(@TempDir Path tempDir) throws Exc IndexParams indexParams = IndexParams.builder().setVectorIndexParams(vectorIndexParams).build(); - UUID indexUUID = UUID.randomUUID(); - - dataset.createIndex( - IndexOptions.builder( - Collections.singletonList(TestVectorDataset.vectorColumnName), - IndexType.IVF_SQ, - indexParams) - .withIndexName(TestVectorDataset.indexName) - .withIndexUUID(indexUUID.toString()) - .withFragmentIds(Collections.singletonList(fragments.get(0).getId())) - .build()); - - dataset.createIndex( - IndexOptions.builder( - Collections.singletonList(TestVectorDataset.vectorColumnName), - IndexType.IVF_SQ, - indexParams) - .withIndexName(TestVectorDataset.indexName) - .withIndexUUID(indexUUID.toString()) - .withFragmentIds(Collections.singletonList(fragments.get(1).getId())) - .build()); + Index firstSegment = + dataset.createIndex( + IndexOptions.builder( + Collections.singletonList(TestVectorDataset.vectorColumnName), + IndexType.IVF_SQ, + indexParams) + .withIndexName(TestVectorDataset.indexName) + .withFragmentIds(Collections.singletonList(fragments.get(0).getId())) + .build()); + + Index secondSegment = + dataset.createIndex( + IndexOptions.builder( + Collections.singletonList(TestVectorDataset.vectorColumnName), + IndexType.IVF_SQ, + indexParams) + .withIndexName(TestVectorDataset.indexName) + .withFragmentIds(Collections.singletonList(fragments.get(1).getId())) + .build()); assertFalse( dataset.listIndexes().contains(TestVectorDataset.indexName), "Partially created IVF_SQ index should not present before commit"); - dataset.mergeIndexMetadata(indexUUID.toString(), IndexType.IVF_SQ, Optional.empty()); - - int fieldId = - dataset.getLanceSchema().fields().stream() - .filter(f -> f.getName().equals(TestVectorDataset.vectorColumnName)) - .findAny() - .orElseThrow( - () -> new RuntimeException("Cannot find vector field for TestVectorDataset")) - .getId(); - - long datasetVersion = dataset.version(); - - Index index = - Index.builder() - .uuid(indexUUID) - .name(TestVectorDataset.indexName) - .fields(Collections.singletonList(fieldId)) - .datasetVersion(datasetVersion) - .indexVersion(0) - .fragments( - fragments.stream().limit(2).map(Fragment::getId).collect(Collectors.toList())) - .build(); + List builtSegments = + dataset.buildIndexSegments(List.of(firstSegment, secondSegment), Optional.empty()); + assertEquals(2, builtSegments.size()); - CreateIndex createIndexOp = - CreateIndex.builder().withNewIndices(Collections.singletonList(index)).build(); - - try (Transaction createIndexTx = - new Transaction.Builder() - .readVersion(dataset.version()) - .operation(createIndexOp) - .build()) { - try (Dataset newDataset = new CommitBuilder(dataset).execute(createIndexTx)) { - assertEquals(datasetVersion + 1, newDataset.version()); - assertTrue(newDataset.listIndexes().contains(TestVectorDataset.indexName)); - } - } + List committed = + dataset.commitExistingIndexSegments( + TestVectorDataset.indexName, TestVectorDataset.vectorColumnName, builtSegments); + assertEquals(2, committed.size()); + assertTrue(dataset.listIndexes().contains(TestVectorDataset.indexName)); } } } diff --git a/python/src/dataset.rs b/python/src/dataset.rs index 7e04ed5837a..695d8b317c5 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -332,7 +332,7 @@ pub struct PyIndexSegmentBuilder { } impl PyIndexSegmentBuilder { - fn into_builder(&self) -> ::IndexSegmentBuilder<'_> { + fn builder(&self) -> ::IndexSegmentBuilder<'_> { let mut builder = self .dataset .create_index_segment_builder() @@ -368,7 +368,7 @@ impl PyIndexSegmentBuilder { fn plan(&self, py: Python<'_>) -> PyResult>> { let plans = rt() - .block_on(Some(py), self.into_builder().plan())? + .block_on(Some(py), self.builder().plan())? .infer_error()?; plans .into_iter() @@ -379,14 +379,14 @@ impl PyIndexSegmentBuilder { fn build(&self, py: Python<'_>, plan: &Bound<'_, PyAny>) -> PyResult> { let plan = plan.extract::>()?; let segment = rt() - .block_on(Some(py), self.into_builder().build(&plan.inner))? + .block_on(Some(py), self.builder().build(&plan.inner))? .infer_error()?; Py::new(py, PyIndexSegment::from_inner(segment)) } fn build_all(&self, py: Python<'_>) -> PyResult>> { let segments = rt() - .block_on(Some(py), self.into_builder().build_all())? + .block_on(Some(py), self.builder().build_all())? .infer_error()?; segments .into_iter()