Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 129 additions & 24 deletions java/lance-jni/src/blocking_dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use jni::sys::{jbyteArray, jlong};
use jni::{objects::JObject, JNIEnv};
use lance::dataset::builder::DatasetBuilder;
use lance::dataset::cleanup::{CleanupPolicy, RemovalStats};
use lance::dataset::index::LanceIndexStoreExt;
use lance::dataset::optimize::{compact_files, CompactionOptions as RustCompactionOptions};
use lance::dataset::refs::{Ref, TagContents};
use lance::dataset::statistics::{DataStatistics, DatasetStatisticsExt};
Expand All @@ -38,11 +39,13 @@ use lance::io::{ObjectStore, ObjectStoreParams};
use lance::table::format::Fragment;
use lance::table::format::IndexMetadata;
use lance_core::datatypes::Schema as LanceSchema;
use lance_index::scalar::lance_format::LanceIndexStore;
use lance_index::DatasetIndexExt;
use lance_index::{IndexParams, IndexType};
use lance_io::object_store::ObjectStoreRegistry;
use lance_io::object_store::StorageOptionsProvider;
use std::collections::HashMap;
use std::future::IntoFuture;
use std::iter::empty;
use std::str::FromStr;
use std::sync::Arc;
Expand Down Expand Up @@ -156,21 +159,6 @@ impl BlockingDataset {
Ok(Self { inner })
}

pub fn create_index(
&mut self,
columns: &[&str],
index_type: IndexType,
name: Option<String>,
params: &dyn IndexParams,
replace: bool,
) -> Result<()> {
RT.block_on(
self.inner
.create_index(columns, index_type, name, params, replace),
)?;
Ok(())
}

pub fn latest_version(&self) -> Result<u64> {
let version = RT.block_on(self.inner.latest_version_id())?;
Ok(version)
Expand Down Expand Up @@ -680,9 +668,12 @@ pub extern "system" fn Java_com_lancedb_lance_Dataset_nativeCreateIndex(
java_dataset: JObject,
columns_jobj: JObject, // List<String>
index_type_code_jobj: jint,
name_jobj: JObject, // Optional<String>
params_jobj: JObject, // IndexParams
replace_jobj: jboolean,
name_jobj: JObject, // Optional<String>
params_jobj: JObject, // IndexParams
replace_jobj: jboolean, // replace
train_jobj: jboolean, // train
fragments_jobj: JObject, // List<Integer>
index_uuid_jobj: JObject, // String
) {
ok_or_throw_without_return!(
env,
Expand All @@ -693,25 +684,37 @@ pub extern "system" fn Java_com_lancedb_lance_Dataset_nativeCreateIndex(
index_type_code_jobj,
name_jobj,
params_jobj,
replace_jobj
replace_jobj,
train_jobj,
fragments_jobj,
index_uuid_jobj
)
);
}

#[allow(clippy::too_many_arguments)]
fn inner_create_index(
env: &mut JNIEnv,
java_dataset: JObject,
columns_jobj: JObject, // List<String>
index_type_code_jobj: jint,
name_jobj: JObject, // Optional<String>
params_jobj: JObject, // IndexParams
replace_jobj: jboolean,
name_jobj: JObject, // Optional<String>
params_jobj: JObject, // IndexParams
replace_jobj: jboolean, // replace
train_jobj: jboolean, // train
fragments_jobj: JObject, // Optional<List<String>>
index_uuid_jobj: JObject, // Optional<String>
) -> Result<()> {
let columns = env.get_strings(&columns_jobj)?;
let index_type = IndexType::try_from(index_type_code_jobj)?;
let name = env.get_string_opt(&name_jobj)?;
let replace = replace_jobj != 0;
let columns_slice: Vec<&str> = columns.iter().map(AsRef::as_ref).collect();
let replace = replace_jobj != 0;
let train = train_jobj != 0;
let fragment_ids = env
.get_ints_opt(&fragments_jobj)?
.map(|vec| vec.into_iter().map(|i| i as u32).collect());
let index_uuid = env.get_string_opt(&index_uuid_jobj)?;

// Handle scalar vs vector indices differently and get params before borrowing dataset
let params_result: Result<Box<dyn IndexParams>> = match index_type {
Expand Down Expand Up @@ -754,11 +757,113 @@ fn inner_create_index(
let params = params_result?;
let mut dataset_guard =
unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }?;
dataset_guard.create_index(&columns_slice, index_type, name, params.as_ref(), replace)?;

let mut index_builder = dataset_guard
.inner
.create_index_builder(&columns_slice, index_type, params.as_ref())
.replace(replace)
.train(train);

if let Some(name) = name {
index_builder = index_builder.name(name);
}

let has_fragment_ids = fragment_ids.is_some();

if let Some(fragment_ids) = fragment_ids {
index_builder = index_builder.fragments(fragment_ids);
}

if let Some(index_uuid) = index_uuid {
index_builder = index_builder.index_uuid(index_uuid);
}

if has_fragment_ids {
RT.block_on(index_builder.execute_uncommitted())?;
} else {
RT.block_on(index_builder.into_future())?
}

Ok(())
}

#[no_mangle]
pub extern "system" fn Java_com_lancedb_lance_Dataset_innerMergeIndexMetadata<'local>(
mut env: JNIEnv<'local>,
java_dataset: JObject,
index_uuid: JString,
index_type_code_jobj: jint,
batch_readhead_jobj: JObject, // Optional<Integer>
) {
ok_or_throw_without_return!(
env,
inner_merge_index_metadata(
&mut env,
java_dataset,
index_uuid,
index_type_code_jobj,
batch_readhead_jobj
)
);
}

fn inner_merge_index_metadata(
env: &mut JNIEnv,
java_dataset: JObject,
index_uuid: JString,
index_type_code_jobj: jint,
batch_readhead_jobj: JObject, // Optional<Integer>
) -> Result<()> {
let index_uuid = index_uuid.extract(env)?;
let index_type = IndexType::try_from(index_type_code_jobj)?;
let batch_readhead = env
.get_int_opt(&batch_readhead_jobj)?
.map(|val| val as usize);

let dataset_guard =
unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }?;

RT.block_on(async {
let index_store = LanceIndexStore::from_dataset_for_new(&dataset_guard.inner, &index_uuid)?;
let object_store = dataset_guard.inner.object_store();
let index_dir = dataset_guard.inner.indices_dir().child(index_uuid);

match index_type {
IndexType::Inverted => lance_index::scalar::inverted::builder::merge_index_files(
object_store,
&index_dir,
Arc::new(index_store),
)
.await
.map_err(|e| {
Error::runtime_error(format!(
"Cannot create index of type: {:?}. Caused by: {:?}",
index_type,
e.to_string()
))
}),
IndexType::BTree => lance_index::scalar::btree::merge_index_files(
object_store,
&index_dir,
Arc::new(index_store),
batch_readhead,
)
.await
.map_err(|e| {
Error::runtime_error(format!(
"Cannot create index of type: {:?}. Caused by: {:?}",
index_type,
e.to_string()
))
}),
_ => Err(Error::input_error(format!(
"Cannot merge index type: {:?}. Only supports BTREE and INVERTED now.",
index_type
))),
}
})
}

