diff --git a/.travis.yml b/.travis.yml index b94a6324563..eca26c8bc2d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -130,6 +130,8 @@ matrix: env: ARROW_TEST_GROUP=integration jdk: openjdk8 env: + - ARROW_TRAVIS_PLASMA=1 + - ARROW_TRAVIS_PLASMA_JAVA_CLIENT=1 - CC="clang-6.0" - CXX="clang++-6.0" before_script: @@ -141,6 +143,7 @@ matrix: - $TRAVIS_BUILD_DIR/ci/travis_before_script_cpp.sh script: - $TRAVIS_BUILD_DIR/ci/travis_script_integration.sh + - $TRAVIS_BUILD_DIR/ci/travis_script_plasma_java_client.sh # NodeJS - language: node_js os: linux diff --git a/ci/travis_before_script_cpp.sh b/ci/travis_before_script_cpp.sh index 379007d5bf0..541d5fd937d 100755 --- a/ci/travis_before_script_cpp.sh +++ b/ci/travis_before_script_cpp.sh @@ -74,6 +74,10 @@ if [ $ARROW_TRAVIS_PLASMA == "1" ]; then CMAKE_COMMON_FLAGS="$CMAKE_COMMON_FLAGS -DARROW_PLASMA=ON" fi +if [ $ARROW_TRAVIS_PLASMA_JAVA_CLIENT == "1" ]; then + CMAKE_COMMON_FLAGS="$CMAKE_COMMON_FLAGS -DARROW_PLASMA_JAVA_CLIENT=ON" +fi + if [ $ARROW_TRAVIS_ORC == "1" ]; then CMAKE_COMMON_FLAGS="$CMAKE_COMMON_FLAGS -DARROW_ORC=ON" fi diff --git a/ci/travis_script_plasma_java_client.sh b/ci/travis_script_plasma_java_client.sh new file mode 100755 index 00000000000..628796dac25 --- /dev/null +++ b/ci/travis_script_plasma_java_client.sh @@ -0,0 +1,30 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +set -e + +PLASMA_JAVA_DIR=${TRAVIS_BUILD_DIR}/java/plasma + +pushd $PLASMA_JAVA_DIR + +mvn clean install +export PLASMA_STORE=${TRAVIS_BUILD_DIR}/cpp-install/bin/plasma_store +java -cp target/test-classes:target/classes -Djava.library.path=${TRAVIS_BUILD_DIR}/cpp-build/debug/ org.apache.arrow.plasma.PlasmaClientTest + +popd diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 521e6850b61..0fd100789f6 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -199,6 +199,10 @@ if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}") "Build the plasma object store along with Arrow" OFF) + option(ARROW_PLASMA_JAVA_CLIENT + "Build the plasma object store java client" + OFF) + option(ARROW_USE_SSE "Build with SSE4 optimizations" OFF) diff --git a/cpp/src/plasma/CMakeLists.txt b/cpp/src/plasma/CMakeLists.txt index ed425f6b81a..fd6c7ac3be1 100644 --- a/cpp/src/plasma/CMakeLists.txt +++ b/cpp/src/plasma/CMakeLists.txt @@ -160,6 +160,41 @@ install( FILES "${CMAKE_CURRENT_BINARY_DIR}/plasma.pc" DESTINATION "${CMAKE_INSTALL_LIBDIR}/pkgconfig/") +if(ARROW_PLASMA_JAVA_CLIENT) + # Plasma java client support + find_package(JNI REQUIRED) + # add jni support + include_directories(${JAVA_INCLUDE_PATH}) + include_directories(${JAVA_INCLUDE_PATH2}) + if (JNI_FOUND) + message (STATUS "JNI_INCLUDE_DIRS = ${JNI_INCLUDE_DIRS}") + message (STATUS "JNI_LIBRARIES = ${JNI_LIBRARIES}") + else() + message (WARNING "Could not find JNI") + endif() + + add_compile_options("-I$ENV{JAVA_HOME}/include/") + if(WIN32) + add_compile_options("-I$ENV{JAVA_HOME}/include/win32") + elseif(APPLE) + add_compile_options("-I$ENV{JAVA_HOME}/include/darwin") + else() # linux + add_compile_options("-I$ENV{JAVA_HOME}/include/linux") + endif() + + include_directories("${CMAKE_CURRENT_LIST_DIR}/lib/java") + + file(GLOB PLASMA_LIBRARY_EXT_java_SRC + lib/java/*.cc lib/*.cc) + add_library(plasma_java SHARED + ${PLASMA_LIBRARY_EXT_java_SRC}) + + if(APPLE) + target_link_libraries(plasma_java plasma_static ${PLASMA_LINK_LIBS} "-undefined dynamic_lookup" -Wl,-force_load,${FLATBUFFERS_STATIC_LIB} ${FLATBUFFERS_STATIC_LIB} ${PTHREAD_LIBRARY}) + else(APPLE) + target_link_libraries(plasma_java plasma_static ${PLASMA_LINK_LIBS} -Wl,--whole-archive ${FLATBUFFERS_STATIC_LIB} -Wl,--no-whole-archive ${FLATBUFFERS_STATIC_LIB} ${PTHREAD_LIBRARY}) + endif(APPLE) +endif() ####################################### # Unit tests ####################################### diff --git a/cpp/src/plasma/lib/java/org_apache_arrow_plasma_PlasmaClientJNI.cc b/cpp/src/plasma/lib/java/org_apache_arrow_plasma_PlasmaClientJNI.cc new file mode 100644 index 00000000000..8ddfeebe698 --- /dev/null +++ b/cpp/src/plasma/lib/java/org_apache_arrow_plasma_PlasmaClientJNI.cc @@ -0,0 +1,295 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "plasma/lib/java/org_apache_arrow_plasma_PlasmaClientJNI.h" + +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "plasma/client.h" + +constexpr jsize OBJECT_ID_SIZE = sizeof(plasma::ObjectID) / sizeof(jbyte); + +inline void jbyteArray_to_object_id(JNIEnv* env, jbyteArray a, plasma::ObjectID* oid) { + env->GetByteArrayRegion(a, 0, OBJECT_ID_SIZE, reinterpret_cast(oid)); +} + +inline void object_id_to_jbyteArray(JNIEnv* env, jbyteArray a, plasma::ObjectID* oid) { + env->SetByteArrayRegion(a, 0, OBJECT_ID_SIZE, reinterpret_cast(oid)); +} + +class JByteArrayGetter { + private: + JNIEnv* _env; + jbyteArray _a; + jbyte* bp; + + public: + JByteArrayGetter(JNIEnv* env, jbyteArray a, jbyte** out) { + _env = env; + _a = a; + + bp = _env->GetByteArrayElements(_a, nullptr); + *out = bp; + } + + ~JByteArrayGetter() { _env->ReleaseByteArrayElements(_a, bp, 0); } +}; + +JNIEXPORT jlong JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_connect( + JNIEnv* env, jclass cls, jstring store_socket_name, jstring manager_socket_name, + jint release_delay) { + const char* s_name = env->GetStringUTFChars(store_socket_name, nullptr); + const char* m_name = env->GetStringUTFChars(manager_socket_name, nullptr); + + plasma::PlasmaClient* client = new plasma::PlasmaClient(); + ARROW_CHECK_OK(client->Connect(s_name, m_name, release_delay)); + + env->ReleaseStringUTFChars(store_socket_name, s_name); + env->ReleaseStringUTFChars(manager_socket_name, m_name); + return reinterpret_cast(client); +} + +JNIEXPORT void JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_disconnect( + JNIEnv* env, jclass cls, jlong conn) { + plasma::PlasmaClient* client = reinterpret_cast(conn); + + ARROW_CHECK_OK(client->Disconnect()); + delete client; + return; +} + +JNIEXPORT jobject JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_create( + JNIEnv* env, jclass cls, jlong conn, jbyteArray object_id, jint size, + jbyteArray metadata) { + plasma::PlasmaClient* client = reinterpret_cast(conn); + plasma::ObjectID oid; + jbyteArray_to_object_id(env, object_id, &oid); + + // prepare metadata buffer + uint8_t* md = nullptr; + jsize md_size = 0; + std::unique_ptr md_getter; + if (metadata != nullptr) { + md_size = env->GetArrayLength(metadata); + } + if (md_size > 0) { + md_getter.reset(new JByteArrayGetter(env, metadata, reinterpret_cast(&md))); + } + + std::shared_ptr data; + Status s = client->Create(oid, size, md, md_size, &data); + if (s.IsPlasmaObjectExists()) { + jclass Exception = env->FindClass("java/lang/Exception"); + env->ThrowNew(Exception, + "An object with this ID already exists in the plasma store."); + return nullptr; + } + if (s.IsPlasmaStoreFull()) { + jclass Exception = env->FindClass("java/lang/Exception"); + env->ThrowNew(Exception, + "The plasma store ran out of memory and could not create this object."); + return nullptr; + } + ARROW_CHECK(s.ok()); + + return env->NewDirectByteBuffer(data->mutable_data(), size); +} + +JNIEXPORT jbyteArray JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_hash( + JNIEnv* env, jclass cls, jlong conn, jbyteArray object_id) { + plasma::PlasmaClient* client = reinterpret_cast(conn); + plasma::ObjectID oid; + jbyteArray_to_object_id(env, object_id, &oid); + + unsigned char digest[plasma::kDigestSize]; + bool success = client->Hash(oid, digest).ok(); + + if (success) { + jbyteArray ret = env->NewByteArray(plasma::kDigestSize); + env->SetByteArrayRegion(ret, 0, plasma::kDigestSize, + reinterpret_cast(digest)); + return ret; + } else { + return nullptr; + } +} + +JNIEXPORT void JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_seal( + JNIEnv* env, jclass cls, jlong conn, jbyteArray object_id) { + plasma::PlasmaClient* client = reinterpret_cast(conn); + plasma::ObjectID oid; + jbyteArray_to_object_id(env, object_id, &oid); + + ARROW_CHECK_OK(client->Seal(oid)); +} + +JNIEXPORT void JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_release( + JNIEnv* env, jclass cls, jlong conn, jbyteArray object_id) { + plasma::PlasmaClient* client = reinterpret_cast(conn); + plasma::ObjectID oid; + jbyteArray_to_object_id(env, object_id, &oid); + + ARROW_CHECK_OK(client->Release(oid)); +} + +JNIEXPORT jobjectArray JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_get( + JNIEnv* env, jclass cls, jlong conn, jobjectArray object_ids, jint timeout_ms) { + plasma::PlasmaClient* client = reinterpret_cast(conn); + + jsize num_oids = env->GetArrayLength(object_ids); + std::vector oids(num_oids); + std::vector obufs(num_oids); + for (int i = 0; i < num_oids; ++i) { + jbyteArray_to_object_id( + env, reinterpret_cast(env->GetObjectArrayElement(object_ids, i)), + &oids[i]); + } + // TODO: may be blocked. consider to add the thread support + ARROW_CHECK_OK(client->Get(oids.data(), num_oids, timeout_ms, obufs.data())); + + jclass clsByteBuffer = env->FindClass("java/nio/ByteBuffer"); + jclass clsByteBufferArray = env->FindClass("[Ljava/nio/ByteBuffer;"); + + jobjectArray ret = env->NewObjectArray(num_oids, clsByteBufferArray, nullptr); + jobjectArray o = nullptr; + jobject dataBuf, metadataBuf; + for (int i = 0; i < num_oids; ++i) { + o = env->NewObjectArray(2, clsByteBuffer, nullptr); + if (obufs[i].data && obufs[i].data->size() != -1) { + dataBuf = env->NewDirectByteBuffer(const_cast(obufs[i].data->data()), + obufs[i].data->size()); + if (obufs[i].metadata && obufs[i].metadata->size() > 0) { + metadataBuf = env->NewDirectByteBuffer( + const_cast(obufs[i].metadata->data()), obufs[i].metadata->size()); + } else { + metadataBuf = nullptr; + } + } else { + dataBuf = nullptr; + metadataBuf = nullptr; + } + + env->SetObjectArrayElement(o, 0, dataBuf); + env->SetObjectArrayElement(o, 1, metadataBuf); + env->SetObjectArrayElement(ret, i, o); + } + return ret; +} + +JNIEXPORT jboolean JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_contains( + JNIEnv* env, jclass cls, jlong conn, jbyteArray object_id) { + plasma::PlasmaClient* client = reinterpret_cast(conn); + plasma::ObjectID oid; + jbyteArray_to_object_id(env, object_id, &oid); + + bool has_object; + ARROW_CHECK_OK(client->Contains(oid, &has_object)); + + return has_object; +} + +JNIEXPORT void JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_fetch( + JNIEnv* env, jclass cls, jlong conn, jobjectArray object_ids) { + plasma::PlasmaClient* client = reinterpret_cast(conn); + jsize num_oids = env->GetArrayLength(object_ids); + + std::vector oids(num_oids); + for (int i = 0; i < num_oids; ++i) { + jbyteArray_to_object_id( + env, reinterpret_cast(env->GetObjectArrayElement(object_ids, i)), + &oids[i]); + } + + ARROW_CHECK_OK(client->Fetch(static_cast(num_oids), oids.data())); + + return; +} + +JNIEXPORT jobjectArray JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_wait( + JNIEnv* env, jclass cls, jlong conn, jobjectArray object_ids, jint timeout_ms, + jint num_returns) { + plasma::PlasmaClient* client = reinterpret_cast(conn); + jsize num_oids = env->GetArrayLength(object_ids); + + if (num_returns < 0) { + jclass Exception = env->FindClass("java/lang/RuntimeException"); + env->ThrowNew(Exception, "The argument num_returns cannot be less than zero."); + return nullptr; + } + if (num_returns > num_oids) { + jclass Exception = env->FindClass("java/lang/RuntimeException"); + env->ThrowNew(Exception, + "The argument num_returns cannot be greater than len(object_ids)."); + return nullptr; + } + + std::vector oreqs(num_oids); + + for (int i = 0; i < num_oids; ++i) { + jbyteArray_to_object_id( + env, reinterpret_cast(env->GetObjectArrayElement(object_ids, i)), + &oreqs[i].object_id); + oreqs[i].type = plasma::PLASMA_QUERY_ANYWHERE; + } + + int num_return_objects; + // TODO: may be blocked. consider to add the thread support + ARROW_CHECK_OK(client->Wait(static_cast(num_oids), oreqs.data(), num_returns, + static_cast(timeout_ms), &num_return_objects)); + + int num_to_return = std::min(num_return_objects, num_returns); + jclass clsByteArray = env->FindClass("[B"); + jobjectArray ret = env->NewObjectArray(num_to_return, clsByteArray, nullptr); + + int num_returned = 0; + jbyteArray oid = nullptr; + for (int i = 0; i < num_oids; ++i) { + if (num_returned >= num_to_return) { + break; + } + + if (oreqs[i].status == plasma::ObjectStatusLocal || + oreqs[i].status == plasma::ObjectStatusRemote) { + oid = env->NewByteArray(OBJECT_ID_SIZE); + object_id_to_jbyteArray(env, oid, &oreqs[i].object_id); + env->SetObjectArrayElement(ret, num_returned, oid); + num_returned++; + } + } + ARROW_CHECK(num_returned == num_to_return); + + return ret; +} + +JNIEXPORT jlong JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_evict( + JNIEnv* env, jclass cls, jlong conn, jlong num_bytes) { + plasma::PlasmaClient* client = reinterpret_cast(conn); + + int64_t evicted_bytes; + ARROW_CHECK_OK(client->Evict(static_cast(num_bytes), evicted_bytes)); + + return static_cast(evicted_bytes); +} diff --git a/cpp/src/plasma/lib/java/org_apache_arrow_plasma_PlasmaClientJNI.h b/cpp/src/plasma/lib/java/org_apache_arrow_plasma_PlasmaClientJNI.h new file mode 100644 index 00000000000..697a960484c --- /dev/null +++ b/cpp/src/plasma/lib/java/org_apache_arrow_plasma_PlasmaClientJNI.h @@ -0,0 +1,123 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/* DO NOT EDIT THIS FILE - it is machine generated */ +#include +/* Header for class org_apache_arrow_plasma_PlasmaClientJNI */ + +#ifndef _Included_org_apache_arrow_plasma_PlasmaClientJNI +#define _Included_org_apache_arrow_plasma_PlasmaClientJNI +#ifdef __cplusplus +extern "C" { +#endif +/* + * Class: org_apache_arrow_plasma_PlasmaClientJNI + * Method: connect + * Signature: (Ljava/lang/String;Ljava/lang/String;I)J + */ +JNIEXPORT jlong JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_connect( + JNIEnv*, jclass, jstring, jstring, jint); + +/* + * Class: org_apache_arrow_plasma_PlasmaClientJNI + * Method: disconnect + * Signature: (J)V + */ +JNIEXPORT void JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_disconnect(JNIEnv*, + jclass, + jlong); + +/* + * Class: org_apache_arrow_plasma_PlasmaClientJNI + * Method: create + * Signature: (J[BI[B)Ljava/nio/ByteBuffer; + */ +JNIEXPORT jobject JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_create( + JNIEnv*, jclass, jlong, jbyteArray, jint, jbyteArray); + +/* + * Class: org_apache_arrow_plasma_PlasmaClientJNI + * Method: hash + * Signature: (J[B)[B + */ +JNIEXPORT jbyteArray JNICALL +Java_org_apache_arrow_plasma_PlasmaClientJNI_hash(JNIEnv*, jclass, jlong, jbyteArray); + +/* + * Class: org_apache_arrow_plasma_PlasmaClientJNI + * Method: seal + * Signature: (J[B)V + */ +JNIEXPORT void JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_seal(JNIEnv*, jclass, + jlong, + jbyteArray); + +/* + * Class: org_apache_arrow_plasma_PlasmaClientJNI + * Method: release + * Signature: (J[B)V + */ +JNIEXPORT void JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_release(JNIEnv*, + jclass, jlong, + jbyteArray); + +/* + * Class: org_apache_arrow_plasma_PlasmaClientJNI + * Method: get + * Signature: (J[[BI)[[Ljava/nio/ByteBuffer; + */ +JNIEXPORT jobjectArray JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_get( + JNIEnv*, jclass, jlong, jobjectArray, jint); + +/* + * Class: org_apache_arrow_plasma_PlasmaClientJNI + * Method: contains + * Signature: (J[B)Z + */ +JNIEXPORT jboolean JNICALL +Java_org_apache_arrow_plasma_PlasmaClientJNI_contains(JNIEnv*, jclass, jlong, jbyteArray); + +/* + * Class: org_apache_arrow_plasma_PlasmaClientJNI + * Method: fetch + * Signature: (J[[B)V + */ +JNIEXPORT void JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_fetch(JNIEnv*, jclass, + jlong, + jobjectArray); + +/* + * Class: org_apache_arrow_plasma_PlasmaClientJNI + * Method: wait + * Signature: (J[[BII)[[B + */ +JNIEXPORT jobjectArray JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_wait( + JNIEnv*, jclass, jlong, jobjectArray, jint, jint); + +/* + * Class: org_apache_arrow_plasma_PlasmaClientJNI + * Method: evict + * Signature: (JJ)J + */ +JNIEXPORT jlong JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_evict(JNIEnv*, + jclass, jlong, + jlong); + +#ifdef __cplusplus +} +#endif +#endif diff --git a/java/plasma/README.md b/java/plasma/README.md new file mode 100644 index 00000000000..0dcb4e21f86 --- /dev/null +++ b/java/plasma/README.md @@ -0,0 +1,39 @@ + + +# Java Plasma Client + +## Setup Build Environment + +Install: + - java 8 or later + - maven 3.3 or later + - the same requirement of build [Arrow C++](https://github.com/apache/arrow/tree/master/cpp) + +## Build the jar of plasma client + +``` +cd .. +mvn clean install -pl plasma -am -Dmaven.test.skip +``` + +## Building and running tests +``` +./test.sh +``` diff --git a/java/plasma/pom.xml b/java/plasma/pom.xml new file mode 100644 index 00000000000..6b8a91c38ec --- /dev/null +++ b/java/plasma/pom.xml @@ -0,0 +1,33 @@ + + + + 4.0.0 + + org.apache.arrow + arrow-java-root + 0.10.0-SNAPSHOT + + arrow-plasma + Arrow Plasma Client + + + + org.apache.maven.plugins + maven-compiler-plugin + + 1.8 + 1.8 + + + + + diff --git a/java/plasma/src/main/java/org/apache/arrow/plasma/ObjectStoreLink.java b/java/plasma/src/main/java/org/apache/arrow/plasma/ObjectStoreLink.java new file mode 100644 index 00000000000..e8442730264 --- /dev/null +++ b/java/plasma/src/main/java/org/apache/arrow/plasma/ObjectStoreLink.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.plasma; + +import java.util.List; + +/** + * Object store interface, which provides the capabilities to put and get raw byte array, and serves + */ +public interface ObjectStoreLink { + + /** + * Put value in the local plasma store with object ID objectId. + * + * @param objectId The object ID of the value to be put. + * @param value The value to put in the object store. + * @param metadata encodes whatever metadata the user wishes to encode. + */ + void put(byte[] objectId, byte[] value, byte[] metadata); + + /** + * Create a buffer from the PlasmaStore based on the objectId. + * + * @param objectId The object ID used to identify the object. + * @param timeoutMs The number of milliseconds that the get call should block before timing out + * and returning. Pass -1 if the call should block and 0 if the call should return immediately. + * @param isMetadata false if get data, otherwise get metadata. + * @return A PlasmaBuffer wrapping the object. + */ + default byte[] get(byte[] objectId, int timeoutMs, boolean isMetadata) { + byte[][] objectIds = {objectId}; + return get(objectIds, timeoutMs, isMetadata).get(0); + } + + /** + * Create buffers from the PlasmaStore based on objectIds. + * + * @param objectIds List of object IDs used to identify some objects. + * @param timeoutMs The number of milliseconds that the get call should block before timing out + * and returning. Pass -1 if the call should block and 0 if the call should return immediately. + * @param isMetadata false if get data, otherwise get metadata. + * @return List of PlasmaBuffers wrapping objects. + */ + List get(byte[][] objectIds, int timeoutMs, boolean isMetadata); + + /** + * Wait until numReturns objects in objectIds are ready. + * + * @param objectIds List of object IDs to wait for. + * @param timeoutMs Return to the caller after timeoutMs milliseconds. + * @param numReturns We are waiting for this number of objects to be ready. + * @return List of object IDs that are ready + */ + List wait(byte[][] objectIds, int timeoutMs, int numReturns); + + /** + * Compute the hash of an object in the object store. + * + * @param objectId The object ID used to identify the object. + * @return A digest byte array contains object's SHA256 hash. null means that the object + * isn't in the object store. + */ + byte[] hash(byte[] objectId); + + /** + * Fetch the object with the given ID from other plasma manager instances. + * + * @param objectId The object ID used to identify the object. + */ + default void fetch(byte[] objectId) { + byte[][] objectIds = {objectId}; + fetch(objectIds); + } + + /** + * Fetch the objects with the given IDs from other plasma manager instances. + * + * @param objectIds List of object IDs used to identify the objects. + */ + void fetch(byte[][] objectIds); + + /** + * Evict some objects to recover given count of bytes. + * + * @param numBytes The number of bytes to attempt to recover. + * @return The number of bytes that have been evicted. + */ + long evict(long numBytes); + + /** + * Release the reference of the object. + * + * @param objectId The object ID used to release the reference of the object. + */ + void release(byte[] objectId); + + /** + * Check if the object is present and has been sealed in the PlasmaStore. + * + * @param objectId used to identify an object. + */ + boolean contains(byte[] objectId); +} diff --git a/java/plasma/src/main/java/org/apache/arrow/plasma/PlasmaClient.java b/java/plasma/src/main/java/org/apache/arrow/plasma/PlasmaClient.java new file mode 100644 index 00000000000..33e4e7d9b63 --- /dev/null +++ b/java/plasma/src/main/java/org/apache/arrow/plasma/PlasmaClient.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.plasma; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + + + +/** + * The PlasmaClient is used to interface with a plasma store and manager. + * + * The PlasmaClient can ask the PlasmaStore to allocate a new buffer, seal a buffer, and get a + * buffer. Buffers are referred to by object IDs. + */ +public class PlasmaClient implements ObjectStoreLink { + + private final long conn; + + protected void finalize() { + PlasmaClientJNI.disconnect(this.conn); + } + + // use plasma client to initialize the underlying jni system as well via config and config-overwrites + public PlasmaClient(String storeSocketName, String managerSocketName, int releaseDelay) { + this.conn = PlasmaClientJNI.connect(storeSocketName, managerSocketName, releaseDelay); + } + + // interface methods -------------------- + + @Override + public void put(byte[] objectId, byte[] value, byte[] metadata) { + ByteBuffer buf = null; + try { + buf = PlasmaClientJNI.create(conn, objectId, value.length, metadata); + } catch (Exception e) { + System.err.println("ObjectId " + objectId + " error at PlasmaClient put"); + e.printStackTrace(); + } + if (buf == null) { + return; + } + + buf.put(value); + PlasmaClientJNI.seal(conn, objectId); + PlasmaClientJNI.release(conn, objectId); + } + + @Override + public List get(byte[][] objectIds, int timeoutMs, boolean isMetadata) { + ByteBuffer[][] bufs = PlasmaClientJNI.get(conn, objectIds, timeoutMs); + assert bufs.length == objectIds.length; + + List ret = new ArrayList<>(); + for (int i = 0; i < bufs.length; i++) { + ByteBuffer buf = bufs[i][isMetadata ? 1 : 0]; + if (buf == null) { + ret.add(null); + } else { + byte[] bb = new byte[buf.remaining()]; + buf.get(bb); + ret.add(bb); + } + } + return ret; + } + + @Override + public List wait(byte[][] objectIds, int timeoutMs, int numReturns) { + byte[][] readys = PlasmaClientJNI.wait(conn, objectIds, timeoutMs, numReturns); + + List ret = new ArrayList<>(); + for (byte[] ready : readys) { + for (byte[] id : objectIds) { + if (Arrays.equals(ready, id)) { + ret.add(id); + break; + } + } + } + + assert (ret.size() == readys.length); + return ret; + } + + @Override + public byte[] hash(byte[] objectId) { + return PlasmaClientJNI.hash(conn, objectId); + } + + @Override + public void fetch(byte[][] objectIds) { + PlasmaClientJNI.fetch(conn, objectIds); + } + + @Override + public long evict(long numBytes) { + return PlasmaClientJNI.evict(conn, numBytes); + } + + // wrapper methods -------------------- + + /** + * Seal the buffer in the PlasmaStore for a particular object ID. + * Once a buffer has been sealed, the buffer is immutable and can only be accessed through get. + * + * @param objectId used to identify an object. + */ + public void seal(byte[] objectId) { + PlasmaClientJNI.seal(conn, objectId); + } + + /** + * Notify Plasma that the object is no longer needed. + * + * @param objectId used to identify an object. + */ + public void release(byte[] objectId) { + PlasmaClientJNI.release(conn, objectId); + } + + /** + * Check if the object is present and has been sealed in the PlasmaStore. + * + * @param objectId used to identify an object. + */ + @Override + public boolean contains(byte[] objectId) { + return PlasmaClientJNI.contains(conn, objectId); + } +} diff --git a/java/plasma/src/main/java/org/apache/arrow/plasma/PlasmaClientJNI.java b/java/plasma/src/main/java/org/apache/arrow/plasma/PlasmaClientJNI.java new file mode 100644 index 00000000000..f0cf385a576 --- /dev/null +++ b/java/plasma/src/main/java/org/apache/arrow/plasma/PlasmaClientJNI.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.plasma; + +import java.nio.ByteBuffer; + +/** + * JNI static methods for PlasmaClient + */ +public class PlasmaClientJNI { + + native public static long connect(String store_socket_name, String manager_socket_name, int release_delay); + + native public static void disconnect(long conn); + + native public static ByteBuffer create(long conn, byte[] object_id, int size, byte[] metadata); + + native public static byte[] hash(long conn, byte[] object_id); + + native public static void seal(long conn, byte[] object_id); + + native public static void release(long conn, byte[] object_id); + + native public static ByteBuffer[][] get(long conn, byte[][] object_ids, int timeout_ms); + + native public static boolean contains(long conn, byte[] object_id); + + native public static void fetch(long conn, byte[][] object_ids); + + native public static byte[][] wait(long conn, byte[][] object_ids, int timeout_ms, + int num_returns); + + native public static long evict(long conn, long num_bytes); + +} diff --git a/java/plasma/src/test/java/org/apache/arrow/plasma/PlasmaClientTest.java b/java/plasma/src/test/java/org/apache/arrow/plasma/PlasmaClientTest.java new file mode 100644 index 00000000000..6b67fc8c930 --- /dev/null +++ b/java/plasma/src/test/java/org/apache/arrow/plasma/PlasmaClientTest.java @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.plasma; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +public class PlasmaClientTest { + + private String storeSuffix = "/tmp/store"; + + private Process storeProcess; + + private int storePort; + + private ObjectStoreLink pLink; + + + public PlasmaClientTest() throws Exception{ + try { + String plasmaStorePath = System.getenv("PLASMA_STORE"); + if(plasmaStorePath == null) { + throw new Exception("Please set plasma store path in env PLASMA_STORE"); + } + + this.startObjectStore(plasmaStorePath); + System.loadLibrary("plasma_java"); + pLink = new PlasmaClient(this.getStoreAddress(), "", 0); + } + catch (Throwable t) { + cleanup(); + throw t; + } + + } + + private Process startProcess(String[] cmd) { + ProcessBuilder builder; + List newCmd = Arrays.stream(cmd).filter(s -> s.length() > 0).collect(Collectors.toList()); + builder = new ProcessBuilder(newCmd); + Process p = null; + try { + p = builder.start(); + } catch (IOException e) { + e.printStackTrace(); + return null; + } + System.out.println("Start process " + p.hashCode() + " OK, cmd = " + Arrays.toString(cmd).replace(',', ' ')); + return p; + } + + private void startObjectStore(String plasmaStorePath) { + int occupiedMemoryMB = 10; + long memoryBytes = occupiedMemoryMB * 1000000; + int numRetries = 10; + Process p = null; + while (numRetries-- > 0) { + int currentPort = java.util.concurrent.ThreadLocalRandom.current().nextInt(0, 100000); + String name = storeSuffix + currentPort; + String cmd = plasmaStorePath + " -s " + name + " -m " + memoryBytes; + + p = startProcess(cmd.split(" ")); + + if (p != null && p.isAlive()) { + try { + TimeUnit.MILLISECONDS.sleep(100); + } catch (InterruptedException e) { + e.printStackTrace(); + } + if (p.isAlive()) { + storePort = currentPort; + break; + } + } + } + + + if (p == null || !p.isAlive()) { + throw new RuntimeException("Start object store failed ..."); + } else { + storeProcess = p; + System.out.println("Start object store success"); + } + } + + private void cleanup() { + if (storeProcess != null && killProcess(storeProcess)) { + System.out.println("Kill plasma store process forcely"); + } + } + + private static boolean killProcess(Process p) { + if (p.isAlive()) { + p.destroyForcibly(); + return true; + } else { + return false; + } + } + + public void doTest() { + System.out.println("Start test."); + int timeoutMs = 3000; + byte[] id1 = new byte[20]; + Arrays.fill(id1, (byte)1); + byte[] value1 = new byte[20]; + Arrays.fill(value1, (byte)11); + pLink.put(id1, value1, null); + + byte[] id2 = new byte[20]; + Arrays.fill(id2, (byte)2); + byte[] value2 = new byte[20]; + Arrays.fill(value2, (byte)12); + pLink.put(id2, value2, null); + System.out.println("Plasma java client put test success."); + byte[] getValue1 = pLink.get(id1, timeoutMs, false); + assert Arrays.equals(value1, getValue1); + + byte[] getValue2 = pLink.get(id2, timeoutMs, false); + assert Arrays.equals(value2, getValue2); + System.out.println("Plasma java client get single object test success."); + byte[][] ids = {id1, id2}; + List values = pLink.get(ids, timeoutMs, false); + assert Arrays.equals(values.get(0), value1); + assert Arrays.equals(values.get(1), value2); + System.out.println("Plasma java client get multi-object test success."); + pLink.put(id1, value1, null); + System.out.println("Plasma java client put same object twice exception test success."); + byte[] id1Hash = pLink.hash(id1); + assert id1Hash != null; + System.out.println("Plasma java client hash test success."); + boolean exsit = pLink.contains(id2); + assert exsit; + byte[] id3 = new byte[20]; + Arrays.fill(id3, (byte)3); + boolean notExsit = pLink.contains(id3); + assert !notExsit; + System.out.println("Plasma java client contains test success."); + cleanup(); + System.out.println("All test success."); + + } + + public String getStoreAddress() { + return storeSuffix+storePort; + } + public static void main(String[] args) throws Exception { + + PlasmaClientTest plasmaClientTest = new PlasmaClientTest(); + plasmaClientTest.doTest(); + + } + +} diff --git a/java/plasma/test.sh b/java/plasma/test.sh new file mode 100755 index 00000000000..ef0f28be101 --- /dev/null +++ b/java/plasma/test.sh @@ -0,0 +1,56 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +ROOT_DIR=$(cd "$(dirname "${BASH_SOURCE:-$0}")"; pwd) +unamestr="$(uname)" +if [[ "$unamestr" == "Linux" ]]; then + PARALLEL=$(nproc) +elif [[ "$unamestr" == "Darwin" ]]; then + PARALLEL=$(sysctl -n hw.ncpu) +else + echo "Unrecognized platform." + exit 1 +fi +pushd ../../cpp + if [ ! -d "release" ]; then + mkdir release + fi + pushd release + cmake -DCMAKE_BUILD_TYPE=Release \ + -DCMAKE_C_FLAGS="-g -O3" \ + -DCMAKE_CXX_FLAGS="-g -O3" \ + -DARROW_BUILD_TESTS=off \ + -DARROW_HDFS=on \ + -DARROW_BOOST_USE_SHARED=on \ + -DARROW_PYTHON=on \ + -DARROW_PLASMA=on \ + -DPLASMA_PYTHON=on \ + -DARROW_JEMALLOC=off \ + -DARROW_WITH_BROTLI=off \ + -DARROW_WITH_LZ4=off \ + -DARROW_WITH_ZLIB=off \ + -DARROW_WITH_ZSTD=off \ + -DARROW_PLASMA_JAVA_CLIENT=on \ + .. + make VERBOSE=1 -j$PARALLEL + popd +popd + +mvn clean install +export PLASMA_STORE=$ROOT_DIR/../../cpp/release/release/plasma_store +java -cp target/test-classes:target/classes -Djava.library.path=$ROOT_DIR/../../cpp/release/release/ org.apache.arrow.plasma.PlasmaClientTest diff --git a/java/pom.xml b/java/pom.xml index ae9fc86a09b..ce7c550ddcc 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -623,6 +623,7 @@ memory vector tools + plasma