Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 178 additions & 2 deletions java/lance-jni/src/blocking_dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::namespace::{
};
use crate::session::{handle_from_session, session_from_handle};
use crate::storage_options::JavaStorageOptionsProvider;
use crate::traits::{FromJObjectWithEnv, FromJString, export_vec, import_vec};
use crate::traits::{FromJObjectWithEnv, FromJString, export_vec, import_vec, import_vec_to_rust};
use crate::utils::{
build_compaction_options, extract_storage_options, extract_write_params,
get_scalar_index_params, get_vector_index_params, to_rust_map,
Expand Down Expand Up @@ -49,7 +49,7 @@ use lance_index::DatasetIndexExt;
use lance_index::IndexCriteria as RustIndexCriteria;
use lance_index::optimize::OptimizeOptions;
use lance_index::scalar::btree::BTreeParameters;
use lance_index::{IndexParams, IndexType};
use lance_index::{IndexParams, IndexSegment, IndexType};
use lance_io::object_store::ObjectStoreRegistry;
use lance_io::object_store::StorageOptionsProvider;
use lance_namespace::LanceNamespace;
Expand Down Expand Up @@ -1070,6 +1070,182 @@ fn inner_merge_index_metadata(
Ok(())
}

#[unsafe(no_mangle)]
pub extern "system" fn Java_org_lance_Dataset_nativeBuildIndexSegments<'local>(
mut env: JNIEnv<'local>,
java_dataset: JObject,
java_segments: JObject,
target_segment_bytes_jobj: JObject,
) -> JObject<'local> {
ok_or_throw!(
env,
inner_build_index_segments(
&mut env,
java_dataset,
java_segments,
target_segment_bytes_jobj
)
)
}

fn inner_build_index_segments<'local>(
env: &mut JNIEnv<'local>,
java_dataset: JObject,
java_segments: JObject,
target_segment_bytes_jobj: JObject,
) -> Result<JObject<'local>> {
let segments = import_vec_to_rust(env, &java_segments, |env, obj| obj.extract_object(env))?;
let target_segment_bytes = env
.get_long_opt(&target_segment_bytes_jobj)?
.map(|v| v as u64);
let template = segment_template(&segments)?;

let built_segments = {
let dataset_guard =
unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }?;
let mut builder = dataset_guard
.inner
.create_index_segment_builder()
.with_segments(segments);
if let Some(target_segment_bytes) = target_segment_bytes {
builder = builder.with_target_segment_bytes(target_segment_bytes);
}
RT.block_on(builder.build_all())?
};

let built_metadata = built_segments
.into_iter()
.map(|segment| index_segment_to_metadata(&template, segment))
.collect::<Vec<_>>();
export_vec(env, &built_metadata)
}

#[unsafe(no_mangle)]
pub extern "system" fn Java_org_lance_Dataset_nativeCommitExistingIndexSegments<'local>(
mut env: JNIEnv<'local>,
java_dataset: JObject,
index_name: JString,
column: JString,
java_segments: JObject,
) -> JObject<'local> {
ok_or_throw!(
env,
inner_commit_existing_index_segments(
&mut env,
java_dataset,
index_name,
column,
java_segments
)
)
}

fn inner_commit_existing_index_segments<'local>(
env: &mut JNIEnv<'local>,
java_dataset: JObject,
index_name: JString,
column: JString,
java_segments: JObject,
) -> Result<JObject<'local>> {
let index_name = index_name.extract(env)?;
let column = column.extract(env)?;
let segment_metadata =
import_vec_to_rust(env, &java_segments, |env, obj| obj.extract_object(env))?;
let segments = segment_metadata
.iter()
.map(index_metadata_to_segment)
.collect::<Result<Vec<_>>>()?;

let committed = {
let mut dataset_guard =
unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }?;
RT.block_on(dataset_guard.inner.commit_existing_index_segments(
&index_name,
&column,
segments,
))?;
RT.block_on(dataset_guard.inner.load_indices_by_name(&index_name))?
};

export_vec(env, &committed)
}

struct SegmentTemplate {
name: String,
fields: Vec<i32>,
dataset_version: u64,
}

