Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions .github/workflows/java.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,21 @@ jobs:
matrix:
java-version: [8, 11, 17]
name: Build and Test with Java ${{ matrix.java-version }}
services:
localstack:
image: localstack/localstack:4.0
ports:
- 4566:4566
env:
SERVICES: s3,dynamodb,kms
AWS_ACCESS_KEY_ID: ACCESS_KEY
AWS_SECRET_ACCESS_KEY: SECRET_KEY
options: >-
--health-cmd "curl -s http://localhost:4566/_localstack/health"
--health-interval 5s
--health-timeout 3s
--health-retries 3
--health-start-period 10s
steps:
- name: Install dependencies
run: |
Expand Down Expand Up @@ -87,5 +102,7 @@ jobs:
mvn spotless:check
- name: Running tests with Java ${{ matrix.java-version }}
working-directory: java
env:
LANCE_INTEGRATION_TEST: "1"
run: |
mvn install
Binary file not shown.
3 changes: 3 additions & 0 deletions java/lance-jni/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions java/lance-jni/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ lance-encoding = { path = "../../rust/lance-encoding" }
lance-linalg = { path = "../../rust/lance-linalg" }
lance-index = { path = "../../rust/lance-index" }
lance-io = { path = "../../rust/lance-io" }
lance-namespace = { path = "../../rust/lance-namespace" }
lance-core = { path = "../../rust/lance-core" }
lance-file = { path = "../../rust/lance-file" }
arrow = { version = "56.1", features = ["ffi"] }
Expand All @@ -30,6 +31,8 @@ tokio = { version = "1.23", features = [
"fs",
"sync",
] }
async-trait = "0.1"
snafu = "0.8"
jni = "0.21.1"
serde_json = { version = "1" }
log = "0.4"
Expand Down
104 changes: 95 additions & 9 deletions java/lance-jni/src/blocking_dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use crate::error::{Error, Result};
use crate::ffi::JNIEnvExt;
use crate::storage_options::JavaStorageOptionsProvider;
use crate::traits::{export_vec, import_vec, FromJObjectWithEnv, FromJString};
use crate::utils::{
build_compaction_options, extract_storage_options, extract_write_params,
Expand Down Expand Up @@ -38,6 +39,7 @@ use lance_core::datatypes::Schema as LanceSchema;
use lance_index::DatasetIndexExt;
use lance_index::{IndexParams, IndexType};
use lance_io::object_store::ObjectStoreRegistry;
use lance_io::object_store::StorageOptionsProvider;
use std::collections::HashMap;
use std::iter::empty;
use std::str::FromStr;
Expand Down Expand Up @@ -77,6 +79,7 @@ impl BlockingDataset {
Ok(Self { inner })
}

#[allow(clippy::too_many_arguments)]
pub fn open(
uri: &str,
version: Option<i32>,
Expand All @@ -85,14 +88,25 @@ impl BlockingDataset {
metadata_cache_size_bytes: i64,
storage_options: HashMap<String, String>,
serialized_manifest: Option<&[u8]>,
storage_options_provider: Option<Arc<dyn StorageOptionsProvider>>,
s3_credentials_refresh_offset_seconds: Option<u64>,
) -> Result<Self> {
let mut store_params = ObjectStoreParams {
block_size: block_size.map(|size| size as usize),
storage_options: Some(storage_options.clone()),
..Default::default()
};
if let Some(offset_seconds) = s3_credentials_refresh_offset_seconds {
store_params.s3_credentials_refresh_offset =
std::time::Duration::from_secs(offset_seconds);
}
if let Some(provider) = storage_options_provider.clone() {
store_params.storage_options_provider = Some(provider);
}
let params = ReadParams {
index_cache_size_bytes: index_cache_size_bytes as usize,
metadata_cache_size_bytes: metadata_cache_size_bytes as usize,
store_options: Some(ObjectStoreParams {
block_size: block_size.map(|size| size as usize),
..Default::default()
}),
store_options: Some(store_params),
..Default::default()
};

Expand All @@ -102,6 +116,13 @@ impl BlockingDataset {
builder = builder.with_version(ver as u64);
}
builder = builder.with_storage_options(storage_options);
if let Some(provider) = storage_options_provider {
builder = builder.with_storage_options_provider(provider)
}
if let Some(offset_seconds) = s3_credentials_refresh_offset_seconds {
builder = builder
.with_s3_credentials_refresh_offset(std::time::Duration::from_secs(offset_seconds));
}

if let Some(serialized_manifest) = serialized_manifest {
builder = builder.with_serialized_manifest(serialized_manifest)?;
Expand Down Expand Up @@ -743,8 +764,10 @@ pub extern "system" fn Java_com_lancedb_lance_Dataset_openNative<'local>(
block_size_obj: JObject, // Optional<Integer>
index_cache_size_bytes: jlong,
metadata_cache_size_bytes: jlong,
storage_options_obj: JObject, // Map<String, String>
serialized_manifest: JObject, // Optional<ByteBuffer>
storage_options_obj: JObject, // Map<String, String>
serialized_manifest: JObject, // Optional<ByteBuffer>
storage_options_provider_obj: JObject, // Optional<StorageOptionsProvider>
s3_credentials_refresh_offset_seconds_obj: JObject, // Optional<Long>
) -> JObject<'local> {
ok_or_throw!(
env,
Expand All @@ -756,7 +779,9 @@ pub extern "system" fn Java_com_lancedb_lance_Dataset_openNative<'local>(
index_cache_size_bytes,
metadata_cache_size_bytes,
storage_options_obj,
serialized_manifest
serialized_manifest,
storage_options_provider_obj,
s3_credentials_refresh_offset_seconds_obj
)
)
}
Expand All @@ -769,14 +794,73 @@ fn inner_open_native<'local>(
block_size_obj: JObject, // Optional<Integer>
index_cache_size_bytes: jlong,
metadata_cache_size_bytes: jlong,
storage_options_obj: JObject, // Map<String, String>
serialized_manifest: JObject, // Optional<ByteBuffer>
storage_options_obj: JObject, // Map<String, String>
serialized_manifest: JObject, // Optional<ByteBuffer>
storage_options_provider_obj: JObject, // Optional<StorageOptionsProvider>
s3_credentials_refresh_offset_seconds_obj: JObject, // Optional<Long>
) -> Result<JObject<'local>> {
let path_str: String = path.extract(env)?;
let version = env.get_int_opt(&version_obj)?;
let block_size = env.get_int_opt(&block_size_obj)?;
let jmap = JMap::from_env(env, &storage_options_obj)?;
let storage_options = to_rust_map(env, &jmap)?;

