Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 51 additions & 9 deletions bindings/java/src/async_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::future::Future;
use std::str::FromStr;
use std::time::Duration;

Expand All @@ -29,12 +30,15 @@ use jni::sys::jobject;
use jni::sys::jsize;
use jni::JNIEnv;
use opendal::layers::BlockingLayer;
use opendal::operator_futures::FutureWrite;
use opendal::raw::PresignedRequest;
use opendal::Operator;
use opendal::Scheme;
use opendal::{Metadata, Operator};

use crate::convert::jmap_to_hashmap;
use crate::convert::jstring_to_string;
use crate::convert::{
get_optional_map_from_object, get_optional_string_from_object, jmap_to_hashmap,
};
use crate::executor::executor_or_default;
use crate::executor::get_current_env;
use crate::executor::Executor;
Expand Down Expand Up @@ -110,8 +114,9 @@ pub unsafe extern "system" fn Java_org_apache_opendal_AsyncOperator_write(
executor: *const Executor,
path: JString,
content: JByteArray,
write_options: JObject,
) -> jlong {
intern_write(&mut env, op, executor, path, content).unwrap_or_else(|e| {
intern_write(&mut env, op, executor, path, content, write_options).unwrap_or_else(|e| {
e.throw(&mut env);
0
})
Expand All @@ -123,25 +128,62 @@ fn intern_write(
executor: *const Executor,
path: JString,
content: JByteArray,
options: JObject,
) -> Result<jlong> {
let op = unsafe { &mut *op };
let id = request_id(env)?;

let path = jstring_to_string(env, &path)?;
let content = env.convert_byte_array(content)?;
let content_type = get_optional_string_from_object(env, &options, "getContentType")?;
let content_disposition =
get_optional_string_from_object(env, &options, "getContentDisposition")?;
let content_encoding = get_optional_string_from_object(env, &options, "getContentEncoding")?;
let cache_control = get_optional_string_from_object(env, &options, "getCacheControl")?;
let if_match = get_optional_string_from_object(env, &options, "getIfMatch")?;
let if_none_match = get_optional_string_from_object(env, &options, "getIfNoneMatch")?;
let append = env.call_method(&options, "isAppend", "()Z", &[])?.z()?;
let if_not_exists = env
.call_method(&options, "isIfNotExists", "()Z", &[])?
.z()?;
let user_metadata = get_optional_map_from_object(env, &options, "getUserMetadata");

let mut write_op = op.write_with(&path, content);
if let Some(ct) = content_type {
write_op = write_op.content_type(&ct);
}
if let Some(cd) = content_disposition {
write_op = write_op.content_disposition(&cd);
}
if let Some(ce) = content_encoding {
write_op = write_op.content_encoding(&ce);
}
if let Some(cc) = cache_control {
write_op = write_op.cache_control(&cc);
}
if let Some(im) = if_match {
write_op = write_op.if_match(&im);
}
if let Some(inm) = if_none_match {
write_op = write_op.if_none_match(&inm);
}
if let Ok(Some(um)) = user_metadata {
write_op = write_op.user_metadata(um);
}
write_op = write_op.if_not_exists(if_not_exists);
write_op = write_op.append(append);

executor_or_default(env, executor)?.spawn(async move {
let result = do_write(op, path, content).await;
complete_future(id, result.map(|_| JValueOwned::Void))
let result = write_op
.await
.map(|_| JValueOwned::Void)
.map_err(|e| e.into());
complete_future(id, result)
});

Ok(id)
}

async fn do_write(op: &mut Operator, path: String, content: Vec<u8>) -> Result<()> {
Ok(op.write(&path, content).await.map(|_| ())?)
}

/// # Safety
///
/// This function should not be called before the Operator is ready.
Expand Down
30 changes: 30 additions & 0 deletions bindings/java/src/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,36 @@ pub(crate) fn string_to_jstring<'a>(
)
}

pub(crate) fn get_optional_string_from_object<'a>(
env: &mut JNIEnv<'a>,
obj: &JObject,
method: &str,
) -> crate::Result<Option<String>> {
let result = env
.call_method(obj, method, "()Ljava/lang/String;", &[])?
.l()?;
if result.is_null() {
Ok(None)
} else {
Ok(Some(jstring_to_string(env, &JString::from(result))?))
}
}

pub(crate) fn get_optional_map_from_object<'a>(
env: &mut JNIEnv<'a>,
obj: &JObject,
method: &str,
) -> crate::Result<Option<HashMap<String, String>>> {
let result = env
.call_method(obj, method, "()Ljava/util/Map;", &[])?
.l()?;
if result.is_null() {
Ok(None)
} else {
Ok(Some(jmap_to_hashmap(env, &result)?))
}
}