fn segment_template(segments: &[IndexMetadata]) -> Result<SegmentTemplate> {
let first = segments
.first()
.ok_or_else(|| Error::input_error("segments cannot be empty".to_string()))?;
for segment in &segments[1..] {
if segment.name != first.name {
return Err(Error::input_error(format!(
"All segments must share the same index name, got '{}' and '{}'",
first.name, segment.name
)));
}
if segment.fields != first.fields {
return Err(Error::input_error(format!(
"All segments must target the same field ids, got {:?} and {:?}",
first.fields, segment.fields
)));
}
if segment.dataset_version != first.dataset_version {
return Err(Error::input_error(format!(
"All segments must share the same dataset version, got {} and {}",
first.dataset_version, segment.dataset_version
)));
}
}

Ok(SegmentTemplate {
name: first.name.clone(),
fields: first.fields.clone(),
dataset_version: first.dataset_version,
})
}

fn index_metadata_to_segment(metadata: &IndexMetadata) -> Result<IndexSegment> {
let fragment_bitmap = metadata.fragment_bitmap.clone().ok_or_else(|| {
Error::input_error(format!(
"Segment '{}' is missing fragment coverage metadata",
metadata.uuid
))
})?;
let index_details = metadata.index_details.clone().ok_or_else(|| {
Error::input_error(format!(
"Segment '{}' is missing index details metadata",
metadata.uuid
))
})?;

Ok(IndexSegment::new(
metadata.uuid,
fragment_bitmap,
index_details,
metadata.index_version,
))
}

fn index_segment_to_metadata(template: &SegmentTemplate, segment: IndexSegment) -> IndexMetadata {
let (uuid, fragment_bitmap, index_details, index_version) = segment.into_parts();
IndexMetadata {
uuid,
fields: template.fields.clone(),
name: template.name.clone(),
dataset_version: template.dataset_version,
fragment_bitmap: Some(fragment_bitmap),
index_details: Some(index_details),
index_version,
created_at: Some(Utc::now()),
base_id: None,
files: None,
}
}

#[unsafe(no_mangle)]
pub extern "system" fn Java_org_lance_Dataset_nativeOptimizeIndices(
mut env: JNIEnv,
Expand Down
45 changes: 45 additions & 0 deletions java/src/main/java/org/lance/Dataset.java
Original file line number Diff line number Diff line change
Expand Up @@ -1019,6 +1019,51 @@ public void mergeIndexMetadata(
private native void innerMergeIndexMetadata(
String indexUUID, int indexType, Optional<Integer> batchReadHead);

/**
* Build physical vector index segments from previously-created fragment-level index outputs.
*
* @param segments segment metadata returned by {@link #createIndex(IndexOptions)} when
* fragmentIds are provided
* @param targetSegmentBytes optional size target for merged physical segments
* @return built physical segment metadata
*/
public List<Index> buildIndexSegments(List<Index> segments, Optional<Long> targetSegmentBytes) {
Preconditions.checkNotNull(segments, "segments cannot be null");
Preconditions.checkArgument(!segments.isEmpty(), "segments cannot be empty");
try (LockManager.WriteLock writeLock = lockManager.acquireWriteLock()) {
Preconditions.checkArgument(nativeDatasetHandle != 0, "Dataset is closed");
return nativeBuildIndexSegments(segments, targetSegmentBytes);
}
}

private native List<Index> nativeBuildIndexSegments(
List<Index> segments, Optional<Long> targetSegmentBytes);

/**
* Publish one or more existing physical index segments as a logical index.
*
* @param indexName logical index name
* @param column indexed column name
* @param segments physical segment metadata to publish
* @return committed manifest metadata
*/
public List<Index> commitExistingIndexSegments(
String indexName, String column, List<Index> segments) {
Preconditions.checkArgument(
indexName != null && !indexName.isEmpty(), "indexName cannot be null or empty");
Preconditions.checkArgument(
column != null && !column.isEmpty(), "column cannot be null or empty");
Preconditions.checkNotNull(segments, "segments cannot be null");
Preconditions.checkArgument(!segments.isEmpty(), "segments cannot be empty");
try (LockManager.WriteLock writeLock = lockManager.acquireWriteLock()) {
Preconditions.checkArgument(nativeDatasetHandle != 0, "Dataset is closed");
return nativeCommitExistingIndexSegments(indexName, column, segments);
}
}

private native List<Index> nativeCommitExistingIndexSegments(
String indexName, String column, List<Index> segments);

/**
* Count the number of rows in the dataset.
*
Expand Down
Loading
Loading