Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 19 additions & 9 deletions cpp/src/plasma/lib/java/org_apache_arrow_plasma_PlasmaClientJNI.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ inline void object_id_to_jbyteArray(JNIEnv* env, jbyteArray a, plasma::ObjectID*
env->SetByteArrayRegion(a, 0, OBJECT_ID_SIZE, reinterpret_cast<jbyte*>(oid));
}

inline void throw_exception_if_not_OK(JNIEnv* env, const arrow::Status& status) {
if (!status.ok()) {
jclass Exception =
env->FindClass("org/apache/arrow/plasma/exceptions/PlasmaClientException");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, we would raise different exceptions based on the status error category. I guess this is good enough for now.

env->ThrowNew(Exception, status.message().c_str());
}
}

class JByteArrayGetter {
private:
JNIEnv* _env;
Expand All @@ -67,7 +75,7 @@ JNIEXPORT jlong JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_connect(
const char* m_name = env->GetStringUTFChars(manager_socket_name, nullptr);

plasma::PlasmaClient* client = new plasma::PlasmaClient();
ARROW_CHECK_OK(client->Connect(s_name, m_name, release_delay));
throw_exception_if_not_OK(env, client->Connect(s_name, m_name, release_delay));

env->ReleaseStringUTFChars(store_socket_name, s_name);
env->ReleaseStringUTFChars(manager_socket_name, m_name);
Expand All @@ -78,7 +86,7 @@ JNIEXPORT void JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_disconnect(
JNIEnv* env, jclass cls, jlong conn) {
plasma::PlasmaClient* client = reinterpret_cast<plasma::PlasmaClient*>(conn);

ARROW_CHECK_OK(client->Disconnect());
throw_exception_if_not_OK(env, client->Disconnect());
delete client;
return;
}
Expand Down Expand Up @@ -115,7 +123,7 @@ JNIEXPORT jobject JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_create(
env->ThrowNew(exceptionClass, "");
return nullptr;
}
ARROW_CHECK(s.ok());
throw_exception_if_not_OK(env, s);

return env->NewDirectByteBuffer(data->mutable_data(), size);
}
Expand Down Expand Up @@ -145,7 +153,7 @@ JNIEXPORT void JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_seal(
plasma::ObjectID oid;
jbyteArray_to_object_id(env, object_id, &oid);

ARROW_CHECK_OK(client->Seal(oid));
throw_exception_if_not_OK(env, client->Seal(oid));
}

JNIEXPORT void JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_release(
Expand All @@ -154,7 +162,7 @@ JNIEXPORT void JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_release(
plasma::ObjectID oid;
jbyteArray_to_object_id(env, object_id, &oid);

ARROW_CHECK_OK(client->Release(oid));
throw_exception_if_not_OK(env, client->Release(oid));
}

JNIEXPORT void JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_delete(
Expand All @@ -163,7 +171,7 @@ JNIEXPORT void JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_delete(
plasma::ObjectID oid;
jbyteArray_to_object_id(env, object_id, &oid);

ARROW_CHECK_OK(client->Delete(oid));
throw_exception_if_not_OK(env, client->Delete(oid));
}

JNIEXPORT jobjectArray JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_get(
Expand All @@ -179,7 +187,8 @@ JNIEXPORT jobjectArray JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_get(
&oids[i]);
}
// TODO: may be blocked. consider to add the thread support
ARROW_CHECK_OK(client->Get(oids.data(), num_oids, timeout_ms, obufs.data()));
throw_exception_if_not_OK(env,
client->Get(oids.data(), num_oids, timeout_ms, obufs.data()));

jclass clsByteBuffer = env->FindClass("java/nio/ByteBuffer");
jclass clsByteBufferArray = env->FindClass("[Ljava/nio/ByteBuffer;");
Expand Down Expand Up @@ -217,7 +226,7 @@ JNIEXPORT jboolean JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_contains
jbyteArray_to_object_id(env, object_id, &oid);

bool has_object;
ARROW_CHECK_OK(client->Contains(oid, &has_object));
throw_exception_if_not_OK(env, client->Contains(oid, &has_object));

return has_object;
}
Expand All @@ -227,7 +236,8 @@ JNIEXPORT jlong JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_evict(
plasma::PlasmaClient* client = reinterpret_cast<plasma::PlasmaClient*>(conn);

int64_t evicted_bytes;
ARROW_CHECK_OK(client->Evict(static_cast<int64_t>(num_bytes), evicted_bytes));
throw_exception_if_not_OK(
env, client->Evict(static_cast<int64_t>(num_bytes), evicted_bytes));

return static_cast<jlong>(evicted_bytes);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.plasma.exceptions;

public class PlasmaClientException extends RuntimeException {

public PlasmaClientException(String message) {
super(message);
}

public PlasmaClientException(String message, Throwable t) {
super(message, t);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.stream.Collectors;

import org.apache.arrow.plasma.exceptions.DuplicateObjectException;
import org.apache.arrow.plasma.exceptions.PlasmaClientException;
import org.junit.Assert;

public class PlasmaClientTest {
Expand Down Expand Up @@ -205,7 +206,32 @@ public void doTest() {
assert !pLink.contains(id6);
System.out.println("Plasma java client delete test success.");

cleanup();
// Test calling shuntdown while getting the object.
Thread thread = new Thread(() -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for my understanding..is there any reason to call this in a separate thread..can we just call using cleanup?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we call cleanup directly, the socket is closed normally. There will be no exception as expected. This exception happens while client is waiting the reply from the server and at this time the server is killed or crashed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, is this test is not susceptible on timing issues then? for e.g. if the read returns before the server cleanup command.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, if the read returns before the server cleanup command, this test will fail. However, I used a none-existent object id in the get function and the function timeout is 3 seconds to guarantee that this test will pass.

try {
TimeUnit.SECONDS.sleep(1);
cleanup();
} catch (InterruptedException e) {
throw new RuntimeException("Got InterruptedException when sleeping.", e);
}
});
thread.start();

try {
byte[] idNone = new byte[20];
Arrays.fill(idNone, (byte)987);
pLink.get(idNone, timeoutMs, false);
Assert.fail("Fail to throw PlasmaClientException when get an object " +
"when object store shutdown.");
} catch (PlasmaClientException e) {
System.out.println(String.format("Expected PlasmaClientException: %s", e));
}

try {
thread.join();
} catch (Exception e) {
System.out.println(String.format("Excpetion caught: %s", e));
}
System.out.println("All test success.");

}
Expand Down