diff --git a/bindings/java/src/async_operator.rs b/bindings/java/src/async_operator.rs index 9c1e943e01f5..06231d7906e0 100644 --- a/bindings/java/src/async_operator.rs +++ b/bindings/java/src/async_operator.rs @@ -34,18 +34,17 @@ use opendal::Operator; use opendal::Scheme; use crate::convert::{ - bytes_to_jbytearray, jmap_to_hashmap, offset_length_to_range, read_int64_field, read_map_field, - read_string_field, + bytes_to_jbytearray, jmap_to_hashmap, offset_length_to_range, read_int64_field, }; use crate::convert::{jstring_to_string, read_bool_field}; use crate::executor::executor_or_default; use crate::executor::get_current_env; use crate::executor::Executor; -use crate::make_entry; use crate::make_metadata; use crate::make_operator_info; use crate::make_presigned_request; use crate::Result; +use crate::{make_entry, make_write_options}; #[no_mangle] pub extern "system" fn Java_org_apache_opendal_AsyncOperator_constructor( @@ -122,45 +121,13 @@ fn intern_write( let op = unsafe { &mut *op }; let id = request_id(env)?; + let write_opts = make_write_options(env, &options)?; let path = jstring_to_string(env, &path)?; let content = env.convert_byte_array(content)?; - let content_type = read_string_field(env, &options, "contentType")?; - let content_disposition = read_string_field(env, &options, "contentDisposition")?; - let content_encoding = read_string_field(env, &options, "contentEncoding")?; - let cache_control = read_string_field(env, &options, "cacheControl")?; - let if_match = read_string_field(env, &options, "ifMatch")?; - let if_none_match = read_string_field(env, &options, "ifNoneMatch")?; - let append = read_bool_field(env, &options, "append")?; - let if_not_exists = read_bool_field(env, &options, "ifNotExists")?; - let user_metadata = read_map_field(env, &options, "userMetadata")?; - - let mut write_op = op.write_with(&path, content); - if let Some(content_type) = content_type { - write_op = write_op.content_type(&content_type); - } - if let Some(content_disposition) = content_disposition { - write_op = write_op.content_disposition(&content_disposition); - } - if let Some(content_encoding) = content_encoding { - write_op = write_op.content_encoding(&content_encoding); - } - if let Some(cache_control) = cache_control { - write_op = write_op.cache_control(&cache_control); - } - if let Some(if_match) = if_match { - write_op = write_op.if_match(&if_match); - } - if let Some(if_none_match) = if_none_match { - write_op = write_op.if_none_match(&if_none_match); - } - if let Some(user_metadata) = user_metadata { - write_op = write_op.user_metadata(user_metadata); - } - write_op = write_op.if_not_exists(if_not_exists); - write_op = write_op.append(append); executor_or_default(env, executor)?.spawn(async move { - let result = write_op + let result = op + .write_options(&path, content, write_opts) .await .map(|_| JValueOwned::Void) .map_err(Into::into); diff --git a/bindings/java/src/convert.rs b/bindings/java/src/convert.rs index c95c84c63633..f163722cc186 100644 --- a/bindings/java/src/convert.rs +++ b/bindings/java/src/convert.rs @@ -76,6 +76,10 @@ pub(crate) fn read_int64_field(env: &mut JNIEnv<'_>, obj: &JObject, field: &str) Ok(env.get_field(obj, field, "J")?.j()?) } +pub(crate) fn read_int_field(env: &mut JNIEnv<'_>, obj: &JObject, field: &str) -> Result { + Ok(env.get_field(obj, field, "I")?.i()?) +} + pub(crate) fn read_string_field( env: &mut JNIEnv<'_>, obj: &JObject, diff --git a/bindings/java/src/lib.rs b/bindings/java/src/lib.rs index 683baefb86ea..a2ab50b633ec 100644 --- a/bindings/java/src/lib.rs +++ b/bindings/java/src/lib.rs @@ -24,11 +24,11 @@ use jni::sys::jint; use jni::sys::jlong; use jni::JNIEnv; use opendal::raw::PresignedRequest; -use opendal::Capability; use opendal::Entry; use opendal::EntryMode; use opendal::Metadata; use opendal::OperatorInfo; +use opendal::{Capability, Error, ErrorKind}; mod async_operator; mod convert; @@ -202,3 +202,44 @@ fn make_entry<'a>(env: &mut JNIEnv<'a>, entry: Entry) -> Result> { &[JValue::Object(&path), JValue::Object(&metadata)], )?) } + +fn make_write_options<'a>( + env: &mut JNIEnv<'a>, + options: &JObject, +) -> Result { + let concurrent = match convert::read_int_field(env, options, "concurrent")? { + v if v > 0 => v as usize, + v => { + return Err(Error::new( + ErrorKind::Unexpected, + format!("Concurrent must be positive, instead got: {}", v), + ) + .into()) + } + }; + let chunk = match convert::read_int64_field(env, options, "chunk")? { + -1 => None, + v if v >= 0 => Some(v as usize), + v => { + return Err(Error::new( + ErrorKind::Unexpected, + format!("Chunk must be positive, instead got: {}", v), + ) + .into()) + } + }; + + Ok(opendal::options::WriteOptions { + append: convert::read_bool_field(env, options, "append").unwrap_or_default(), + content_type: convert::read_string_field(env, options, "contentType")?, + content_disposition: convert::read_string_field(env, options, "contentDisposition")?, + content_encoding: convert::read_string_field(env, options, "contentEncoding")?, + cache_control: convert::read_string_field(env, options, "cacheControl")?, + if_match: convert::read_string_field(env, options, "ifMatch")?, + if_none_match: convert::read_string_field(env, options, "ifNoneMatch")?, + if_not_exists: convert::read_bool_field(env, options, "ifNotExists").unwrap_or_default(), + user_metadata: convert::read_map_field(env, options, "userMetadata")?, + concurrent, + chunk, + }) +} diff --git a/bindings/java/src/main/java/org/apache/opendal/WriteOptions.java b/bindings/java/src/main/java/org/apache/opendal/WriteOptions.java index 9b249ddd8249..f870ad59dbb6 100644 --- a/bindings/java/src/main/java/org/apache/opendal/WriteOptions.java +++ b/bindings/java/src/main/java/org/apache/opendal/WriteOptions.java @@ -79,4 +79,16 @@ public class WriteOptions { * Requires capability: writeWithIfNotExists */ public final boolean ifNotExists; + + /** + * Sets concurrent write operations for this writer. + */ + @Builder.Default + public final int concurrent = 1; + + /** + * Sets chunk size for buffered writes. + */ + @Builder.Default + public final long chunk = -1L; } diff --git a/bindings/java/src/operator.rs b/bindings/java/src/operator.rs index 4e7689a1a031..85bdfef097c8 100644 --- a/bindings/java/src/operator.rs +++ b/bindings/java/src/operator.rs @@ -30,11 +30,11 @@ use opendal::options; use crate::convert::{ bytes_to_jbytearray, jstring_to_string, offset_length_to_range, read_bool_field, - read_int64_field, read_map_field, read_string_field, + read_int64_field, }; -use crate::make_entry; use crate::make_metadata; use crate::Result; +use crate::{make_entry, make_write_options}; /// # Safety /// @@ -127,25 +127,9 @@ fn intern_write( ) -> Result<()> { let path = jstring_to_string(env, &path)?; let content = env.convert_byte_array(content)?; + let write_opts = make_write_options(env, &options)?; - let append = read_bool_field(env, &options, "append")?; - let content_type = read_string_field(env, &options, "contentType")?; - let content_disposition = read_string_field(env, &options, "contentDisposition")?; - let cache_control = read_string_field(env, &options, "cacheControl")?; - let user_metadata = read_map_field(env, &options, "userMetadata")?; - - let _ = op.write_options( - &path, - content, - options::WriteOptions { - append, - content_type, - content_disposition, - cache_control, - user_metadata, - ..Default::default() - }, - )?; + let _ = op.write_options(&path, content, write_opts)?; Ok(()) } diff --git a/bindings/java/src/test/java/org/apache/opendal/test/behavior/AsyncWriteOptionsTest.java b/bindings/java/src/test/java/org/apache/opendal/test/behavior/AsyncWriteOptionsTest.java index 537b25d154f6..16f18a6ac1fd 100644 --- a/bindings/java/src/test/java/org/apache/opendal/test/behavior/AsyncWriteOptionsTest.java +++ b/bindings/java/src/test/java/org/apache/opendal/test/behavior/AsyncWriteOptionsTest.java @@ -42,6 +42,37 @@ void testIfNotExists() { .is(OpenDALExceptionCondition.ofAsync(Code.ConditionNotMatch)); } + @Test + void testWriteWithChunk() { + assumeTrue(asyncOp().info.fullCapability.writeCanMulti); + + final String path = UUID.randomUUID().toString(); + final byte[] content = generateBytes(5 * 1024 * 1024); + + WriteOptions options = WriteOptions.builder().chunk(1024 * 1024L).build(); + + asyncOp().write(path, content, options).join(); + + byte[] result = asyncOp().read(path).join(); + assertThat(result).isEqualTo(content); + } + + @Test + void testWriteWithConcurrentChunk() { + assumeTrue(asyncOp().info.fullCapability.writeCanMulti); + + final String path = UUID.randomUUID().toString(); + final byte[] content = generateBytes(5 * 1024 * 1024); + + WriteOptions options = + WriteOptions.builder().chunk(1024 * 1024L).concurrent(4).build(); + + asyncOp().write(path, content, options).join(); + + byte[] result = asyncOp().read(path).join(); + assertThat(result).isEqualTo(content); + } + @Test void testWriteWithCacheControl() { assumeTrue(asyncOp().info.fullCapability.writeWithCacheControl); diff --git a/bindings/java/src/test/java/org/apache/opendal/test/behavior/BlockingWriteOptionTest.java b/bindings/java/src/test/java/org/apache/opendal/test/behavior/BlockingWriteOptionTest.java index 973f63a0e361..b22521d8c930 100644 --- a/bindings/java/src/test/java/org/apache/opendal/test/behavior/BlockingWriteOptionTest.java +++ b/bindings/java/src/test/java/org/apache/opendal/test/behavior/BlockingWriteOptionTest.java @@ -89,4 +89,35 @@ void testWriteWithAppend() { assertThat(result.length).isEqualTo(contentOne.length + contentTwo.length); assertThat(result).isEqualTo("Test Data".getBytes()); } + + @Test + void testWriteWithChunk() { + assumeTrue(op().info.fullCapability.writeCanMulti); + + final String path = UUID.randomUUID().toString(); + final byte[] content = generateBytes(5 * 1024 * 1024); + + WriteOptions options = WriteOptions.builder().chunk(1024 * 1024L).build(); + + op().write(path, content, options); + + byte[] result = op().read(path); + assertThat(result).isEqualTo(content); + } + + @Test + void testWriteWithConcurrentChunk() { + assumeTrue(op().info.fullCapability.writeCanMulti); + + final String path = UUID.randomUUID().toString(); + final byte[] content = generateBytes(5 * 1024 * 1024); + + WriteOptions options = + WriteOptions.builder().chunk(1024 * 1024L).concurrent(4).build(); + + op().write(path, content, options); + + byte[] result = op().read(path); + assertThat(result).isEqualTo(content); + } }