/// # Safety
///
/// The caller must guarantee that the Object passed in is an instance
Expand Down
6 changes: 5 additions & 1 deletion bindings/java/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ fn make_operator_info<'a>(env: &mut JNIEnv<'a>, info: OperatorInfo) -> Result<JO
fn make_capability<'a>(env: &mut JNIEnv<'a>, cap: Capability) -> Result<JObject<'a>> {
let capability = env.new_object(
"org/apache/opendal/Capability",
"(ZZZZZZZZZZZZZZZJJZZZZZZZZZZZZZZ)V",
"(ZZZZZZZZZZZZZZZZZZZJJZZZZZZZZZZZZZZ)V",
&[
JValue::Bool(cap.stat as jboolean),
JValue::Bool(cap.stat_with_if_match as jboolean),
Expand All @@ -111,6 +111,10 @@ fn make_capability<'a>(env: &mut JNIEnv<'a>, cap: Capability) -> Result<JObject<
JValue::Bool(cap.write_with_content_type as jboolean),
JValue::Bool(cap.write_with_content_disposition as jboolean),
JValue::Bool(cap.write_with_cache_control as jboolean),
JValue::Bool(cap.write_with_if_match as jboolean),
JValue::Bool(cap.write_with_if_none_match as jboolean),
JValue::Bool(cap.write_with_if_not_exists as jboolean),
JValue::Bool(cap.write_with_user_metadata as jboolean),
JValue::Long(convert::usize_to_jlong(cap.write_multi_max_size)),
JValue::Long(convert::usize_to_jlong(cap.write_multi_min_size)),
JValue::Bool(cap.create_dir as jboolean),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,22 @@ public Operator blocking() {
}

public CompletableFuture<Void> write(String path, String content) {
return write(path, content.getBytes(StandardCharsets.UTF_8));
return write(
path,
content.getBytes(StandardCharsets.UTF_8),
WriteOptions.builder().build());
}

public CompletableFuture<Void> write(String path, byte[] content) {
final long requestId = write(nativeHandle, executorHandle, path, content);
return write(path, content, WriteOptions.builder().build());
}

public CompletableFuture<Void> write(String path, String content, WriteOptions options) {
return write(path, content.getBytes(StandardCharsets.UTF_8), options);
}

public CompletableFuture<Void> write(String path, byte[] content, WriteOptions options) {
final long requestId = write(nativeHandle, executorHandle, path, content, options);
return AsyncRegistry.take(requestId);
}

Expand Down Expand Up @@ -272,7 +283,8 @@ public CompletableFuture<List<Entry>> list(String path) {

private static native long read(long nativeHandle, long executorHandle, String path);

private static native long write(long nativeHandle, long executorHandle, String path, byte[] content);
private static native long write(
long nativeHandle, long executorHandle, String path, byte[] content, WriteOptions options);

private static native long append(long nativeHandle, long executorHandle, String path, byte[] content);

Expand Down
29 changes: 29 additions & 0 deletions bindings/java/src/main/java/org/apache/opendal/Capability.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,27 @@ public class Capability {
*/
public final boolean writeWithCacheControl;

/**
* If operator supports write with if match.
*/
public final boolean writeWithIfMatch;

/**
* If operator supports write with if none match.
*
*/
public final boolean writeWithIfNoneMatch;

/**
* If operator supports write with if not exists.
*/
public final boolean writeWithIfNotExists;

/**
* If operator supports write with user metadata.
*/
public final boolean writeWithUserMetadata;

/**
* write_multi_max_size is the max size that services support in write_multi.
* For example, AWS S3 supports 5GiB as max in write_multi.
Expand Down Expand Up @@ -196,6 +217,10 @@ public Capability(
boolean writeWithContentType,
boolean writeWithContentDisposition,
boolean writeWithCacheControl,
boolean writeWithIfMatch,
boolean writeWithIfNoneMatch,
boolean writeWithIfNotExists,
boolean writeWithUserMetadata,
long writeMultiMaxSize,
long writeMultiMinSize,
boolean createDir,
Expand Down Expand Up @@ -227,6 +252,10 @@ public Capability(
this.writeWithContentType = writeWithContentType;
this.writeWithContentDisposition = writeWithContentDisposition;
this.writeWithCacheControl = writeWithCacheControl;
this.writeWithIfMatch = writeWithIfMatch;
this.writeWithIfNoneMatch = writeWithIfNoneMatch;
this.writeWithIfNotExists = writeWithIfNotExists;
this.writeWithUserMetadata = writeWithUserMetadata;
this.writeMultiMaxSize = writeMultiMaxSize;
this.writeMultiMinSize = writeMultiMinSize;
this.createDir = createDir;
Expand Down
12 changes: 10 additions & 2 deletions bindings/java/src/main/java/org/apache/opendal/Operator.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,15 @@ public void write(String path, String content) {
}

public void write(String path, byte[] content) {
write(nativeHandle, path, content);
write(nativeHandle, path, content, WriteOptions.builder().build());
}

public void write(String path, String content, WriteOptions options) {
write(path, content.getBytes(StandardCharsets.UTF_8), options);
}

public void write(String path, byte[] content, WriteOptions options) {
write(nativeHandle, path, content, options);
}

public OperatorOutputStream createOutputStream(String path) {
Expand Down Expand Up @@ -128,7 +136,7 @@ public List<Entry> list(String path) {

private static native long duplicate(long op);

private static native void write(long op, String path, byte[] content);
private static native void write(long op, String path, byte[] content, WriteOptions options);

private static native byte[] read(long op, String path);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.opendal;

import java.util.Map;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;

@Getter
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class WriteOptions {

/**
* Sets the Content-Type header for the object.
* Requires capability: writeWithContentType
*/
private String contentType;

/**
* Sets the Content-Disposition header for the object
* Requires capability: writeWithContentDisposition
*/
private String contentDisposition;

/**
* Sets the Cache-Control header for the object
* Requires capability: writeWithCacheControl
*/
private String cacheControl;

/**
* Sets the Content-Encoding header for the object
*/
private String contentEncoding;

/**
* Sets the If-Match header for conditional writes
* Requires capability: writeWithIfMatch
*/
private String ifMatch;

/**
* Sets the If-None-Match header for conditional writes
* Requires capability: writeWithIfNoneMatch
*/
private String ifNoneMatch;

/**
* Sets custom metadata for the file.
* Requires capability: writeWithUserMetadata
*/
private Map<String, String> userMetadata;

/**
* Enables append mode for writing.
* When true, data will be appended to the end of existing file.
* Requires capability: writeCanAppend
*/
private boolean append;

/**
* Write only if the file does not exist.
* Operation will fail if the file at the designated path already exists.
* Requires capability: writeWithIfNotExists
*/
private boolean ifNotExists;
}
Loading