Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 87 additions & 4 deletions java/lance-jni/src/blocking_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@ use std::sync::Arc;

use crate::error::{Error, Result};
use crate::ffi::JNIEnvExt;
use crate::traits::{import_vec_from_method, import_vec_to_rust};
use crate::traits::{export_vec, import_vec_from_method, import_vec_to_rust, IntoJava};
use arrow::array::Float32Array;
use arrow::{ffi::FFI_ArrowSchema, ffi_stream::FFI_ArrowArrayStream};
use arrow_schema::SchemaRef;
use jni::objects::{JObject, JString};
use jni::objects::{JObject, JString, JValueGen};
use jni::sys::{jboolean, jint, JNI_TRUE};
use jni::{sys::jlong, JNIEnv};
use lance::dataset::scanner::{ColumnOrdering, DatasetRecordBatchStream, Scanner};
use lance::dataset::scanner::{
ColumnOrdering, DatasetRecordBatchStream, Scanner, Split, SplitFragment, SplitOptions,
};
use lance_index::scalar::inverted::query::{
BooleanQuery as FtsBooleanQuery, BoostQuery as FtsBoostQuery, FtsQuery,
MatchQuery as FtsMatchQuery, MultiMatchQuery as FtsMultiMatchQuery, Occur as FtsOccur,
Expand All @@ -24,7 +26,6 @@ use lance_linalg::distance::DistanceType;

use crate::{
blocking_dataset::{BlockingDataset, NATIVE_DATASET},
traits::IntoJava,
RT,
};

Expand Down Expand Up @@ -56,6 +57,11 @@ impl BlockingScanner {
let res = RT.block_on(self.inner.count_rows())?;
Ok(res)
}

pub fn plan_splits(&self, options: Option<SplitOptions>) -> Result<Vec<Split>> {
let res = RT.block_on(self.inner.plan_splits(options))?;
Ok(res)
}
}

fn build_full_text_search_query<'a>(env: &mut JNIEnv<'a>, java_obj: JObject) -> Result<FtsQuery> {
Expand Down Expand Up @@ -481,3 +487,80 @@ fn inner_count_rows(env: &mut JNIEnv, j_scanner: JObject) -> Result<u64> {
unsafe { env.get_rust_field::<_, _, BlockingScanner>(j_scanner, NATIVE_SCANNER) }?;
scanner_guard.count_rows()
}

#[no_mangle]
pub extern "system" fn Java_org_lance_ipc_LanceScanner_nativePlanSplits<'local>(
mut env: JNIEnv<'local>,
j_scanner: JObject,
options_obj: JObject, // Optional<SplitOptions>
) -> JObject<'local> {
ok_or_throw!(env, inner_plan_splits(&mut env, j_scanner, options_obj))
}

fn inner_plan_splits<'local>(
env: &mut JNIEnv<'local>,
j_scanner: JObject,
options_obj: JObject,
) -> Result<JObject<'local>> {
let options = extract_split_options(env, &options_obj)?;
let splits = {
let scanner_guard =
unsafe { env.get_rust_field::<_, _, BlockingScanner>(j_scanner, NATIVE_SCANNER) }?;
scanner_guard.plan_splits(options)?
};
export_vec(env, &splits)
}

fn extract_split_options(env: &mut JNIEnv, options_obj: &JObject) -> Result<Option<SplitOptions>> {
if options_obj.is_null() {
return Ok(None);
}

let is_present = env.call_method(options_obj, "isPresent", "()Z", &[])?.z()?;

if !is_present {
return Ok(None);
}

let options_inner = env
.call_method(options_obj, "get", "()Ljava/lang/Object;", &[])?
.l()?;

let max_size_bytes = env.get_optional_i64_from_method(&options_inner, "getMaxSizeBytes")?;
let max_row_count = env.get_optional_i64_from_method(&options_inner, "getMaxRowCount")?;

Ok(Some(SplitOptions {
max_size_bytes: max_size_bytes.map(|v| v as usize),
max_row_count: max_row_count.map(|v| v as usize),
}))
}

const SPLIT_CLASS: &str = "org/lance/ipc/Split";
const SPLIT_CONSTRUCTOR_SIG: &str = "(Ljava/util/List;)V";
const SPLIT_FRAGMENT_CLASS: &str = "org/lance/ipc/SplitFragment";
const SPLIT_FRAGMENT_CONSTRUCTOR_SIG: &str = "(Lorg/lance/FragmentMetadata;J)V";

