Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 163 additions & 12 deletions java/lance-jni/src/namespace.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,121 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

use std::collections::HashMap;
use std::sync::Arc;

use bytes::Bytes;
use jni::objects::{JByteArray, JMap, JObject, JString};
use jni::objects::{GlobalRef, JByteArray, JMap, JObject, JString, JValue};
use jni::sys::{jbyteArray, jlong, jstring};
use jni::JNIEnv;
use lance_namespace::models::*;
use lance_namespace::LanceNamespace as LanceNamespaceTrait;
use lance_namespace_impls::{
ConnectBuilder, DirectoryNamespace, DirectoryNamespaceBuilder, RestAdapter, RestAdapterConfig,
RestNamespace, RestNamespaceBuilder,
ConnectBuilder, DirectoryNamespace, DirectoryNamespaceBuilder, DynamicContextProvider,
OperationInfo, RestAdapter, RestAdapterConfig, RestNamespace, RestNamespaceBuilder,
};
use serde::{Deserialize, Serialize};
use std::sync::Arc;

use crate::error::{Error, Result};
use crate::utils::to_rust_map;
use crate::RT;

/// Java-implemented dynamic context provider.
///
/// Wraps a Java object that implements the DynamicContextProvider interface.
pub struct JavaDynamicContextProvider {
java_provider: GlobalRef,
jvm: Arc<jni::JavaVM>,
}

impl JavaDynamicContextProvider {
/// Create a new Java context provider wrapper.
pub fn new(env: &mut JNIEnv, java_provider: &JObject) -> Result<Self> {
let java_provider = env.new_global_ref(java_provider)?;
let jvm = Arc::new(env.get_java_vm()?);
Ok(Self { java_provider, jvm })
}
}

impl std::fmt::Debug for JavaDynamicContextProvider {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "JavaDynamicContextProvider")
}
}

impl DynamicContextProvider for JavaDynamicContextProvider {
fn provide_context(&self, info: &OperationInfo) -> HashMap<String, String> {
// Attach to JVM
let mut env = match self.jvm.attach_current_thread() {
Ok(env) => env,
Err(e) => {
log::error!("Failed to attach to JVM: {}", e);
return HashMap::new();
}
};

// Create Java strings for parameters
let operation = match env.new_string(&info.operation) {
Ok(s) => s,
Err(e) => {
log::error!("Failed to create operation string: {}", e);
return HashMap::new();
}
};

let object_id = match env.new_string(&info.object_id) {
Ok(s) => s,
Err(e) => {
log::error!("Failed to create object_id string: {}", e);
return HashMap::new();
}
};

// Call provideContext(String, String) -> Map<String, String>
let result = env.call_method(
&self.java_provider,
"provideContext",
"(Ljava/lang/String;Ljava/lang/String;)Ljava/util/Map;",
&[JValue::Object(&operation), JValue::Object(&object_id)],
);

match result {
Ok(jvalue) => match jvalue.l() {
Ok(obj) if !obj.is_null() => {
// Convert Java Map to Rust HashMap
convert_java_map_to_hashmap(&mut env, &obj).unwrap_or_default()
}
Ok(_) => HashMap::new(),
Err(e) => {
log::error!("provideContext did not return object: {}", e);
HashMap::new()
}
},
Err(e) => {
log::error!("Failed to call provideContext: {}", e);
HashMap::new()
}
}
}
}

fn convert_java_map_to_hashmap(
env: &mut JNIEnv,
map_obj: &JObject,
) -> Result<HashMap<String, String>> {
let jmap = JMap::from_env(env, map_obj)?;
let mut result = HashMap::new();

let mut iter = jmap.iter(env)?;
while let Some((key, value)) = iter.next(env)? {
let key_str: String = env.get_string(&JString::from(key))?.into();
let value_str: String = env.get_string(&JString::from(value))?.into();
result.insert(key_str, value_str);
}

Ok(result)
}

