diff --git a/.github/workflows/java_jni.yml b/.github/workflows/java_jni.yml index 46dae212a0d..79ba50ef6e8 100644 --- a/.github/workflows/java_jni.yml +++ b/.github/workflows/java_jni.yml @@ -45,7 +45,7 @@ env: jobs: docker: - name: AMD64 Debian 9 Java JNI (Gandiva, Plasma, ORC) + name: AMD64 Debian 9 Java JNI (Gandiva, Plasma, ORC, Dataset) runs-on: ubuntu-latest if: ${{ !contains(github.event.pull_request.title, 'WIP') }} strategy: diff --git a/ci/docker/linux-apt-jni.dockerfile b/ci/docker/linux-apt-jni.dockerfile index 6aaa90c65eb..1abbf05af3b 100644 --- a/ci/docker/linux-apt-jni.dockerfile +++ b/ci/docker/linux-apt-jni.dockerfile @@ -68,13 +68,14 @@ RUN wget -nv -O - https://github.com/Kitware/CMake/releases/download/v${cmake}/c ENV PATH=/opt/cmake-${cmake}-Linux-x86_64/bin:$PATH ENV ARROW_BUILD_TESTS=OFF \ + ARROW_DATASET=ON \ ARROW_FLIGHT=OFF \ ARROW_GANDIVA_JAVA=ON \ ARROW_GANDIVA=ON \ ARROW_HOME=/usr/local \ ARROW_JNI=ON \ ARROW_ORC=ON \ - ARROW_PARQUET=OFF \ + ARROW_PARQUET=ON \ ARROW_PLASMA_JAVA_CLIENT=ON \ ARROW_PLASMA=ON \ ARROW_USE_CCACHE=ON \ diff --git a/ci/scripts/java_test.sh b/ci/scripts/java_test.sh index 8daed8a0ae2..da9e45280ec 100755 --- a/ci/scripts/java_test.sh +++ b/ci/scripts/java_test.sh @@ -34,8 +34,8 @@ pushd ${source_dir} ${mvn} test -if [ "${ARROW_GANDIVA_JAVA}" = "ON" ]; then - ${mvn} test -Parrow-jni -pl adapter/orc,gandiva -Darrow.cpp.build.dir=${cpp_build_dir} +if [ "${ARROW_JNI}" = "ON" ]; then + ${mvn} test -Parrow-jni -pl adapter/orc,gandiva,dataset -Darrow.cpp.build.dir=${cpp_build_dir} fi if [ "${ARROW_PLASMA}" = "ON" ]; then diff --git a/cpp/.gitignore b/cpp/.gitignore index e0a9429a9b6..a52014264f1 100644 --- a/cpp/.gitignore +++ b/cpp/.gitignore @@ -25,6 +25,9 @@ build/ *-build/ Testing/ +# Build directories created by Clion +cmake-build-*/ + ######################################### # Editor temporary/working/backup files # .#* diff --git a/cpp/src/arrow/dataset/discovery.cc b/cpp/src/arrow/dataset/discovery.cc index 079cc6d4065..29480c7323d 100644 --- a/cpp/src/arrow/dataset/discovery.cc +++ b/cpp/src/arrow/dataset/discovery.cc @@ -208,6 +208,17 @@ Result> FileSystemDatasetFactory::Make( std::move(options)); } +Result> FileSystemDatasetFactory::Make( + std::string uri, std::shared_ptr format, + FileSystemFactoryOptions options) { + std::string internal_path; + ARROW_ASSIGN_OR_RAISE(std::shared_ptr filesystem, + arrow::fs::FileSystemFromUri(uri, &internal_path)) + ARROW_ASSIGN_OR_RAISE(fs::FileInfo file_info, filesystem->GetFileInfo(internal_path)) + return std::shared_ptr(new FileSystemDatasetFactory( + {file_info}, std::move(filesystem), std::move(format), std::move(options))); +} + Result>> FileSystemDatasetFactory::InspectSchemas( InspectOptions options) { std::vector> schemas; diff --git a/cpp/src/arrow/dataset/discovery.h b/cpp/src/arrow/dataset/discovery.h index b7786cd305f..ca3274cc086 100644 --- a/cpp/src/arrow/dataset/discovery.h +++ b/cpp/src/arrow/dataset/discovery.h @@ -216,6 +216,16 @@ class ARROW_DS_EXPORT FileSystemDatasetFactory : public DatasetFactory { std::shared_ptr filesystem, fs::FileSelector selector, std::shared_ptr format, FileSystemFactoryOptions options); + /// \brief Build a FileSystemDatasetFactory from an uri including filesystem + /// information. + /// + /// \param[in] uri passed to FileSystemDataset + /// \param[in] format passed to FileSystemDataset + /// \param[in] options see FileSystemFactoryOptions for more information. + static Result> Make(std::string uri, + std::shared_ptr format, + FileSystemFactoryOptions options); + Result>> InspectSchemas( InspectOptions options) override; diff --git a/cpp/src/jni/CMakeLists.txt b/cpp/src/jni/CMakeLists.txt index 3872d671934..3a5cc7fca80 100644 --- a/cpp/src/jni/CMakeLists.txt +++ b/cpp/src/jni/CMakeLists.txt @@ -18,7 +18,10 @@ # # arrow_jni # - if(ARROW_ORC) add_subdirectory(orc) endif() + +if(ARROW_DATASET) + add_subdirectory(dataset) +endif() diff --git a/cpp/src/jni/dataset/CMakeLists.txt b/cpp/src/jni/dataset/CMakeLists.txt new file mode 100644 index 00000000000..f3e309b614a --- /dev/null +++ b/cpp/src/jni/dataset/CMakeLists.txt @@ -0,0 +1,65 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitationsn +# under the License. + +# +# arrow_dataset_jni +# + +project(arrow_dataset_jni) + +cmake_minimum_required(VERSION 3.11) + +find_package(JNI REQUIRED) + +add_custom_target(arrow_dataset_jni) + +set(JNI_HEADERS_DIR "${CMAKE_CURRENT_BINARY_DIR}/generated") + +add_subdirectory(../../../../java/dataset ./java) + +set(ARROW_BUILD_STATIC OFF) + +set(ARROW_DATASET_JNI_LIBS arrow_dataset_static) + +set(ARROW_DATASET_JNI_SOURCES jni_wrapper.cc jni_util.cc) + +add_arrow_lib(arrow_dataset_jni + BUILD_SHARED + SOURCES + ${ARROW_DATASET_JNI_SOURCES} + OUTPUTS + ARROW_DATASET_JNI_LIBRARIES + SHARED_PRIVATE_LINK_LIBS + ${ARROW_DATASET_JNI_LIBS} + STATIC_LINK_LIBS + ${ARROW_DATASET_JNI_LIBS} + EXTRA_INCLUDES + ${JNI_HEADERS_DIR} + PRIVATE_INCLUDES + ${JNI_INCLUDE_DIRS} + DEPENDENCIES + arrow_static + arrow_dataset_java) + +add_dependencies(arrow_dataset_jni ${ARROW_DATASET_JNI_LIBRARIES}) + +add_arrow_test(dataset_jni_test + SOURCES + jni_util_test.cc + jni_util.cc + EXTRA_INCLUDES + ${JNI_INCLUDE_DIRS}) diff --git a/cpp/src/jni/dataset/jni_util.cc b/cpp/src/jni/dataset/jni_util.cc new file mode 100644 index 00000000000..113669a4cf6 --- /dev/null +++ b/cpp/src/jni/dataset/jni_util.cc @@ -0,0 +1,242 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "jni/dataset/jni_util.h" + +#include "arrow/util/logging.h" + +#include + +namespace arrow { +namespace dataset { +namespace jni { + +class ReservationListenableMemoryPool::Impl { + public: + explicit Impl(arrow::MemoryPool* pool, std::shared_ptr listener, + int64_t block_size) + : pool_(pool), + listener_(listener), + block_size_(block_size), + blocks_reserved_(0), + bytes_reserved_(0) {} + + arrow::Status Allocate(int64_t size, uint8_t** out) { + RETURN_NOT_OK(UpdateReservation(size)); + arrow::Status error = pool_->Allocate(size, out); + if (!error.ok()) { + RETURN_NOT_OK(UpdateReservation(-size)); + return error; + } + return arrow::Status::OK(); + } + + arrow::Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) { + bool reserved = false; + int64_t diff = new_size - old_size; + if (new_size >= old_size) { + // new_size >= old_size, pre-reserve bytes from listener before allocating + // from underlying pool + RETURN_NOT_OK(UpdateReservation(diff)); + reserved = true; + } + arrow::Status error = pool_->Reallocate(old_size, new_size, ptr); + if (!error.ok()) { + if (reserved) { + // roll back reservations on error + RETURN_NOT_OK(UpdateReservation(-diff)); + } + return error; + } + if (!reserved) { + // otherwise (e.g. new_size < old_size), make updates after calling underlying pool + RETURN_NOT_OK(UpdateReservation(diff)); + } + return arrow::Status::OK(); + } + + void Free(uint8_t* buffer, int64_t size) { + pool_->Free(buffer, size); + // FIXME: See ARROW-11143, currently method ::Free doesn't allow Status return + arrow::Status s = UpdateReservation(-size); + if (!s.ok()) { + ARROW_LOG(FATAL) << "Failed to update reservation while freeing bytes: " + << s.message(); + return; + } + } + + arrow::Status UpdateReservation(int64_t diff) { + int64_t granted = Reserve(diff); + if (granted == 0) { + return arrow::Status::OK(); + } + if (granted < 0) { + RETURN_NOT_OK(listener_->OnRelease(-granted)); + return arrow::Status::OK(); + } + RETURN_NOT_OK(listener_->OnReservation(granted)); + return arrow::Status::OK(); + } + + int64_t Reserve(int64_t diff) { + std::lock_guard lock(mutex_); + bytes_reserved_ += diff; + int64_t new_block_count; + if (bytes_reserved_ == 0) { + new_block_count = 0; + } else { + // ceil to get the required block number + new_block_count = (bytes_reserved_ - 1) / block_size_ + 1; + } + int64_t bytes_granted = (new_block_count - blocks_reserved_) * block_size_; + blocks_reserved_ = new_block_count; + return bytes_granted; + } + + int64_t bytes_allocated() { return pool_->bytes_allocated(); } + + int64_t max_memory() { return pool_->max_memory(); } + + std::string backend_name() { return pool_->backend_name(); } + + std::shared_ptr get_listener() { return listener_; } + + private: + arrow::MemoryPool* pool_; + std::shared_ptr listener_; + int64_t block_size_; + int64_t blocks_reserved_; + int64_t bytes_reserved_; + std::mutex mutex_; +}; + +ReservationListenableMemoryPool::ReservationListenableMemoryPool( + MemoryPool* pool, std::shared_ptr listener, int64_t block_size) { + impl_.reset(new Impl(pool, listener, block_size)); +} + +arrow::Status ReservationListenableMemoryPool::Allocate(int64_t size, uint8_t** out) { + return impl_->Allocate(size, out); +} + +arrow::Status ReservationListenableMemoryPool::Reallocate(int64_t old_size, + int64_t new_size, + uint8_t** ptr) { + return impl_->Reallocate(old_size, new_size, ptr); +} + +void ReservationListenableMemoryPool::Free(uint8_t* buffer, int64_t size) { + return impl_->Free(buffer, size); +} + +int64_t ReservationListenableMemoryPool::bytes_allocated() const { + return impl_->bytes_allocated(); +} + +int64_t ReservationListenableMemoryPool::max_memory() const { + return impl_->max_memory(); +} + +std::string ReservationListenableMemoryPool::backend_name() const { + return impl_->backend_name(); +} + +std::shared_ptr ReservationListenableMemoryPool::get_listener() { + return impl_->get_listener(); +} + +ReservationListenableMemoryPool::~ReservationListenableMemoryPool() {} + +jclass CreateGlobalClassReference(JNIEnv* env, const char* class_name) { + jclass local_class = env->FindClass(class_name); + jclass global_class = (jclass)env->NewGlobalRef(local_class); + env->DeleteLocalRef(local_class); + return global_class; +} + +arrow::Result GetMethodID(JNIEnv* env, jclass this_class, const char* name, + const char* sig) { + jmethodID ret = env->GetMethodID(this_class, name, sig); + if (ret == nullptr) { + std::string error_message = "Unable to find method " + std::string(name) + + " within signature" + std::string(sig); + return arrow::Status::Invalid(error_message); + } + return ret; +} + +arrow::Result GetStaticMethodID(JNIEnv* env, jclass this_class, + const char* name, const char* sig) { + jmethodID ret = env->GetStaticMethodID(this_class, name, sig); + if (ret == nullptr) { + std::string error_message = "Unable to find static method " + std::string(name) + + " within signature" + std::string(sig); + return arrow::Status::Invalid(error_message); + } + return ret; +} + +std::string JStringToCString(JNIEnv* env, jstring string) { + if (string == nullptr) { + return std::string(); + } + const char* chars = env->GetStringUTFChars(string, nullptr); + std::string ret(chars); + env->ReleaseStringUTFChars(string, chars); + return ret; +} + +std::vector ToStringVector(JNIEnv* env, jobjectArray& str_array) { + int length = env->GetArrayLength(str_array); + std::vector vector; + for (int i = 0; i < length; i++) { + auto string = reinterpret_cast(env->GetObjectArrayElement(str_array, i)); + vector.push_back(JStringToCString(env, string)); + } + return vector; +} + +arrow::Result ToSchemaByteArray(JNIEnv* env, + std::shared_ptr schema) { + ARROW_ASSIGN_OR_RAISE( + std::shared_ptr buffer, + arrow::ipc::SerializeSchema(*schema, arrow::default_memory_pool())) + + jbyteArray out = env->NewByteArray(buffer->size()); + auto src = reinterpret_cast(buffer->data()); + env->SetByteArrayRegion(out, 0, buffer->size(), src); + return out; +} + +arrow::Result> FromSchemaByteArray( + JNIEnv* env, jbyteArray schemaBytes) { + arrow::ipc::DictionaryMemo in_memo; + int schemaBytes_len = env->GetArrayLength(schemaBytes); + jbyte* schemaBytes_data = env->GetByteArrayElements(schemaBytes, nullptr); + auto serialized_schema = std::make_shared( + reinterpret_cast(schemaBytes_data), schemaBytes_len); + arrow::io::BufferReader buf_reader(serialized_schema); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr schema, + arrow::ipc::ReadSchema(&buf_reader, &in_memo)) + env->ReleaseByteArrayElements(schemaBytes, schemaBytes_data, JNI_ABORT); + return schema; +} + +} // namespace jni +} // namespace dataset +} // namespace arrow diff --git a/cpp/src/jni/dataset/jni_util.h b/cpp/src/jni/dataset/jni_util.h new file mode 100644 index 00000000000..c76033ae633 --- /dev/null +++ b/cpp/src/jni/dataset/jni_util.h @@ -0,0 +1,135 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "arrow/array.h" +#include "arrow/io/api.h" +#include "arrow/ipc/api.h" +#include "arrow/memory_pool.h" +#include "arrow/result.h" +#include "arrow/type.h" + +#include + +namespace arrow { +namespace dataset { +namespace jni { + +jclass CreateGlobalClassReference(JNIEnv* env, const char* class_name); + +arrow::Result GetMethodID(JNIEnv* env, jclass this_class, const char* name, + const char* sig); + +arrow::Result GetStaticMethodID(JNIEnv* env, jclass this_class, + const char* name, const char* sig); + +std::string JStringToCString(JNIEnv* env, jstring string); + +std::vector ToStringVector(JNIEnv* env, jobjectArray& str_array); + +arrow::Result ToSchemaByteArray(JNIEnv* env, + std::shared_ptr schema); + +arrow::Result> FromSchemaByteArray(JNIEnv* env, + jbyteArray schemaBytes); + +/// \brief Create a new shared_ptr on heap from shared_ptr t to prevent +/// the managed object from being garbage-collected. +/// +/// \return address of the newly created shared pointer +template +jlong CreateNativeRef(std::shared_ptr t) { + std::shared_ptr* retained_ptr = new std::shared_ptr(t); + return reinterpret_cast(retained_ptr); +} + +/// \brief Get the shared_ptr that was derived via function CreateNativeRef. +/// +/// \param[in] ref address of the shared_ptr +/// \return the shared_ptr object +template +std::shared_ptr RetrieveNativeInstance(jlong ref) { + std::shared_ptr* retrieved_ptr = reinterpret_cast*>(ref); + return *retrieved_ptr; +} + +/// \brief Destroy a shared_ptr using its memory address. +/// +/// \param[in] ref address of the shared_ptr +template +void ReleaseNativeRef(jlong ref) { + std::shared_ptr* retrieved_ptr = reinterpret_cast*>(ref); + delete retrieved_ptr; +} + +/// Listener to act on reservations/unreservations from ReservationListenableMemoryPool. +/// +/// Note the memory pool will call this listener only on block-level memory +/// reservation/unreservation is granted. So the invocation parameter "size" is always +/// multiple of block size (by default, 512k) specified in memory pool. +class ReservationListener { + public: + virtual ~ReservationListener() = default; + + virtual arrow::Status OnReservation(int64_t size) = 0; + virtual arrow::Status OnRelease(int64_t size) = 0; + + protected: + ReservationListener() = default; +}; + +/// A memory pool implementation for pre-reserving memory blocks from a +/// customizable listener. This will typically be used when memory allocations +/// have to be subject to another "virtual" resource manager, which just tracks or +/// limits number of bytes of application's overall memory usage. The underlying +/// memory pool will still be responsible for actual malloc/free operations. +class ReservationListenableMemoryPool : public arrow::MemoryPool { + public: + /// \brief Constructor. + /// + /// \param[in] pool the underlying memory pool + /// \param[in] listener a listener for block-level reservations/releases. + /// \param[in] block_size size of each block to reserve from the listener + explicit ReservationListenableMemoryPool(MemoryPool* pool, + std::shared_ptr listener, + int64_t block_size = 512 * 1024); + + ~ReservationListenableMemoryPool(); + + arrow::Status Allocate(int64_t size, uint8_t** out) override; + + arrow::Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) override; + + void Free(uint8_t* buffer, int64_t size) override; + + int64_t bytes_allocated() const override; + + int64_t max_memory() const override; + + std::string backend_name() const override; + + std::shared_ptr get_listener(); + + private: + class Impl; + std::unique_ptr impl_; +}; + +} // namespace jni +} // namespace dataset +} // namespace arrow diff --git a/cpp/src/jni/dataset/jni_util_test.cc b/cpp/src/jni/dataset/jni_util_test.cc new file mode 100644 index 00000000000..589f00b1cc7 --- /dev/null +++ b/cpp/src/jni/dataset/jni_util_test.cc @@ -0,0 +1,134 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include "arrow/memory_pool.h" +#include "arrow/testing/gtest_util.h" +#include "jni/dataset/jni_util.h" + +namespace arrow { +namespace dataset { +namespace jni { + +class MyListener : public ReservationListener { + public: + Status OnReservation(int64_t size) override { + bytes_reserved_ += size; + reservation_count_++; + return arrow::Status::OK(); + } + + Status OnRelease(int64_t size) override { + bytes_reserved_ -= size; + release_count_++; + return arrow::Status::OK(); + } + + int64_t bytes_reserved() { return bytes_reserved_; } + + int32_t reservation_count() const { return reservation_count_; } + + int32_t release_count() const { return release_count_; } + + private: + int64_t bytes_reserved_; + int32_t reservation_count_; + int32_t release_count_; +}; + +TEST(ReservationListenableMemoryPool, Basic) { + auto pool = MemoryPool::CreateDefault(); + auto listener = std::make_shared(); + ReservationListenableMemoryPool rlp(pool.get(), listener); + + uint8_t* data; + ASSERT_OK(rlp.Allocate(100, &data)); + + uint8_t* data2; + ASSERT_OK(rlp.Allocate(100, &data2)); + + rlp.Free(data, 100); + rlp.Free(data2, 100); + + ASSERT_EQ(200, rlp.max_memory()); + ASSERT_EQ(200, pool->max_memory()); +} + +TEST(ReservationListenableMemoryPool, Listener) { + auto pool = MemoryPool::CreateDefault(); + auto listener = std::make_shared(); + ReservationListenableMemoryPool rlp(pool.get(), listener); + + uint8_t* data; + ASSERT_OK(rlp.Allocate(100, &data)); + + uint8_t* data2; + ASSERT_OK(rlp.Allocate(100, &data2)); + + ASSERT_EQ(200, rlp.bytes_allocated()); + ASSERT_EQ(512 * 1024, listener->bytes_reserved()); + + rlp.Free(data, 100); + rlp.Free(data2, 100); + + ASSERT_EQ(0, rlp.bytes_allocated()); + ASSERT_EQ(0, listener->bytes_reserved()); + ASSERT_EQ(1, listener->reservation_count()); + ASSERT_EQ(1, listener->release_count()); +} + +TEST(ReservationListenableMemoryPool, BlockSize) { + auto pool = MemoryPool::CreateDefault(); + auto listener = std::make_shared(); + ReservationListenableMemoryPool rlp(pool.get(), listener, 100); + + uint8_t* data; + ASSERT_OK(rlp.Allocate(100, &data)); + + ASSERT_EQ(100, rlp.bytes_allocated()); + ASSERT_EQ(100, listener->bytes_reserved()); + + rlp.Free(data, 100); + + ASSERT_EQ(0, rlp.bytes_allocated()); + ASSERT_EQ(0, listener->bytes_reserved()); +} + +TEST(ReservationListenableMemoryPool, BlockSize2) { + auto pool = MemoryPool::CreateDefault(); + auto listener = std::make_shared(); + ReservationListenableMemoryPool rlp(pool.get(), listener, 99); + + uint8_t* data; + ASSERT_OK(rlp.Allocate(100, &data)); + + ASSERT_EQ(100, rlp.bytes_allocated()); + ASSERT_EQ(198, listener->bytes_reserved()); + + rlp.Free(data, 100); + + ASSERT_EQ(0, rlp.bytes_allocated()); + ASSERT_EQ(0, listener->bytes_reserved()); + + ASSERT_EQ(1, listener->reservation_count()); + ASSERT_EQ(1, listener->release_count()); +} + +} // namespace jni +} // namespace dataset +} // namespace arrow diff --git a/cpp/src/jni/dataset/jni_wrapper.cc b/cpp/src/jni/dataset/jni_wrapper.cc new file mode 100644 index 00000000000..a1e0f503340 --- /dev/null +++ b/cpp/src/jni/dataset/jni_wrapper.cc @@ -0,0 +1,571 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include "arrow/array.h" +#include "arrow/dataset/api.h" +#include "arrow/dataset/file_base.h" +#include "arrow/filesystem/localfs.h" +#include "arrow/ipc/api.h" +#include "arrow/util/iterator.h" + +#include "jni/dataset/jni_util.h" + +#include "org_apache_arrow_dataset_file_JniWrapper.h" +#include "org_apache_arrow_dataset_jni_JniWrapper.h" +#include "org_apache_arrow_dataset_jni_NativeMemoryPool.h" + +namespace { + +jclass illegal_access_exception_class; +jclass illegal_argument_exception_class; +jclass runtime_exception_class; + +jclass record_batch_handle_class; +jclass record_batch_handle_field_class; +jclass record_batch_handle_buffer_class; +jclass java_reservation_listener_class; + +jmethodID record_batch_handle_constructor; +jmethodID record_batch_handle_field_constructor; +jmethodID record_batch_handle_buffer_constructor; +jmethodID reserve_memory_method; +jmethodID unreserve_memory_method; + +jlong default_memory_pool_id = -1L; + +jint JNI_VERSION = JNI_VERSION_1_6; + +class JniPendingException : public std::runtime_error { + public: + explicit JniPendingException(const std::string& arg) : runtime_error(arg) {} +}; + +void ThrowPendingException(const std::string& message) { + throw JniPendingException(message); +} + +template +T JniGetOrThrow(arrow::Result result) { + if (!result.status().ok()) { + ThrowPendingException(result.status().message()); + } + return std::move(result).ValueOrDie(); +} + +void JniAssertOkOrThrow(arrow::Status status) { + if (!status.ok()) { + ThrowPendingException(status.message()); + } +} + +void JniThrow(std::string message) { ThrowPendingException(message); } + +arrow::Result> GetFileFormat( + jint file_format_id) { + switch (file_format_id) { + case 0: + return std::make_shared(); + default: + std::string error_message = + "illegal file format id: " + std::to_string(file_format_id); + return arrow::Status::Invalid(error_message); + } +} + +class ReserveFromJava : public arrow::dataset::jni::ReservationListener { + public: + ReserveFromJava(JavaVM* vm, jobject java_reservation_listener) + : vm_(vm), java_reservation_listener_(java_reservation_listener) {} + + arrow::Status OnReservation(int64_t size) override { + JNIEnv* env; + if (vm_->GetEnv(reinterpret_cast(&env), JNI_VERSION) != JNI_OK) { + return arrow::Status::Invalid("JNIEnv was not attached to current thread"); + } + env->CallObjectMethod(java_reservation_listener_, reserve_memory_method, size); + if (env->ExceptionCheck()) { + env->ExceptionDescribe(); + env->ExceptionClear(); + return arrow::Status::Invalid("Error calling Java side reservation listener"); + } + return arrow::Status::OK(); + } + + arrow::Status OnRelease(int64_t size) override { + JNIEnv* env; + if (vm_->GetEnv(reinterpret_cast(&env), JNI_VERSION) != JNI_OK) { + return arrow::Status::Invalid("JNIEnv was not attached to current thread"); + } + env->CallObjectMethod(java_reservation_listener_, unreserve_memory_method, size); + if (env->ExceptionCheck()) { + env->ExceptionDescribe(); + env->ExceptionClear(); + return arrow::Status::Invalid("Error calling Java side reservation listener"); + } + return arrow::Status::OK(); + } + + jobject GetJavaReservationListener() { return java_reservation_listener_; } + + private: + JavaVM* vm_; + jobject java_reservation_listener_; +}; + +/// \class DisposableScannerAdaptor +/// \brief An adaptor that iterates over a Scanner instance then returns RecordBatches +/// directly. +/// +/// This lessens the complexity of the JNI bridge to make sure it to be easier to +/// maintain. On Java-side, NativeScanner can only produces a single NativeScanTask +/// instance during its whole lifecycle. Each task stands for a DisposableScannerAdaptor +/// instance through JNI bridge. +/// +class DisposableScannerAdaptor { + public: + DisposableScannerAdaptor(std::shared_ptr scanner, + arrow::dataset::ScanTaskIterator task_itr) { + this->scanner_ = std::move(scanner); + this->task_itr_ = std::move(task_itr); + } + + static arrow::Result> Create( + std::shared_ptr scanner) { + ARROW_ASSIGN_OR_RAISE(arrow::dataset::ScanTaskIterator task_itr, scanner->Scan()) + return std::make_shared(scanner, std::move(task_itr)); + } + + arrow::Result> Next() { + do { + ARROW_ASSIGN_OR_RAISE(std::shared_ptr batch, NextBatch()) + if (batch != nullptr) { + return batch; + } + // batch is null, current task is fully consumed + ARROW_ASSIGN_OR_RAISE(bool has_next_task, NextTask()) + if (!has_next_task) { + // no more tasks + return nullptr; + } + // new task appended, read again + } while (true); + } + + const std::shared_ptr& GetScanner() const { return scanner_; } + + private: + arrow::dataset::ScanTaskIterator task_itr_; + std::shared_ptr scanner_; + std::shared_ptr current_task_ = nullptr; + arrow::RecordBatchIterator current_batch_itr_ = + arrow::MakeEmptyIterator>(); + + arrow::Result NextTask() { + ARROW_ASSIGN_OR_RAISE(current_task_, task_itr_.Next()) + if (current_task_ == nullptr) { + return false; + } + ARROW_ASSIGN_OR_RAISE(current_batch_itr_, current_task_->Execute()) + return true; + } + + arrow::Result> NextBatch() { + ARROW_ASSIGN_OR_RAISE(std::shared_ptr batch, + current_batch_itr_.Next()) + return batch; + } +}; + +} // namespace + +using arrow::dataset::jni::CreateGlobalClassReference; +using arrow::dataset::jni::CreateNativeRef; +using arrow::dataset::jni::FromSchemaByteArray; +using arrow::dataset::jni::GetMethodID; +using arrow::dataset::jni::JStringToCString; +using arrow::dataset::jni::ReleaseNativeRef; +using arrow::dataset::jni::RetrieveNativeInstance; +using arrow::dataset::jni::ToSchemaByteArray; +using arrow::dataset::jni::ToStringVector; + +using arrow::dataset::jni::ReservationListenableMemoryPool; +using arrow::dataset::jni::ReservationListener; + +#define JNI_METHOD_START try { +// macro ended + +#define JNI_METHOD_END(fallback_expr) \ + } \ + catch (JniPendingException & e) { \ + env->ThrowNew(runtime_exception_class, e.what()); \ + return fallback_expr; \ + } +// macro ended + +jint JNI_OnLoad(JavaVM* vm, void* reserved) { + JNIEnv* env; + if (vm->GetEnv(reinterpret_cast(&env), JNI_VERSION) != JNI_OK) { + return JNI_ERR; + } + JNI_METHOD_START + illegal_access_exception_class = + CreateGlobalClassReference(env, "Ljava/lang/IllegalAccessException;"); + illegal_argument_exception_class = + CreateGlobalClassReference(env, "Ljava/lang/IllegalArgumentException;"); + runtime_exception_class = + CreateGlobalClassReference(env, "Ljava/lang/RuntimeException;"); + + record_batch_handle_class = + CreateGlobalClassReference(env, + "Lorg/apache/arrow/" + "dataset/jni/NativeRecordBatchHandle;"); + record_batch_handle_field_class = + CreateGlobalClassReference(env, + "Lorg/apache/arrow/" + "dataset/jni/NativeRecordBatchHandle$Field;"); + record_batch_handle_buffer_class = + CreateGlobalClassReference(env, + "Lorg/apache/arrow/" + "dataset/jni/NativeRecordBatchHandle$Buffer;"); + java_reservation_listener_class = + CreateGlobalClassReference(env, + "Lorg/apache/arrow/" + "dataset/jni/ReservationListener;"); + + record_batch_handle_constructor = + JniGetOrThrow(GetMethodID(env, record_batch_handle_class, "", + "(J[Lorg/apache/arrow/dataset/" + "jni/NativeRecordBatchHandle$Field;" + "[Lorg/apache/arrow/dataset/" + "jni/NativeRecordBatchHandle$Buffer;)V")); + record_batch_handle_field_constructor = + JniGetOrThrow(GetMethodID(env, record_batch_handle_field_class, "", "(JJ)V")); + record_batch_handle_buffer_constructor = JniGetOrThrow( + GetMethodID(env, record_batch_handle_buffer_class, "", "(JJJJ)V")); + reserve_memory_method = + JniGetOrThrow(GetMethodID(env, java_reservation_listener_class, "reserve", "(J)V")); + unreserve_memory_method = JniGetOrThrow( + GetMethodID(env, java_reservation_listener_class, "unreserve", "(J)V")); + + default_memory_pool_id = reinterpret_cast(arrow::default_memory_pool()); + + return JNI_VERSION; + JNI_METHOD_END(JNI_ERR) +} + +void JNI_OnUnload(JavaVM* vm, void* reserved) { + JNIEnv* env; + vm->GetEnv(reinterpret_cast(&env), JNI_VERSION); + env->DeleteGlobalRef(illegal_access_exception_class); + env->DeleteGlobalRef(illegal_argument_exception_class); + env->DeleteGlobalRef(runtime_exception_class); + env->DeleteGlobalRef(record_batch_handle_class); + env->DeleteGlobalRef(record_batch_handle_field_class); + env->DeleteGlobalRef(record_batch_handle_buffer_class); + env->DeleteGlobalRef(java_reservation_listener_class); + + default_memory_pool_id = -1L; +} + +/* + * Class: org_apache_arrow_dataset_jni_NativeMemoryPool + * Method: getDefaultMemoryPool + * Signature: ()J + */ +JNIEXPORT jlong JNICALL +Java_org_apache_arrow_dataset_jni_NativeMemoryPool_getDefaultMemoryPool(JNIEnv* env, + jclass) { + JNI_METHOD_START + return default_memory_pool_id; + JNI_METHOD_END(-1L) +} + +/* + * Class: org_apache_arrow_dataset_jni_NativeMemoryPool + * Method: createListenableMemoryPool + * Signature: (Lorg/apache/arrow/memory/ReservationListener;)J + */ +JNIEXPORT jlong JNICALL +Java_org_apache_arrow_dataset_jni_NativeMemoryPool_createListenableMemoryPool( + JNIEnv* env, jclass, jobject jlistener) { + JNI_METHOD_START + jobject jlistener_ref = env->NewGlobalRef(jlistener); + JavaVM* vm; + if (env->GetJavaVM(&vm) != JNI_OK) { + JniThrow("Unable to get JavaVM instance"); + } + std::shared_ptr listener = + std::make_shared(vm, jlistener_ref); + auto memory_pool = + new ReservationListenableMemoryPool(arrow::default_memory_pool(), listener); + return reinterpret_cast(memory_pool); + JNI_METHOD_END(-1L) +} + +/* + * Class: org_apache_arrow_dataset_jni_NativeMemoryPool + * Method: releaseMemoryPool + * Signature: (J)V + */ +JNIEXPORT void JNICALL +Java_org_apache_arrow_dataset_jni_NativeMemoryPool_releaseMemoryPool( + JNIEnv* env, jclass, jlong memory_pool_id) { + JNI_METHOD_START + if (memory_pool_id == default_memory_pool_id) { + return; + } + ReservationListenableMemoryPool* pool = + reinterpret_cast(memory_pool_id); + if (pool == nullptr) { + return; + } + std::shared_ptr rm = + std::dynamic_pointer_cast(pool->get_listener()); + if (rm == nullptr) { + delete pool; + return; + } + delete pool; + env->DeleteGlobalRef(rm->GetJavaReservationListener()); + JNI_METHOD_END() +} + +/* + * Class: org_apache_arrow_dataset_jni_NativeMemoryPool + * Method: bytesAllocated + * Signature: (J)J + */ +JNIEXPORT jlong JNICALL Java_org_apache_arrow_dataset_jni_NativeMemoryPool_bytesAllocated( + JNIEnv* env, jclass, jlong memory_pool_id) { + JNI_METHOD_START + arrow::MemoryPool* pool = reinterpret_cast(memory_pool_id); + if (pool == nullptr) { + JniThrow("Memory pool instance not found. It may not exist nor has been closed"); + } + return pool->bytes_allocated(); + JNI_METHOD_END(-1L) +} + +/* + * Class: org_apache_arrow_dataset_jni_JniWrapper + * Method: closeDatasetFactory + * Signature: (J)V + */ +JNIEXPORT void JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_closeDatasetFactory( + JNIEnv* env, jobject, jlong id) { + JNI_METHOD_START + ReleaseNativeRef(id); + JNI_METHOD_END() +} + +/* + * Class: org_apache_arrow_dataset_jni_JniWrapper + * Method: inspectSchema + * Signature: (J)[B + */ +JNIEXPORT jbyteArray JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_inspectSchema( + JNIEnv* env, jobject, jlong dataset_factor_id) { + JNI_METHOD_START + std::shared_ptr d = + RetrieveNativeInstance(dataset_factor_id); + std::shared_ptr schema = JniGetOrThrow(d->Inspect()); + return JniGetOrThrow(ToSchemaByteArray(env, schema)); + JNI_METHOD_END(nullptr) +} + +/* + * Class: org_apache_arrow_dataset_jni_JniWrapper + * Method: createDataset + * Signature: (J[B)J + */ +JNIEXPORT jlong JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_createDataset( + JNIEnv* env, jobject, jlong dataset_factory_id, jbyteArray schema_bytes) { + JNI_METHOD_START + std::shared_ptr d = + RetrieveNativeInstance(dataset_factory_id); + std::shared_ptr schema; + schema = JniGetOrThrow(FromSchemaByteArray(env, schema_bytes)); + std::shared_ptr dataset = JniGetOrThrow(d->Finish(schema)); + return CreateNativeRef(dataset); + JNI_METHOD_END(-1L) +} + +/* + * Class: org_apache_arrow_dataset_jni_JniWrapper + * Method: closeDataset + * Signature: (J)V + */ +JNIEXPORT void JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_closeDataset( + JNIEnv* env, jobject, jlong id) { + JNI_METHOD_START + ReleaseNativeRef(id); + JNI_METHOD_END() +} + +/* + * Class: org_apache_arrow_dataset_jni_JniWrapper + * Method: createScanner + * Signature: (J[Ljava/lang/String;JJ)J + */ +JNIEXPORT jlong JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_createScanner( + JNIEnv* env, jobject, jlong dataset_id, jobjectArray columns, jlong batch_size, + jlong memory_pool_id) { + JNI_METHOD_START + arrow::MemoryPool* pool = reinterpret_cast(memory_pool_id); + if (pool == nullptr) { + JniThrow("Memory pool does not exist or has been closed"); + } + std::shared_ptr context = + std::make_shared(); + context->pool = pool; + std::shared_ptr dataset = + RetrieveNativeInstance(dataset_id); + std::shared_ptr scanner_builder = + JniGetOrThrow(dataset->NewScan(context)); + + std::vector column_vector = ToStringVector(env, columns); + JniAssertOkOrThrow(scanner_builder->Project(column_vector)); + JniAssertOkOrThrow(scanner_builder->BatchSize(batch_size)); + + auto scanner = JniGetOrThrow(scanner_builder->Finish()); + std::shared_ptr scanner_adaptor = + JniGetOrThrow(DisposableScannerAdaptor::Create(scanner)); + jlong id = CreateNativeRef(scanner_adaptor); + return id; + JNI_METHOD_END(-1L) +} + +/* + * Class: org_apache_arrow_dataset_jni_JniWrapper + * Method: closeScanner + * Signature: (J)V + */ +JNIEXPORT void JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_closeScanner( + JNIEnv* env, jobject, jlong scanner_id) { + JNI_METHOD_START + ReleaseNativeRef(scanner_id); + JNI_METHOD_END() +} + +/* + * Class: org_apache_arrow_dataset_jni_JniWrapper + * Method: getSchemaFromScanner + * Signature: (J)[B + */ +JNIEXPORT jbyteArray JNICALL +Java_org_apache_arrow_dataset_jni_JniWrapper_getSchemaFromScanner(JNIEnv* env, jobject, + jlong scanner_id) { + JNI_METHOD_START + std::shared_ptr schema = + RetrieveNativeInstance(scanner_id) + ->GetScanner() + ->schema(); + return JniGetOrThrow(ToSchemaByteArray(env, schema)); + JNI_METHOD_END(nullptr) +} + +/* + * Class: org_apache_arrow_dataset_jni_JniWrapper + * Method: nextRecordBatch + * Signature: (J)Lorg/apache/arrow/dataset/jni/NativeRecordBatchHandle; + */ +JNIEXPORT jobject JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_nextRecordBatch( + JNIEnv* env, jobject, jlong scanner_id) { + JNI_METHOD_START + std::shared_ptr scanner_adaptor = + RetrieveNativeInstance(scanner_id); + + std::shared_ptr record_batch = + JniGetOrThrow(scanner_adaptor->Next()); + if (record_batch == nullptr) { + return nullptr; // stream ended + } + std::shared_ptr schema = record_batch->schema(); + jobjectArray field_array = + env->NewObjectArray(schema->num_fields(), record_batch_handle_field_class, nullptr); + + std::vector> buffers; + for (int i = 0; i < schema->num_fields(); ++i) { + auto column = record_batch->column(i); + auto dataArray = column->data(); + jobject field = env->NewObject(record_batch_handle_field_class, + record_batch_handle_field_constructor, + column->length(), column->null_count()); + env->SetObjectArrayElement(field_array, i, field); + + for (auto& buffer : dataArray->buffers) { + buffers.push_back(buffer); + } + } + + jobjectArray buffer_array = + env->NewObjectArray(buffers.size(), record_batch_handle_buffer_class, nullptr); + + for (size_t j = 0; j < buffers.size(); ++j) { + auto buffer = buffers[j]; + uint8_t* data = nullptr; + int64_t size = 0; + int64_t capacity = 0; + if (buffer != nullptr) { + data = (uint8_t*)buffer->data(); + size = buffer->size(); + capacity = buffer->capacity(); + } + jobject buffer_handle = env->NewObject(record_batch_handle_buffer_class, + record_batch_handle_buffer_constructor, + CreateNativeRef(buffer), data, size, capacity); + env->SetObjectArrayElement(buffer_array, j, buffer_handle); + } + + jobject ret = env->NewObject(record_batch_handle_class, record_batch_handle_constructor, + record_batch->num_rows(), field_array, buffer_array); + return ret; + JNI_METHOD_END(nullptr) +} + +/* + * Class: org_apache_arrow_dataset_jni_JniWrapper + * Method: releaseBuffer + * Signature: (J)V + */ +JNIEXPORT void JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_releaseBuffer( + JNIEnv* env, jobject, jlong id) { + JNI_METHOD_START + ReleaseNativeRef(id); + JNI_METHOD_END() +} + +/* + * Class: org_apache_arrow_dataset_file_JniWrapper + * Method: makeFileSystemDatasetFactory + * Signature: (Ljava/lang/String;II)J + */ +JNIEXPORT jlong JNICALL +Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory( + JNIEnv* env, jobject, jstring uri, jint file_format_id) { + JNI_METHOD_START + std::shared_ptr file_format = + JniGetOrThrow(GetFileFormat(file_format_id)); + arrow::dataset::FileSystemFactoryOptions options; + std::shared_ptr d = + JniGetOrThrow(arrow::dataset::FileSystemDatasetFactory::Make( + JStringToCString(env, uri), file_format, options)); + return CreateNativeRef(d); + JNI_METHOD_END(-1L) +} diff --git a/java/dataset/CMakeLists.txt b/java/dataset/CMakeLists.txt new file mode 100644 index 00000000000..2743e4a6041 --- /dev/null +++ b/java/dataset/CMakeLists.txt @@ -0,0 +1,43 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# +# arrow_dataset_java +# + +# Headers: top level + +project(arrow_dataset_java) + +# Find java/jni +include(FindJava) +include(UseJava) +include(FindJNI) + +message("generating headers to ${JNI_HEADERS_DIR}") + +add_jar( + arrow_dataset_java + src/main/java/org/apache/arrow/dataset/jni/JniLoader.java + src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java + src/main/java/org/apache/arrow/dataset/jni/NativeRecordBatchHandle.java + src/main/java/org/apache/arrow/dataset/file/JniWrapper.java + src/main/java/org/apache/arrow/dataset/jni/NativeMemoryPool.java + src/main/java/org/apache/arrow/dataset/jni/ReservationListener.java + GENERATE_NATIVE_HEADERS arrow_dataset_java-native + DESTINATION ${JNI_HEADERS_DIR} +) diff --git a/java/dataset/pom.xml b/java/dataset/pom.xml new file mode 100644 index 00000000000..c4246a89090 --- /dev/null +++ b/java/dataset/pom.xml @@ -0,0 +1,134 @@ + + + + + arrow-java-root + org.apache.arrow + 4.0.0-SNAPSHOT + + 4.0.0 + + arrow-dataset + Arrow Java Dataset + Java implementation of Arrow Dataset API/Framework + jar + + ../../../cpp/release-build/ + 2.5.0 + 1.11.0 + 1.8.2 + + + + + org.apache.arrow + arrow-vector + ${project.version} + compile + ${arrow.vector.classifier} + + + org.apache.arrow + arrow-memory-core + ${project.version} + compile + + + org.apache.arrow + arrow-memory-netty + ${project.version} + test + + + org.apache.parquet + parquet-avro + ${parquet.version} + test + + + org.apache.parquet + parquet-hadoop + ${parquet.version} + test + + + org.apache.hadoop + hadoop-common + ${dep.hadoop.version} + test + + + commons-logging + commons-logging + + + javax.servlet + servlet-api + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + + + org.apache.avro + avro + ${avro.version} + test + + + com.google.guava + guava + ${dep.guava.version} + test + + + + + + ${arrow.cpp.build.dir} + + **/libarrow_dataset_jni.* + + + + + + + org.xolstice.maven.plugins + protobuf-maven-plugin + 0.5.1 + + com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier} + + ../../cpp/src/jni/dataset/proto + + + + + compile + test-compile + + + + + + + + diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileFormat.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileFormat.java new file mode 100644 index 00000000000..e341d46beac --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileFormat.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.file; + +/** + * File format definitions. + */ +public enum FileFormat { + PARQUET(0), + NONE(-1); + + private int id; + + FileFormat(int id) { + this.id = id; + } + + public int id() { + return id; + } +} diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java new file mode 100644 index 00000000000..1268d11fe10 --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.file; + +import org.apache.arrow.dataset.jni.NativeDatasetFactory; +import org.apache.arrow.dataset.jni.NativeMemoryPool; +import org.apache.arrow.memory.BufferAllocator; + +/** + * Java binding of the C++ FileSystemDatasetFactory. + */ +public class FileSystemDatasetFactory extends NativeDatasetFactory { + + public FileSystemDatasetFactory(BufferAllocator allocator, NativeMemoryPool memoryPool, FileFormat format, + String uri) { + super(allocator, memoryPool, createNative(format, uri)); + } + + private static long createNative(FileFormat format, String uri) { + return JniWrapper.get().makeFileSystemDatasetFactory(uri, format.id()); + } + +} diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java new file mode 100644 index 00000000000..1af307aac38 --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.file; + +import org.apache.arrow.dataset.jni.JniLoader; + +/** + * JniWrapper for filesystem based {@link org.apache.arrow.dataset.source.Dataset} implementations. + */ +public class JniWrapper { + + private static final JniWrapper INSTANCE = new JniWrapper(); + + public static JniWrapper get() { + return INSTANCE; + } + + private JniWrapper() { + JniLoader.get().ensureLoaded(); + } + + /** + * Create FileSystemDatasetFactory and return its native pointer. The pointer is pointing to a + * intermediate shared_ptr of the factory instance. + * @param uri file uri to read + * @param fileFormat file format ID + * @return the native pointer of the arrow::dataset::FileSystemDatasetFactory instance. + * @see FileFormat + */ + public native long makeFileSystemDatasetFactory(String uri, int fileFormat); + +} diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/DirectReservationListener.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/DirectReservationListener.java new file mode 100644 index 00000000000..72a1cadcf69 --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/DirectReservationListener.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.jni; + +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.arrow.util.VisibleForTesting; + +/** + * Reserving Java direct memory bytes from java.nio.Bits. Used by Java Dataset API's C++ memory + * pool implementation. This makes memory allocated by the pool to be controlled by JVM option + * "-XX:MaxDirectMemorySize". + */ +public class DirectReservationListener implements ReservationListener { + private final Method methodReserve; + private final Method methodUnreserve; + + private DirectReservationListener() { + try { + final Class classBits = Class.forName("java.nio.Bits"); + methodReserve = classBits.getDeclaredMethod("reserveMemory", long.class, int.class); + methodReserve.setAccessible(true); + methodUnreserve = classBits.getDeclaredMethod("unreserveMemory", long.class, int.class); + methodUnreserve.setAccessible(true); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private static final DirectReservationListener INSTANCE = new DirectReservationListener(); + + public static DirectReservationListener instance() { + return INSTANCE; + } + + /** + * Reserve bytes by invoking java.nio.java.Bitjava.nio.Bitss#reserveMemory. + */ + @Override + public void reserve(long size) { + try { + if (size > Integer.MAX_VALUE) { + throw new IllegalArgumentException("reserve size should not be larger than Integer.MAX_VALUE (0x7fffffff)"); + } + methodReserve.invoke(null, (int) size, (int) size); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Unreserve bytes by invoking java.nio.java.Bitjava.nio.Bitss#unreserveMemory. + */ + @Override + public void unreserve(long size) { + try { + if (size > Integer.MAX_VALUE) { + throw new IllegalArgumentException("unreserve size should not be larger than Integer.MAX_VALUE (0x7fffffff)"); + } + methodUnreserve.invoke(null, (int) size, (int) size); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Get current reservation of jVM direct memory. Visible for testing. + */ + @VisibleForTesting + public long getCurrentDirectMemReservation() { + try { + final Class classBits = Class.forName("java.nio.Bits"); + final Field f = classBits.getDeclaredField("reservedMemory"); + f.setAccessible(true); + return ((AtomicLong) f.get(null)).get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniLoader.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniLoader.java new file mode 100644 index 00000000000..15ce5448b86 --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniLoader.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.jni; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * The JniLoader for Dataset API's native implementation. + */ +public final class JniLoader { + + private static final JniLoader INSTANCE = new JniLoader(Collections.singletonList("arrow_dataset_jni")); + + public static JniLoader get() { + return INSTANCE; + } + + private final Set librariesToLoad; + + private JniLoader(List libraryNames) { + librariesToLoad = new HashSet<>(libraryNames); + } + + private boolean finished() { + return librariesToLoad.isEmpty(); + } + + /** + * If required JNI libraries are not loaded, then load them. + */ + public void ensureLoaded() { + if (finished()) { + return; + } + loadRemaining(); + } + + private synchronized void loadRemaining() { + // The method is protected by a mutex via synchronized, if more than one thread race to call + // loadRemaining, at same time only one will do the actual loading and the others will wait for + // the mutex to be acquired then check on the remaining list: if there are libraries that were not + // successfully loaded then the mutex owner will try to load them again. + if (finished()) { + return; + } + List libs = new ArrayList<>(librariesToLoad); + for (String lib : libs) { + load(lib); + librariesToLoad.remove(lib); + } + } + + private void load(String name) { + final String libraryToLoad = System.mapLibraryName(name); + try { + File temp = File.createTempFile("jnilib-", ".tmp", new File(System.getProperty("java.io.tmpdir"))); + try (final InputStream is + = JniWrapper.class.getClassLoader().getResourceAsStream(libraryToLoad)) { + if (is == null) { + throw new FileNotFoundException(libraryToLoad); + } + Files.copy(is, temp.toPath(), StandardCopyOption.REPLACE_EXISTING); + System.load(temp.getAbsolutePath()); + } + } catch (IOException e) { + throw new IllegalStateException("error loading native libraries: " + e); + } + } +} diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java new file mode 100644 index 00000000000..8460841eeee --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.jni; + +/** + * JNI wrapper for Dataset API's native implementation. + */ +public class JniWrapper { + + private static final JniWrapper INSTANCE = new JniWrapper(); + + public static JniWrapper get() { + return INSTANCE; + } + + private JniWrapper() { + JniLoader.get().ensureLoaded(); + } + + /** + * Release the DatasetFactory by destroying its reference held by JNI wrapper. + * + * @param datasetFactoryId the native pointer of the arrow::dataset::DatasetFactory instance. + */ + public native void closeDatasetFactory(long datasetFactoryId); + + /** + * Get a serialized schema from native instance of a DatasetFactory. + * + * @param datasetFactoryId the native pointer of the arrow::dataset::DatasetFactory instance. + * @return the serialized schema + * @see org.apache.arrow.vector.types.pojo.Schema + */ + public native byte[] inspectSchema(long datasetFactoryId); + + /** + * Create Dataset from a DatasetFactory and get the native pointer of the Dataset. + * + * @param datasetFactoryId the native pointer of the arrow::dataset::DatasetFactory instance. + * @param schema the predefined schema of the resulting Dataset. + * @return the native pointer of the arrow::dataset::Dataset instance. + */ + public native long createDataset(long datasetFactoryId, byte[] schema); + + /** + * Release the Dataset by destroying its reference held by JNI wrapper. + * + * @param datasetId the native pointer of the arrow::dataset::Dataset instance. + */ + public native void closeDataset(long datasetId); + + /** + * Create Scanner from a Dataset and get the native pointer of the Dataset. + * @param datasetId the native pointer of the arrow::dataset::Dataset instance. + * @param columns desired column names. Columns not in this list will not be emitted when performing scan operation. + * @param batchSize batch size of scanned record batches. + * @param memoryPool identifier of memory pool used in the native scanner. + * @return the native pointer of the arrow::dataset::Scanner instance. + */ + public native long createScanner(long datasetId, String[] columns, long batchSize, long memoryPool); + + /** + * Get a serialized schema from native instance of a Scanner. + * + * @param scannerId the native pointer of the arrow::dataset::Scanner instance. + * @return the serialized schema + * @see org.apache.arrow.vector.types.pojo.Schema + */ + public native byte[] getSchemaFromScanner(long scannerId); + + /** + * Release the Scanner by destroying its reference held by JNI wrapper. + * @param scannerId the native pointer of the arrow::dataset::Scanner instance. + */ + public native void closeScanner(long scannerId); + + /** + * Read next record batch from the specified scanner. + * @param scannerId the native pointer of the arrow::dataset::Scanner instance. + * @return an instance of {@link NativeRecordBatchHandle} describing the overall layout of the native record batch. + */ + public native NativeRecordBatchHandle nextRecordBatch(long scannerId); + + /** + * Release the Buffer by destroying its reference held by JNI wrapper. + * @param bufferId the native pointer of the arrow::Buffer instance. + */ + public native void releaseBuffer(long bufferId); + +} diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeContext.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeContext.java new file mode 100644 index 00000000000..7f6dfbc028f --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeContext.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.jni; + +import org.apache.arrow.memory.BufferAllocator; + +/** + * Context for relevant classes of NativeDataset. + */ +public class NativeContext { + private final BufferAllocator allocator; + private final NativeMemoryPool memoryPool; + + /** + * Constructor. + * + * @param allocator The allocator in use. + * @param memoryPool Native memory pool. + */ + public NativeContext(BufferAllocator allocator, NativeMemoryPool memoryPool) { + this.allocator = allocator; + this.memoryPool = memoryPool; + } + + /** + * Returns the allocator which is in use. + */ + public BufferAllocator getAllocator() { + return allocator; + } + + /** + * Returns the native memory pool. + */ + public NativeMemoryPool getMemoryPool() { + return memoryPool; + } +} diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java new file mode 100644 index 00000000000..537886f3221 --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.jni; + +import org.apache.arrow.dataset.scanner.ScanOptions; +import org.apache.arrow.dataset.source.Dataset; + +/** + * Native implementation of {@link Dataset}. + */ +public class NativeDataset implements Dataset { + + private final NativeContext context; + private final long datasetId; + + private boolean closed = false; + + public NativeDataset(NativeContext context, long datasetId) { + this.context = context; + this.datasetId = datasetId; + } + + @Override + public synchronized NativeScanner newScan(ScanOptions options) { + if (closed) { + throw new NativeInstanceReleasedException(); + } + long scannerId = JniWrapper.get().createScanner(datasetId, options.getColumns(), options.getBatchSize(), + context.getMemoryPool().getNativeInstanceId()); + return new NativeScanner(context, scannerId); + } + + @Override + public synchronized void close() { + if (closed) { + return; + } + closed = true; + JniWrapper.get().closeDataset(datasetId); + } +} diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDatasetFactory.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDatasetFactory.java new file mode 100644 index 00000000000..993d44fa26f --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDatasetFactory.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.jni; + +import java.io.IOException; + +import org.apache.arrow.dataset.source.DatasetFactory; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.arrow.vector.util.SchemaUtility; + +/** + * Native implementation of {@link DatasetFactory}. + */ +public class NativeDatasetFactory implements DatasetFactory { + private final long datasetFactoryId; + private final NativeMemoryPool memoryPool; + private final BufferAllocator allocator; + + private boolean closed = false; + + /** + * Constructor. + * + * @param allocator a context allocator associated with this factory. Any buffer that will be created natively will + * be then bound to this allocator. + * @param memoryPool the native memory pool associated with this factory. Any buffer created natively should request + * for memory spaces from this memory pool. This is a mapped instance of c++ arrow::MemoryPool. + * @param datasetFactoryId an ID, at the same time the native pointer of the underlying native instance of this + * factory. Make sure in c++ side the pointer is pointing to the shared pointer wrapping + * the actual instance so we could successfully decrease the reference count once + * {@link #close} is called. + * @see #close() + */ + public NativeDatasetFactory(BufferAllocator allocator, NativeMemoryPool memoryPool, long datasetFactoryId) { + this.allocator = allocator; + this.memoryPool = memoryPool; + this.datasetFactoryId = datasetFactoryId; + } + + @Override + public Schema inspect() { + final byte[] buffer; + synchronized (this) { + if (closed) { + throw new NativeInstanceReleasedException(); + } + buffer = JniWrapper.get().inspectSchema(datasetFactoryId); + } + try { + return SchemaUtility.deserialize(buffer, allocator); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public NativeDataset finish() { + return finish(inspect()); + } + + @Override + public NativeDataset finish(Schema schema) { + try { + byte[] serialized = SchemaUtility.serialize(schema); + synchronized (this) { + if (closed) { + throw new NativeInstanceReleasedException(); + } + return new NativeDataset(new NativeContext(allocator, memoryPool), + JniWrapper.get().createDataset(datasetFactoryId, serialized)); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** + * Close this factory by release the pointer of the native instance. + */ + @Override + public synchronized void close() { + if (closed) { + return; + } + closed = true; + JniWrapper.get().closeDatasetFactory(datasetFactoryId); + } +} diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeInstanceReleasedException.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeInstanceReleasedException.java new file mode 100644 index 00000000000..3231ca23abe --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeInstanceReleasedException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.jni; + +/** + * Thrown if trying to operate on a native instance that is already released. + */ +public class NativeInstanceReleasedException extends RuntimeException { + public NativeInstanceReleasedException() { + super("Native instance has been released"); + } + + public NativeInstanceReleasedException(String message) { + super(message); + } +} diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeMemoryPool.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeMemoryPool.java new file mode 100644 index 00000000000..83825776b76 --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeMemoryPool.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.jni; + +/** + * C++ memory pool(arrow::MemoryPool)'s Java mapped instance. + */ +public class NativeMemoryPool implements AutoCloseable { + private final long nativeInstanceId; + + static { + JniLoader.get().ensureLoaded(); + } + + private NativeMemoryPool(long nativeInstanceId) { + this.nativeInstanceId = nativeInstanceId; + } + + /** + * Get the default memory pool. This will return arrow::default_memory_pool() directly. + */ + public static NativeMemoryPool getDefault() { + return new NativeMemoryPool(getDefaultMemoryPool()); + } + + /** + * Create a listenable memory pool (see also: arrow::ReservationListenableMemoryPool) with + * a specific listener. All buffers created from the memory pool should take enough reservation + * from the listener in advance. + */ + public static NativeMemoryPool createListenable(ReservationListener listener) { + return new NativeMemoryPool(createListenableMemoryPool(listener)); + } + + /** + * Return native instance ID of this memory pool. + */ + public long getNativeInstanceId() { + return nativeInstanceId; + } + + /** + * Get current allocated bytes. + */ + public long getBytesAllocated() { + return bytesAllocated(nativeInstanceId); + } + + @Override + public void close() throws Exception { + releaseMemoryPool(nativeInstanceId); + } + + private static native long getDefaultMemoryPool(); + + private static native long createListenableMemoryPool(ReservationListener listener); + + private static native void releaseMemoryPool(long id); + + private static native long bytesAllocated(long id); +} diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeRecordBatchHandle.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeRecordBatchHandle.java new file mode 100644 index 00000000000..dd90fd1c1dd --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeRecordBatchHandle.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.jni; + +import java.util.Arrays; +import java.util.List; + +/** + * Hold pointers to a Arrow C++ RecordBatch. + */ +public class NativeRecordBatchHandle { + + private final long numRows; + private final List fields; + private final List buffers; + + /** + * Constructor. + * + * @param numRows Total row number of the associated RecordBatch + * @param fields Metadata of fields + * @param buffers Retained Arrow buffers + */ + public NativeRecordBatchHandle(long numRows, Field[] fields, Buffer[] buffers) { + this.numRows = numRows; + this.fields = Arrays.asList(fields); + this.buffers = Arrays.asList(buffers); + } + + /** + * Returns the total row number of the associated RecordBatch. + * @return Total row number of the associated RecordBatch. + */ + public long getNumRows() { + return numRows; + } + + /** + * Returns Metadata of fields. + * @return Metadata of fields. + */ + public List getFields() { + return fields; + } + + /** + * Returns the buffers. + * @return Retained Arrow buffers. + */ + public List getBuffers() { + return buffers; + } + + /** + * Field metadata. + */ + public static class Field { + public final long length; + public final long nullCount; + + public Field(long length, long nullCount) { + this.length = length; + this.nullCount = nullCount; + } + } + + /** + * Pointers and metadata of the targeted Arrow buffer. + */ + public static class Buffer { + public final long nativeInstanceId; + public final long memoryAddress; + public final long size; + public final long capacity; + + /** + * Constructor. + * + * @param nativeInstanceId Native instance's id + * @param memoryAddress Memory address of the first byte + * @param size Size (in bytes) + * @param capacity Capacity (in bytes) + */ + public Buffer(long nativeInstanceId, long memoryAddress, long size, long capacity) { + this.nativeInstanceId = nativeInstanceId; + this.memoryAddress = memoryAddress; + this.size = size; + this.capacity = capacity; + } + } +} diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanTask.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanTask.java new file mode 100644 index 00000000000..14d89c2ee7c --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanTask.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.jni; + +import org.apache.arrow.dataset.scanner.ScanTask; + +/** + * Native implementation of {@link ScanTask}. Currently RecordBatches are iterated directly by the scanner + * id via {@link JniWrapper}, thus we allow only one-time execution of method {@link #execute()}. If a re-scan + * operation is expected, call {@link NativeDataset#newScan} to create a new scanner instance. + */ +public class NativeScanTask implements ScanTask { + private final NativeScanner scanner; + + /** + * Constructor. + */ + public NativeScanTask(NativeScanner scanner) { + this.scanner = scanner; + } + + @Override + public BatchIterator execute() { + return scanner.execute(); + } + + @Override + public void close() { + scanner.close(); + } +} diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java new file mode 100644 index 00000000000..24c298067af --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.jni; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.NoSuchElementException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; + +import org.apache.arrow.dataset.scanner.ScanTask; +import org.apache.arrow.dataset.scanner.Scanner; +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.BufferLedger; +import org.apache.arrow.memory.NativeUnderlyingMemory; +import org.apache.arrow.memory.util.LargeMemoryUtil; +import org.apache.arrow.vector.ipc.message.ArrowFieldNode; +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.arrow.vector.util.SchemaUtility; + +/** + * Native implementation of {@link Scanner}. Note that it currently emits only a single scan task of type + * {@link NativeScanTask}, which is internally a combination of all scan task instances returned by the + * native scanner. + */ +public class NativeScanner implements Scanner { + + private final AtomicBoolean executed = new AtomicBoolean(false); + private final NativeContext context; + private final long scannerId; + + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + private final Lock writeLock = lock.writeLock(); + private final Lock readLock = lock.readLock(); + private boolean closed = false; + + public NativeScanner(NativeContext context, long scannerId) { + this.context = context; + this.scannerId = scannerId; + } + + ScanTask.BatchIterator execute() { + if (closed) { + throw new NativeInstanceReleasedException(); + } + if (!executed.compareAndSet(false, true)) { + throw new UnsupportedOperationException("NativeScanner cannot be executed more than once. Consider creating " + + "new scanner instead"); + } + return new ScanTask.BatchIterator() { + private ArrowRecordBatch peek = null; + + @Override + public void close() { + NativeScanner.this.close(); + } + + @Override + public boolean hasNext() { + if (peek != null) { + return true; + } + final NativeRecordBatchHandle handle; + readLock.lock(); + try { + if (closed) { + throw new NativeInstanceReleasedException(); + } + handle = JniWrapper.get().nextRecordBatch(scannerId); + } finally { + readLock.unlock(); + } + if (handle == null) { + return false; + } + final ArrayList buffers = new ArrayList<>(); + for (NativeRecordBatchHandle.Buffer buffer : handle.getBuffers()) { + final BufferAllocator allocator = context.getAllocator(); + final int size = LargeMemoryUtil.checkedCastToInt(buffer.size); + final NativeUnderlyingMemory am = NativeUnderlyingMemory.create(allocator, + size, buffer.nativeInstanceId, buffer.memoryAddress); + BufferLedger ledger = am.associate(allocator); + ArrowBuf buf = new ArrowBuf(ledger, null, size, buffer.memoryAddress); + buffers.add(buf); + } + + try { + final int numRows = LargeMemoryUtil.checkedCastToInt(handle.getNumRows()); + peek = new ArrowRecordBatch(numRows, handle.getFields().stream() + .map(field -> new ArrowFieldNode(field.length, field.nullCount)) + .collect(Collectors.toList()), buffers); + return true; + } finally { + buffers.forEach(buffer -> buffer.getReferenceManager().release()); + } + } + + @Override + public ArrowRecordBatch next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + try { + return peek; + } finally { + peek = null; + } + } + }; + } + + @Override + public Iterable scan() { + if (closed) { + throw new NativeInstanceReleasedException(); + } + return Collections.singletonList(new NativeScanTask(this)); + } + + @Override + public Schema schema() { + readLock.lock(); + try { + if (closed) { + throw new NativeInstanceReleasedException(); + } + return SchemaUtility.deserialize(JniWrapper.get().getSchemaFromScanner(scannerId), context.getAllocator()); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + readLock.unlock(); + } + } + + @Override + public void close() { + writeLock.lock(); + try { + if (closed) { + return; + } + closed = true; + JniWrapper.get().closeScanner(scannerId); + } finally { + writeLock.unlock(); + } + } +} diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/ReservationListener.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/ReservationListener.java new file mode 100644 index 00000000000..f1ffdd2acbd --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/ReservationListener.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.jni; + +/** + * Listener of buffer memory reservation. Used by native datasets. + */ +public interface ReservationListener { + + /** + * Reserve bytes. + * + * @throws RuntimeException if request size cannot be granted + */ + void reserve(long size); + + /** + * Unreserve bytes. + */ + void unreserve(long size); +} diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java new file mode 100644 index 00000000000..02317541d42 --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.scanner; + +/** + * Options used during scanning. + */ +public class ScanOptions { + private final String[] columns; + private final long batchSize; + + /** + * Constructor. + * @param columns Projected columns. Empty for scanning all columns. + * @param batchSize Maximum row number of each returned {@link org.apache.arrow.vector.ipc.message.ArrowRecordBatch} + */ + public ScanOptions(String[] columns, long batchSize) { + this.columns = columns; + this.batchSize = batchSize; + } + + public String[] getColumns() { + return columns; + } + + public long getBatchSize() { + return batchSize; + } +} diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanTask.java b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanTask.java new file mode 100644 index 00000000000..d07036a61ee --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanTask.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.scanner; + +import java.util.Iterator; + +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; + +/** + * Read record batches from a range of a single data fragment. A + * ScanTask is meant to be a unit of work to be dispatched. The implementation + * must be thread and concurrent safe. + */ +public interface ScanTask extends AutoCloseable { + + /** + * Creates and returns a {@link BatchIterator} instance. + */ + BatchIterator execute(); + + /** + * The iterator implementation for {@link org.apache.arrow.vector.ipc.message.ArrowRecordBatch}s. + */ + interface BatchIterator extends Iterator, AutoCloseable { + + } +} diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/Scanner.java b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/Scanner.java new file mode 100644 index 00000000000..93a1b08f366 --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/Scanner.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.scanner; + +import org.apache.arrow.vector.types.pojo.Schema; + +/** + * A high level interface for scanning data over dataset. + */ +public interface Scanner extends AutoCloseable { + + /** + * Perform the scan operation. + * + * @return a iterable set of {@link ScanTask}s. Each task is considered independent and it is allowed + * to execute the tasks concurrently to gain better performance. + */ + Iterable scan(); + + /** + * Get the schema of this Scanner. + * + * @return the schema instance + */ + Schema schema(); +} diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/source/Dataset.java b/java/dataset/src/main/java/org/apache/arrow/dataset/source/Dataset.java new file mode 100644 index 00000000000..ce193581f40 --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/source/Dataset.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.source; + +import org.apache.arrow.dataset.scanner.ScanOptions; +import org.apache.arrow.dataset.scanner.Scanner; + +/** + * A container of Fragments which are the internal iterable unit of read data. + */ +public interface Dataset extends AutoCloseable { + + /** + * Create a new Scanner using the provided scan options. + * + * @param options options used during creating Scanner + * @return the Scanner instance + */ + Scanner newScan(ScanOptions options); +} diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/source/DatasetFactory.java b/java/dataset/src/main/java/org/apache/arrow/dataset/source/DatasetFactory.java new file mode 100644 index 00000000000..46b8545d662 --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/source/DatasetFactory.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.source; + +import org.apache.arrow.vector.types.pojo.Schema; + +/** + * DatasetFactory provides a way to inspect a Dataset potential + * schema before materializing it. Thus, the user can peek the schema for + * data sources and decide on a unified schema. + */ +public interface DatasetFactory extends AutoCloseable { + + /** + * Get unified schema for the resulting Dataset. + * + * @return the schema object inspected + */ + Schema inspect(); + + /** + * Create a Dataset with auto-inferred schema. Which means, the schema of the resulting Dataset will be + * the same with calling {@link #inspect()} manually. + * + * @return the Dataset instance + */ + Dataset finish(); + + /** + * Create a Dataset with predefined schema. Schema inference will not be performed. + * + * @param schema a predefined schema + * @return the Dataset instance + */ + Dataset finish(Schema schema); +} diff --git a/java/dataset/src/main/java/org/apache/arrow/memory/NativeUnderlyingMemory.java b/java/dataset/src/main/java/org/apache/arrow/memory/NativeUnderlyingMemory.java new file mode 100644 index 00000000000..963fb617040 --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/memory/NativeUnderlyingMemory.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.memory; + +import org.apache.arrow.dataset.jni.JniWrapper; + +/** + * AllocationManager implementation for native allocated memory. + */ +public class NativeUnderlyingMemory extends AllocationManager { + + private final int size; + private final long nativeInstanceId; + private final long address; + + /** + * Constructor. + * + * @param accountingAllocator The accounting allocator instance + * @param size Size of underlying memory (in bytes) + * @param nativeInstanceId ID of the native instance + */ + NativeUnderlyingMemory(BufferAllocator accountingAllocator, int size, long nativeInstanceId, long address) { + super(accountingAllocator); + this.size = size; + this.nativeInstanceId = nativeInstanceId; + this.address = address; + // pre-allocate bytes on accounting allocator + final AllocationListener listener = accountingAllocator.getListener(); + try (final AllocationReservation reservation = accountingAllocator.newReservation()) { + listener.onPreAllocation(size); + reservation.reserve(size); + listener.onAllocation(size); + } catch (Exception e) { + release0(); + throw e; + } + } + + /** + * Alias to constructor. + */ + public static NativeUnderlyingMemory create(BufferAllocator bufferAllocator, int size, long nativeInstanceId, + long address) { + return new NativeUnderlyingMemory(bufferAllocator, size, nativeInstanceId, address); + } + + public BufferLedger associate(BufferAllocator allocator) { + return super.associate(allocator); + } + + @Override + protected void release0() { + JniWrapper.get().releaseBuffer(nativeInstanceId); + } + + @Override + public long getSize() { + return size; + } + + @Override + protected long memoryAddress() { + return address; + } +} diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/ParquetWriteSupport.java b/java/dataset/src/test/java/org/apache/arrow/dataset/ParquetWriteSupport.java new file mode 100644 index 00000000000..c6299d135a0 --- /dev/null +++ b/java/dataset/src/test/java/org/apache/arrow/dataset/ParquetWriteSupport.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset; + +import java.io.File; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.arrow.util.Preconditions; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.parquet.avro.AvroParquetWriter; +import org.apache.parquet.hadoop.ParquetWriter; + +/** + * Utility class for writing Parquet files using Avro based tools. + */ +public class ParquetWriteSupport implements AutoCloseable { + + private final String path; + private final String uri; + private final ParquetWriter writer; + private final Schema avroSchema; + private final List writtenRecords = new ArrayList<>(); + private final GenericRecordListBuilder recordListBuilder = new GenericRecordListBuilder(); + + + public ParquetWriteSupport(String schemaName, File outputFolder) throws Exception { + avroSchema = readSchemaFromFile(schemaName); + path = outputFolder.getPath() + File.separator + "generated.parquet"; + uri = "file://" + path; + writer = AvroParquetWriter.builder(new org.apache.hadoop.fs.Path(path)) + .withSchema(avroSchema) + .build(); + } + + private static Schema readSchemaFromFile(String schemaName) throws Exception { + Path schemaPath = Paths.get(ParquetWriteSupport.class.getResource("/").getPath(), + "avroschema", schemaName); + return new org.apache.avro.Schema.Parser().parse(schemaPath.toFile()); + } + + public static ParquetWriteSupport writeTempFile(String schemaName, File outputFolder, + Object... values) throws Exception { + try (final ParquetWriteSupport writeSupport = new ParquetWriteSupport(schemaName, outputFolder)) { + writeSupport.writeRecords(values); + return writeSupport; + } + } + + public void writeRecords(Object... values) throws Exception { + final List valueList = getRecordListBuilder().createRecordList(values); + writeRecords(valueList); + } + + public void writeRecords(List records) throws Exception { + for (GenericRecord record : records) { + writeRecord(record); + } + } + + public void writeRecord(GenericRecord record) throws Exception { + writtenRecords.add(record); + writer.write(record); + } + + public String getOutputURI() { + return uri; + } + + public Schema getAvroSchema() { + return avroSchema; + } + + public GenericRecordListBuilder getRecordListBuilder() { + return recordListBuilder; + } + + public List getWrittenRecords() { + return Collections.unmodifiableList(writtenRecords); + } + + @Override + public void close() throws Exception { + writer.close(); + } + + public class GenericRecordListBuilder { + public final List createRecordList(Object... values) { + final int fieldCount = avroSchema.getFields().size(); + Preconditions.checkArgument(values.length % fieldCount == 0, + "arg count of values should be divide by field number"); + final List recordList = new ArrayList<>(); + for (int i = 0; i < values.length / fieldCount; i++) { + final GenericRecord record = new GenericData.Record(avroSchema); + for (int j = 0; j < fieldCount; j++) { + record.put(j, values[i * fieldCount + j]); + } + recordList.add(record); + } + return Collections.unmodifiableList(recordList); + } + } +} diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/TestDataset.java b/java/dataset/src/test/java/org/apache/arrow/dataset/TestDataset.java new file mode 100644 index 00000000000..51dac15e561 --- /dev/null +++ b/java/dataset/src/test/java/org/apache/arrow/dataset/TestDataset.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset; + +import java.util.Iterator; +import java.util.List; +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import org.apache.arrow.dataset.scanner.ScanOptions; +import org.apache.arrow.dataset.scanner.Scanner; +import org.apache.arrow.dataset.source.Dataset; +import org.apache.arrow.dataset.source.DatasetFactory; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.util.AutoCloseables; +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; +import org.apache.arrow.vector.types.pojo.Schema; +import org.junit.After; +import org.junit.Before; + +public abstract class TestDataset { + private RootAllocator allocator = null; + + @Before + public void setUp() { + allocator = new RootAllocator(Long.MAX_VALUE); + } + + @After + public void tearDown() { + allocator.close(); + } + + protected RootAllocator rootAllocator() { + return allocator; + } + + protected List collectResultFromFactory(DatasetFactory factory, ScanOptions options) { + final Dataset dataset = factory.finish(); + final Scanner scanner = dataset.newScan(options); + final List ret = stream(scanner.scan()) + .flatMap(t -> stream(t.execute())) + .collect(Collectors.toList()); + try { + AutoCloseables.close(scanner, dataset); + } catch (Exception e) { + throw new RuntimeException(e); + } + return ret; + } + + protected Schema inferResultSchemaFromFactory(DatasetFactory factory, ScanOptions options) { + final Dataset dataset = factory.finish(); + final Scanner scanner = dataset.newScan(options); + final Schema schema = scanner.schema(); + try { + AutoCloseables.close(scanner, dataset); + } catch (Exception e) { + throw new RuntimeException(e); + } + return schema; + } + + protected Stream stream(Iterable iterable) { + return StreamSupport.stream(iterable.spliterator(), false); + } + + protected List collect(Iterable iterable) { + return stream(iterable).collect(Collectors.toList()); + } + + protected Stream stream(Iterator iterator) { + return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false); + } + + protected List collect(Iterator iterator) { + return stream(iterator).collect(Collectors.toList()); + } +} diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java new file mode 100644 index 00000000000..063f0955925 --- /dev/null +++ b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.file; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; + +import org.apache.arrow.dataset.ParquetWriteSupport; +import org.apache.arrow.dataset.jni.NativeDataset; +import org.apache.arrow.dataset.jni.NativeInstanceReleasedException; +import org.apache.arrow.dataset.jni.NativeMemoryPool; +import org.apache.arrow.dataset.jni.NativeScanTask; +import org.apache.arrow.dataset.jni.NativeScanner; +import org.apache.arrow.dataset.jni.TestNativeDataset; +import org.apache.arrow.dataset.scanner.ScanOptions; +import org.apache.arrow.dataset.scanner.ScanTask; +import org.apache.arrow.util.AutoCloseables; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.VectorLoader; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; +import org.apache.arrow.vector.types.Types; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericRecordBuilder; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; +import org.junit.rules.TemporaryFolder; + +public class TestFileSystemDataset extends TestNativeDataset { + + @ClassRule + public static final TemporaryFolder TMP = new TemporaryFolder(); + + public static final String AVRO_SCHEMA_USER = "user.avsc"; + + @Test + public void testParquetRead() throws Exception { + ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a"); + + FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), + FileFormat.PARQUET, writeSupport.getOutputURI()); + ScanOptions options = new ScanOptions(new String[0], 100); + Schema schema = inferResultSchemaFromFactory(factory, options); + List datum = collectResultFromFactory(factory, options); + + assertSingleTaskProduced(factory, options); + assertEquals(1, datum.size()); + assertEquals(2, schema.getFields().size()); + assertEquals("id", schema.getFields().get(0).getName()); + assertEquals("name", schema.getFields().get(1).getName()); + assertEquals(Types.MinorType.INT.getType(), schema.getFields().get(0).getType()); + assertEquals(Types.MinorType.VARCHAR.getType(), schema.getFields().get(1).getType()); + checkParquetReadResult(schema, writeSupport.getWrittenRecords(), datum); + + AutoCloseables.close(datum); + } + + @Test + public void testParquetProjector() throws Exception { + ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a"); + + FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), + FileFormat.PARQUET, writeSupport.getOutputURI()); + ScanOptions options = new ScanOptions(new String[]{"id"}, 100); + Schema schema = inferResultSchemaFromFactory(factory, options); + List datum = collectResultFromFactory(factory, options); + org.apache.avro.Schema expectedSchema = truncateAvroSchema(writeSupport.getAvroSchema(), 0, 1); + + assertSingleTaskProduced(factory, options); + assertEquals(1, schema.getFields().size()); + assertEquals("id", schema.getFields().get(0).getName()); + assertEquals(Types.MinorType.INT.getType(), schema.getFields().get(0).getType()); + assertEquals(1, datum.size()); + checkParquetReadResult(schema, + Collections.singletonList( + new GenericRecordBuilder( + expectedSchema) + .set("id", 1) + .build()), datum); + + AutoCloseables.close(datum); + } + + @Test + public void testParquetBatchSize() throws Exception { + ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), + 1, "a", 2, "b", 3, "c"); + + ScanOptions options = new ScanOptions(new String[0], 1); + FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), + FileFormat.PARQUET, writeSupport.getOutputURI()); + Schema schema = inferResultSchemaFromFactory(factory, options); + List datum = collectResultFromFactory(factory, options); + + assertSingleTaskProduced(factory, options); + assertEquals(3, datum.size()); + datum.forEach(batch -> assertEquals(1, batch.getLength())); + checkParquetReadResult(schema, writeSupport.getWrittenRecords(), datum); + + AutoCloseables.close(datum); + } + + @Test + public void testCloseAgain() throws Exception { + ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a"); + + FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), + FileFormat.PARQUET, writeSupport.getOutputURI()); + + assertDoesNotThrow(() -> { + NativeDataset dataset = factory.finish(); + dataset.close(); + dataset.close(); + }); + } + + @Test + public void testScanAgain() throws Exception { + ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a"); + + FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), + FileFormat.PARQUET, writeSupport.getOutputURI()); + NativeDataset dataset = factory.finish(); + ScanOptions options = new ScanOptions(new String[0], 100); + NativeScanner scanner = dataset.newScan(options); + List taskList1 = collect(scanner.scan()); + List taskList2 = collect(scanner.scan()); + NativeScanTask task1 = taskList1.get(0); + NativeScanTask task2 = taskList2.get(0); + List datum = collect(task1.execute()); + + UnsupportedOperationException uoe = assertThrows(UnsupportedOperationException.class, task2::execute); + Assertions.assertEquals("NativeScanner cannot be executed more than once. Consider creating new scanner instead", + uoe.getMessage()); + + AutoCloseables.close(datum); + AutoCloseables.close(taskList1); + AutoCloseables.close(taskList2); + AutoCloseables.close(scanner, dataset, factory); + } + + @Test + public void testScanInOtherThread() throws Exception { + ExecutorService executor = Executors.newSingleThreadExecutor(); + ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a"); + + FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), + FileFormat.PARQUET, writeSupport.getOutputURI()); + NativeDataset dataset = factory.finish(); + ScanOptions options = new ScanOptions(new String[0], 100); + NativeScanner scanner = dataset.newScan(options); + List taskList = collect(scanner.scan()); + NativeScanTask task = taskList.get(0); + List datum = executor.submit(() -> collect(task.execute())).get(); + + AutoCloseables.close(datum); + AutoCloseables.close(taskList); + AutoCloseables.close(scanner, dataset, factory); + } + + @Test + public void testScanAfterClose1() throws Exception { + ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a"); + + FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), + FileFormat.PARQUET, writeSupport.getOutputURI()); + NativeDataset dataset = factory.finish(); + ScanOptions options = new ScanOptions(new String[0], 100); + NativeScanner scanner = dataset.newScan(options); + scanner.close(); + assertThrows(NativeInstanceReleasedException.class, scanner::scan); + } + + @Test + public void testScanAfterClose2() throws Exception { + ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a"); + + FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), + FileFormat.PARQUET, writeSupport.getOutputURI()); + NativeDataset dataset = factory.finish(); + ScanOptions options = new ScanOptions(new String[0], 100); + NativeScanner scanner = dataset.newScan(options); + List tasks = collect(scanner.scan()); + NativeScanTask task = tasks.get(0); + task.close(); + assertThrows(NativeInstanceReleasedException.class, task::execute); + } + + @Test + public void testScanAfterClose3() throws Exception { + ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a"); + + FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), + FileFormat.PARQUET, writeSupport.getOutputURI()); + NativeDataset dataset = factory.finish(); + ScanOptions options = new ScanOptions(new String[0], 100); + NativeScanner scanner = dataset.newScan(options); + List tasks = collect(scanner.scan()); + NativeScanTask task = tasks.get(0); + ScanTask.BatchIterator iterator = task.execute(); + task.close(); + assertThrows(NativeInstanceReleasedException.class, iterator::hasNext); + } + + @Test + public void testMemoryAllocation() throws Exception { + ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a"); + FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), + FileFormat.PARQUET, writeSupport.getOutputURI()); + ScanOptions options = new ScanOptions(new String[0], 100); + long initReservation = rootAllocator().getAllocatedMemory(); + List datum = collectResultFromFactory(factory, options); + final long expected_diff = datum.stream() + .flatMapToLong(batch -> batch.getBuffers() + .stream() + .mapToLong(buf -> buf.getReferenceManager().getAccountedSize())).sum(); + long reservation = rootAllocator().getAllocatedMemory(); + AutoCloseables.close(datum); + long finalReservation = rootAllocator().getAllocatedMemory(); + Assert.assertEquals(expected_diff, reservation - initReservation); + Assert.assertEquals(-expected_diff, finalReservation - reservation); + } + + private void checkParquetReadResult(Schema schema, List expected, List actual) { + assertEquals(expected.size(), actual.stream() + .mapToInt(ArrowRecordBatch::getLength) + .sum()); + final int fieldCount = schema.getFields().size(); + LinkedList expectedRemovable = new LinkedList<>(expected); + try (VectorSchemaRoot vsr = VectorSchemaRoot.create(schema, rootAllocator())) { + VectorLoader loader = new VectorLoader(vsr); + for (ArrowRecordBatch batch : actual) { + try { + assertEquals(fieldCount, batch.getNodes().size()); + loader.load(batch); + int batchRowCount = vsr.getRowCount(); + for (int i = 0; i < fieldCount; i++) { + FieldVector vector = vsr.getVector(i); + for (int j = 0; j < batchRowCount; j++) { + Object object = vector.getObject(j); + Object expectedObject = expectedRemovable.get(j).get(i); + assertEquals(Objects.toString(expectedObject), + Objects.toString(object)); + } + } + for (int i = 0; i < batchRowCount; i++) { + expectedRemovable.poll(); + } + } finally { + batch.close(); + } + } + assertTrue(expectedRemovable.isEmpty()); + } + } + + private org.apache.avro.Schema truncateAvroSchema(org.apache.avro.Schema schema, int from, int to) { + List fields = schema.getFields().subList(from, to); + return org.apache.avro.Schema.createRecord( + fields.stream() + .map(f -> new org.apache.avro.Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal(), f.order())) + .collect(Collectors.toList())); + } +} diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDatasetFactory.java b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDatasetFactory.java new file mode 100644 index 00000000000..bddf96b5ec9 --- /dev/null +++ b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDatasetFactory.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.file; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import org.apache.arrow.dataset.jni.NativeMemoryPool; +import org.apache.arrow.memory.RootAllocator; +import org.junit.Test; + +public class TestFileSystemDatasetFactory { + + @Test + public void testErrorHandling() { + RuntimeException e = assertThrows(RuntimeException.class, () -> { + new FileSystemDatasetFactory(new RootAllocator(Long.MAX_VALUE), NativeMemoryPool.getDefault(), + FileFormat.NONE, "file:///NON_EXIST_FILE"); + }); + assertEquals("illegal file format id: -1", e.getMessage()); + } + + @Test + public void testCloseAgain() { + assertDoesNotThrow(() -> { + FileSystemDatasetFactory factory = new FileSystemDatasetFactory(new RootAllocator(Long.MAX_VALUE), + NativeMemoryPool.getDefault(), FileFormat.PARQUET, "file:///NON_EXIST_FILE"); + factory.close(); + factory.close(); + }); + } +} diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/jni/TestNativeDataset.java b/java/dataset/src/test/java/org/apache/arrow/dataset/jni/TestNativeDataset.java new file mode 100644 index 00000000000..2a86a256883 --- /dev/null +++ b/java/dataset/src/test/java/org/apache/arrow/dataset/jni/TestNativeDataset.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.jni; + +import org.apache.arrow.dataset.TestDataset; +import org.apache.arrow.dataset.scanner.ScanOptions; +import org.apache.arrow.dataset.scanner.Scanner; +import org.apache.arrow.dataset.source.Dataset; +import org.apache.arrow.dataset.source.DatasetFactory; +import org.junit.Assert; + +public abstract class TestNativeDataset extends TestDataset { + protected void assertSingleTaskProduced(DatasetFactory factory, ScanOptions options) { + final Dataset dataset = factory.finish(); + final Scanner scanner = dataset.newScan(options); + Assert.assertEquals(1L, stream(scanner.scan()).count()); + } +} diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/jni/TestReservationListener.java b/java/dataset/src/test/java/org/apache/arrow/dataset/jni/TestReservationListener.java new file mode 100644 index 00000000000..a9d950590a2 --- /dev/null +++ b/java/dataset/src/test/java/org/apache/arrow/dataset/jni/TestReservationListener.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.jni; + +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.arrow.dataset.ParquetWriteSupport; +import org.apache.arrow.dataset.TestDataset; +import org.apache.arrow.dataset.file.FileFormat; +import org.apache.arrow.dataset.file.FileSystemDatasetFactory; +import org.apache.arrow.dataset.scanner.ScanOptions; +import org.apache.arrow.util.AutoCloseables; +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestReservationListener extends TestDataset { + + @ClassRule + public static final TemporaryFolder TMP = new TemporaryFolder(); + + public static final String AVRO_SCHEMA_USER = "user.avsc"; + + /** + * The default block size of C++ ReservationListenableMemoryPool. + */ + public static final long DEFAULT_NATIVE_MEMORY_POOL_BLOCK_SIZE = 512 * 1024; + + @Test + public void testDirectReservationListener() throws Exception { + ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a"); + NativeMemoryPool pool = NativeMemoryPool.createListenable(DirectReservationListener.instance()); + FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), + pool, FileFormat.PARQUET, + writeSupport.getOutputURI()); + ScanOptions options = new ScanOptions(new String[0], 100); + long initReservation = DirectReservationListener.instance().getCurrentDirectMemReservation(); + List datum = collectResultFromFactory(factory, options); + long reservation = DirectReservationListener.instance().getCurrentDirectMemReservation(); + AutoCloseables.close(datum); + AutoCloseables.close(pool); + long finalReservation = DirectReservationListener.instance().getCurrentDirectMemReservation(); + final long expected_diff = DEFAULT_NATIVE_MEMORY_POOL_BLOCK_SIZE; + Assert.assertEquals(expected_diff, reservation - initReservation); + Assert.assertEquals(-expected_diff, finalReservation - reservation); + } + + @Test + public void testCustomReservationListener() throws Exception { + ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a"); + final AtomicLong reserved = new AtomicLong(0L); + ReservationListener listener = new ReservationListener() { + @Override + public void reserve(long size) { + reserved.getAndAdd(size); + } + + @Override + public void unreserve(long size) { + reserved.getAndAdd(-size); + } + }; + NativeMemoryPool pool = NativeMemoryPool.createListenable(listener); + FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), + pool, FileFormat.PARQUET, writeSupport.getOutputURI()); + ScanOptions options = new ScanOptions(new String[0], 100); + long initReservation = reserved.get(); + List datum = collectResultFromFactory(factory, options); + long reservation = reserved.get(); + AutoCloseables.close(datum); + AutoCloseables.close(pool); + long finalReservation = reserved.get(); + final long expected_diff = DEFAULT_NATIVE_MEMORY_POOL_BLOCK_SIZE; + Assert.assertEquals(expected_diff, reservation - initReservation); + Assert.assertEquals(-expected_diff, finalReservation - reservation); + } +} diff --git a/java/dataset/src/test/java/org/apache/arrow/memory/TestNativeUnderlyingMemory.java b/java/dataset/src/test/java/org/apache/arrow/memory/TestNativeUnderlyingMemory.java new file mode 100644 index 00000000000..c81868e42b2 --- /dev/null +++ b/java/dataset/src/test/java/org/apache/arrow/memory/TestNativeUnderlyingMemory.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.memory; + +import static org.junit.Assert.*; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestNativeUnderlyingMemory { + + private RootAllocator allocator = null; + + @Before + public void setUp() { + allocator = new RootAllocator(Long.MAX_VALUE); + } + + @After + public void tearDown() { + allocator.close(); + } + + protected RootAllocator rootAllocator() { + return allocator; + } + + @Test + public void testReservation() { + final RootAllocator root = rootAllocator(); + + final int size = 512; + final AllocationManager am = new MockUnderlyingMemory(root, size); + final BufferLedger ledger = am.associate(root); + + assertEquals(size, root.getAllocatedMemory()); + + ledger.release(); + } + + @Test + public void testBufferTransfer() { + final RootAllocator root = rootAllocator(); + + ChildAllocator allocator1 = (ChildAllocator) root.newChildAllocator("allocator1", 0, Long.MAX_VALUE); + ChildAllocator allocator2 = (ChildAllocator) root.newChildAllocator("allocator2", 0, Long.MAX_VALUE); + assertEquals(0, allocator1.getAllocatedMemory()); + assertEquals(0, allocator2.getAllocatedMemory()); + + final int size = 512; + final AllocationManager am = new MockUnderlyingMemory(allocator1, size); + + final BufferLedger owningLedger = am.associate(allocator1); + assertEquals(size, owningLedger.getAccountedSize()); + assertEquals(size, owningLedger.getSize()); + assertEquals(size, allocator1.getAllocatedMemory()); + + final BufferLedger transferredLedger = am.associate(allocator2); + owningLedger.release(); // release previous owner + assertEquals(0, owningLedger.getAccountedSize()); + assertEquals(size, owningLedger.getSize()); + assertEquals(size, transferredLedger.getAccountedSize()); + assertEquals(size, transferredLedger.getSize()); + assertEquals(0, allocator1.getAllocatedMemory()); + assertEquals(size, allocator2.getAllocatedMemory()); + + transferredLedger.release(); + allocator1.close(); + allocator2.close(); + } + + /** + * A mock class of {@link NativeUnderlyingMemory} for unit testing about size-related operations. + */ + private static class MockUnderlyingMemory extends NativeUnderlyingMemory { + + /** + * Constructor. + */ + MockUnderlyingMemory(BaseAllocator accountingAllocator, int size) { + super(accountingAllocator, size, -1L, -1L); + } + + @Override + protected void release0() { + System.out.println("Underlying memory released. Size: " + getSize()); + } + + @Override + protected long memoryAddress() { + throw new UnsupportedOperationException(); + } + } +} diff --git a/java/dataset/src/test/resources/avroschema/user.avsc b/java/dataset/src/test/resources/avroschema/user.avsc new file mode 100644 index 00000000000..072b643912b --- /dev/null +++ b/java/dataset/src/test/resources/avroschema/user.avsc @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.dataset", + "type": "record", + "name": "User", + "fields": [ + {"name": "id", "type": ["int", "null"]}, + {"name": "name", "type": ["string", "null"]} + ] +} diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AllocationManager.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AllocationManager.java index 9c7cfa9d90d..5f8ab12446a 100644 --- a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AllocationManager.java +++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AllocationManager.java @@ -191,12 +191,12 @@ void release(final BufferLedger ledger) { public abstract long getSize(); /** - * Return the absolute memory address pointing to the fist byte of underling memory chunk. + * Return the absolute memory address pointing to the fist byte of underlying memory chunk. */ protected abstract long memoryAddress(); /** - * Release the underling memory chunk. + * Release the underlying memory chunk. */ protected abstract void release0(); diff --git a/java/pom.xml b/java/pom.xml index c8f79e77743..e4f0c783233 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -697,6 +697,7 @@ adapter/orc gandiva + dataset diff --git a/java/vector/src/main/java/org/apache/arrow/vector/util/SchemaUtility.java b/java/vector/src/main/java/org/apache/arrow/vector/util/SchemaUtility.java new file mode 100644 index 00000000000..f8167604c21 --- /dev/null +++ b/java/vector/src/main/java/org/apache/arrow/vector/util/SchemaUtility.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.vector.util; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.channels.Channels; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.ipc.ReadChannel; +import org.apache.arrow.vector.ipc.WriteChannel; +import org.apache.arrow.vector.ipc.message.MessageChannelReader; +import org.apache.arrow.vector.ipc.message.MessageResult; +import org.apache.arrow.vector.ipc.message.MessageSerializer; +import org.apache.arrow.vector.types.pojo.Schema; + +/** + * Schema utility class including serialization and deserialization. + */ +public class SchemaUtility { + private SchemaUtility() {} + + /** + * Deserialize Arrow schema from byte array. + */ + public static Schema deserialize(byte[] bytes, BufferAllocator allocator) throws IOException { + try (MessageChannelReader schemaReader = + new MessageChannelReader( + new ReadChannel( + new ByteArrayReadableSeekableByteChannel(bytes)), allocator)) { + + MessageResult result = schemaReader.readNext(); + if (result == null) { + throw new IOException("Unexpected end of input. Missing schema."); + } + return MessageSerializer.deserializeSchema(result.getMessage()); + } + } + + /** + * Serialize Arrow schema into byte array. + */ + public static byte[] serialize(Schema schema) throws IOException { + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + MessageSerializer.serialize(new WriteChannel(Channels.newChannel(out)), schema); + return out.toByteArray(); + } +} diff --git a/java/vector/src/test/java/org/apache/arrow/util/TestSchemaUtil.java b/java/vector/src/test/java/org/apache/arrow/util/TestSchemaUtil.java new file mode 100644 index 00000000000..cefff838232 --- /dev/null +++ b/java/vector/src/test/java/org/apache/arrow/util/TestSchemaUtil.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.util; + +import static java.util.Arrays.asList; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; + +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.arrow.vector.util.SchemaUtility; +import org.junit.Test; + +public class TestSchemaUtil { + + private static Field field(String name, boolean nullable, ArrowType type, Field... children) { + return new Field(name, new FieldType(nullable, type, null, null), asList(children)); + } + + @Test + public void testSerializationAndDeserialization() throws IOException { + Schema schema = new Schema(asList( + field("a", false, new ArrowType.Null()), + field("b", true, new ArrowType.Utf8()), + field("c", true, new ArrowType.Binary())) + ); + + byte[] serialized = SchemaUtility.serialize(schema); + Schema deserialized = SchemaUtility.deserialize(serialized, new RootAllocator(Long.MAX_VALUE)); + assertEquals(schema, deserialized); + } +}