Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
310 changes: 310 additions & 0 deletions java/lance-jni/src/blocking_dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2445,3 +2445,313 @@ fn inner_cleanup_with_policy<'local>(

Ok(jstats)
}

//////////////////////////////
// Index operation Methods //
//////////////////////////////

#[no_mangle]
pub extern "system" fn Java_org_lance_Dataset_nativeGetIndexes<'local>(
mut env: JNIEnv<'local>,
java_dataset: JObject,
) -> JObject<'local> {
ok_or_throw!(env, inner_get_indexes(&mut env, java_dataset))
}

fn inner_get_indexes<'local>(
env: &mut JNIEnv<'local>,
java_dataset: JObject,
) -> Result<JObject<'local>> {
let indexes = {
let dataset_guard =
unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }?;
dataset_guard.list_indexes()?
};

let array_list = env.new_object("java/util/ArrayList", "()V", &[])?;

for index_meta in indexes.iter() {
let java_index = create_java_index(env, index_meta)?;
env.call_method(
&array_list,
"add",
"(Ljava/lang/Object;)Z",
&[JValue::Object(&java_index)],
)?;
}

Ok(array_list)
}

fn create_java_index<'local>(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

env: &mut JNIEnv<'local>,
index_meta: &IndexMetadata,
) -> Result<JObject<'local>> {
// Create UUID
let uuid_str = index_meta.uuid.to_string();
let uuid_obj = env
.call_static_method(
"java/util/UUID",
"fromString",
"(Ljava/lang/String;)Ljava/util/UUID;",
&[JValue::Object(&env.new_string(&uuid_str)?.into())],
)?
.l()?;

// Create fields list (List<Integer>)
let fields_list = env.new_object("java/util/ArrayList", "()V", &[])?;
for field in &index_meta.fields {
let int_obj = env
.call_static_method(
"java/lang/Integer",
"valueOf",
"(I)Ljava/lang/Integer;",
&[JValue::Int(*field)],
)?
.l()?;
env.call_method(
&fields_list,
"add",
"(Ljava/lang/Object;)Z",
&[JValue::Object(&int_obj)],
)?;
}

// Create name
let name = env.new_string(&index_meta.name)?;

// Create fragments list (List<Integer>) - can be null
let fragments_list = if let Some(bitmap) = &index_meta.fragment_bitmap {
let list = env.new_object("java/util/ArrayList", "()V", &[])?;
for frag_id in bitmap.iter() {
let int_obj = env
.call_static_method(
"java/lang/Integer",
"valueOf",
"(I)Ljava/lang/Integer;",
&[JValue::Int(frag_id as i32)],
)?
.l()?;
env.call_method(
&list,
"add",
"(Ljava/lang/Object;)Z",
&[JValue::Object(&int_obj)],
)?;
}
list
} else {
JObject::null()
};

// Create indexDetails byte array - can be null
let index_details_bytes = if let Some(details) = &index_meta.index_details {
let bytes = prost::Message::encode_to_vec(details.as_ref());
let byte_array = env.byte_array_from_slice(&bytes)?;
byte_array.into()
} else {
JObject::null()
};

// Create createdAt Instant - can be null
let created_at = if let Some(dt) = index_meta.created_at {
let millis = dt.timestamp_millis();
env.call_static_method(
"java/time/Instant",
"ofEpochMilli",
"(J)Ljava/time/Instant;",
&[JValue::Long(millis)],
)?
.l()?
} else {
JObject::null()
};

// Create baseId Integer - can be null
let base_id = if let Some(id) = index_meta.base_id {
env.call_static_method(
"java/lang/Integer",
"valueOf",
"(I)Ljava/lang/Integer;",
&[JValue::Int(id as i32)],
)?
.l()?
} else {
JObject::null()
};

// Determine index type from index_details type_url
let index_type = determine_index_type(env, &index_meta.index_details)?;

// Call Index.create() static method
let index_obj = env.call_static_method(
"org/lance/index/Index",
"create",
"(Ljava/util/UUID;Ljava/util/List;Ljava/lang/String;JLjava/util/List;[BILjava/time/Instant;Ljava/lang/Integer;Lorg/lance/index/IndexType;)Lorg/lance/index/Index;",
&[
JValue::Object(&uuid_obj),
JValue::Object(&fields_list),
JValue::Object(&name.into()),
JValue::Long(index_meta.dataset_version as i64),
JValue::Object(&fragments_list),
JValue::Object(&index_details_bytes),
JValue::Int(index_meta.index_version),
JValue::Object(&created_at),
JValue::Object(&base_id),
JValue::Object(&index_type),
],
)?.l()?;

Ok(index_obj)
}

fn determine_index_type<'local>(
env: &mut JNIEnv<'local>,
index_details: &Option<Arc<prost_types::Any>>,
) -> Result<JObject<'local>> {
let type_name = if let Some(details) = index_details {
// Extract type name from type_url (e.g., ".lance.index.BTreeIndexDetails" -> "BTREE")
let type_url = &details.type_url;
let type_part = type_url.split('.').next_back().unwrap_or("");
let lower = type_part.to_lowercase();

if lower.contains("btree") {
Some("BTREE")
} else if lower.contains("bitmap") {
Some("BITMAP")
} else if lower.contains("labellist") {
Some("LABEL_LIST")
} else if lower.contains("inverted") {
Some("INVERTED")
} else if lower.contains("ngram") {
Some("NGRAM")
} else if lower.contains("zonemap") {
Some("ZONEMAP")
} else if lower.contains("bloomfilter") {
Some("BLOOM_FILTER")
} else if lower.contains("ivfhnsw") {
if lower.contains("sq") {
Some("IVF_HNSW_SQ")
} else if lower.contains("pq") {
Some("IVF_HNSW_PQ")
} else {
Some("IVF_HNSW_FLAT")
}
} else if lower.contains("ivf") {
if lower.contains("sq") {
Some("IVF_SQ")
} else if lower.contains("pq") {
Some("IVF_PQ")
} else {
Some("IVF_FLAT")
}
} else if lower.contains("vector") {
Some("VECTOR")
} else {
None
}
} else {
None
};

