diff --git a/bindings/java/src/async_operator.rs b/bindings/java/src/async_operator.rs index 239b97b89e51..5f4721742e5f 100644 --- a/bindings/java/src/async_operator.rs +++ b/bindings/java/src/async_operator.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::future::Future; use std::str::FromStr; use std::time::Duration; @@ -29,12 +30,15 @@ use jni::sys::jobject; use jni::sys::jsize; use jni::JNIEnv; use opendal::layers::BlockingLayer; +use opendal::operator_futures::FutureWrite; use opendal::raw::PresignedRequest; -use opendal::Operator; use opendal::Scheme; +use opendal::{Metadata, Operator}; -use crate::convert::jmap_to_hashmap; use crate::convert::jstring_to_string; +use crate::convert::{ + get_optional_map_from_object, get_optional_string_from_object, jmap_to_hashmap, +}; use crate::executor::executor_or_default; use crate::executor::get_current_env; use crate::executor::Executor; @@ -110,8 +114,9 @@ pub unsafe extern "system" fn Java_org_apache_opendal_AsyncOperator_write( executor: *const Executor, path: JString, content: JByteArray, + write_options: JObject, ) -> jlong { - intern_write(&mut env, op, executor, path, content).unwrap_or_else(|e| { + intern_write(&mut env, op, executor, path, content, write_options).unwrap_or_else(|e| { e.throw(&mut env); 0 }) @@ -123,25 +128,62 @@ fn intern_write( executor: *const Executor, path: JString, content: JByteArray, + options: JObject, ) -> Result { let op = unsafe { &mut *op }; let id = request_id(env)?; let path = jstring_to_string(env, &path)?; let content = env.convert_byte_array(content)?; + let content_type = get_optional_string_from_object(env, &options, "getContentType")?; + let content_disposition = + get_optional_string_from_object(env, &options, "getContentDisposition")?; + let content_encoding = get_optional_string_from_object(env, &options, "getContentEncoding")?; + let cache_control = get_optional_string_from_object(env, &options, "getCacheControl")?; + let if_match = get_optional_string_from_object(env, &options, "getIfMatch")?; + let if_none_match = get_optional_string_from_object(env, &options, "getIfNoneMatch")?; + let append = env.call_method(&options, "isAppend", "()Z", &[])?.z()?; + let if_not_exists = env + .call_method(&options, "isIfNotExists", "()Z", &[])? + .z()?; + let user_metadata = get_optional_map_from_object(env, &options, "getUserMetadata"); + + let mut write_op = op.write_with(&path, content); + if let Some(ct) = content_type { + write_op = write_op.content_type(&ct); + } + if let Some(cd) = content_disposition { + write_op = write_op.content_disposition(&cd); + } + if let Some(ce) = content_encoding { + write_op = write_op.content_encoding(&ce); + } + if let Some(cc) = cache_control { + write_op = write_op.cache_control(&cc); + } + if let Some(im) = if_match { + write_op = write_op.if_match(&im); + } + if let Some(inm) = if_none_match { + write_op = write_op.if_none_match(&inm); + } + if let Ok(Some(um)) = user_metadata { + write_op = write_op.user_metadata(um); + } + write_op = write_op.if_not_exists(if_not_exists); + write_op = write_op.append(append); executor_or_default(env, executor)?.spawn(async move { - let result = do_write(op, path, content).await; - complete_future(id, result.map(|_| JValueOwned::Void)) + let result = write_op + .await + .map(|_| JValueOwned::Void) + .map_err(|e| e.into()); + complete_future(id, result) }); Ok(id) } -async fn do_write(op: &mut Operator, path: String, content: Vec) -> Result<()> { - Ok(op.write(&path, content).await.map(|_| ())?) -} - /// # Safety /// /// This function should not be called before the Operator is ready. diff --git a/bindings/java/src/convert.rs b/bindings/java/src/convert.rs index e347fffc8262..c0af00334a7d 100644 --- a/bindings/java/src/convert.rs +++ b/bindings/java/src/convert.rs @@ -69,6 +69,36 @@ pub(crate) fn string_to_jstring<'a>( ) } +pub(crate) fn get_optional_string_from_object<'a>( + env: &mut JNIEnv<'a>, + obj: &JObject, + method: &str, +) -> crate::Result> { + let result = env + .call_method(obj, method, "()Ljava/lang/String;", &[])? + .l()?; + if result.is_null() { + Ok(None) + } else { + Ok(Some(jstring_to_string(env, &JString::from(result))?)) + } +} + +pub(crate) fn get_optional_map_from_object<'a>( + env: &mut JNIEnv<'a>, + obj: &JObject, + method: &str, +) -> crate::Result>> { + let result = env + .call_method(obj, method, "()Ljava/util/Map;", &[])? + .l()?; + if result.is_null() { + Ok(None) + } else { + Ok(Some(jmap_to_hashmap(env, &result)?)) + } +} + /// # Safety /// /// The caller must guarantee that the Object passed in is an instance diff --git a/bindings/java/src/lib.rs b/bindings/java/src/lib.rs index 45a0554edb08..e67d7f479955 100644 --- a/bindings/java/src/lib.rs +++ b/bindings/java/src/lib.rs @@ -94,7 +94,7 @@ fn make_operator_info<'a>(env: &mut JNIEnv<'a>, info: OperatorInfo) -> Result(env: &mut JNIEnv<'a>, cap: Capability) -> Result> { let capability = env.new_object( "org/apache/opendal/Capability", - "(ZZZZZZZZZZZZZZZJJZZZZZZZZZZZZZZ)V", + "(ZZZZZZZZZZZZZZZZZZZJJZZZZZZZZZZZZZZ)V", &[ JValue::Bool(cap.stat as jboolean), JValue::Bool(cap.stat_with_if_match as jboolean), @@ -111,6 +111,10 @@ fn make_capability<'a>(env: &mut JNIEnv<'a>, cap: Capability) -> Result write(String path, String content) { - return write(path, content.getBytes(StandardCharsets.UTF_8)); + return write( + path, + content.getBytes(StandardCharsets.UTF_8), + WriteOptions.builder().build()); } public CompletableFuture write(String path, byte[] content) { - final long requestId = write(nativeHandle, executorHandle, path, content); + return write(path, content, WriteOptions.builder().build()); + } + + public CompletableFuture write(String path, String content, WriteOptions options) { + return write(path, content.getBytes(StandardCharsets.UTF_8), options); + } + + public CompletableFuture write(String path, byte[] content, WriteOptions options) { + final long requestId = write(nativeHandle, executorHandle, path, content, options); return AsyncRegistry.take(requestId); } @@ -272,7 +283,8 @@ public CompletableFuture> list(String path) { private static native long read(long nativeHandle, long executorHandle, String path); - private static native long write(long nativeHandle, long executorHandle, String path, byte[] content); + private static native long write( + long nativeHandle, long executorHandle, String path, byte[] content, WriteOptions options); private static native long append(long nativeHandle, long executorHandle, String path, byte[] content); diff --git a/bindings/java/src/main/java/org/apache/opendal/Capability.java b/bindings/java/src/main/java/org/apache/opendal/Capability.java index 159a03d615ed..668156c89008 100644 --- a/bindings/java/src/main/java/org/apache/opendal/Capability.java +++ b/bindings/java/src/main/java/org/apache/opendal/Capability.java @@ -98,6 +98,27 @@ public class Capability { */ public final boolean writeWithCacheControl; + /** + * If operator supports write with if match. + */ + public final boolean writeWithIfMatch; + + /** + * If operator supports write with if none match. + * + */ + public final boolean writeWithIfNoneMatch; + + /** + * If operator supports write with if not exists. + */ + public final boolean writeWithIfNotExists; + + /** + * If operator supports write with user metadata. + */ + public final boolean writeWithUserMetadata; + /** * write_multi_max_size is the max size that services support in write_multi. * For example, AWS S3 supports 5GiB as max in write_multi. @@ -196,6 +217,10 @@ public Capability( boolean writeWithContentType, boolean writeWithContentDisposition, boolean writeWithCacheControl, + boolean writeWithIfMatch, + boolean writeWithIfNoneMatch, + boolean writeWithIfNotExists, + boolean writeWithUserMetadata, long writeMultiMaxSize, long writeMultiMinSize, boolean createDir, @@ -227,6 +252,10 @@ public Capability( this.writeWithContentType = writeWithContentType; this.writeWithContentDisposition = writeWithContentDisposition; this.writeWithCacheControl = writeWithCacheControl; + this.writeWithIfMatch = writeWithIfMatch; + this.writeWithIfNoneMatch = writeWithIfNoneMatch; + this.writeWithIfNotExists = writeWithIfNotExists; + this.writeWithUserMetadata = writeWithUserMetadata; this.writeMultiMaxSize = writeMultiMaxSize; this.writeMultiMinSize = writeMultiMinSize; this.createDir = createDir; diff --git a/bindings/java/src/main/java/org/apache/opendal/Operator.java b/bindings/java/src/main/java/org/apache/opendal/Operator.java index be496d2cb70f..9489509bad3a 100644 --- a/bindings/java/src/main/java/org/apache/opendal/Operator.java +++ b/bindings/java/src/main/java/org/apache/opendal/Operator.java @@ -76,7 +76,15 @@ public void write(String path, String content) { } public void write(String path, byte[] content) { - write(nativeHandle, path, content); + write(nativeHandle, path, content, WriteOptions.builder().build()); + } + + public void write(String path, String content, WriteOptions options) { + write(path, content.getBytes(StandardCharsets.UTF_8), options); + } + + public void write(String path, byte[] content, WriteOptions options) { + write(nativeHandle, path, content, options); } public OperatorOutputStream createOutputStream(String path) { @@ -128,7 +136,7 @@ public List list(String path) { private static native long duplicate(long op); - private static native void write(long op, String path, byte[] content); + private static native void write(long op, String path, byte[] content, WriteOptions options); private static native byte[] read(long op, String path); diff --git a/bindings/java/src/main/java/org/apache/opendal/WriteOptions.java b/bindings/java/src/main/java/org/apache/opendal/WriteOptions.java new file mode 100644 index 000000000000..82a7cea65a9e --- /dev/null +++ b/bindings/java/src/main/java/org/apache/opendal/WriteOptions.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.opendal; + +import java.util.Map; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import lombok.NoArgsConstructor; + +@Getter +@AllArgsConstructor +@NoArgsConstructor +@Builder +public class WriteOptions { + + /** + * Sets the Content-Type header for the object. + * Requires capability: writeWithContentType + */ + private String contentType; + + /** + * Sets the Content-Disposition header for the object + * Requires capability: writeWithContentDisposition + */ + private String contentDisposition; + + /** + * Sets the Cache-Control header for the object + * Requires capability: writeWithCacheControl + */ + private String cacheControl; + + /** + * Sets the Content-Encoding header for the object + */ + private String contentEncoding; + + /** + * Sets the If-Match header for conditional writes + * Requires capability: writeWithIfMatch + */ + private String ifMatch; + + /** + * Sets the If-None-Match header for conditional writes + * Requires capability: writeWithIfNoneMatch + */ + private String ifNoneMatch; + + /** + * Sets custom metadata for the file. + * Requires capability: writeWithUserMetadata + */ + private Map userMetadata; + + /** + * Enables append mode for writing. + * When true, data will be appended to the end of existing file. + * Requires capability: writeCanAppend + */ + private boolean append; + + /** + * Write only if the file does not exist. + * Operation will fail if the file at the designated path already exists. + * Requires capability: writeWithIfNotExists + */ + private boolean ifNotExists; +} diff --git a/bindings/java/src/operator.rs b/bindings/java/src/operator.rs index 9e457feda862..db0280129790 100644 --- a/bindings/java/src/operator.rs +++ b/bindings/java/src/operator.rs @@ -27,7 +27,7 @@ use jni::sys::jsize; use jni::JNIEnv; use opendal::BlockingOperator; -use crate::convert::jstring_to_string; +use crate::convert::{get_optional_string_from_object, jstring_to_string}; use crate::make_entry; use crate::make_metadata; use crate::Result; @@ -90,8 +90,9 @@ pub unsafe extern "system" fn Java_org_apache_opendal_Operator_write( op: *mut BlockingOperator, path: JString, content: JByteArray, + write_options: JObject, ) { - intern_write(&mut env, &mut *op, path, content).unwrap_or_else(|e| { + intern_write(&mut env, &mut *op, path, content, write_options).unwrap_or_else(|e| { e.throw(&mut env); }) } @@ -101,10 +102,31 @@ fn intern_write( op: &mut BlockingOperator, path: JString, content: JByteArray, + write_options: JObject, ) -> Result<()> { let path = jstring_to_string(env, &path)?; let content = env.convert_byte_array(content)?; - Ok(op.write(&path, content).map(|_| ())?) + + let content_type = get_optional_string_from_object(env, &write_options, "getContentType")?; + let content_disposition = + get_optional_string_from_object(env, &write_options, "getContentDisposition")?; + let cache_control = get_optional_string_from_object(env, &write_options, "getCacheControl")?; + let append = env + .call_method(&write_options, "isAppend", "()Z", &[])? + .z()?; + + let mut write_op = op.write_with(&path, content); + if let Some(ct) = content_type { + write_op = write_op.content_type(&ct); + } + if let Some(cd) = content_disposition { + write_op = write_op.content_disposition(&cd); + } + if let Some(cc) = cache_control { + write_op = write_op.cache_control(&cc); + } + write_op = write_op.append(append); + Ok(write_op.call().map(|_| ())?) } /// # Safety diff --git a/bindings/java/src/test/java/org/apache/opendal/test/behavior/AsyncWriteOptionsTest.java b/bindings/java/src/test/java/org/apache/opendal/test/behavior/AsyncWriteOptionsTest.java new file mode 100644 index 000000000000..537b25d154f6 --- /dev/null +++ b/bindings/java/src/test/java/org/apache/opendal/test/behavior/AsyncWriteOptionsTest.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.opendal.test.behavior; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; +import static org.junit.jupiter.api.Assumptions.assumeTrue; +import java.util.UUID; +import org.apache.opendal.OpenDALException.Code; +import org.apache.opendal.WriteOptions; +import org.apache.opendal.test.condition.OpenDALExceptionCondition; +import org.junit.jupiter.api.Test; + +public class AsyncWriteOptionsTest extends BehaviorTestBase { + + @Test + void testIfNotExists() { + assumeTrue(asyncOp().info.fullCapability.writeWithIfNotExists); + final String path = UUID.randomUUID().toString(); + final byte[] content = generateBytes(); + WriteOptions options = WriteOptions.builder().ifNotExists(true).build(); + asyncOp().write(path, content, options).join(); + + assertThatThrownBy(() -> asyncOp().write(path, content, options).join()) + .is(OpenDALExceptionCondition.ofAsync(Code.ConditionNotMatch)); + } + + @Test + void testWriteWithCacheControl() { + assumeTrue(asyncOp().info.fullCapability.writeWithCacheControl); + final String path = UUID.randomUUID().toString(); + final byte[] content = generateBytes(); + final String cacheControl = "max-age=3600"; + + WriteOptions options = WriteOptions.builder().cacheControl(cacheControl).build(); + + asyncOp().write(path, content, options).join(); + + String actualCacheControl = asyncOp().stat(path).join().getCacheControl(); + assertThat(actualCacheControl).isEqualTo(cacheControl); + } + + @Test + void testWriteWithIfNoneMatch() { + assumeTrue(asyncOp().info.fullCapability.writeWithIfNoneMatch); + final String path = UUID.randomUUID().toString(); + final byte[] content = generateBytes(); + + asyncOp().write(path, content).join(); + String etag = asyncOp().stat(path).join().getEtag(); + + WriteOptions options = WriteOptions.builder().ifNoneMatch(etag).build(); + + assertThatThrownBy(() -> asyncOp().write(path, content, options).join()) + .is(OpenDALExceptionCondition.ofAsync(Code.ConditionNotMatch)); + } + + @Test + void testWriteWithIfMatch() { + assumeTrue(asyncOp().info.fullCapability.writeWithIfMatch); + + final String pathA = UUID.randomUUID().toString(); + final String pathB = UUID.randomUUID().toString(); + final byte[] contentA = generateBytes(); + final byte[] contentB = generateBytes(); + + asyncOp().write(pathA, contentA).join(); + asyncOp().write(pathB, contentB).join(); + + String etagA = asyncOp().stat(pathA).join().getEtag(); + String etagB = asyncOp().stat(pathB).join().getEtag(); + + WriteOptions optionsA = WriteOptions.builder().ifMatch(etagA).build(); + + asyncOp().write(pathA, contentA, optionsA).join(); + + WriteOptions optionsB = WriteOptions.builder().ifMatch(etagB).build(); + + assertThatThrownBy(() -> asyncOp().write(pathA, contentA, optionsB).join()) + .is(OpenDALExceptionCondition.ofAsync(Code.ConditionNotMatch)); + } + + @Test + void testWriteWithAppend() { + assumeTrue(asyncOp().info.fullCapability.writeCanAppend); + + final String path = UUID.randomUUID().toString(); + final byte[] contentOne = "Test".getBytes(); + final byte[] contentTwo = " Data".getBytes(); + + WriteOptions appendOptions = WriteOptions.builder().append(true).build(); + asyncOp().write(path, contentOne, appendOptions).join(); + asyncOp().write(path, contentTwo, appendOptions).join(); + + byte[] result = asyncOp().read(path).join(); + assertThat(result.length).isEqualTo(contentOne.length + contentTwo.length); + assertThat(result).isEqualTo("Test Data".getBytes()); + } +} diff --git a/bindings/java/src/test/java/org/apache/opendal/test/behavior/BlockingWriteOptionTest.java b/bindings/java/src/test/java/org/apache/opendal/test/behavior/BlockingWriteOptionTest.java new file mode 100644 index 000000000000..973f63a0e361 --- /dev/null +++ b/bindings/java/src/test/java/org/apache/opendal/test/behavior/BlockingWriteOptionTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.opendal.test.behavior; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.junit.jupiter.api.Assumptions.assumeTrue; +import java.util.UUID; +import org.apache.opendal.WriteOptions; +import org.junit.jupiter.api.Test; + +public class BlockingWriteOptionTest extends BehaviorTestBase { + + @Test + void testWriteWithCacheControl() { + assumeTrue(op().info.fullCapability.writeWithCacheControl); + + final String path = UUID.randomUUID().toString(); + final byte[] content = generateBytes(); + final String cacheControl = "max-age=3600"; + + WriteOptions options = WriteOptions.builder().cacheControl(cacheControl).build(); + op().write(path, content, options); + + String actualCacheControl = op().stat(path).getCacheControl(); + assertThat(actualCacheControl).isEqualTo(cacheControl); + } + + @Test + void testWriteWithContentType() { + assumeTrue(op().info.fullCapability.writeWithContentType); + + final String path = UUID.randomUUID().toString(); + final byte[] content = generateBytes(); + final String contentType = "application/json"; + + WriteOptions options = WriteOptions.builder().contentType(contentType).build(); + op().write(path, content, options); + + String actualContentType = op().stat(path).getContentType(); + assertThat(actualContentType).isEqualTo(contentType); + } + + @Test + void testWriteWithContentDisposition() { + assumeTrue(op().info.fullCapability.writeWithContentDisposition); + + final String path = UUID.randomUUID().toString(); + final byte[] content = generateBytes(); + final String disposition = "attachment; filename=\"test.txt\""; + + WriteOptions options = + WriteOptions.builder().contentDisposition(disposition).build(); + op().write(path, content, options); + + String actualDisposition = op().stat(path).getContentDisposition(); + assertThat(actualDisposition).isEqualTo(disposition); + } + + @Test + void testWriteWithAppend() { + assumeTrue(op().info.fullCapability.writeCanAppend); + + final String path = UUID.randomUUID().toString(); + final byte[] contentOne = "Test".getBytes(); + final byte[] contentTwo = " Data".getBytes(); + WriteOptions appendOptions = WriteOptions.builder().append(true).build(); + + op().write(path, contentOne, appendOptions); + op().write(path, contentTwo, appendOptions); + + byte[] result = op().read(path); + assertThat(result.length).isEqualTo(contentOne.length + contentTwo.length); + assertThat(result).isEqualTo("Test Data".getBytes()); + } +}