From 52c52bdd3ddd8e425827d9abcb35623271b14903 Mon Sep 17 00:00:00 2001 From: Yuhong Guo Date: Mon, 18 Feb 2019 14:51:30 +0800 Subject: [PATCH 1/2] Avoid Crash in Plasma Java Client --- ...org_apache_arrow_plasma_PlasmaClientJNI.cc | 28 ++++++++++++------ .../exceptions/PlasmaClientException.java | 29 +++++++++++++++++++ 2 files changed, 48 insertions(+), 9 deletions(-) create mode 100644 java/plasma/src/main/java/org/apache/arrow/plasma/exceptions/PlasmaClientException.java diff --git a/cpp/src/plasma/lib/java/org_apache_arrow_plasma_PlasmaClientJNI.cc b/cpp/src/plasma/lib/java/org_apache_arrow_plasma_PlasmaClientJNI.cc index 1988742af9b..248c268c071 100644 --- a/cpp/src/plasma/lib/java/org_apache_arrow_plasma_PlasmaClientJNI.cc +++ b/cpp/src/plasma/lib/java/org_apache_arrow_plasma_PlasmaClientJNI.cc @@ -42,6 +42,14 @@ inline void object_id_to_jbyteArray(JNIEnv* env, jbyteArray a, plasma::ObjectID* env->SetByteArrayRegion(a, 0, OBJECT_ID_SIZE, reinterpret_cast(oid)); } +inline void throw_exception_if_not_OK(JNIEnv* env, const arrow::Status& status) { + if (!status.ok()) { + jclass Exception = + env->FindClass("org/apache/arrow/plasma/exceptions/PlasmaClientException"); + env->ThrowNew(Exception, status.message().c_str()); + } +} + class JByteArrayGetter { private: JNIEnv* _env; @@ -67,7 +75,7 @@ JNIEXPORT jlong JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_connect( const char* m_name = env->GetStringUTFChars(manager_socket_name, nullptr); plasma::PlasmaClient* client = new plasma::PlasmaClient(); - ARROW_CHECK_OK(client->Connect(s_name, m_name, release_delay)); + throw_exception_if_not_OK(env, client->Connect(s_name, m_name, release_delay)); env->ReleaseStringUTFChars(store_socket_name, s_name); env->ReleaseStringUTFChars(manager_socket_name, m_name); @@ -78,7 +86,7 @@ JNIEXPORT void JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_disconnect( JNIEnv* env, jclass cls, jlong conn) { plasma::PlasmaClient* client = reinterpret_cast(conn); - ARROW_CHECK_OK(client->Disconnect()); + throw_exception_if_not_OK(env, client->Disconnect()); delete client; return; } @@ -115,7 +123,7 @@ JNIEXPORT jobject JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_create( env->ThrowNew(exceptionClass, ""); return nullptr; } - ARROW_CHECK(s.ok()); + throw_exception_if_not_OK(env, s); return env->NewDirectByteBuffer(data->mutable_data(), size); } @@ -145,7 +153,7 @@ JNIEXPORT void JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_seal( plasma::ObjectID oid; jbyteArray_to_object_id(env, object_id, &oid); - ARROW_CHECK_OK(client->Seal(oid)); + throw_exception_if_not_OK(env, client->Seal(oid)); } JNIEXPORT void JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_release( @@ -154,7 +162,7 @@ JNIEXPORT void JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_release( plasma::ObjectID oid; jbyteArray_to_object_id(env, object_id, &oid); - ARROW_CHECK_OK(client->Release(oid)); + throw_exception_if_not_OK(env, client->Release(oid)); } JNIEXPORT void JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_delete( @@ -163,7 +171,7 @@ JNIEXPORT void JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_delete( plasma::ObjectID oid; jbyteArray_to_object_id(env, object_id, &oid); - ARROW_CHECK_OK(client->Delete(oid)); + throw_exception_if_not_OK(env, client->Delete(oid)); } JNIEXPORT jobjectArray JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_get( @@ -179,7 +187,8 @@ JNIEXPORT jobjectArray JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_get( &oids[i]); } // TODO: may be blocked. consider to add the thread support - ARROW_CHECK_OK(client->Get(oids.data(), num_oids, timeout_ms, obufs.data())); + throw_exception_if_not_OK(env, + client->Get(oids.data(), num_oids, timeout_ms, obufs.data())); jclass clsByteBuffer = env->FindClass("java/nio/ByteBuffer"); jclass clsByteBufferArray = env->FindClass("[Ljava/nio/ByteBuffer;"); @@ -217,7 +226,7 @@ JNIEXPORT jboolean JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_contains jbyteArray_to_object_id(env, object_id, &oid); bool has_object; - ARROW_CHECK_OK(client->Contains(oid, &has_object)); + throw_exception_if_not_OK(env, client->Contains(oid, &has_object)); return has_object; } @@ -227,7 +236,8 @@ JNIEXPORT jlong JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_evict( plasma::PlasmaClient* client = reinterpret_cast(conn); int64_t evicted_bytes; - ARROW_CHECK_OK(client->Evict(static_cast(num_bytes), evicted_bytes)); + throw_exception_if_not_OK( + env, client->Evict(static_cast(num_bytes), evicted_bytes)); return static_cast(evicted_bytes); } diff --git a/java/plasma/src/main/java/org/apache/arrow/plasma/exceptions/PlasmaClientException.java b/java/plasma/src/main/java/org/apache/arrow/plasma/exceptions/PlasmaClientException.java new file mode 100644 index 00000000000..0608ad7178d --- /dev/null +++ b/java/plasma/src/main/java/org/apache/arrow/plasma/exceptions/PlasmaClientException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.plasma.exceptions; + +public class PlasmaClientException extends RuntimeException { + + public PlasmaClientException(String message) { + super(message); + } + + public PlasmaClientException(String message, Throwable t) { + super(message, t); + } +} From 3cbb4e3d9570522f5f553257d0300452cb9c6678 Mon Sep 17 00:00:00 2001 From: Yuhong Guo Date: Mon, 18 Feb 2019 22:27:41 +0800 Subject: [PATCH 2/2] Add Plasma Client Java test --- .../apache/arrow/plasma/PlasmaClientTest.java | 28 ++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/java/plasma/src/test/java/org/apache/arrow/plasma/PlasmaClientTest.java b/java/plasma/src/test/java/org/apache/arrow/plasma/PlasmaClientTest.java index 3f326d30d83..e978a246037 100644 --- a/java/plasma/src/test/java/org/apache/arrow/plasma/PlasmaClientTest.java +++ b/java/plasma/src/test/java/org/apache/arrow/plasma/PlasmaClientTest.java @@ -24,6 +24,7 @@ import java.util.stream.Collectors; import org.apache.arrow.plasma.exceptions.DuplicateObjectException; +import org.apache.arrow.plasma.exceptions.PlasmaClientException; import org.junit.Assert; public class PlasmaClientTest { @@ -205,7 +206,32 @@ public void doTest() { assert !pLink.contains(id6); System.out.println("Plasma java client delete test success."); - cleanup(); + // Test calling shuntdown while getting the object. + Thread thread = new Thread(() -> { + try { + TimeUnit.SECONDS.sleep(1); + cleanup(); + } catch (InterruptedException e) { + throw new RuntimeException("Got InterruptedException when sleeping.", e); + } + }); + thread.start(); + + try { + byte[] idNone = new byte[20]; + Arrays.fill(idNone, (byte)987); + pLink.get(idNone, timeoutMs, false); + Assert.fail("Fail to throw PlasmaClientException when get an object " + + "when object store shutdown."); + } catch (PlasmaClientException e) { + System.out.println(String.format("Expected PlasmaClientException: %s", e)); + } + + try { + thread.join(); + } catch (Exception e) { + System.out.println(String.format("Excpetion caught: %s", e)); + } System.out.println("All test success."); }