match type_name {
Some(name) => {
let index_type = env
.get_static_field(
"org/lance/index/IndexType",
name,
"Lorg/lance/index/IndexType;",
)?
.l()?;
Ok(index_type)
}
None => Ok(JObject::null()),
}
}

#[no_mangle]
pub extern "system" fn Java_org_lance_Dataset_nativeCountIndexedRows(
mut env: JNIEnv,
java_dataset: JObject,
jindex_name: JString,
jfilter: JString,
jfragment_ids: JObject, // Optional<List<Integer>>
) -> jlong {
ok_or_throw_with_return!(
env,
inner_count_indexed_rows(&mut env, java_dataset, jindex_name, jfilter, jfragment_ids),
-1
)
}

fn inner_count_indexed_rows(
env: &mut JNIEnv,
java_dataset: JObject,
_jindex_name: JString,
jfilter: JString,
jfragment_ids: JObject, // Optional<List<Integer>>
) -> Result<i64> {
let filter: String = jfilter.extract(env)?;

// Extract optional fragment IDs
let fragment_ids: Option<Vec<u32>> = if env
.call_method(&jfragment_ids, "isPresent", "()Z", &[])?
.z()?
{
let list_obj = env
.call_method(&jfragment_ids, "get", "()Ljava/lang/Object;", &[])?
.l()?;
let list = env.get_list(&list_obj)?;
let mut ids = Vec::new();
let mut iter = list.iter(env)?;
while let Some(elem) = iter.next(env)? {
let int_val = env.call_method(&elem, "intValue", "()I", &[])?.i()?;
ids.push(int_val as u32);
}
Some(ids)
} else {
None
};

let count = {
let dataset_guard =
unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }?;

// Use a scanner with fragment filtering to count rows
// This ensures we only count rows in the specified fragments
let inner = dataset_guard.inner.clone();

RT.block_on(async {
let mut scanner = inner.scan();

// Apply filter
if !filter.is_empty() {
scanner.filter(&filter)?;
}

// Empty projection and enable row_id for count_rows to work
// count_rows() requires metadata-only projection
scanner.project::<String>(&[])?;
scanner.with_row_id();

// Apply fragment filter if specified
if let Some(frag_ids) = fragment_ids {
// Convert FileFragment to Fragment by extracting metadata
let filtered_fragments: Vec<_> = inner
.get_fragments()
.into_iter()
.filter(|f| frag_ids.contains(&(f.id() as u32)))
.map(|f| f.metadata().clone())
.collect();
scanner.with_fragments(filtered_fragments);
}

// Use the scanner's count_rows method
let count = scanner.count_rows().await?;

Ok::<i64, lance::Error>(count as i64)
})?
};

Ok(count)
}
39 changes: 39 additions & 0 deletions java/src/main/java/org/lance/Dataset.java
Original file line number Diff line number Diff line change
Expand Up @@ -834,6 +834,31 @@ public long countRows(String filter) {

private native long nativeCountRows(Optional<String> filter);

/**
* Count rows matching a filter using a specific scalar index. This directly queries the index and
* counts matching row addresses, which is more efficient than scanning when the index covers the
* filter column.
*
* @param indexName the name of the scalar index to use
* @param filter the filter expression (e.g., "column = 5")
* @param fragmentIds optional list of fragment IDs to restrict the count to
* @return count of matching rows
*/
public long countIndexedRows(
String indexName, String filter, Optional<List<Integer>> fragmentIds) {
try (LockManager.ReadLock readLock = lockManager.acquireReadLock()) {
Preconditions.checkArgument(nativeDatasetHandle != 0, "Dataset is closed");
Preconditions.checkArgument(
indexName != null && !indexName.isEmpty(), "indexName cannot be null or empty");
Preconditions.checkArgument(
filter != null && !filter.isEmpty(), "filter cannot be null or empty");
return nativeCountIndexedRows(indexName, filter, fragmentIds);
}
}

private native long nativeCountIndexedRows(
String indexName, String filter, Optional<List<Integer>> fragmentIds);

/**
* Calculate the size of the dataset.
*
Expand Down Expand Up @@ -928,6 +953,20 @@ public List<String> listIndexes() {

private native List<String> nativeListIndexes();

/**
* Get all indexes with full metadata.
*
* @return list of Index objects with complete metadata including index type and fragment coverage
*/
public List<org.lance.index.Index> getIndexes() {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could use the short name and import it

try (LockManager.ReadLock readLock = lockManager.acquireReadLock()) {
Preconditions.checkArgument(nativeDatasetHandle != 0, "Dataset is closed");
return nativeGetIndexes();
}
}

private native List<org.lance.index.Index> nativeGetIndexes();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above


/**
* Get the table config of the dataset.
*
Expand Down
Loading