/// Blocking wrapper for DirectoryNamespace
pub struct BlockingDirectoryNamespace {
pub(crate) inner: DirectoryNamespace,
Expand All @@ -40,20 +138,47 @@ pub extern "system" fn Java_org_lance_namespace_DirectoryNamespace_createNative(
) -> jlong {
ok_or_throw_with_return!(
env,
create_directory_namespace_internal(&mut env, properties_map),
create_directory_namespace_internal(&mut env, properties_map, None),
0
)
}

fn create_directory_namespace_internal(env: &mut JNIEnv, properties_map: JObject) -> Result<jlong> {
#[no_mangle]
pub extern "system" fn Java_org_lance_namespace_DirectoryNamespace_createNativeWithProvider(
mut env: JNIEnv,
_obj: JObject,
properties_map: JObject,
context_provider: JObject,
) -> jlong {
ok_or_throw_with_return!(
env,
create_directory_namespace_internal(&mut env, properties_map, Some(context_provider)),
0
)
}

fn create_directory_namespace_internal(
env: &mut JNIEnv,
properties_map: JObject,
context_provider: Option<JObject>,
) -> Result<jlong> {
// Convert Java HashMap to Rust HashMap
let jmap = JMap::from_env(env, &properties_map)?;
let properties = to_rust_map(env, &jmap)?;

// Build DirectoryNamespace using builder
let builder = DirectoryNamespaceBuilder::from_properties(properties, None).map_err(|e| {
Error::runtime_error(format!("Failed to create DirectoryNamespaceBuilder: {}", e))
})?;
let mut builder =
DirectoryNamespaceBuilder::from_properties(properties, None).map_err(|e| {
Error::runtime_error(format!("Failed to create DirectoryNamespaceBuilder: {}", e))
})?;

// Add context provider if provided
if let Some(provider_obj) = context_provider {
if !provider_obj.is_null() {
let java_provider = JavaDynamicContextProvider::new(env, &provider_obj)?;
builder = builder.context_provider(Arc::new(java_provider));
}
}

let namespace = RT
.block_on(builder.build())
Expand Down Expand Up @@ -537,21 +662,47 @@ pub extern "system" fn Java_org_lance_namespace_RestNamespace_createNative(
) -> jlong {
ok_or_throw_with_return!(
env,
create_rest_namespace_internal(&mut env, properties_map),
create_rest_namespace_internal(&mut env, properties_map, None),
0
)
}

fn create_rest_namespace_internal(env: &mut JNIEnv, properties_map: JObject) -> Result<jlong> {
#[no_mangle]
pub extern "system" fn Java_org_lance_namespace_RestNamespace_createNativeWithProvider(
mut env: JNIEnv,
_obj: JObject,
properties_map: JObject,
context_provider: JObject,
) -> jlong {
ok_or_throw_with_return!(
env,
create_rest_namespace_internal(&mut env, properties_map, Some(context_provider)),
0
)
}

fn create_rest_namespace_internal(
env: &mut JNIEnv,
properties_map: JObject,
context_provider: Option<JObject>,
) -> Result<jlong> {
// Convert Java HashMap to Rust HashMap
let jmap = JMap::from_env(env, &properties_map)?;
let properties = to_rust_map(env, &jmap)?;

// Build RestNamespace using builder
let builder = RestNamespaceBuilder::from_properties(properties).map_err(|e| {
let mut builder = RestNamespaceBuilder::from_properties(properties).map_err(|e| {
Error::runtime_error(format!("Failed to create RestNamespaceBuilder: {}", e))
})?;

// Add context provider if provided
if let Some(provider_obj) = context_provider {
if !provider_obj.is_null() {
let java_provider = JavaDynamicContextProvider::new(env, &provider_obj)?;
builder = builder.context_provider(Arc::new(java_provider));
}
}

let namespace = builder.build();

let blocking_namespace = BlockingRestNamespace { inner: namespace };
Expand Down
113 changes: 112 additions & 1 deletion java/src/main/java/org/lance/namespace/DirectoryNamespace.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
import org.apache.arrow.memory.BufferAllocator;

import java.io.Closeable;
import java.lang.reflect.Constructor;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

/**
* DirectoryNamespace implementation that provides Lance namespace functionality for directory-based
Expand Down Expand Up @@ -149,11 +152,43 @@ public DirectoryNamespace() {}

@Override
public void initialize(Map<String, String> configProperties, BufferAllocator allocator) {
initialize(configProperties, allocator, null);
}

/**
* Initialize with a dynamic context provider.
*
* <p>If contextProvider is null and the properties contain {@code dynamic_context_provider.impl},
* the provider will be loaded from the class path. The class must implement {@link
* DynamicContextProvider} and have a constructor accepting {@code Map<String, String>}.
*
* @param configProperties Configuration properties for the namespace
* @param allocator Arrow buffer allocator
* @param contextProvider Optional provider for per-request context (e.g., dynamic auth headers)
*/
public void initialize(
Map<String, String> configProperties,
BufferAllocator allocator,
DynamicContextProvider contextProvider) {
if (this.nativeDirectoryNamespaceHandle != 0) {
throw new IllegalStateException("DirectoryNamespace already initialized");
}
this.allocator = allocator;
this.nativeDirectoryNamespaceHandle = createNative(configProperties);

// If no explicit provider, try to create from properties
DynamicContextProvider provider = contextProvider;
if (provider == null) {
provider = createProviderFromProperties(configProperties).orElse(null);
}

// Filter out provider properties before passing to native layer
Map<String, String> filteredProperties = filterProviderProperties(configProperties);

if (provider != null) {
this.nativeDirectoryNamespaceHandle = createNativeWithProvider(filteredProperties, provider);
} else {
this.nativeDirectoryNamespaceHandle = createNative(filteredProperties);
}
}

@Override
Expand Down Expand Up @@ -399,6 +434,9 @@ private static <T> T fromJson(String json, Class<T> clazz) {
// Native methods
private native long createNative(Map<String, String> properties);

private native long createNativeWithProvider(
Map<String, String> properties, DynamicContextProvider contextProvider);

private native void releaseNative(long handle);

private native String namespaceIdNative(long handle);
Expand Down Expand Up @@ -453,4 +491,77 @@ private native String mergeInsertIntoTableNative(
private native String describeTransactionNative(long handle, String requestJson);

private native String alterTransactionNative(long handle, String requestJson);

// ==========================================================================
// Provider loading helpers
// ==========================================================================

private static final String PROVIDER_PREFIX = "dynamic_context_provider.";
private static final String IMPL_KEY = "dynamic_context_provider.impl";

/**
* Create a context provider from properties if configured.
*
* <p>Loads the class specified by {@code dynamic_context_provider.impl} from the class path and
* instantiates it with the extracted provider properties.
*/
private static Optional<DynamicContextProvider> createProviderFromProperties(
Map<String, String> properties) {
String className = properties.get(IMPL_KEY);
if (className == null || className.isEmpty()) {
return Optional.empty();
}

// Extract provider-specific properties (strip prefix, exclude impl key)
Map<String, String> providerProps = new HashMap<>();
for (Map.Entry<String, String> entry : properties.entrySet()) {
String key = entry.getKey();
if (key.startsWith(PROVIDER_PREFIX) && !key.equals(IMPL_KEY)) {
String propName = key.substring(PROVIDER_PREFIX.length());
providerProps.put(propName, entry.getValue());
}
}

try {
Class<?> providerClass = Class.forName(className);
if (!DynamicContextProvider.class.isAssignableFrom(providerClass)) {
throw new IllegalArgumentException(
String.format(
"Class '%s' does not implement DynamicContextProvider interface", className));
}

@SuppressWarnings("unchecked")
Class<? extends DynamicContextProvider> typedClass =
(Class<? extends DynamicContextProvider>) providerClass;

Constructor<? extends DynamicContextProvider> constructor =
typedClass.getConstructor(Map.class);
return Optional.of(constructor.newInstance(providerProps));

} catch (ClassNotFoundException e) {
throw new IllegalArgumentException(
String.format("Failed to load context provider class '%s': %s", className, e), e);
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException(
String.format(
"Context provider class '%s' must have a public constructor "
+ "that accepts Map<String, String>",
className),
e);
} catch (ReflectiveOperationException e) {
throw new IllegalArgumentException(
String.format("Failed to instantiate context provider '%s': %s", className, e), e);
}
}

/** Filter out dynamic_context_provider.* properties from the map. */
private static Map<String, String> filterProviderProperties(Map<String, String> properties) {
Map<String, String> filtered = new HashMap<>();
for (Map.Entry<String, String> entry : properties.entrySet()) {
if (!entry.getKey().startsWith(PROVIDER_PREFIX)) {
filtered.put(entry.getKey(), entry.getValue());
}
}
return filtered;
}
}
Loading