Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions java/lance-jni/src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,28 @@ pub struct Dispatcher {
}

impl Dispatcher {
/// Initialize the dispatcher with a persistent JNI thread
pub fn initialize(jvm: Arc<JavaVM>) -> Arc<Self> {
/// Initialize the dispatcher with a persistent JNI thread.
///
/// `async_scanner_class` must be a `GlobalRef` to the `AsyncScanner`
/// Java class, resolved on a thread that has the correct application
/// classloader (typically the `JNI_OnLoad` thread).
pub fn initialize(jvm: Arc<JavaVM>, async_scanner_class: GlobalRef) -> Arc<Self> {
let (tx, mut rx) = mpsc::unbounded_channel::<DispatcherMessage>();

// Spawn persistent dispatcher thread
std::thread::Builder::new()
.name("lance-jni-dispatcher".to_string())
.spawn(move || {
// Attach ONCE and never detach - this is the key optimization
// Attach as daemon so this thread does not prevent JVM shutdown
let mut env = jvm
.attach_current_thread_permanently()
.expect("Failed to attach dispatcher to JVM");
.attach_current_thread_as_daemon()
.expect("Failed to attach dispatcher to JVM as daemon");

log::info!("JNI dispatcher thread started");

// Cache method IDs for completeTask and failTask
let async_scanner_class = env
.find_class("org/lance/ipc/AsyncScanner")
.expect("AsyncScanner class not found");
// Cache method IDs for completeTask and failTask.
// Use the pre-resolved GlobalRef instead of find_class to
// avoid classloader issues on this native thread.
let complete_method = env
.get_method_id(&async_scanner_class, "completeTask", "(JJ)V")
.expect("completeTask method not found");
Expand Down
21 changes: 19 additions & 2 deletions java/lance-jni/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,27 @@ pub extern "system" fn JNI_OnLoad(
vm: jni::JavaVM,
_reserved: *mut std::ffi::c_void,
) -> jni::sys::jint {
// Resolve AsyncScanner class on the current thread which has the correct
// application classloader. A newly spawned native thread only gets the
// system classloader after attach_current_thread_permanently(), which
// cannot find application classes in environments like Spark, web
// containers, or shaded JARs.
let mut env = vm
.get_env()
.expect("Failed to get JNIEnv in JNI_OnLoad");
let async_scanner_local = env
.find_class("org/lance/ipc/AsyncScanner")
.expect("AsyncScanner class not found");
let async_scanner_class = env
.new_global_ref(async_scanner_local)
.expect("Failed to create GlobalRef for AsyncScanner class");

let jvm_arc = Arc::new(vm);

// Initialize global dispatcher with persistent thread
let dispatcher = dispatcher::Dispatcher::initialize(jvm_arc);
// Initialize global dispatcher with persistent thread, passing the
// pre-resolved class reference so the dispatcher thread does not need
// to look up the class with the wrong classloader.
let dispatcher = dispatcher::Dispatcher::initialize(jvm_arc, async_scanner_class);

// Set the global DISPATCHER (will panic if called more than once)
dispatcher::DISPATCHER
Expand Down
232 changes: 232 additions & 0 deletions java/src/test/java/org/lance/AsyncScannerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,18 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
Expand Down Expand Up @@ -263,6 +270,98 @@ void testAsyncScanWithProjection(@TempDir Path tempDir) throws Exception {
}
}

/**
* Regression test for classloader isolation.
*
* <p>Verifies that AsyncScanner works correctly when the calling thread has an isolated
* (non-system) context classloader, simulating environments like Spark executors, web containers,
* or shaded JARs where application classes are loaded by a custom classloader.
*
* <p>The fix under test moved class resolution from the JNI dispatcher thread (which only has the
* system classloader after attach_current_thread_permanently()) to JNI_OnLoad (which has the
* correct application classloader), passing a GlobalRef to the dispatcher.
*/
@Test
void testAsyncScanWithIsolatedClassloader(@TempDir Path tempDir) throws Exception {
String datasetPath = tempDir.resolve("async_scanner_classloader").toString();
try (BufferAllocator allocator = new RootAllocator()) {
TestUtils.SimpleTestDataset testDataset =
new TestUtils.SimpleTestDataset(allocator, datasetPath);
testDataset.createEmptyDataset().close();
int totalRows = 40;

try (Dataset dataset = testDataset.write(1, totalRows)) {
ScanOptions options = new ScanOptions.Builder().batchSize(20L).build();

// Use a classloader that delegates only to the bootstrap classloader,
// making it unable to find any org.lance.* classes on its own.
ClassLoader originalCl = Thread.currentThread().getContextClassLoader();
ClassLoader restrictedCl = new ClassLoader(null) {
// Intentionally empty: parent is null (bootstrap only)
};

// --- Part 1: Swap context classloader on the current thread ---
try {
Thread.currentThread().setContextClassLoader(restrictedCl);

try (AsyncScanner scanner = AsyncScanner.create(dataset, options, allocator)) {
CompletableFuture<ArrowReader> future = scanner.scanBatchesAsync();
ArrowReader reader = future.get(10, TimeUnit.SECONDS);
assertNotNull(reader);

int rowCount = 0;
while (reader.loadNextBatch()) {
VectorSchemaRoot root = reader.getVectorSchemaRoot();
rowCount += root.getRowCount();
}

assertEquals(
totalRows, rowCount, "Async scan should succeed despite restrictive classloader");
reader.close();
}
} finally {
Thread.currentThread().setContextClassLoader(originalCl);
}

// --- Part 2: Run from a thread with an isolated classloader ---
// Simulates Spark executor threads that have a non-system classloader.
ExecutorService isolatedExecutor =
Executors.newSingleThreadExecutor(
r -> {
Thread t = new Thread(r, "isolated-classloader-thread");
t.setContextClassLoader(restrictedCl);
return t;
});
try {
CompletableFuture<Integer> result =
CompletableFuture.supplyAsync(
() -> {
try (AsyncScanner scanner = AsyncScanner.create(dataset, options, allocator)) {
CompletableFuture<ArrowReader> future = scanner.scanBatchesAsync();
ArrowReader reader = future.get(10, TimeUnit.SECONDS);
int rowCount = 0;
while (reader.loadNextBatch()) {
rowCount += reader.getVectorSchemaRoot().getRowCount();
}
reader.close();
return rowCount;
} catch (Exception e) {
throw new RuntimeException(e);
}
},
isolatedExecutor);

assertEquals(
totalRows,
result.get(15, TimeUnit.SECONDS),
"Async scan from isolated classloader thread should succeed");
} finally {
isolatedExecutor.shutdown();
}
}
}
}

/**
* Example 6: Using thenCompose for sequential async operations.
*
Expand Down Expand Up @@ -308,4 +407,137 @@ void testAsyncChaining(@TempDir Path tempDir) throws Exception {
}
}
}

/**
* Regression test: reproduce the JNI classloader bug via a forked JVM.
*
* <p>Launches a child JVM where only {@code target/test-classes/} is on the system classpath. All
* lance classes are loaded by an isolated {@link URLClassLoader} with a null parent (bootstrap
* classloader only). This means JNI's {@code FindClass} on a native thread attached via {@code
* attach_current_thread_permanently()} will use the system classloader, which <b>cannot</b> find
* {@code org.lance.ipc.AsyncScanner}.
*
* <p>With the fix, {@code JNI_OnLoad} pre-resolves the class on the loading thread (which has the
* correct classloader) and passes a {@code GlobalRef} to the dispatcher — so the dispatcher never
* calls {@code FindClass}.
*/
@Test
void testClassloaderIsolationWithForkedJvm(@TempDir Path tempDir) throws Exception {
// --- Collect full classpath from the current classloader hierarchy ---
List<String> classpathEntries = new ArrayList<>();
ClassLoader cl = Thread.currentThread().getContextClassLoader();
if (cl == null) {
cl = AsyncScannerTest.class.getClassLoader();
}
while (cl != null) {
if (cl instanceof URLClassLoader) {
for (URL url : ((URLClassLoader) cl).getURLs()) {
classpathEntries.add(url.getPath());
}
}
cl = cl.getParent();
}

// Fallback: if no URLClassLoader found (Java 9+ AppClassLoader), use java.class.path
if (classpathEntries.isEmpty()) {
String cp = System.getProperty("java.class.path");
if (cp != null) {
for (String entry : cp.split(java.io.File.pathSeparator)) {
classpathEntries.add(entry);
}
}
}

// Find the test-classes directory
String testClassesDir = null;
for (String entry : classpathEntries) {
if (entry.contains("test-classes")) {
testClassesDir = entry;
break;
}
}
assertNotNull(testClassesDir, "Could not find test-classes directory in classpath");

// Full classpath for the isolated URLClassLoader inside the forked JVM
String fullClasspath = String.join(java.io.File.pathSeparator, classpathEntries);

// --- Build the forked JVM command ---
String javaHome = System.getProperty("java.home");
String javaBin = javaHome + java.io.File.separator + "bin" + java.io.File.separator + "java";

List<String> command = new ArrayList<>();
command.add(javaBin);

// Add --add-opens flags matching surefire config (required by Arrow/Netty)
command.add("--add-opens=java.base/java.lang=ALL-UNNAMED");
command.add("--add-opens=java.base/java.lang.invoke=ALL-UNNAMED");
command.add("--add-opens=java.base/java.lang.reflect=ALL-UNNAMED");
command.add("--add-opens=java.base/java.io=ALL-UNNAMED");
command.add("--add-opens=java.base/java.net=ALL-UNNAMED");
command.add("--add-opens=java.base/java.nio=ALL-UNNAMED");
command.add("--add-opens=java.base/java.util=ALL-UNNAMED");
command.add("--add-opens=java.base/java.util.concurrent=ALL-UNNAMED");
command.add("--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED");
command.add("--add-opens=java.base/jdk.internal.ref=ALL-UNNAMED");
command.add("--add-opens=java.base/sun.nio.ch=ALL-UNNAMED");
command.add("--add-opens=java.base/sun.nio.cs=ALL-UNNAMED");
command.add("--add-opens=java.base/sun.security.action=ALL-UNNAMED");
command.add("--add-opens=java.base/sun.util.calendar=ALL-UNNAMED");
command.add("--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED");
command.add("-XX:+IgnoreUnrecognizedVMOptions");
command.add("-Dio.netty.tryReflectionSetAccessible=true");

// System classpath: ONLY test-classes (no lance main classes)
command.add("-cp");
command.add(testClassesDir);

command.add("org.lance.ClassloaderBugBootstrap");
command.add(fullClasspath);
command.add(tempDir.toString());

ProcessBuilder pb = new ProcessBuilder(command);
pb.redirectErrorStream(false);

Process process = pb.start();

// Read stdout and stderr concurrently to avoid deadlock when buffers fill
CompletableFuture<String> stderrFuture =
CompletableFuture.supplyAsync(
() -> {
try (BufferedReader errReader =
new BufferedReader(new InputStreamReader(process.getErrorStream()))) {
return errReader.lines().collect(Collectors.joining("\n"));
} catch (Exception e) {
return "Failed to read stderr: " + e.getMessage();
}
});

String stdout;
try (BufferedReader outReader =
new BufferedReader(new InputStreamReader(process.getInputStream()))) {
stdout = outReader.lines().collect(Collectors.joining("\n"));
}

String stderr = stderrFuture.get(60, TimeUnit.SECONDS);

boolean exited = process.waitFor(60, TimeUnit.SECONDS);
assertTrue(exited, "Forked JVM did not exit within 60 seconds");

int exitCode = process.exitValue();
assertEquals(
0,
exitCode,
"Forked JVM exited with code "
+ exitCode
+ "\n--- stdout ---\n"
+ stdout
+ "\n--- stderr ---\n"
+ stderr);
assertTrue(
stdout.contains("SUCCESS"),
"Expected SUCCESS in stdout but got:\n--- stdout ---\n"
+ stdout
+ "\n--- stderr ---\n"
+ stderr);
}
}
73 changes: 73 additions & 0 deletions java/src/test/java/org/lance/ClassloaderBugBootstrap.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.lance;

import java.io.File;
import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLClassLoader;

/**
* Standalone bootstrap for the forked-JVM classloader bug test.
*
* <p>This class has <b>zero</b> lance imports. It lives on the forked JVM's system classpath
* ({@code target/test-classes/} only). All lance classes are loaded through an isolated {@link
* URLClassLoader} whose parent is {@code null} (bootstrap classloader), so the JVM's system
* classloader cannot see them.
*
* <p>Usage: {@code java -cp target/test-classes org.lance.ClassloaderBugBootstrap <full-classpath>
* <temp-dir>}
*
* <p>Exit code 0 + "SUCCESS" on stdout = pass; non-zero = failure.
*/
public class ClassloaderBugBootstrap {

public static void main(String[] args) {
if (args.length < 2) {
System.err.println("Usage: ClassloaderBugBootstrap <classpath> <tempDir>");
System.exit(2);
}

String classpath = args[0];
String tempDir = args[1];

try {
// Build URL array from the full classpath
String[] entries = classpath.split(File.pathSeparator);
URL[] urls = new URL[entries.length];
for (int i = 0; i < entries.length; i++) {
urls[i] = new File(entries[i]).toURI().toURL();
}

// Parent is null => only bootstrap classloader is the parent.
// The system classloader (which only has target/test-classes/) is bypassed.
URLClassLoader isolatedCl = new URLClassLoader(urls, null);

// Set as context classloader for consistency
Thread.currentThread().setContextClassLoader(isolatedCl);

// Load and invoke the helper class through the isolated classloader
Class<?> helperClass = Class.forName("org.lance.ClassloaderBugHelper", true, isolatedCl);
Method runMethod = helperClass.getMethod("run", String.class);
runMethod.invoke(null, tempDir);

System.exit(0);

} catch (Throwable t) {
System.err.println("ClassloaderBugBootstrap FAILED:");
t.printStackTrace(System.err);
System.exit(1);
}
}
}
Loading
Loading