Skip to content
Draft
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions common/src/main/java/org/apache/comet/parquet/Native.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,16 @@

package org.apache.comet.parquet;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import org.apache.comet.NativeBase;

public final class Native extends NativeBase {
Expand Down Expand Up @@ -292,4 +299,104 @@ public static native void currentColumnBatch(
* @param handle
*/
public static native void closeRecordBatchReader(long handle);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move these to a new file, say JniHDFSBridge ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, fixed.

/**
* Reads a byte range from a file using Hadoop FileSystem API.
*
* @param path The file path to read from
* @param configs Configuration properties for the filesystem
* @param offset Starting byte position (0-based)
* @param len Number of bytes to read
* @return Byte array containing the read data, or null if error occurs
* @throws IllegalArgumentException If parameters are invalid
*/
public static byte[] read(String path, Map<String, String> configs, long offset, int len) {
if (path == null || path.isEmpty()) {
throw new IllegalArgumentException("Path cannot be null or empty");
}
if (offset < 0) {
throw new IllegalArgumentException("Offset cannot be negative");
}
if (len < 0) {
throw new IllegalArgumentException("Length cannot be negative");
}

try {
Path p = new Path(path);
Configuration conf = new Configuration();

// Set configurations if provided
if (configs != null) {
for (Map.Entry<String, String> entry : configs.entrySet()) {
conf.set(entry.getKey(), entry.getValue());
}
}
org.apache.hadoop.fs.FileSystem fs = p.getFileSystem(conf);

long fileLen = fs.getFileStatus(p).getLen();

if (offset > fileLen) {
throw new IOException(
"Offset beyond file length: offset=" + offset + ", fileLen=" + fileLen);
}

if (len == 0) {
return new byte[0];
}

// Adjust length if it exceeds remaining bytes
if (offset + len > fileLen) {
len = (int) (fileLen - offset);
if (len <= 0) {
return new byte[0];
}
}

FSDataInputStream inputStream = fs.open(p);
inputStream.seek(offset);
byte[] buffer = new byte[len];
int totalBytesRead = 0;
while (totalBytesRead < len) {
int read = inputStream.read(buffer, totalBytesRead, len - totalBytesRead);
if (read == -1) break;
totalBytesRead += read;
}
inputStream.close();

return totalBytesRead < len ? Arrays.copyOf(buffer, totalBytesRead) : buffer;
} catch (Exception e) {
System.err.println("Native.read failed: " + e);
return null;
}
}

/**
* Gets the length of a file using Hadoop FileSystem API.
*
* @param path The file path to check
* @param configs Configuration properties for the filesystem
* @return File length in bytes, or -1 if the file doesn't exist
* @throws IllegalArgumentException If path is invalid or configs contain invalid values
*/
public static long getLength(String path, Map<String, String> configs) {
if (path == null || path.isEmpty()) {
throw new IllegalArgumentException("Path cannot be null or empty");
}

try {
Path p = new Path(path);
Configuration conf = new Configuration();
if (configs != null) {
for (Map.Entry<String, String> entry : configs.entrySet()) {
conf.set(entry.getKey(), entry.getValue());
}
}

FileSystem fs = p.getFileSystem(conf);
return fs.getFileStatus(p).getLen();
} catch (Exception e) {
System.err.println("Native.getLength failed: " + e);
return -1;
}
}
}
9 changes: 9 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,15 @@ object CometConf extends ShimCometConf {

val COMET_EXEC_CONFIG_PREFIX = "spark.comet.exec";

val COMET_USE_JNI_OBJECT_STORE: ConfigEntry[Boolean] =
conf("spark.comet.use_jni_object_store")
.doc(
"If enabled, Comet will access Hadoop-compatible file systems using the Hadoop FileSystem" +
" API via JNI, bypassing the native Rust object store implementations.")
.internal()
.booleanConf
.createWithDefault(false)

val COMET_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.enabled")
.doc(
"Whether to enable Comet extension for Spark. When this is turned on, Spark will use " +
Expand Down
1 change: 1 addition & 0 deletions native/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions native/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ lazy_static = "1.4.0"
prost = "0.13.5"
jni = "0.21"
snap = "1.1"
chrono = { version = "0.4", default-features = false, features = ["clock"] }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe use the dependency define in the main Cargo.toml (i.e. make this a workspace dependency)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

# we disable default features in lz4_flex to force the use of the faster unsafe encoding and decoding implementation
lz4_flex = { version = "0.11.3", default-features = false }
zstd = "0.13.3"
Expand Down
3 changes: 3 additions & 0 deletions native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ use crate::execution::spark_plan::SparkPlan;

use crate::execution::tracing::{log_memory_usage, trace_begin, trace_end, with_trace};

use crate::parquet::objectstore::jni::init_jvm;
use datafusion_comet_proto::spark_operator::operator::OpStruct;
use log::info;
use once_cell::sync::Lazy;
Expand Down Expand Up @@ -166,6 +167,8 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
) -> jlong {
try_unwrap_or_throw(&e, |mut env| {
with_trace("createPlan", tracing_enabled != JNI_FALSE, || {
init_jvm(&env);

// Init JVM classes
JVMClasses::init(&mut env);

Expand Down
2 changes: 1 addition & 1 deletion native/core/src/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub mod parquet_support;
pub mod read;
pub mod schema_adapter;

mod objectstore;
pub mod objectstore;

use std::collections::HashMap;
use std::task::Poll;
Expand Down
Loading
Loading