impl IntoJava for &SplitFragment {
fn into_java<'a>(self, env: &mut JNIEnv<'a>) -> Result<JObject<'a>> {
let fragment = self.fragment.into_java(env)?;
Ok(env.new_object(
SPLIT_FRAGMENT_CLASS,
SPLIT_FRAGMENT_CONSTRUCTOR_SIG,
&[
JValueGen::Object(&fragment),
JValueGen::Long(self.max_row_count as i64),
],
)?)
}
}

impl IntoJava for &Split {
fn into_java<'a>(self, env: &mut JNIEnv<'a>) -> Result<JObject<'a>> {
let fragments = export_vec(env, &self.fragments)?;
Ok(env.new_object(
SPLIT_CLASS,
SPLIT_CONSTRUCTOR_SIG,
&[JValueGen::Object(&fragments)],
)?)
}
}
34 changes: 34 additions & 0 deletions java/src/main/java/org/lance/ipc/LanceScanner.java
Original file line number Diff line number Diff line change
Expand Up @@ -164,4 +164,38 @@ public long countRows() {
}

private native long nativeCountRows();

/**
* Plan splits for parallel scanning of the dataset.
*
* <p>Splits can be used to distribute scanning work across multiple workers or threads. Each
* split contains one or more fragments that can be scanned together.
*
* @return a list of splits for parallel scanning
*/
public List<Split> planSplits() {
return planSplits(Optional.empty());
}

/**
* Plan splits for parallel scanning of the dataset with custom options.
*
* <p>Splits can be used to distribute scanning work across multiple workers or threads. Each
* split contains one or more fragments that can be scanned together.
*
* @param options options for configuring split generation
* @return a list of splits for parallel scanning
*/
public List<Split> planSplits(SplitOptions options) {
return planSplits(Optional.ofNullable(options));
}

private List<Split> planSplits(Optional<SplitOptions> options) {
try (LockManager.ReadLock readLock = lockManager.acquireReadLock()) {
Preconditions.checkArgument(nativeScannerHandle != 0, "Scanner is closed");
return nativePlanSplits(options);
}
}

private native List<Split> nativePlanSplits(Optional<SplitOptions> options);
}
71 changes: 71 additions & 0 deletions java/src/main/java/org/lance/ipc/Split.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.lance.ipc;

import com.google.common.base.MoreObjects;

import java.io.Serializable;
import java.util.List;
import java.util.Objects;

/**
* Represents a split for parallel scanning of fragments.
*
* <p>A split contains one or more fragments that can be scanned together. Splits can be used to
* distribute scanning work across multiple workers or threads.
*/
public class Split implements Serializable {
private static final long serialVersionUID = 1L;
private final List<SplitFragment> fragments;

/**
* Creates a new Split.
*
* @param fragments the list of fragments in this split
*/
public Split(List<SplitFragment> fragments) {
this.fragments = fragments;
}

/**
* Returns the list of fragments in this split.
*
* @return the list of split fragments
*/
public List<SplitFragment> getFragments() {
return fragments;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Split split = (Split) o;
return Objects.equals(fragments, split.fragments);
}

@Override
public int hashCode() {
return Objects.hash(fragments);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("fragments", fragments).toString();
}
}
87 changes: 87 additions & 0 deletions java/src/main/java/org/lance/ipc/SplitFragment.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.lance.ipc;

import org.lance.FragmentMetadata;

import com.google.common.base.MoreObjects;

import java.io.Serializable;
import java.util.Objects;

/**
* A fragment within a {@link Split}, along with metadata about the expected number of rows that
* will be scanned from it.
*/
public class SplitFragment implements Serializable {
private static final long serialVersionUID = 1L;
private final FragmentMetadata fragment;
private final long maxRowCount;

/**
* Creates a new SplitFragment.
*
* @param fragment the fragment metadata
* @param maxRowCount an upper bound on the number of rows that will be read from this fragment
* after applying any filters or index pruning
*/
public SplitFragment(FragmentMetadata fragment, long maxRowCount) {
this.fragment = fragment;
this.maxRowCount = maxRowCount;
}

/**
* Returns the fragment metadata.
*
* @return the fragment metadata
*/
public FragmentMetadata getFragment() {
return fragment;
}

/**
* Returns an upper bound on the number of rows that will be read from this fragment after
* applying any filters or index pruning.
*
* @return the maximum row count
*/
public long getMaxRowCount() {
return maxRowCount;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SplitFragment that = (SplitFragment) o;
return maxRowCount == that.maxRowCount && Objects.equals(fragment, that.fragment);
}

@Override
public int hashCode() {
return Objects.hash(fragment, maxRowCount);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("fragment", fragment)
.add("maxRowCount", maxRowCount)
.toString();
}
}
Loading
Loading