Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 5 additions & 38 deletions bindings/java/src/async_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,17 @@ use opendal::Operator;
use opendal::Scheme;

use crate::convert::{
bytes_to_jbytearray, jmap_to_hashmap, offset_length_to_range, read_int64_field, read_map_field,
read_string_field,
bytes_to_jbytearray, jmap_to_hashmap, offset_length_to_range, read_int64_field,
};
use crate::convert::{jstring_to_string, read_bool_field};
use crate::executor::executor_or_default;
use crate::executor::get_current_env;
use crate::executor::Executor;
use crate::make_entry;
use crate::make_metadata;
use crate::make_operator_info;
use crate::make_presigned_request;
use crate::Result;
use crate::{make_entry, make_write_options};

#[no_mangle]
pub extern "system" fn Java_org_apache_opendal_AsyncOperator_constructor(
Expand Down Expand Up @@ -122,45 +121,13 @@ fn intern_write(
let op = unsafe { &mut *op };
let id = request_id(env)?;

let write_opts = make_write_options(env, &options)?;
let path = jstring_to_string(env, &path)?;
let content = env.convert_byte_array(content)?;
let content_type = read_string_field(env, &options, "contentType")?;
let content_disposition = read_string_field(env, &options, "contentDisposition")?;
let content_encoding = read_string_field(env, &options, "contentEncoding")?;
let cache_control = read_string_field(env, &options, "cacheControl")?;
let if_match = read_string_field(env, &options, "ifMatch")?;
let if_none_match = read_string_field(env, &options, "ifNoneMatch")?;
let append = read_bool_field(env, &options, "append")?;
let if_not_exists = read_bool_field(env, &options, "ifNotExists")?;
let user_metadata = read_map_field(env, &options, "userMetadata")?;

let mut write_op = op.write_with(&path, content);
if let Some(content_type) = content_type {
write_op = write_op.content_type(&content_type);
}
if let Some(content_disposition) = content_disposition {
write_op = write_op.content_disposition(&content_disposition);
}
if let Some(content_encoding) = content_encoding {
write_op = write_op.content_encoding(&content_encoding);
}
if let Some(cache_control) = cache_control {
write_op = write_op.cache_control(&cache_control);
}
if let Some(if_match) = if_match {
write_op = write_op.if_match(&if_match);
}
if let Some(if_none_match) = if_none_match {
write_op = write_op.if_none_match(&if_none_match);
}
if let Some(user_metadata) = user_metadata {
write_op = write_op.user_metadata(user_metadata);
}
write_op = write_op.if_not_exists(if_not_exists);
write_op = write_op.append(append);

executor_or_default(env, executor)?.spawn(async move {
let result = write_op
let result = op
.write_options(&path, content, write_opts)
.await
.map(|_| JValueOwned::Void)
.map_err(Into::into);
Expand Down
4 changes: 4 additions & 0 deletions bindings/java/src/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ pub(crate) fn read_int64_field(env: &mut JNIEnv<'_>, obj: &JObject, field: &str)
Ok(env.get_field(obj, field, "J")?.j()?)
}

pub(crate) fn read_int_field(env: &mut JNIEnv<'_>, obj: &JObject, field: &str) -> Result<i32> {
Ok(env.get_field(obj, field, "I")?.i()?)
}

pub(crate) fn read_string_field(
env: &mut JNIEnv<'_>,
obj: &JObject,
Expand Down
43 changes: 42 additions & 1 deletion bindings/java/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ use jni::sys::jint;
use jni::sys::jlong;
use jni::JNIEnv;
use opendal::raw::PresignedRequest;
use opendal::Capability;
use opendal::Entry;
use opendal::EntryMode;
use opendal::Metadata;
use opendal::OperatorInfo;
use opendal::{Capability, Error, ErrorKind};

mod async_operator;
mod convert;
Expand Down Expand Up @@ -202,3 +202,44 @@ fn make_entry<'a>(env: &mut JNIEnv<'a>, entry: Entry) -> Result<JObject<'a>> {
&[JValue::Object(&path), JValue::Object(&metadata)],
)?)
}

fn make_write_options<'a>(
env: &mut JNIEnv<'a>,
options: &JObject,
) -> Result<opendal::options::WriteOptions> {
let concurrent = match convert::read_int_field(env, options, "concurrent")? {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added input validation to prevent negative values that would cause capacity overflow when casting to usize. negative i32/i64 values wrap to huge positive usize values

v if v > 0 => v as usize,
v => {
return Err(Error::new(
ErrorKind::Unexpected,
format!("Concurrent must be positive, instead got: {}", v),
)
.into())
}
};
let chunk = match convert::read_int64_field(env, options, "chunk")? {
-1 => None,
v if v >= 0 => Some(v as usize),
v => {
return Err(Error::new(
ErrorKind::Unexpected,
format!("Chunk must be positive, instead got: {}", v),
)
.into())
}
};

Ok(opendal::options::WriteOptions {
append: convert::read_bool_field(env, options, "append").unwrap_or_default(),
content_type: convert::read_string_field(env, options, "contentType")?,
content_disposition: convert::read_string_field(env, options, "contentDisposition")?,
content_encoding: convert::read_string_field(env, options, "contentEncoding")?,
cache_control: convert::read_string_field(env, options, "cacheControl")?,
if_match: convert::read_string_field(env, options, "ifMatch")?,
if_none_match: convert::read_string_field(env, options, "ifNoneMatch")?,
if_not_exists: convert::read_bool_field(env, options, "ifNotExists").unwrap_or_default(),
user_metadata: convert::read_map_field(env, options, "userMetadata")?,
concurrent,
chunk,
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,16 @@ public class WriteOptions {
* Requires capability: writeWithIfNotExists
*/
public final boolean ifNotExists;

/**
* Sets concurrent write operations for this writer.
*/
@Builder.Default
public final int concurrent = 1;

/**
* Sets chunk size for buffered writes.
*/
@Builder.Default
public final long chunk = -1L;
}
24 changes: 4 additions & 20 deletions bindings/java/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ use opendal::options;

use crate::convert::{
bytes_to_jbytearray, jstring_to_string, offset_length_to_range, read_bool_field,
read_int64_field, read_map_field, read_string_field,
read_int64_field,
};
use crate::make_entry;
use crate::make_metadata;
use crate::Result;
use crate::{make_entry, make_write_options};

/// # Safety
///
Expand Down Expand Up @@ -127,25 +127,9 @@ fn intern_write(
) -> Result<()> {
let path = jstring_to_string(env, &path)?;
let content = env.convert_byte_array(content)?;
let write_opts = make_write_options(env, &options)?;

let append = read_bool_field(env, &options, "append")?;
let content_type = read_string_field(env, &options, "contentType")?;
let content_disposition = read_string_field(env, &options, "contentDisposition")?;
let cache_control = read_string_field(env, &options, "cacheControl")?;
let user_metadata = read_map_field(env, &options, "userMetadata")?;

let _ = op.write_options(
&path,
content,
options::WriteOptions {
append,
content_type,
content_disposition,
cache_control,
user_metadata,
..Default::default()
},
)?;
let _ = op.write_options(&path, content, write_opts)?;
Ok(())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,37 @@ void testIfNotExists() {
.is(OpenDALExceptionCondition.ofAsync(Code.ConditionNotMatch));
}

@Test
void testWriteWithChunk() {
assumeTrue(asyncOp().info.fullCapability.writeCanMulti);

final String path = UUID.randomUUID().toString();
final byte[] content = generateBytes(5 * 1024 * 1024);

WriteOptions options = WriteOptions.builder().chunk(1024 * 1024L).build();

asyncOp().write(path, content, options).join();

byte[] result = asyncOp().read(path).join();
assertThat(result).isEqualTo(content);
}

@Test
void testWriteWithConcurrentChunk() {
assumeTrue(asyncOp().info.fullCapability.writeCanMulti);

final String path = UUID.randomUUID().toString();
final byte[] content = generateBytes(5 * 1024 * 1024);

WriteOptions options =
WriteOptions.builder().chunk(1024 * 1024L).concurrent(4).build();

asyncOp().write(path, content, options).join();

byte[] result = asyncOp().read(path).join();
assertThat(result).isEqualTo(content);
}

@Test
void testWriteWithCacheControl() {
assumeTrue(asyncOp().info.fullCapability.writeWithCacheControl);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,35 @@ void testWriteWithAppend() {
assertThat(result.length).isEqualTo(contentOne.length + contentTwo.length);
assertThat(result).isEqualTo("Test Data".getBytes());
}

@Test
void testWriteWithChunk() {
assumeTrue(op().info.fullCapability.writeCanMulti);

final String path = UUID.randomUUID().toString();
final byte[] content = generateBytes(5 * 1024 * 1024);

WriteOptions options = WriteOptions.builder().chunk(1024 * 1024L).build();

op().write(path, content, options);

byte[] result = op().read(path);
assertThat(result).isEqualTo(content);
}

@Test
void testWriteWithConcurrentChunk() {
assumeTrue(op().info.fullCapability.writeCanMulti);

final String path = UUID.randomUUID().toString();
final byte[] content = generateBytes(5 * 1024 * 1024);

WriteOptions options =
WriteOptions.builder().chunk(1024 * 1024L).concurrent(4).build();

op().write(path, content, options);

byte[] result = op().read(path);
assertThat(result).isEqualTo(content);
}
}
Loading