// Extract storage options provider first (before get_bytes_opt which borrows env)
let storage_options_provider = if !storage_options_provider_obj.is_null() {
// Check if it's an Optional.empty()
let is_present = env
.call_method(&storage_options_provider_obj, "isPresent", "()Z", &[])?
.z()?;
if is_present {
// Get the value from Optional
let provider_obj = env
.call_method(
&storage_options_provider_obj,
"get",
"()Ljava/lang/Object;",
&[],
)?
.l()?;
Some(JavaStorageOptionsProvider::new(env, provider_obj)?)
} else {
None
}
} else {
None
};

let storage_options_provider_arc =
storage_options_provider.map(|v| Arc::new(v) as Arc<dyn StorageOptionsProvider>);

// Extract s3_credentials_refresh_offset_seconds
let s3_credentials_refresh_offset_seconds =
if !s3_credentials_refresh_offset_seconds_obj.is_null() {
let is_present = env
.call_method(
&s3_credentials_refresh_offset_seconds_obj,
"isPresent",
"()Z",
&[],
)?
.z()?;
if is_present {
let value = env
.call_method(
&s3_credentials_refresh_offset_seconds_obj,
"get",
"()Ljava/lang/Object;",
&[],
)?
.l()?;
let long_value = env.call_method(&value, "longValue", "()J", &[])?.j()?;
Some(long_value as u64)
} else {
None
}
} else {
None
};

let serialized_manifest = env.get_bytes_opt(&serialized_manifest)?;
let dataset = BlockingDataset::open(
&path_str,
Expand All @@ -786,6 +870,8 @@ fn inner_open_native<'local>(
metadata_cache_size_bytes,
storage_options,
serialized_manifest,
storage_options_provider_arc,
s3_credentials_refresh_offset_seconds,
)?;
dataset.into_java(env)
}
Expand Down
2 changes: 2 additions & 0 deletions java/lance-jni/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,15 @@ mod merge_insert;
mod optimize;
mod schema;
mod sql;
mod storage_options;
pub mod traits;
mod transaction;
pub mod utils;

pub use error::Error;
pub use error::Result;
pub use ffi::JNIEnvExt;
pub use storage_options::JavaStorageOptionsProvider;

use env_logger::{Builder, Env};
use std::env;
Expand Down
Loading
Loading