//////////////////
// Read Methods //
//////////////////
Expand Down
56 changes: 35 additions & 21 deletions java/lance-jni/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use prost::Message;
use prost_types::Any;
use roaring::RoaringBitmap;
use std::collections::HashMap;
use std::io::Cursor;
use std::sync::Arc;
use uuid::Uuid;

Expand Down Expand Up @@ -98,18 +97,19 @@ impl IntoJava for &IndexMetadata {
};
let name = env.new_string(&self.name)?;

let fragment_bitmap = if let Some(bitmap) = &self.fragment_bitmap {
let mut bytes = Vec::new();
bitmap
.serialize_into(&mut bytes)
.map_err(|e| Error::input_error(e.to_string()))?;

let jbytes =
unsafe { std::slice::from_raw_parts(bytes.as_ptr() as *const jbyte, bytes.len()) };

let byte_array = env.new_byte_array(bytes.len() as i32)?;
env.set_byte_array_region(&byte_array, 0, jbytes)?;
byte_array.into()
let fragments = if let Some(bitmap) = &self.fragment_bitmap {
let array_list = env.new_object("java/util/ArrayList", "()V", &[])?;
for frag_id in bitmap.iter() {
let id_obj =
env.new_object("java/lang/Integer", "(I)V", &[JValue::Int(frag_id as i32)])?;
env.call_method(
&array_list,
"add",
"(Ljava/lang/Object;)Z",
&[JValue::Object(&id_obj)],
)?;
}
array_list
} else {
JObject::null()
};
Expand Down Expand Up @@ -152,13 +152,13 @@ impl IntoJava for &IndexMetadata {
// Create IndexMetadata object
Ok(env.new_object(
"com/lancedb/lance/index/Index",
"(Ljava/util/UUID;Ljava/util/List;Ljava/lang/String;J[B[BILjava/time/Instant;Ljava/lang/Integer;)V",
"(Ljava/util/UUID;Ljava/util/List;Ljava/lang/String;JLjava/util/List;[BILjava/time/Instant;Ljava/lang/Integer;)V",
&[
JValue::Object(&uuid),
JValue::Object(&fields),
JValue::Object(&name),
JValue::Long(self.dataset_version as i64),
JValue::Object(&fragment_bitmap),
JValue::Object(&fragments),
JValue::Object(&index_details),
JValue::Int(self.index_version),
JValue::Object(&created_at),
Expand Down Expand Up @@ -251,12 +251,12 @@ impl FromJObjectWithEnv<IndexMetadata> for JObject<'_> {
let dataset_version = env.get_field(self, "datasetVersion", "J")?.j()? as u64;

let fragment_bitmap: Option<RoaringBitmap> =
env.get_optional_from_method(self, "fragmentBitmap", |env, bitmap_obj| {
let byte_array: JByteArray = bitmap_obj.into();
let bytes = env.convert_byte_array(&byte_array)?;
let bitmap = RoaringBitmap::deserialize_from(Cursor::new(bytes)).map_err(|e| {
Error::input_error(format!("Invalid RoaringBitmap data: {}", e))
})?;
env.get_optional_from_method(self, "fragments", |env, fragments_obj| {
let frag_ids = env.get_integers(&fragments_obj)?;
let bitmap = frag_ids
.iter()
.map(|val| *val as u32)
.collect::<RoaringBitmap>();
Ok(bitmap)
})?;

Expand Down Expand Up @@ -986,6 +986,20 @@ fn convert_to_rust_operation(
.i()? as u32;
return Ok(Operation::ReserveFragments { num_fragments });
}
"CreateIndex" => {
let new_indices =
import_vec_from_method(env, java_operation, "getNewIndices", |env, index| {
index.extract_object(env)
})?;
let removed_indices =
import_vec_from_method(env, java_operation, "getRemovedIndices", |env, index| {
index.extract_object(env)
})?;
return Ok(Operation::CreateIndex {
new_indices,
removed_indices,
});
}
_ => unimplemented!(),
};
Ok(op)
Expand Down
41 changes: 38 additions & 3 deletions java/src/main/java/com/lancedb/lance/Dataset.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.lancedb.lance.cleanup.CleanupPolicy;
import com.lancedb.lance.cleanup.RemovalStats;
import com.lancedb.lance.compaction.CompactionOptions;
import com.lancedb.lance.index.IndexOptions;
import com.lancedb.lance.index.IndexParams;
import com.lancedb.lance.index.IndexType;
import com.lancedb.lance.io.StorageOptionsProvider;
Expand Down Expand Up @@ -651,23 +652,46 @@ public void restore() {
private native void nativeRestore();

/**
* Creates a new index on the dataset. Only vector indexes are supported.
* Creates a new index on the dataset
*
* @param columns the columns to index from
* @param indexType the index type
* @param name the name of the created index
* @param params index params
* @param replace whether to replace the existing index
* @deprecated please use {@link Dataset#createIndex(IndexOptions)} instead.
*/
@Deprecated
public void createIndex(
List<String> columns,
IndexType indexType,
Optional<String> name,
IndexParams params,
boolean replace) {
createIndex(
IndexOptions.builder(columns, indexType, params)
.replace(replace)
.withIndexName(name.orElse(null))
.build());
}

/**
* Creates a new index on the dataset.
*
* @param options options for building index
*/
public void createIndex(IndexOptions options) {
try (LockManager.ReadLock readLock = lockManager.acquireReadLock()) {
Preconditions.checkArgument(nativeDatasetHandle != 0, "Dataset is closed");
nativeCreateIndex(columns, indexType.getValue(), name, params, replace);
nativeCreateIndex(
options.getColumns(),
options.getIndexType().ordinal(),
options.getIndexName(),
options.getIndexParams(),
options.isReplace(),
options.isTrain(),
options.getFragmentIds(),
options.getIndexUUID());
}
}

Expand All @@ -676,7 +700,18 @@ private native void nativeCreateIndex(
int indexTypeCode,
Optional<String> name,
IndexParams params,
boolean replace);
boolean replace,
boolean train,
Optional<List<Integer>> fragments,
Optional<String> indexUUID);

public void mergeIndexMetadata(
String indexUUID, IndexType indexType, Optional<Integer> batchReadHead) {
innerMergeIndexMetadata(indexUUID, indexType.getValue(), batchReadHead);
}

private native void innerMergeIndexMetadata(
String indexUUID, int indexType, Optional<Integer> batchReadHead);

/**
* Count the number of rows in the dataset.
Expand Down
Loading
Loading