diff --git a/.travis.yml b/.travis.yml index cdf787c831b..9cc2b86c05c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -120,6 +120,27 @@ matrix: - $TRAVIS_BUILD_DIR/ci/travis_before_script_c_glib.sh script: - $TRAVIS_BUILD_DIR/ci/travis_script_c_glib.sh + - compiler: gcc + language: cpp + os: linux + group: deprecated + before_script: + - export CC="gcc-4.9" + - export CXX="g++-4.9" + - $TRAVIS_BUILD_DIR/ci/travis_before_script_cpp.sh + script: + - $TRAVIS_BUILD_DIR/ci/travis_script_cpp.sh + - $TRAVIS_BUILD_DIR/ci/travis_script_plasma.sh + - compiler: clang + osx_image: xcode6.4 + os: osx + cache: + addons: + before_script: + - $TRAVIS_BUILD_DIR/ci/travis_before_script_cpp.sh + script: + - $TRAVIS_BUILD_DIR/ci/travis_script_cpp.sh + - $TRAVIS_BUILD_DIR/ci/travis_script_plasma.sh before_install: - ulimit -c unlimited -S diff --git a/ci/travis_script_manylinux.sh b/ci/travis_script_manylinux.sh index 4e6be62bd3e..844d5f719f1 100755 --- a/ci/travis_script_manylinux.sh +++ b/ci/travis_script_manylinux.sh @@ -18,4 +18,4 @@ set -ex pushd python/manylinux1 git clone ../../ arrow docker build -t arrow-base-x86_64 -f Dockerfile-x86_64 . -docker run --rm -e PYARROW_PARALLEL=3 -v $PWD:/io arrow-base-x86_64 /io/build_arrow.sh +docker run --shm-size=2g --rm -e PYARROW_PARALLEL=3 -v $PWD:/io arrow-base-x86_64 /io/build_arrow.sh diff --git a/ci/travis_script_plasma.sh b/ci/travis_script_plasma.sh new file mode 100755 index 00000000000..fa384ade89c --- /dev/null +++ b/ci/travis_script_plasma.sh @@ -0,0 +1,97 @@ +#!/usr/bin/env bash + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. See accompanying LICENSE file. + +set -e + +source $TRAVIS_BUILD_DIR/ci/travis_env_common.sh + +export ARROW_HOME=$ARROW_CPP_INSTALL +export PYARROW_WITH_PLASMA=1 + +pushd $ARROW_PYTHON_DIR + +function build_arrow_libraries() { + CPP_BUILD_DIR=$1 + CPP_DIR=$TRAVIS_BUILD_DIR/cpp + + mkdir $CPP_BUILD_DIR + pushd $CPP_BUILD_DIR + + cmake -DARROW_BUILD_TESTS=off \ + -DARROW_PYTHON=on \ + -DARROW_PLASMA=on \ + -DCMAKE_INSTALL_PREFIX=$2 \ + $CPP_DIR + + make -j4 + make install + + popd +} + +python_version_tests() { + PYTHON_VERSION=$1 + CONDA_ENV_DIR=$TRAVIS_BUILD_DIR/pyarrow-test-$PYTHON_VERSION + + export ARROW_HOME=$TRAVIS_BUILD_DIR/arrow-install-$PYTHON_VERSION + export LD_LIBRARY_PATH=$ARROW_HOME/lib:$PARQUET_HOME/lib + + conda create -y -q -p $CONDA_ENV_DIR python=$PYTHON_VERSION cmake curl + source activate $CONDA_ENV_DIR + + python --version + which python + + # faster builds, please + conda install -y -q nomkl + + # Expensive dependencies install from Continuum package repo + conda install -y -q pip numpy pandas cython + + # Build C++ libraries + build_arrow_libraries arrow-build-$PYTHON_VERSION $ARROW_HOME + + # Other stuff pip install + pip install -r requirements.txt + + python setup.py build_ext --inplace + + python -m pytest -vv -r sxX pyarrow + + # Build documentation once + if [[ "$PYTHON_VERSION" == "3.6" ]] + then + conda install -y -q --file=doc/requirements.txt + python setup.py build_sphinx -s doc/source + fi + + # Build and install pyarrow + pushd $TRAVIS_BUILD_DIR/python + python setup.py install + popd + + # Run Plasma tests + pushd $TRAVIS_BUILD_DIR/python + python -m pytest pyarrow/tests/test_plasma.py + if [ $TRAVIS_OS_NAME == "linux" ]; then + PLASMA_VALGRIND=1 python -m pytest pyarrow/tests/test_plasma.py + fi + popd +} + +# run tests for python 2.7 and 3.6 +python_version_tests 2.7 +python_version_tests 3.6 + +popd diff --git a/ci/travis_script_python.sh b/ci/travis_script_python.sh index ac64c548d82..fdb5ad6a62c 100755 --- a/ci/travis_script_python.sh +++ b/ci/travis_script_python.sh @@ -17,6 +17,7 @@ set -e source $TRAVIS_BUILD_DIR/ci/travis_env_common.sh export ARROW_HOME=$ARROW_CPP_INSTALL +export PYARROW_WITH_PLASMA=1 pushd $ARROW_PYTHON_DIR export PARQUET_HOME=$TRAVIS_BUILD_DIR/parquet-env @@ -71,9 +72,8 @@ function build_arrow_libraries() { pushd $CPP_BUILD_DIR cmake -DARROW_BUILD_TESTS=off \ - -DARROW_PYTHON=on \ - -DPLASMA_PYTHON=on \ -DARROW_PLASMA=on \ + -DARROW_PYTHON=on \ -DCMAKE_INSTALL_PREFIX=$2 \ $CPP_DIR diff --git a/cpp/src/arrow/util/logging.h b/cpp/src/arrow/util/logging.h index b6181219dba..0edaa9dfc37 100644 --- a/cpp/src/arrow/util/logging.h +++ b/cpp/src/arrow/util/logging.h @@ -113,8 +113,10 @@ class CerrLog { template CerrLog& operator<<(const T& t) { - has_logged_ = true; - std::cerr << t; + if (severity_ != ARROW_DEBUG) { + has_logged_ = true; + std::cerr << t; + } return *this; } diff --git a/cpp/src/plasma/CMakeLists.txt b/cpp/src/plasma/CMakeLists.txt index 4ff3beba779..8bb7e71fdf1 100644 --- a/cpp/src/plasma/CMakeLists.txt +++ b/cpp/src/plasma/CMakeLists.txt @@ -19,16 +19,13 @@ cmake_minimum_required(VERSION 2.8) project(plasma) +set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_SOURCE_DIR}/../python/cmake_modules") + find_package(PythonLibsNew REQUIRED) find_package(Threads) -option(PLASMA_PYTHON - "Build the Plasma Python extensions" - OFF) - -if(APPLE) - SET(CMAKE_SHARED_LIBRARY_SUFFIX ".so") -endif(APPLE) +set(PLASMA_SO_VERSION "0") +set(PLASMA_ABI_VERSION "${PLASMA_SO_VERSION}.0.0") include_directories(SYSTEM ${PYTHON_INCLUDE_DIRS}) include_directories("${FLATBUFFERS_INCLUDE_DIR}" "${CMAKE_CURRENT_LIST_DIR}/" "${CMAKE_CURRENT_LIST_DIR}/thirdparty/" "${CMAKE_CURRENT_LIST_DIR}/../") @@ -40,7 +37,7 @@ set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wno-conversion") # Compile flatbuffers set(PLASMA_FBS_SRC "${CMAKE_CURRENT_LIST_DIR}/format/plasma.fbs" "${CMAKE_CURRENT_LIST_DIR}/format/common.fbs") -set(OUTPUT_DIR ${CMAKE_CURRENT_LIST_DIR}/format/) +set(OUTPUT_DIR ${CMAKE_CURRENT_LIST_DIR}/) set(PLASMA_FBS_OUTPUT_FILES "${OUTPUT_DIR}/common_generated.h" @@ -69,8 +66,6 @@ endif() set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fPIC") -set_source_files_properties(extension.cc PROPERTIES COMPILE_FLAGS -Wno-strict-aliasing) - set(PLASMA_SRCS client.cc common.cc @@ -97,17 +92,33 @@ set_source_files_properties(malloc.cc PROPERTIES COMPILE_FLAGS "-Wno-error -O3") add_executable(plasma_store store.cc) target_link_libraries(plasma_store plasma_static) +# Headers: top level +install(FILES + common.h + common_generated.h + client.h + events.h + plasma.h + plasma_generated.h + protocol.h + DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/plasma") + +# Plasma store +install(TARGETS plasma_store DESTINATION ${CMAKE_INSTALL_BINDIR}) + +# pkg-config support +configure_file(plasma.pc.in + "${CMAKE_CURRENT_BINARY_DIR}/plasma.pc" + @ONLY) +install( + FILES "${CMAKE_CURRENT_BINARY_DIR}/plasma.pc" + DESTINATION "${CMAKE_INSTALL_LIBDIR}/pkgconfig/") + +####################################### +# Unit tests +####################################### + ADD_ARROW_TEST(test/serialization_tests) ARROW_TEST_LINK_LIBRARIES(test/serialization_tests plasma_static) ADD_ARROW_TEST(test/client_tests) ARROW_TEST_LINK_LIBRARIES(test/client_tests plasma_static) - -if(PLASMA_PYTHON) - add_library(plasma_extension SHARED extension.cc) - - if(APPLE) - target_link_libraries(plasma_extension plasma_static "-undefined dynamic_lookup") - else(APPLE) - target_link_libraries(plasma_extension plasma_static -Wl,--whole-archive ${FLATBUFFERS_STATIC_LIB} -Wl,--no-whole-archive) - endif(APPLE) -endif() diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc index dcb78e7ec52..62bfbec21c4 100644 --- a/cpp/src/plasma/client.cc +++ b/cpp/src/plasma/client.cc @@ -51,11 +51,31 @@ #define XXH64_DEFAULT_SEED 0 +namespace plasma { + // Number of threads used for memcopy and hash computations. constexpr int64_t kThreadPoolSize = 8; constexpr int64_t kBytesInMB = 1 << 20; static std::vector threadpool_(kThreadPoolSize); +struct ObjectInUseEntry { + /// A count of the number of times this client has called PlasmaClient::Create + /// or + /// PlasmaClient::Get on this object ID minus the number of calls to + /// PlasmaClient::Release. + /// When this count reaches zero, we remove the entry from the ObjectsInUse + /// and decrement a count in the relevant ClientMmapTableEntry. + int count; + /// Cached information to read the object. + PlasmaObject object; + /// A flag representing whether the object has been sealed. + bool is_sealed; +}; + +PlasmaClient::PlasmaClient() {} + +PlasmaClient::~PlasmaClient() {} + // If the file descriptor fd has been mmapped in this client process before, // return the pointer that was returned by mmap, otherwise mmap it and store the // pointer in a hash table. @@ -300,6 +320,10 @@ Status PlasmaClient::PerformRelease(const ObjectID& object_id) { } Status PlasmaClient::Release(const ObjectID& object_id) { + // If the client is already disconnected, ignore release requests. + if (store_conn_ < 0) { + return Status::OK(); + } // Add the new object to the release history. release_history_.push_front(object_id); // If there are too many bytes in use by the client or if there are too many @@ -386,22 +410,6 @@ static uint64_t compute_object_hash(const ObjectBuffer& obj_buffer) { return XXH64_digest(&hash_state); } -bool plasma_compute_object_hash( - PlasmaClient* conn, ObjectID object_id, unsigned char* digest) { - // Get the plasma object data. We pass in a timeout of 0 to indicate that - // the operation should timeout immediately. - ObjectBuffer object_buffer; - ARROW_CHECK_OK(conn->Get(&object_id, 1, 0, &object_buffer)); - // If the object was not retrieved, return false. - if (object_buffer.data_size == -1) { return false; } - // Compute the hash. - uint64_t hash = compute_object_hash(object_buffer); - memcpy(digest, &hash, sizeof(hash)); - // Release the plasma object. - ARROW_CHECK_OK(conn->Release(object_id)); - return true; -} - Status PlasmaClient::Seal(const ObjectID& object_id) { // Make sure this client has a reference to the object before sending the // request to Plasma. @@ -413,7 +421,7 @@ Status PlasmaClient::Seal(const ObjectID& object_id) { object_entry->second->is_sealed = true; /// Send the seal request to Plasma. static unsigned char digest[kDigestSize]; - ARROW_CHECK(plasma_compute_object_hash(this, object_id, &digest[0])); + RETURN_NOT_OK(Hash(object_id, &digest[0])); RETURN_NOT_OK(SendSealRequest(store_conn_, object_id, &digest[0])); // We call PlasmaClient::Release to decrement the number of instances of this // object @@ -439,6 +447,22 @@ Status PlasmaClient::Evict(int64_t num_bytes, int64_t& num_bytes_evicted) { return ReadEvictReply(buffer.data(), num_bytes_evicted); } +Status PlasmaClient::Hash(const ObjectID& object_id, uint8_t* digest) { + // Get the plasma object data. We pass in a timeout of 0 to indicate that + // the operation should timeout immediately. + ObjectBuffer object_buffer; + RETURN_NOT_OK(Get(&object_id, 1, 0, &object_buffer)); + // If the object was not retrieved, return false. + if (object_buffer.data_size == -1) { + return Status::PlasmaObjectNonexistent("Object not found"); + } + // Compute the hash. + uint64_t hash = compute_object_hash(object_buffer); + memcpy(digest, &hash, sizeof(hash)); + // Release the plasma object. + return Release(object_id); +} + Status PlasmaClient::Subscribe(int* fd) { int sock[2]; // Create a non-blocking socket pair. This will only be used to send @@ -459,6 +483,26 @@ Status PlasmaClient::Subscribe(int* fd) { return Status::OK(); } +Status PlasmaClient::GetNotification( + int fd, ObjectID* object_id, int64_t* data_size, int64_t* metadata_size) { + uint8_t* notification = read_message_async(fd); + if (notification == NULL) { + return Status::IOError("Failed to read object notification from Plasma socket"); + } + auto object_info = flatbuffers::GetRoot(notification); + ARROW_CHECK(object_info->object_id()->size() == sizeof(ObjectID)); + memcpy(object_id, object_info->object_id()->data(), sizeof(ObjectID)); + if (object_info->is_deletion()) { + *data_size = -1; + *metadata_size = -1; + } else { + *data_size = object_info->data_size(); + *metadata_size = object_info->metadata_size(); + } + delete[] notification; + return Status::OK(); +} + Status PlasmaClient::Connect(const std::string& store_socket_name, const std::string& manager_socket_name, int release_delay) { store_conn_ = connect_ipc_sock_retry(store_socket_name, -1, -1); @@ -485,7 +529,11 @@ Status PlasmaClient::Disconnect() { // Close the connections to Plasma. The Plasma store will release the objects // that were in use by us when handling the SIGPIPE. close(store_conn_); - if (manager_conn_ >= 0) { close(manager_conn_); } + store_conn_ = -1; + if (manager_conn_ >= 0) { + close(manager_conn_); + manager_conn_ = -1; + } return Status::OK(); } @@ -555,3 +603,5 @@ Status PlasmaClient::Wait(int64_t num_object_requests, ObjectRequest* object_req } return Status::OK(); } + +} // namespace plasma diff --git a/cpp/src/plasma/client.h b/cpp/src/plasma/client.h index fb3a161795d..d9ed9f7c266 100644 --- a/cpp/src/plasma/client.h +++ b/cpp/src/plasma/client.h @@ -22,12 +22,18 @@ #include #include +#include #include +#include -#include "plasma/plasma.h" +#include "arrow/status.h" +#include "arrow/util/visibility.h" +#include "plasma/common.h" using arrow::Status; +namespace plasma { + #define PLASMA_DEFAULT_RELEASE_DELAY 64 // Use 100MB as an overestimate of the L3 cache size. @@ -63,22 +69,16 @@ struct ClientMmapTableEntry { int count; }; -struct ObjectInUseEntry { - /// A count of the number of times this client has called PlasmaClient::Create - /// or - /// PlasmaClient::Get on this object ID minus the number of calls to - /// PlasmaClient::Release. - /// When this count reaches zero, we remove the entry from the ObjectsInUse - /// and decrement a count in the relevant ClientMmapTableEntry. - int count; - /// Cached information to read the object. - PlasmaObject object; - /// A flag representing whether the object has been sealed. - bool is_sealed; -}; +struct ObjectInUseEntry; +struct ObjectRequest; +struct PlasmaObject; -class PlasmaClient { +class ARROW_EXPORT PlasmaClient { public: + PlasmaClient(); + + ~PlasmaClient(); + /// Connect to the local plasma store and plasma manager. Return /// the resulting connection. /// @@ -177,10 +177,18 @@ class PlasmaClient { /// @return The return status. Status Evict(int64_t num_bytes, int64_t& num_bytes_evicted); + /// Compute the hash of an object in the object store. + /// + /// @param conn The object containing the connection state. + /// @param object_id The ID of the object we want to hash. + /// @param digest A pointer at which to return the hash digest of the object. + /// The pointer must have at least kDigestSize bytes allocated. + /// @return The return status. + Status Hash(const ObjectID& object_id, uint8_t* digest); + /// Subscribe to notifications when objects are sealed in the object store. /// Whenever an object is sealed, a message will be written to the client - /// socket - /// that is returned by this method. + /// socket that is returned by this method. /// /// @param fd Out parameter for the file descriptor the client should use to /// read notifications @@ -188,6 +196,16 @@ class PlasmaClient { /// @return The return status. Status Subscribe(int* fd); + /// Receive next object notification for this client if Subscribe has been called. + /// + /// @param fd The file descriptor we are reading the notification from. + /// @param object_id Out parameter, the object_id of the object that was sealed. + /// @param data_size Out parameter, the data size of the object that was sealed. + /// @param metadata_size Out parameter, the metadata size of the object that was sealed. + /// @return The return status. + Status GetNotification( + int fd, ObjectID* object_id, int64_t* data_size, int64_t* metadata_size); + /// Disconnect from the local plasma instance, including the local store and /// manager. /// @@ -330,14 +348,6 @@ class PlasmaClient { int64_t store_capacity_; }; -/// Compute the hash of an object in the object store. -/// -/// @param conn The object containing the connection state. -/// @param object_id The ID of the object we want to hash. -/// @param digest A pointer at which to return the hash digest of the object. -/// The pointer must have at least DIGEST_SIZE bytes allocated. -/// @return A boolean representing whether the hash operation succeeded. -bool plasma_compute_object_hash( - PlasmaClient* conn, ObjectID object_id, unsigned char* digest); +} // namespace plasma #endif // PLASMA_CLIENT_H diff --git a/cpp/src/plasma/common.cc b/cpp/src/plasma/common.cc index a09a963fa47..a5f530e202f 100644 --- a/cpp/src/plasma/common.cc +++ b/cpp/src/plasma/common.cc @@ -19,7 +19,9 @@ #include -#include "format/plasma_generated.h" +#include "plasma/plasma_generated.h" + +namespace plasma { using arrow::Status; @@ -81,3 +83,8 @@ Status plasma_error_status(int plasma_error) { } return Status::OK(); } + +ARROW_EXPORT int ObjectStatusLocal = ObjectStatus_Local; +ARROW_EXPORT int ObjectStatusRemote = ObjectStatus_Remote; + +} // namespace plasma diff --git a/cpp/src/plasma/common.h b/cpp/src/plasma/common.h index 85dc74bf86e..6f2d4dd841b 100644 --- a/cpp/src/plasma/common.h +++ b/cpp/src/plasma/common.h @@ -29,9 +29,11 @@ #include "arrow/status.h" #include "arrow/util/logging.h" +namespace plasma { + constexpr int64_t kUniqueIDSize = 20; -class UniqueID { +class ARROW_EXPORT UniqueID { public: static UniqueID from_random(); static UniqueID from_binary(const std::string& binary); @@ -60,4 +62,39 @@ typedef UniqueID ObjectID; arrow::Status plasma_error_status(int plasma_error); +/// Size of object hash digests. +constexpr int64_t kDigestSize = sizeof(uint64_t); + +/// Object request data structure. Used for Wait. +struct ObjectRequest { + /// The ID of the requested object. If ID_NIL request any object. + ObjectID object_id; + /// Request associated to the object. It can take one of the following values: + /// - PLASMA_QUERY_LOCAL: return if or when the object is available in the + /// local Plasma Store. + /// - PLASMA_QUERY_ANYWHERE: return if or when the object is available in + /// the system (i.e., either in the local or a remote Plasma Store). + int type; + /// Object status. Same as the status returned by plasma_status() function + /// call. This is filled in by plasma_wait_for_objects1(): + /// - ObjectStatus_Local: object is ready at the local Plasma Store. + /// - ObjectStatus_Remote: object is ready at a remote Plasma Store. + /// - ObjectStatus_Nonexistent: object does not exist in the system. + /// - PLASMA_CLIENT_IN_TRANSFER, if the object is currently being scheduled + /// for being transferred or it is transferring. + int status; +}; + +enum ObjectRequestType { + /// Query for object in the local plasma store. + PLASMA_QUERY_LOCAL = 1, + /// Query for object in the local plasma store or in a remote plasma store. + PLASMA_QUERY_ANYWHERE +}; + +extern int ObjectStatusLocal; +extern int ObjectStatusRemote; + +} // namespace plasma + #endif // PLASMA_COMMON_H diff --git a/cpp/src/plasma/events.cc b/cpp/src/plasma/events.cc index a9f7356e1f6..675424d5c2f 100644 --- a/cpp/src/plasma/events.cc +++ b/cpp/src/plasma/events.cc @@ -19,6 +19,8 @@ #include +namespace plasma { + void EventLoop::file_event_callback( aeEventLoop* loop, int fd, void* context, int events) { FileCallback* callback = reinterpret_cast(context); @@ -79,3 +81,5 @@ int EventLoop::remove_timer(int64_t timer_id) { timer_callbacks_.erase(timer_id); return err; } + +} // namespace plasma diff --git a/cpp/src/plasma/events.h b/cpp/src/plasma/events.h index bd93d6bb2a6..b989b7fac24 100644 --- a/cpp/src/plasma/events.h +++ b/cpp/src/plasma/events.h @@ -26,6 +26,8 @@ extern "C" { #include "ae/ae.h" } +namespace plasma { + /// Constant specifying that the timer is done and it will be removed. constexpr int kEventLoopTimerDone = AE_NOMORE; @@ -96,4 +98,6 @@ class EventLoop { std::unordered_map> timer_callbacks_; }; +} // namespace plasma + #endif // PLASMA_EVENTS diff --git a/cpp/src/plasma/eviction_policy.cc b/cpp/src/plasma/eviction_policy.cc index 4ae6384d425..ef18e333729 100644 --- a/cpp/src/plasma/eviction_policy.cc +++ b/cpp/src/plasma/eviction_policy.cc @@ -19,6 +19,8 @@ #include +namespace plasma { + void LRUCache::add(const ObjectID& key, int64_t size) { auto it = item_map_.find(key); ARROW_CHECK(it == item_map_.end()); @@ -105,3 +107,5 @@ void EvictionPolicy::end_object_access( /* Add the object to the LRU cache.*/ cache_.add(object_id, entry->info.data_size + entry->info.metadata_size); } + +} // namespace plasma diff --git a/cpp/src/plasma/eviction_policy.h b/cpp/src/plasma/eviction_policy.h index 3815fc6652f..c4f21832831 100644 --- a/cpp/src/plasma/eviction_policy.h +++ b/cpp/src/plasma/eviction_policy.h @@ -26,6 +26,8 @@ #include "plasma/common.h" #include "plasma/plasma.h" +namespace plasma { + // ==== The eviction policy ==== // // This file contains declaration for all functions and data structures that @@ -131,4 +133,6 @@ class EvictionPolicy { LRUCache cache_; }; +} // namespace plasma + #endif // PLASMA_EVICTION_POLICY_H diff --git a/cpp/src/plasma/extension.cc b/cpp/src/plasma/extension.cc deleted file mode 100644 index 5d61e337c10..00000000000 --- a/cpp/src/plasma/extension.cc +++ /dev/null @@ -1,456 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "plasma/extension.h" - -#include -#include - -#include "plasma/client.h" -#include "plasma/common.h" -#include "plasma/io.h" -#include "plasma/protocol.h" - -PyObject* PlasmaOutOfMemoryError; -PyObject* PlasmaObjectExistsError; - -PyObject* PyPlasma_connect(PyObject* self, PyObject* args) { - const char* store_socket_name; - const char* manager_socket_name; - int release_delay; - if (!PyArg_ParseTuple( - args, "ssi", &store_socket_name, &manager_socket_name, &release_delay)) { - return NULL; - } - PlasmaClient* client = new PlasmaClient(); - ARROW_CHECK_OK(client->Connect(store_socket_name, manager_socket_name, release_delay)); - - return PyCapsule_New(client, "plasma", NULL); -} - -PyObject* PyPlasma_disconnect(PyObject* self, PyObject* args) { - PyObject* client_capsule; - if (!PyArg_ParseTuple(args, "O", &client_capsule)) { return NULL; } - PlasmaClient* client; - ARROW_CHECK(PyObjectToPlasmaClient(client_capsule, &client)); - ARROW_CHECK_OK(client->Disconnect()); - /* We use the context of the connection capsule to indicate if the connection - * is still active (if the context is NULL) or if it is closed (if the context - * is (void*) 0x1). This is neccessary because the primary pointer of the - * capsule cannot be NULL. */ - PyCapsule_SetContext(client_capsule, reinterpret_cast(0x1)); - Py_RETURN_NONE; -} - -PyObject* PyPlasma_create(PyObject* self, PyObject* args) { - PlasmaClient* client; - ObjectID object_id; - Py_ssize_t size; - PyObject* metadata; - if (!PyArg_ParseTuple(args, "O&O&nO", PyObjectToPlasmaClient, &client, - PyStringToUniqueID, &object_id, &size, &metadata)) { - return NULL; - } - if (!PyByteArray_Check(metadata)) { - PyErr_SetString(PyExc_TypeError, "metadata must be a bytearray"); - return NULL; - } - uint8_t* data; - Status s = client->Create(object_id, size, - reinterpret_cast(PyByteArray_AsString(metadata)), - PyByteArray_Size(metadata), &data); - if (s.IsPlasmaObjectExists()) { - PyErr_SetString(PlasmaObjectExistsError, - "An object with this ID already exists in the plasma " - "store."); - return NULL; - } - if (s.IsPlasmaStoreFull()) { - PyErr_SetString(PlasmaOutOfMemoryError, - "The plasma store ran out of memory and could not create " - "this object."); - return NULL; - } - ARROW_CHECK(s.ok()); - -#if PY_MAJOR_VERSION >= 3 - return PyMemoryView_FromMemory(reinterpret_cast(data), size, PyBUF_WRITE); -#else - return PyBuffer_FromReadWriteMemory(reinterpret_cast(data), size); -#endif -} - -PyObject* PyPlasma_hash(PyObject* self, PyObject* args) { - PlasmaClient* client; - ObjectID object_id; - if (!PyArg_ParseTuple(args, "O&O&", PyObjectToPlasmaClient, &client, PyStringToUniqueID, - &object_id)) { - return NULL; - } - unsigned char digest[kDigestSize]; - bool success = plasma_compute_object_hash(client, object_id, digest); - if (success) { - PyObject* digest_string = - PyBytes_FromStringAndSize(reinterpret_cast(digest), kDigestSize); - return digest_string; - } else { - Py_RETURN_NONE; - } -} - -PyObject* PyPlasma_seal(PyObject* self, PyObject* args) { - PlasmaClient* client; - ObjectID object_id; - if (!PyArg_ParseTuple(args, "O&O&", PyObjectToPlasmaClient, &client, PyStringToUniqueID, - &object_id)) { - return NULL; - } - ARROW_CHECK_OK(client->Seal(object_id)); - Py_RETURN_NONE; -} - -PyObject* PyPlasma_release(PyObject* self, PyObject* args) { - PlasmaClient* client; - ObjectID object_id; - if (!PyArg_ParseTuple(args, "O&O&", PyObjectToPlasmaClient, &client, PyStringToUniqueID, - &object_id)) { - return NULL; - } - ARROW_CHECK_OK(client->Release(object_id)); - Py_RETURN_NONE; -} - -PyObject* PyPlasma_get(PyObject* self, PyObject* args) { - PlasmaClient* client; - PyObject* object_id_list; - Py_ssize_t timeout_ms; - if (!PyArg_ParseTuple( - args, "O&On", PyObjectToPlasmaClient, &client, &object_id_list, &timeout_ms)) { - return NULL; - } - - Py_ssize_t num_object_ids = PyList_Size(object_id_list); - std::vector object_ids(num_object_ids); - std::vector object_buffers(num_object_ids); - - for (int i = 0; i < num_object_ids; ++i) { - PyStringToUniqueID(PyList_GetItem(object_id_list, i), &object_ids[i]); - } - - Py_BEGIN_ALLOW_THREADS; - ARROW_CHECK_OK( - client->Get(object_ids.data(), num_object_ids, timeout_ms, object_buffers.data())); - Py_END_ALLOW_THREADS; - - PyObject* returns = PyList_New(num_object_ids); - for (int i = 0; i < num_object_ids; ++i) { - if (object_buffers[i].data_size != -1) { - /* The object was retrieved, so return the object. */ - PyObject* t = PyTuple_New(2); - Py_ssize_t data_size = static_cast(object_buffers[i].data_size); - Py_ssize_t metadata_size = static_cast(object_buffers[i].metadata_size); -#if PY_MAJOR_VERSION >= 3 - char* data = reinterpret_cast(object_buffers[i].data); - char* metadata = reinterpret_cast(object_buffers[i].metadata); - PyTuple_SET_ITEM(t, 0, PyMemoryView_FromMemory(data, data_size, PyBUF_READ)); - PyTuple_SET_ITEM( - t, 1, PyMemoryView_FromMemory(metadata, metadata_size, PyBUF_READ)); -#else - void* data = reinterpret_cast(object_buffers[i].data); - void* metadata = reinterpret_cast(object_buffers[i].metadata); - PyTuple_SET_ITEM(t, 0, PyBuffer_FromMemory(data, data_size)); - PyTuple_SET_ITEM(t, 1, PyBuffer_FromMemory(metadata, metadata_size)); -#endif - ARROW_CHECK(PyList_SetItem(returns, i, t) == 0); - } else { - /* The object was not retrieved, so just add None to the list of return - * values. */ - Py_INCREF(Py_None); - ARROW_CHECK(PyList_SetItem(returns, i, Py_None) == 0); - } - } - return returns; -} - -PyObject* PyPlasma_contains(PyObject* self, PyObject* args) { - PlasmaClient* client; - ObjectID object_id; - if (!PyArg_ParseTuple(args, "O&O&", PyObjectToPlasmaClient, &client, PyStringToUniqueID, - &object_id)) { - return NULL; - } - bool has_object; - ARROW_CHECK_OK(client->Contains(object_id, &has_object)); - - if (has_object) { - Py_RETURN_TRUE; - } else { - Py_RETURN_FALSE; - } -} - -PyObject* PyPlasma_fetch(PyObject* self, PyObject* args) { - PlasmaClient* client; - PyObject* object_id_list; - if (!PyArg_ParseTuple(args, "O&O", PyObjectToPlasmaClient, &client, &object_id_list)) { - return NULL; - } - if (client->get_manager_fd() == -1) { - PyErr_SetString(PyExc_RuntimeError, "Not connected to the plasma manager"); - return NULL; - } - Py_ssize_t n = PyList_Size(object_id_list); - ObjectID* object_ids = new ObjectID[n]; - for (int i = 0; i < n; ++i) { - PyStringToUniqueID(PyList_GetItem(object_id_list, i), &object_ids[i]); - } - ARROW_CHECK_OK(client->Fetch(static_cast(n), object_ids)); - delete[] object_ids; - Py_RETURN_NONE; -} - -PyObject* PyPlasma_wait(PyObject* self, PyObject* args) { - PlasmaClient* client; - PyObject* object_id_list; - Py_ssize_t timeout; - int num_returns; - if (!PyArg_ParseTuple(args, "O&Oni", PyObjectToPlasmaClient, &client, &object_id_list, - &timeout, &num_returns)) { - return NULL; - } - Py_ssize_t n = PyList_Size(object_id_list); - - if (client->get_manager_fd() == -1) { - PyErr_SetString(PyExc_RuntimeError, "Not connected to the plasma manager"); - return NULL; - } - if (num_returns < 0) { - PyErr_SetString( - PyExc_RuntimeError, "The argument num_returns cannot be less than zero."); - return NULL; - } - if (num_returns > n) { - PyErr_SetString(PyExc_RuntimeError, - "The argument num_returns cannot be greater than len(object_ids)"); - return NULL; - } - int64_t threshold = 1 << 30; - if (timeout > threshold) { - PyErr_SetString( - PyExc_RuntimeError, "The argument timeout cannot be greater than 2 ** 30."); - return NULL; - } - - std::vector object_requests(n); - for (int i = 0; i < n; ++i) { - ARROW_CHECK(PyStringToUniqueID(PyList_GetItem(object_id_list, i), - &object_requests[i].object_id) == 1); - object_requests[i].type = PLASMA_QUERY_ANYWHERE; - } - /* Drop the global interpreter lock while we are waiting, so other threads can - * run. */ - int num_return_objects; - Py_BEGIN_ALLOW_THREADS; - ARROW_CHECK_OK( - client->Wait(n, object_requests.data(), num_returns, timeout, &num_return_objects)); - Py_END_ALLOW_THREADS; - - int num_to_return = std::min(num_return_objects, num_returns); - PyObject* ready_ids = PyList_New(num_to_return); - PyObject* waiting_ids = PySet_New(object_id_list); - int num_returned = 0; - for (int i = 0; i < n; ++i) { - if (num_returned == num_to_return) { break; } - if (object_requests[i].status == ObjectStatus_Local || - object_requests[i].status == ObjectStatus_Remote) { - PyObject* ready = PyBytes_FromStringAndSize( - reinterpret_cast(&object_requests[i].object_id), - sizeof(object_requests[i].object_id)); - PyList_SetItem(ready_ids, num_returned, ready); - PySet_Discard(waiting_ids, ready); - num_returned += 1; - } else { - ARROW_CHECK(object_requests[i].status == ObjectStatus_Nonexistent); - } - } - ARROW_CHECK(num_returned == num_to_return); - /* Return both the ready IDs and the remaining IDs. */ - PyObject* t = PyTuple_New(2); - PyTuple_SetItem(t, 0, ready_ids); - PyTuple_SetItem(t, 1, waiting_ids); - return t; -} - -PyObject* PyPlasma_evict(PyObject* self, PyObject* args) { - PlasmaClient* client; - Py_ssize_t num_bytes; - if (!PyArg_ParseTuple(args, "O&n", PyObjectToPlasmaClient, &client, &num_bytes)) { - return NULL; - } - int64_t evicted_bytes; - ARROW_CHECK_OK(client->Evict(static_cast(num_bytes), evicted_bytes)); - return PyLong_FromSsize_t(static_cast(evicted_bytes)); -} - -PyObject* PyPlasma_delete(PyObject* self, PyObject* args) { - PlasmaClient* client; - ObjectID object_id; - if (!PyArg_ParseTuple(args, "O&O&", PyObjectToPlasmaClient, &client, PyStringToUniqueID, - &object_id)) { - return NULL; - } - ARROW_CHECK_OK(client->Delete(object_id)); - Py_RETURN_NONE; -} - -PyObject* PyPlasma_transfer(PyObject* self, PyObject* args) { - PlasmaClient* client; - ObjectID object_id; - const char* addr; - int port; - if (!PyArg_ParseTuple(args, "O&O&si", PyObjectToPlasmaClient, &client, - PyStringToUniqueID, &object_id, &addr, &port)) { - return NULL; - } - - if (client->get_manager_fd() == -1) { - PyErr_SetString(PyExc_RuntimeError, "Not connected to the plasma manager"); - return NULL; - } - - ARROW_CHECK_OK(client->Transfer(addr, port, object_id)); - Py_RETURN_NONE; -} - -PyObject* PyPlasma_subscribe(PyObject* self, PyObject* args) { - PlasmaClient* client; - if (!PyArg_ParseTuple(args, "O&", PyObjectToPlasmaClient, &client)) { return NULL; } - - int sock; - ARROW_CHECK_OK(client->Subscribe(&sock)); - return PyLong_FromLong(sock); -} - -PyObject* PyPlasma_receive_notification(PyObject* self, PyObject* args) { - int plasma_sock; - - if (!PyArg_ParseTuple(args, "i", &plasma_sock)) { return NULL; } - /* Receive object notification from the plasma connection socket. If the - * object was added, return a tuple of its fields: ObjectID, data_size, - * metadata_size. If the object was deleted, data_size and metadata_size will - * be set to -1. */ - uint8_t* notification = read_message_async(plasma_sock); - if (notification == NULL) { - PyErr_SetString( - PyExc_RuntimeError, "Failed to read object notification from Plasma socket"); - return NULL; - } - auto object_info = flatbuffers::GetRoot(notification); - /* Construct a tuple from object_info and return. */ - PyObject* t = PyTuple_New(3); - PyTuple_SetItem(t, 0, PyBytes_FromStringAndSize(object_info->object_id()->data(), - object_info->object_id()->size())); - if (object_info->is_deletion()) { - PyTuple_SetItem(t, 1, PyLong_FromLong(-1)); - PyTuple_SetItem(t, 2, PyLong_FromLong(-1)); - } else { - PyTuple_SetItem(t, 1, PyLong_FromLong(object_info->data_size())); - PyTuple_SetItem(t, 2, PyLong_FromLong(object_info->metadata_size())); - } - - delete[] notification; - return t; -} - -static PyMethodDef plasma_methods[] = { - {"connect", PyPlasma_connect, METH_VARARGS, "Connect to plasma."}, - {"disconnect", PyPlasma_disconnect, METH_VARARGS, "Disconnect from plasma."}, - {"create", PyPlasma_create, METH_VARARGS, "Create a new plasma object."}, - {"hash", PyPlasma_hash, METH_VARARGS, "Compute the hash of a plasma object."}, - {"seal", PyPlasma_seal, METH_VARARGS, "Seal a plasma object."}, - {"get", PyPlasma_get, METH_VARARGS, "Get a plasma object."}, - {"contains", PyPlasma_contains, METH_VARARGS, - "Does the plasma store contain this plasma object?"}, - {"fetch", PyPlasma_fetch, METH_VARARGS, - "Fetch the object from another plasma manager instance."}, - {"wait", PyPlasma_wait, METH_VARARGS, - "Wait until num_returns objects in object_ids are ready."}, - {"evict", PyPlasma_evict, METH_VARARGS, - "Evict some objects until we recover some number of bytes."}, - {"release", PyPlasma_release, METH_VARARGS, "Release the plasma object."}, - {"delete", PyPlasma_delete, METH_VARARGS, "Delete a plasma object."}, - {"transfer", PyPlasma_transfer, METH_VARARGS, - "Transfer object to another plasma manager."}, - {"subscribe", PyPlasma_subscribe, METH_VARARGS, - "Subscribe to the plasma notification socket."}, - {"receive_notification", PyPlasma_receive_notification, METH_VARARGS, - "Receive next notification from plasma notification socket."}, - {NULL} /* Sentinel */ -}; - -#if PY_MAJOR_VERSION >= 3 -static struct PyModuleDef moduledef = { - PyModuleDef_HEAD_INIT, "libplasma", /* m_name */ - "A Python client library for plasma.", /* m_doc */ - 0, /* m_size */ - plasma_methods, /* m_methods */ - NULL, /* m_reload */ - NULL, /* m_traverse */ - NULL, /* m_clear */ - NULL, /* m_free */ -}; -#endif - -#if PY_MAJOR_VERSION >= 3 -#define INITERROR return NULL -#else -#define INITERROR return -#endif - -#ifndef PyMODINIT_FUNC /* declarations for DLL import/export */ -#define PyMODINIT_FUNC void -#endif - -#if PY_MAJOR_VERSION >= 3 -#define MOD_INIT(name) PyMODINIT_FUNC PyInit_##name(void) -#else -#define MOD_INIT(name) PyMODINIT_FUNC init##name(void) -#endif - -MOD_INIT(libplasma) { -#if PY_MAJOR_VERSION >= 3 - PyObject* m = PyModule_Create(&moduledef); -#else - PyObject* m = - Py_InitModule3("libplasma", plasma_methods, "A Python client library for plasma."); -#endif - - /* Create a custom exception for when an object ID is reused. */ - char plasma_object_exists_error[] = "plasma_object_exists.error"; - PlasmaObjectExistsError = PyErr_NewException(plasma_object_exists_error, NULL, NULL); - Py_INCREF(PlasmaObjectExistsError); - PyModule_AddObject(m, "plasma_object_exists_error", PlasmaObjectExistsError); - /* Create a custom exception for when the plasma store is out of memory. */ - char plasma_out_of_memory_error[] = "plasma_out_of_memory.error"; - PlasmaOutOfMemoryError = PyErr_NewException(plasma_out_of_memory_error, NULL, NULL); - Py_INCREF(PlasmaOutOfMemoryError); - PyModule_AddObject(m, "plasma_out_of_memory_error", PlasmaOutOfMemoryError); - -#if PY_MAJOR_VERSION >= 3 - return m; -#endif -} diff --git a/cpp/src/plasma/extension.h b/cpp/src/plasma/extension.h deleted file mode 100644 index cee30abb359..00000000000 --- a/cpp/src/plasma/extension.h +++ /dev/null @@ -1,50 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef PLASMA_EXTENSION_H -#define PLASMA_EXTENSION_H - -#undef _XOPEN_SOURCE -#undef _POSIX_C_SOURCE -#include - -#include "bytesobject.h" // NOLINT - -#include "plasma/client.h" -#include "plasma/common.h" - -static int PyObjectToPlasmaClient(PyObject* object, PlasmaClient** client) { - if (PyCapsule_IsValid(object, "plasma")) { - *client = reinterpret_cast(PyCapsule_GetPointer(object, "plasma")); - return 1; - } else { - PyErr_SetString(PyExc_TypeError, "must be a 'plasma' capsule"); - return 0; - } -} - -int PyStringToUniqueID(PyObject* object, ObjectID* object_id) { - if (PyBytes_Check(object)) { - memcpy(object_id, PyBytes_AsString(object), sizeof(ObjectID)); - return 1; - } else { - PyErr_SetString(PyExc_TypeError, "must be a 20 character string"); - return 0; - } -} - -#endif // PLASMA_EXTENSION_H diff --git a/cpp/src/plasma/plasma.cc b/cpp/src/plasma/plasma.cc index 559d8e7f2a6..bfed5009b61 100644 --- a/cpp/src/plasma/plasma.cc +++ b/cpp/src/plasma/plasma.cc @@ -24,6 +24,8 @@ #include "plasma/common.h" #include "plasma/protocol.h" +namespace plasma { + int warn_if_sigpipe(int status, int client_sock) { if (status >= 0) { return 0; } if (errno == EPIPE || errno == EBADF || errno == ECONNRESET) { @@ -62,3 +64,5 @@ ObjectTableEntry* get_object_table_entry( if (it == store_info->objects.end()) { return NULL; } return it->second.get(); } + +} // namespace plasma diff --git a/cpp/src/plasma/plasma.h b/cpp/src/plasma/plasma.h index 275d0c7a416..db8669ff0dd 100644 --- a/cpp/src/plasma/plasma.h +++ b/cpp/src/plasma/plasma.h @@ -32,8 +32,10 @@ #include "arrow/status.h" #include "arrow/util/logging.h" -#include "format/common_generated.h" #include "plasma/common.h" +#include "plasma/common_generated.h" + +namespace plasma { #define HANDLE_SIGPIPE(s, fd_) \ do { \ @@ -54,47 +56,23 @@ /// Allocation granularity used in plasma for object allocation. #define BLOCK_SIZE 64 -/// Size of object hash digests. -constexpr int64_t kDigestSize = sizeof(uint64_t); - struct Client; -/// Object request data structure. Used in the plasma_wait_for_objects() -/// argument. -typedef struct { - /// The ID of the requested object. If ID_NIL request any object. - ObjectID object_id; - /// Request associated to the object. It can take one of the following values: - /// - PLASMA_QUERY_LOCAL: return if or when the object is available in the - /// local Plasma Store. - /// - PLASMA_QUERY_ANYWHERE: return if or when the object is available in - /// the system (i.e., either in the local or a remote Plasma Store). - int type; - /// Object status. Same as the status returned by plasma_status() function - /// call. This is filled in by plasma_wait_for_objects1(): - /// - ObjectStatus_Local: object is ready at the local Plasma Store. - /// - ObjectStatus_Remote: object is ready at a remote Plasma Store. - /// - ObjectStatus_Nonexistent: object does not exist in the system. - /// - PLASMA_CLIENT_IN_TRANSFER, if the object is currently being scheduled - /// for being transferred or it is transferring. - int status; -} ObjectRequest; - /// Mapping from object IDs to type and status of the request. typedef std::unordered_map ObjectRequestMap; /// Handle to access memory mapped file and map it into client address space. -typedef struct { +struct object_handle { /// The file descriptor of the memory mapped file in the store. It is used as /// a unique identifier of the file in the client to look up the corresponding /// file descriptor on the client's side. int store_fd; /// The size in bytes of the memory mapped file. int64_t mmap_size; -} object_handle; +}; // TODO(pcm): Replace this by the flatbuffers message PlasmaObjectSpec. -typedef struct { +struct PlasmaObject { /// Handle for memory mapped file the object is stored in. object_handle handle; /// The offset in bytes in the memory mapped file of the data. @@ -105,28 +83,21 @@ typedef struct { int64_t data_size; /// The size in bytes of the metadata. int64_t metadata_size; -} PlasmaObject; +}; -typedef enum { +enum object_state { /// Object was created but not sealed in the local Plasma Store. PLASMA_CREATED = 1, /// Object is sealed and stored in the local Plasma Store. PLASMA_SEALED -} object_state; +}; -typedef enum { +enum object_status { /// The object was not found. OBJECT_NOT_FOUND = 0, /// The object was found. OBJECT_FOUND = 1 -} object_status; - -typedef enum { - /// Query for object in the local plasma store. - PLASMA_QUERY_LOCAL = 1, - /// Query for object in the local plasma store or in a remote plasma store. - PLASMA_QUERY_ANYWHERE -} object_request_type; +}; /// This type is used by the Plasma store. It is here because it is exposed to /// the eviction policy. @@ -188,4 +159,6 @@ int warn_if_sigpipe(int status, int client_sock); uint8_t* create_object_info_buffer(ObjectInfoT* object_info); +} // namespace plasma + #endif // PLASMA_PLASMA_H diff --git a/cpp/src/plasma/plasma.pc.in b/cpp/src/plasma/plasma.pc.in new file mode 100644 index 00000000000..d86868939f3 --- /dev/null +++ b/cpp/src/plasma/plasma.pc.in @@ -0,0 +1,30 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +prefix=@CMAKE_INSTALL_PREFIX@ +libdir=${prefix}/@CMAKE_INSTALL_LIBDIR@ +includedir=${prefix}/include + +so_version=@PLASMA_SO_VERSION@ +abi_version=@PLASMA_ABI_VERSION@ +executable=${prefix}/@CMAKE_INSTALL_BINDIR@/plasma_store + +Name: Plasma +Description: Plasma is an in-memory object store and cache for big data. +Version: @PLASMA_VERSION@ +Libs: -L${libdir} -lplasma +Cflags: -I${includedir} diff --git a/cpp/src/plasma/protocol.cc b/cpp/src/plasma/protocol.cc index 246aa297360..2998c68b827 100644 --- a/cpp/src/plasma/protocol.cc +++ b/cpp/src/plasma/protocol.cc @@ -18,11 +18,13 @@ #include "plasma/protocol.h" #include "flatbuffers/flatbuffers.h" -#include "format/plasma_generated.h" +#include "plasma/plasma_generated.h" #include "plasma/common.h" #include "plasma/io.h" +namespace plasma { + using flatbuffers::uoffset_t; flatbuffers::Offset>> @@ -500,3 +502,5 @@ Status ReadDataReply( *metadata_size = (int64_t)message->metadata_size(); return Status::OK(); } + +} // namespace plasma diff --git a/cpp/src/plasma/protocol.h b/cpp/src/plasma/protocol.h index 5d9d1367514..835c5a0b589 100644 --- a/cpp/src/plasma/protocol.h +++ b/cpp/src/plasma/protocol.h @@ -21,9 +21,11 @@ #include #include "arrow/status.h" -#include "format/plasma_generated.h" +#include "plasma/plasma_generated.h" #include "plasma/plasma.h" +namespace plasma { + using arrow::Status; /* Plasma receive message. */ @@ -167,4 +169,6 @@ Status SendDataReply( Status ReadDataReply( uint8_t* data, ObjectID* object_id, int64_t* object_size, int64_t* metadata_size); +} // namespace plasma + #endif /* PLASMA_PROTOCOL */ diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc index 9394e3de310..8d4fb106f53 100644 --- a/cpp/src/plasma/store.cc +++ b/cpp/src/plasma/store.cc @@ -49,12 +49,14 @@ #include #include -#include "format/common_generated.h" +#include "plasma/common_generated.h" #include "plasma/common.h" #include "plasma/fling.h" #include "plasma/io.h" #include "plasma/malloc.h" +namespace plasma { + extern "C" { void* dlmalloc(size_t bytes); void* dlmemalign(size_t alignment, size_t bytes); @@ -625,8 +627,10 @@ void start_server(char* socket_name, int64_t system_memory) { loop.run(); } +} // namespace plasma + int main(int argc, char* argv[]) { - signal(SIGTERM, signal_handler); + signal(SIGTERM, plasma::signal_handler); char* socket_name = NULL; int64_t system_memory = -1; int c; @@ -677,7 +681,7 @@ int main(int argc, char* argv[]) { #endif // Make it so dlmalloc fails if we try to request more memory than is // available. - dlmalloc_set_footprint_limit((size_t)system_memory); + plasma::dlmalloc_set_footprint_limit((size_t)system_memory); ARROW_LOG(DEBUG) << "starting server listening on " << socket_name; - start_server(socket_name, system_memory); + plasma::start_server(socket_name, system_memory); } diff --git a/cpp/src/plasma/store.h b/cpp/src/plasma/store.h index 8bd94265410..27c3813da8e 100644 --- a/cpp/src/plasma/store.h +++ b/cpp/src/plasma/store.h @@ -27,6 +27,8 @@ #include "plasma/plasma.h" #include "plasma/protocol.h" +namespace plasma { + struct GetRequest; struct NotificationQueue { @@ -166,4 +168,6 @@ class PlasmaStore { std::unordered_map pending_notifications_; }; +} // namespace plasma + #endif // PLASMA_STORE_H diff --git a/cpp/src/plasma/test/client_tests.cc b/cpp/src/plasma/test/client_tests.cc index 29b5b135144..6dc558e7707 100644 --- a/cpp/src/plasma/test/client_tests.cc +++ b/cpp/src/plasma/test/client_tests.cc @@ -29,7 +29,9 @@ #include "plasma/plasma.h" #include "plasma/protocol.h" -std::string g_test_executable; // NOLINT +namespace plasma { + +std::string test_executable; // NOLINT class TestPlasmaStore : public ::testing::Test { public: @@ -37,7 +39,7 @@ class TestPlasmaStore : public ::testing::Test { // stdout of the object store. Consider changing that. void SetUp() { std::string plasma_directory = - g_test_executable.substr(0, g_test_executable.find_last_of("/")); + test_executable.substr(0, test_executable.find_last_of("/")); std::string plasma_command = plasma_directory + "/plasma_store -m 1000000000 -s /tmp/store 1> /dev/null 2> /dev/null &"; @@ -125,8 +127,10 @@ TEST_F(TestPlasmaStore, MultipleGetTest) { ASSERT_EQ(object_buffer[1].data[0], 2); } +} // namespace plasma + int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); - g_test_executable = std::string(argv[0]); + plasma::test_executable = std::string(argv[0]); return RUN_ALL_TESTS(); } diff --git a/cpp/src/plasma/test/serialization_tests.cc b/cpp/src/plasma/test/serialization_tests.cc index 325cead06e7..13938cd6fb0 100644 --- a/cpp/src/plasma/test/serialization_tests.cc +++ b/cpp/src/plasma/test/serialization_tests.cc @@ -25,6 +25,8 @@ #include "plasma/plasma.h" #include "plasma/protocol.h" +namespace plasma { + /** * Create a temporary file. Needs to be closed by the caller. * @@ -386,3 +388,5 @@ TEST(PlasmaSerialization, DataReply) { ASSERT_EQ(object_size1, object_size2); ASSERT_EQ(metadata_size1, metadata_size2); } + +} // namespace plasma diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt index 224147d8b5c..6ff66462958 100644 --- a/python/CMakeLists.txt +++ b/python/CMakeLists.txt @@ -220,6 +220,12 @@ include_directories(SYSTEM find_package(Arrow REQUIRED) include_directories(SYSTEM ${ARROW_INCLUDE_DIR}) +## Plasma +find_package(Plasma) +if (PLASMA_FOUND) + include_directories(SYSTEM ${PLASMA_INCLUDE_DIR}) +endif() + function(bundle_arrow_lib library_path) get_filename_component(LIBRARY_DIR ${${library_path}} DIRECTORY) get_filename_component(LIBRARY_NAME ${${library_path}} NAME_WE) @@ -252,6 +258,9 @@ if (PYARROW_BUNDLE_ARROW_CPP) file(COPY ${ARROW_INCLUDE_DIR}/arrow DESTINATION ${BUILD_OUTPUT_ROOT_DIRECTORY}/include) bundle_arrow_lib(ARROW_SHARED_LIB) bundle_arrow_lib(ARROW_PYTHON_SHARED_LIB) + if (PLASMA_FOUND) + bundle_arrow_lib(PLASMA_SHARED_LIB) + endif() endif() if (MSVC) @@ -278,9 +287,14 @@ set(CYTHON_EXTENSIONS lib ) +if (PLASMA_FOUND) + set(CYTHON_EXTENSIONS ${CYTHON_EXTENSIONS} plasma) +endif() + set(LINK_LIBS arrow_shared arrow_python_shared + ${PLASMA_SHARED_LIB} ) if (PYARROW_BUILD_PARQUET) @@ -379,3 +393,7 @@ foreach(module ${CYTHON_EXTENSIONS}) target_link_libraries(${module_name} ${LINK_LIBS}) endforeach(module) + +if (PLASMA_FOUND) + file(COPY ${PLASMA_EXECUTABLE} DESTINATION ${BUILD_OUTPUT_ROOT_DIRECTORY}) +endif() diff --git a/python/cmake_modules/FindPlasma.cmake b/python/cmake_modules/FindPlasma.cmake new file mode 100644 index 00000000000..3acaa348bff --- /dev/null +++ b/python/cmake_modules/FindPlasma.cmake @@ -0,0 +1,99 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# - Find PLASMA (plasma/client.h, libplasma.a, libplasma.so) +# This module defines +# PLASMA_INCLUDE_DIR, directory containing headers +# PLASMA_LIBS, directory containing plasma libraries +# PLASMA_STATIC_LIB, path to libplasma.a +# PLASMA_SHARED_LIB, path to libplasma's shared library +# PLASMA_SHARED_IMP_LIB, path to libplasma's import library (MSVC only) +# PLASMA_FOUND, whether plasma has been found + +include(FindPkgConfig) + +if ("$ENV{ARROW_HOME}" STREQUAL "") + pkg_check_modules(PLASMA plasma) + if (PLASMA_FOUND) + pkg_get_variable(PLASMA_EXECUTABLE plasma executable) + pkg_get_variable(PLASMA_ABI_VERSION plasma abi_version) + message(STATUS "Plasma ABI version: ${PLASMA_ABI_VERSION}") + pkg_get_variable(PLASMA_SO_VERSION plasma so_version) + message(STATUS "Plasma SO version: ${PLASMA_SO_VERSION}") + set(PLASMA_INCLUDE_DIR ${PLASMA_INCLUDE_DIRS}) + set(PLASMA_LIBS ${PLASMA_LIBRARY_DIRS}) + set(PLASMA_SEARCH_LIB_PATH ${PLASMA_LIBRARY_DIRS}) + endif() +else() + set(PLASMA_HOME "$ENV{ARROW_HOME}") + + set(PLASMA_EXECUTABLE ${PLASMA_HOME}/bin/plasma_store) + + set(PLASMA_SEARCH_HEADER_PATHS + ${PLASMA_HOME}/include + ) + + set(PLASMA_SEARCH_LIB_PATH + ${PLASMA_HOME}/lib + ) + + find_path(PLASMA_INCLUDE_DIR plasma/client.h PATHS + ${PLASMA_SEARCH_HEADER_PATHS} + # make sure we don't accidentally pick up a different version + NO_DEFAULT_PATH + ) +endif() + +find_library(PLASMA_LIB_PATH NAMES plasma + PATHS + ${PLASMA_SEARCH_LIB_PATH} + NO_DEFAULT_PATH) +get_filename_component(PLASMA_LIBS ${PLASMA_LIB_PATH} DIRECTORY) + +if (PLASMA_INCLUDE_DIR AND PLASMA_LIBS) + set(PLASMA_FOUND TRUE) + set(PLASMA_LIB_NAME plasma) + + set(PLASMA_STATIC_LIB ${PLASMA_LIBS}/lib${PLASMA_LIB_NAME}.a) + + set(PLASMA_SHARED_LIB ${PLASMA_LIBS}/lib${PLASMA_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX}) +endif() + +if (PLASMA_FOUND) + if (NOT Plasma_FIND_QUIETLY) + message(STATUS "Found the Plasma core library: ${PLASMA_LIB_PATH}") + message(STATUS "Found Plasma executable: ${PLASMA_EXECUTABLE}") + endif () +else () + if (NOT Plasma_FIND_QUIETLY) + set(PLASMA_ERR_MSG "Could not find the Plasma library. Looked for headers") + set(PLASMA_ERR_MSG "${PLASMA_ERR_MSG} in ${PLASMA_SEARCH_HEADER_PATHS}, and for libs") + set(PLASMA_ERR_MSG "${PLASMA_ERR_MSG} in ${PLASMA_SEARCH_LIB_PATH}") + if (Plasma_FIND_REQUIRED) + message(FATAL_ERROR "${PLASMA_ERR_MSG}") + else (Plasma_FIND_REQUIRED) + message(STATUS "${PLASMA_ERR_MSG}") + endif (Plasma_FIND_REQUIRED) + endif () + set(PLASMA_FOUND FALSE) +endif () + +mark_as_advanced( + PLASMA_INCLUDE_DIR + PLASMA_STATIC_LIB + PLASMA_SHARED_LIB +) diff --git a/python/doc/source/api.rst b/python/doc/source/api.rst index c52d400cef1..780aa483961 100644 --- a/python/doc/source/api.rst +++ b/python/doc/source/api.rst @@ -212,6 +212,21 @@ Type Classes Field Schema +.. currentmodule:: pyarrow.plasma + +.. _api.plasma: + +In-Memory Object Store +---------------------- + +.. autosummary:: + :toctree: generated/ + + ObjectID + PlasmaClient + PlasmaBuffer + MutablePlasmaBuffer + .. currentmodule:: pyarrow.parquet .. _api.parquet: diff --git a/python/manylinux1/build_arrow.sh b/python/manylinux1/build_arrow.sh index 8c6bda9550e..85c096a5c11 100755 --- a/python/manylinux1/build_arrow.sh +++ b/python/manylinux1/build_arrow.sh @@ -35,6 +35,7 @@ cd /arrow/python # PyArrow build configuration export PYARROW_BUILD_TYPE='release' export PYARROW_WITH_PARQUET=1 +export PYARROW_WITH_PLASMA=1 export PYARROW_BUNDLE_ARROW_CPP=1 # Need as otherwise arrow_io is sometimes not linked export LDFLAGS="-Wl,--no-as-needed" @@ -52,7 +53,7 @@ for PYTHON in ${PYTHON_VERSIONS}; do ARROW_BUILD_DIR=/arrow/cpp/build-PY${PYTHON} mkdir -p "${ARROW_BUILD_DIR}" pushd "${ARROW_BUILD_DIR}" - PATH="$(cpython_path $PYTHON)/bin:$PATH" cmake -DCMAKE_BUILD_TYPE=Release -DCMAKE_INSTALL_PREFIX=/arrow-dist -DARROW_BUILD_TESTS=OFF -DARROW_BUILD_SHARED=ON -DARROW_BOOST_USE_SHARED=OFF -DARROW_JEMALLOC=ON -DARROW_RPATH_ORIGIN=ON -DARROW_JEMALLOC_USE_SHARED=OFF -DARROW_PYTHON=ON -DPythonInterp_FIND_VERSION=${PYTHON} .. + PATH="$(cpython_path $PYTHON)/bin:$PATH" cmake -DCMAKE_BUILD_TYPE=Release -DCMAKE_INSTALL_PREFIX=/arrow-dist -DARROW_BUILD_TESTS=OFF -DARROW_BUILD_SHARED=ON -DARROW_BOOST_USE_SHARED=OFF -DARROW_JEMALLOC=ON -DARROW_RPATH_ORIGIN=ON -DARROW_JEMALLOC_USE_SHARED=OFF -DARROW_PYTHON=ON -DPythonInterp_FIND_VERSION=${PYTHON} -DARROW_PLASMA=ON .. make -j5 install popd @@ -65,6 +66,7 @@ for PYTHON in ${PYTHON_VERSIONS}; do echo "=== (${PYTHON}) Test the existence of optional modules ===" $PIPI_IO -r requirements.txt PATH="$PATH:$(cpython_path $PYTHON)/bin" $PYTHON_INTERPRETER -c "import pyarrow.parquet" + PATH="$PATH:$(cpython_path $PYTHON)/bin" $PYTHON_INTERPRETER -c "import pyarrow.plasma" echo "=== (${PYTHON}) Tag the wheel with manylinux1 ===" mkdir -p repaired_wheels/ @@ -78,4 +80,3 @@ for PYTHON in ${PYTHON_VERSIONS}; do mv repaired_wheels/*.whl /io/dist done - diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py index e3d783aee58..6d0ce204382 100644 --- a/python/pyarrow/__init__.py +++ b/python/pyarrow/__init__.py @@ -68,6 +68,7 @@ Date32Value, Date64Value, TimestampValue) from pyarrow.lib import (HdfsFile, NativeFile, PythonFile, + FixedSizeBufferOutputStream, Buffer, BufferReader, BufferOutputStream, OSFile, MemoryMappedFile, memory_map, frombuffer, @@ -99,7 +100,6 @@ open_file, serialize_pandas, deserialize_pandas) - localfs = LocalFilesystem.get_instance() diff --git a/python/pyarrow/error.pxi b/python/pyarrow/error.pxi index 259aeb074e3..8a3f57d209a 100644 --- a/python/pyarrow/error.pxi +++ b/python/pyarrow/error.pxi @@ -48,6 +48,18 @@ class ArrowNotImplementedError(NotImplementedError, ArrowException): pass +class PlasmaObjectExists(ArrowException): + pass + + +class PlasmaObjectNonexistent(ArrowException): + pass + + +class PlasmaStoreFull(ArrowException): + pass + + cdef int check_status(const CStatus& status) nogil except -1: if status.ok(): return 0 @@ -66,5 +78,11 @@ cdef int check_status(const CStatus& status) nogil except -1: raise ArrowNotImplementedError(message) elif status.IsTypeError(): raise ArrowTypeError(message) + elif status.IsPlasmaObjectExists(): + raise PlasmaObjectExists(message) + elif status.IsPlasmaObjectNonexistent(): + raise PlasmaObjectNonexistent(message) + elif status.IsPlasmaStoreFull(): + raise PlasmaStoreFull(message) else: raise ArrowException(message) diff --git a/python/pyarrow/includes/common.pxd b/python/pyarrow/includes/common.pxd index 3487d48ce9b..637a133afb0 100644 --- a/python/pyarrow/includes/common.pxd +++ b/python/pyarrow/includes/common.pxd @@ -50,6 +50,9 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil: c_bool IsKeyError() c_bool IsNotImplemented() c_bool IsTypeError() + c_bool IsPlasmaObjectExists() + c_bool IsPlasmaObjectNonexistent() + c_bool IsPlasmaStoreFull() cdef inline object PyObject_to_object(PyObject* o): diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index edf50ad54e7..ffe867b0af0 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -148,9 +148,15 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil: CLoggingMemoryPool(CMemoryPool*) cdef cppclass CBuffer" arrow::Buffer": + CBuffer(const uint8_t* data, int64_t size) uint8_t* data() int64_t size() shared_ptr[CBuffer] parent() + c_bool is_mutable() const + + cdef cppclass CMutableBuffer" arrow::MutableBuffer"(CBuffer): + CMutableBuffer(const uint8_t* data, int64_t size) + uint8_t* mutable_data() cdef cppclass ResizableBuffer(CBuffer): CStatus Resize(int64_t nbytes) @@ -558,6 +564,9 @@ cdef extern from "arrow/io/memory.h" namespace "arrow::io" nogil: CMockOutputStream() int64_t GetExtentBytesWritten() + cdef cppclass CFixedSizeBufferWriter" arrow::io::FixedSizeBufferWriter"(WriteableFile): + CFixedSizeBufferWriter(const shared_ptr[CBuffer]& buffer) + cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil: enum MessageType" arrow::ipc::Message::Type": diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi index 8b213a33053..181b0b18a71 100644 --- a/python/pyarrow/io.pxi +++ b/python/pyarrow/io.pxi @@ -473,6 +473,15 @@ cdef class OSFile(NativeFile): self.wr_file = handle +cdef class FixedSizeBufferOutputStream(NativeFile): + + def __cinit__(self, Buffer buffer): + self.wr_file.reset(new CFixedSizeBufferWriter(buffer.buffer)) + self.is_readable = 0 + self.is_writeable = 1 + self.is_open = True + + # ---------------------------------------------------------------------- # Arrow buffers @@ -523,7 +532,10 @@ cdef class Buffer: buffer.len = self.size buffer.ndim = 1 buffer.obj = self - buffer.readonly = 1 + if self.buffer.get().is_mutable(): + buffer.readonly = 0 + else: + buffer.readonly = 1 buffer.shape = self.shape buffer.strides = self.strides buffer.suboffsets = NULL @@ -540,6 +552,15 @@ cdef class Buffer: p[0] = self.buffer.get().data() return self.size + def __getwritebuffer__(self, Py_ssize_t idx, void **p): + if not self.buffer.get().is_mutable(): + raise SystemError("trying to write an immutable buffer") + if idx != 0: + raise SystemError("accessing non-existent buffer segment") + if p != NULL: + p[0] = self.buffer.get().data() + return self.size + cdef shared_ptr[PoolBuffer] allocate_buffer(CMemoryPool* pool): cdef shared_ptr[PoolBuffer] result diff --git a/python/pyarrow/plasma.pyx b/python/pyarrow/plasma.pyx new file mode 100644 index 00000000000..bb17685277a --- /dev/null +++ b/python/pyarrow/plasma.pyx @@ -0,0 +1,560 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# cython: profile=False +# distutils: language = c++ +# cython: embedsignature = True + +from libcpp cimport bool as c_bool, nullptr +from libcpp.memory cimport shared_ptr, unique_ptr, make_shared +from libcpp.string cimport string as c_string +from libcpp.vector cimport vector as c_vector +from libc.stdint cimport int64_t, uint8_t, uintptr_t +from cpython.pycapsule cimport * + +from pyarrow.lib cimport Buffer, NativeFile, check_status +from pyarrow.includes.libarrow cimport (CMutableBuffer, CBuffer, + CFixedSizeBufferWriter, CStatus) + + +PLASMA_WAIT_TIMEOUT = 2 ** 30 + + +cdef extern from "plasma/common.h" nogil: + + cdef cppclass CUniqueID" plasma::UniqueID": + + @staticmethod + CUniqueID from_binary(const c_string& binary) + + c_bool operator==(const CUniqueID& rhs) const + + c_string hex() const + + c_string binary() const + + cdef struct CObjectRequest" plasma::ObjectRequest": + CUniqueID object_id + int type + int status + + +cdef extern from "plasma/common.h": + cdef int64_t kDigestSize" plasma::kDigestSize" + + cdef enum ObjectRequestType: + PLASMA_QUERY_LOCAL"plasma::PLASMA_QUERY_LOCAL", + PLASMA_QUERY_ANYWHERE"plasma::PLASMA_QUERY_ANYWHERE" + + cdef int ObjectStatusLocal"plasma::ObjectStatusLocal"; + cdef int ObjectStatusRemote"plasma::ObjectStatusRemote"; + +cdef extern from "plasma/client.h" nogil: + + cdef cppclass CPlasmaClient" plasma::PlasmaClient": + + CPlasmaClient() + + CStatus Connect(const c_string& store_socket_name, + const c_string& manager_socket_name, int release_delay) + + CStatus Create(const CUniqueID& object_id, int64_t data_size, + const uint8_t* metadata, int64_t metadata_size, + uint8_t** data) + + CStatus Get(const CUniqueID* object_ids, int64_t num_objects, + int64_t timeout_ms, CObjectBuffer* object_buffers) + + CStatus Seal(const CUniqueID& object_id) + + CStatus Evict(int64_t num_bytes, int64_t& num_bytes_evicted) + + CStatus Hash(const CUniqueID& object_id, uint8_t* digest) + + CStatus Release(const CUniqueID& object_id) + + CStatus Contains(const CUniqueID& object_id, c_bool* has_object) + + CStatus Subscribe(int* fd) + + CStatus GetNotification(int fd, CUniqueID* object_id, + int64_t* data_size, int64_t* metadata_size) + + CStatus Disconnect() + + CStatus Fetch(int num_object_ids, const CUniqueID* object_ids) + + CStatus Wait(int64_t num_object_requests, CObjectRequest* object_requests, + int num_ready_objects, int64_t timeout_ms, int* num_objects_ready); + + CStatus Transfer(const char* addr, int port, const CUniqueID& object_id) + + +cdef extern from "plasma/client.h" nogil: + + cdef struct CObjectBuffer" plasma::ObjectBuffer": + int64_t data_size + uint8_t* data + int64_t metadata_size + uint8_t* metadata + + +def make_object_id(object_id): + return ObjectID(object_id) + + +cdef class ObjectID: + """ + An ObjectID represents a string of bytes used to identify Plasma objects. + """ + + cdef: + CUniqueID data + + def __cinit__(self, object_id): + self.data = CUniqueID.from_binary(object_id) + + def __richcmp__(ObjectID self, ObjectID object_id, operation): + if operation != 2: + raise ValueError("operation != 2 (only equality is supported)") + return self.data == object_id.data + + def __hash__(self): + return hash(self.data.binary()) + + def __repr__(self): + return "ObjectID(" + self.data.hex().decode() + ")" + + def __reduce__(self): + return (make_object_id, (self.data.binary(),)) + + def binary(self): + """ + Return the binary representation of this ObjectID. + + Returns + ------- + bytes + Binary representation of the ObjectID. + """ + return self.data.binary() + + +cdef class PlasmaBuffer(Buffer): + """ + This is the type returned by calls to get with a PlasmaClient. + + We define our own class instead of directly returning a buffer object so + that we can add a custom destructor which notifies Plasma that the object + is no longer being used, so the memory in the Plasma store backing the + object can potentially be freed. + + Attributes + ---------- + object_id : ObjectID + The ID of the object in the buffer. + client : PlasmaClient + The PlasmaClient that we use to communicate with the store and manager. + """ + + cdef: + ObjectID object_id + PlasmaClient client + + def __cinit__(self, ObjectID object_id, PlasmaClient client): + """ + Initialize a PlasmaBuffer. + """ + self.object_id = object_id + self.client = client + + def __dealloc__(self): + """ + Notify Plasma that the object is no longer needed. + + If the plasma client has been shut down, then don't do anything. + """ + self.client.release(self.object_id) + + +cdef class PlasmaClient: + """ + The PlasmaClient is used to interface with a plasma store and manager. + + The PlasmaClient can ask the PlasmaStore to allocate a new buffer, seal a + buffer, and get a buffer. Buffers are referred to by object IDs, which are + strings. + """ + + cdef: + shared_ptr[CPlasmaClient] client + int notification_fd + c_string store_socket_name + c_string manager_socket_name + + def __cinit__(self, store_socket_name, manager_socket_name, int release_delay): + """ + Create a new PlasmaClient that is connected to a plasma store + and optionally a plasma manager. + + Parameters + ---------- + store_socket_name : str + Name of the socket the plasma store is listening at. + manager_socket_name : str + Name of the socket the plasma manager is listening at. + release_delay : int + The maximum number of objects that the client will keep and + delay releasing (for caching reasons). + """ + self.client.reset(new CPlasmaClient()) + self.notification_fd = -1 + self.store_socket_name = store_socket_name.encode() + self.manager_socket_name = manager_socket_name.encode() + with nogil: + check_status(self.client.get().Connect(self.store_socket_name, + self.manager_socket_name, release_delay)) + + cdef _get_object_buffers(self, object_ids, int64_t timeout_ms, + c_vector[CObjectBuffer]* result): + cdef c_vector[CUniqueID] ids + cdef ObjectID object_id + for object_id in object_ids: + ids.push_back(object_id.data) + result[0].resize(ids.size()) + with nogil: + check_status(self.client.get().Get(ids.data(), ids.size(), + timeout_ms, result[0].data())) + + cdef _make_plasma_buffer(self, ObjectID object_id, uint8_t* data, + int64_t size): + cdef shared_ptr[CBuffer] buffer + buffer.reset(new CBuffer(data, size)) + result = PlasmaBuffer(object_id, self) + result.init(buffer) + return result + + cdef _make_mutable_plasma_buffer(self, ObjectID object_id, uint8_t* data, + int64_t size): + cdef shared_ptr[CBuffer] buffer + buffer.reset(new CMutableBuffer(data, size)) + result = PlasmaBuffer(object_id, self) + result.init(buffer) + return result + + @property + def store_socket_name(self): + return self.store_socket_name.decode() + + @property + def manager_socket_name(self): + return self.manager_socket_name.decode() + + def create(self, ObjectID object_id, int64_t data_size, c_string metadata=b""): + """ + Create a new buffer in the PlasmaStore for a particular object ID. + + The returned buffer is mutable until seal is called. + + Parameters + ---------- + object_id : ObjectID + The object ID used to identify an object. + size : int + The size in bytes of the created buffer. + metadata : bytes + An optional string of bytes encoding whatever metadata the user + wishes to encode. + + Raises + ------ + PlasmaObjectExists + This exception is raised if the object could not be created because + there already is an object with the same ID in the plasma store. + + PlasmaStoreFull: This exception is raised if the object could + not be created because the plasma store is unable to evict + enough objects to create room for it. + """ + cdef uint8_t* data + with nogil: + check_status(self.client.get().Create(object_id.data, data_size, + (metadata.data()), + metadata.size(), &data)) + return self._make_mutable_plasma_buffer(object_id, data, data_size) + + def get(self, object_ids, timeout_ms=-1): + """ + Returns data buffer from the PlasmaStore based on object ID. + + If the object has not been sealed yet, this call will block. The + retrieved buffer is immutable. + + Parameters + ---------- + object_ids : list + A list of ObjectIDs used to identify some objects. + timeout_ms :int + The number of milliseconds that the get call should block before + timing out and returning. Pass -1 if the call should block and 0 + if the call should return immediately. + + Returns + ------- + list + List of PlasmaBuffers for the data associated with the object_ids + and None if the object was not available. + """ + cdef c_vector[CObjectBuffer] object_buffers + self._get_object_buffers(object_ids, timeout_ms, &object_buffers) + result = [] + for i in range(object_buffers.size()): + if object_buffers[i].data_size != -1: + result.append(self._make_plasma_buffer( + object_ids[i], object_buffers[i].data, + object_buffers[i].data_size)) + else: + result.append(None) + return result + + def get_metadata(self, object_ids, timeout_ms=-1): + """ + Returns metadata buffer from the PlasmaStore based on object ID. + + If the object has not been sealed yet, this call will block. The + retrieved buffer is immutable. + + Parameters + ---------- + object_ids : list + A list of ObjectIDs used to identify some objects. + timeout_ms : int + The number of milliseconds that the get call should block before + timing out and returning. Pass -1 if the call should block and 0 + if the call should return immediately. + + Returns + ------- + list + List of PlasmaBuffers for the metadata associated with the + object_ids and None if the object was not available. + """ + cdef c_vector[CObjectBuffer] object_buffers + self._get_object_buffers(object_ids, timeout_ms, &object_buffers) + result = [] + for i in range(object_buffers.size()): + result.append(self._make_plasma_buffer( + object_ids[i], object_buffers[i].metadata, + object_buffers[i].metadata_size)) + return result + + def seal(self, ObjectID object_id): + """ + Seal the buffer in the PlasmaStore for a particular object ID. + + Once a buffer has been sealed, the buffer is immutable and can only be + accessed through get. + + Parameters + ---------- + object_id : ObjectID + A string used to identify an object. + """ + with nogil: + check_status(self.client.get().Seal(object_id.data)) + + def release(self, ObjectID object_id): + """ + Notify Plasma that the object is no longer needed. + + Parameters + ---------- + object_id : ObjectID + A string used to identify an object. + """ + with nogil: + check_status(self.client.get().Release(object_id.data)) + + def contains(self, ObjectID object_id): + """ + Check if the object is present and sealed in the PlasmaStore. + + Parameters + ---------- + object_id : ObjectID + A string used to identify an object. + """ + cdef c_bool is_contained + with nogil: + check_status(self.client.get().Contains(object_id.data, + &is_contained)) + return is_contained + + def hash(self, ObjectID object_id): + """ + Compute the checksum of an object in the object store. + + Parameters + ---------- + object_id : ObjectID + A string used to identify an object. + + Returns + ------- + bytes + A digest string object's hash. If the object isn't in the object + store, the string will have length zero. + """ + cdef c_vector[uint8_t] digest = c_vector[uint8_t](kDigestSize) + with nogil: + check_status(self.client.get().Hash(object_id.data, + digest.data())) + return bytes(digest[:]) + + def evict(self, int64_t num_bytes): + """ + Evict some objects until to recover some bytes. + + Recover at least num_bytes bytes if possible. + + Parameters + ---------- + num_bytes : int + The number of bytes to attempt to recover. + """ + cdef int64_t num_bytes_evicted = -1 + with nogil: + check_status(self.client.get().Evict(num_bytes, num_bytes_evicted)) + return num_bytes_evicted + + def transfer(self, address, int port, ObjectID object_id): + """ + Transfer local object with id object_id to another plasma instance + + Parameters + ---------- + addr : str + IPv4 address of the plasma instance the object is sent to. + port : int + Port number of the plasma instance the object is sent to. + object_id : str + A string used to identify an object. + """ + cdef c_string addr = address.encode() + with nogil: + check_status(self.client.get().Transfer(addr.c_str(), port, object_id.data)) + + def fetch(self, object_ids): + """ + Fetch the objects with the given IDs from other plasma managers. + + Parameters + ---------- + object_ids : list + A list of strings used to identify the objects. + """ + cdef c_vector[CUniqueID] ids + cdef ObjectID object_id + for object_id in object_ids: + ids.push_back(object_id.data) + with nogil: + check_status(self.client.get().Fetch(ids.size(), ids.data())) + + def wait(self, object_ids, int64_t timeout=PLASMA_WAIT_TIMEOUT, int num_returns=1): + """ + Wait until num_returns objects in object_ids are ready. + Currently, the object ID arguments to wait must be unique. + + Parameters + ---------- + object_ids : list + List of object IDs to wait for. + timeout :int + Return to the caller after timeout milliseconds. + num_returns : int + We are waiting for this number of objects to be ready. + + Returns + ------- + list + List of object IDs that are ready. + list + List of object IDs we might still wait on. + """ + # Check that the object ID arguments are unique. The plasma manager + # currently crashes if given duplicate object IDs. + if len(object_ids) != len(set(object_ids)): + raise Exception("Wait requires a list of unique object IDs.") + cdef int64_t num_object_requests = len(object_ids) + cdef c_vector[CObjectRequest] object_requests = c_vector[CObjectRequest](num_object_requests) + cdef int num_objects_ready = 0 + cdef ObjectID object_id + for i, object_id in enumerate(object_ids): + object_requests[i].object_id = object_id.data + object_requests[i].type = PLASMA_QUERY_ANYWHERE + with nogil: + check_status(self.client.get().Wait(num_object_requests, object_requests.data(), num_returns, timeout, &num_objects_ready)) + cdef int num_to_return = min(num_objects_ready, num_returns); + ready_ids = [] + waiting_ids = set(object_ids) + cdef int num_returned = 0 + for i in range(len(object_ids)): + if num_returned == num_to_return: + break + if object_requests[i].status == ObjectStatusLocal or object_requests[i].status == ObjectStatusRemote: + ready_ids.append(ObjectID(object_requests[i].object_id.binary())) + waiting_ids.discard(ObjectID(object_requests[i].object_id.binary())) + num_returned += 1 + return ready_ids, list(waiting_ids) + + def subscribe(self): + """Subscribe to notifications about sealed objects.""" + with nogil: + check_status(self.client.get().Subscribe(&self.notification_fd)) + + def get_next_notification(self): + """ + Get the next notification from the notification socket. + + Returns + ------- + ObjectID + The object ID of the object that was stored. + int + The data size of the object that was stored. + int + The metadata size of the object that was stored. + """ + cdef ObjectID object_id = ObjectID(20 * b"\0") + cdef int64_t data_size + cdef int64_t metadata_size + with nogil: + check_status(self.client.get().GetNotification(self.notification_fd, + &object_id.data, + &data_size, + &metadata_size)) + return object_id, data_size, metadata_size + + def to_capsule(self): + return PyCapsule_New(self.client.get(), "plasma", NULL) + + def disconnect(self): + """ + Disconnect this client from the Plasma store. + """ + with nogil: + check_status(self.client.get().Disconnect()) diff --git a/python/pyarrow/tests/conftest.py b/python/pyarrow/tests/conftest.py index 2aeeab7294c..21288e4f35e 100644 --- a/python/pyarrow/tests/conftest.py +++ b/python/pyarrow/tests/conftest.py @@ -18,11 +18,12 @@ from pytest import skip -groups = ['hdfs', 'parquet', 'large_memory'] +groups = ['hdfs', 'parquet', 'plasma', 'large_memory'] defaults = { 'hdfs': False, 'parquet': False, + 'plasma': False, 'large_memory': False } @@ -32,6 +33,11 @@ except ImportError: pass +try: + import pyarrow.plasma as plasma + defaults['plasma'] = True +except ImportError: + pass def pytest_configure(config): pass diff --git a/python/pyarrow/tests/test_plasma.py b/python/pyarrow/tests/test_plasma.py new file mode 100644 index 00000000000..ce684e3e41f --- /dev/null +++ b/python/pyarrow/tests/test_plasma.py @@ -0,0 +1,683 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import glob +import numpy as np +import os +import pytest +import random +import signal +import subprocess +import sys +import time +import unittest + +import pyarrow as pa +import pandas as pd + +DEFAULT_PLASMA_STORE_MEMORY = 10 ** 9 + +def random_name(): + return str(random.randint(0, 99999999)) + + +def random_object_id(): + import pyarrow.plasma as plasma + return plasma.ObjectID(np.random.bytes(20)) + + +def generate_metadata(length): + metadata = bytearray(length) + if length > 0: + metadata[0] = random.randint(0, 255) + metadata[-1] = random.randint(0, 255) + for _ in range(100): + metadata[random.randint(0, length - 1)] = random.randint(0, 255) + return metadata + + +def write_to_data_buffer(buff, length): + array = np.frombuffer(buff, dtype="uint8") + if length > 0: + array[0] = random.randint(0, 255) + array[-1] = random.randint(0, 255) + for _ in range(100): + array[random.randint(0, length - 1)] = random.randint(0, 255) + + +def create_object_with_id(client, object_id, data_size, metadata_size, + seal=True): + metadata = generate_metadata(metadata_size) + memory_buffer = client.create(object_id, data_size, metadata) + write_to_data_buffer(memory_buffer, data_size) + if seal: + client.seal(object_id) + return memory_buffer, metadata + + +def create_object(client, data_size, metadata_size, seal=True): + object_id = random_object_id() + memory_buffer, metadata = create_object_with_id(client, object_id, + data_size, metadata_size, + seal=seal) + return object_id, memory_buffer, metadata + + +def assert_get_object_equal(unit_test, client1, client2, object_id, + memory_buffer=None, metadata=None): + import pyarrow.plasma as plasma + client1_buff = client1.get([object_id])[0] + client2_buff = client2.get([object_id])[0] + client1_metadata = client1.get_metadata([object_id])[0] + client2_metadata = client2.get_metadata([object_id])[0] + assert len(client1_buff) == len(client2_buff) + assert len(client1_metadata) == len(client2_metadata) + # Check that the buffers from the two clients are the same. + assert plasma.buffers_equal(client1_buff, client2_buff) + # Check that the metadata buffers from the two clients are the same. + assert plasma.buffers_equal(client1_metadata, client2_metadata) + # If a reference buffer was provided, check that it is the same as well. + if memory_buffer is not None: + assert plasma.buffers_equal(memory_buffer, client1_buff) + # If reference metadata was provided, check that it is the same as well. + if metadata is not None: + assert plasma.buffers_equal(metadata, client1_metadata) + + +def start_plasma_store(plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY, + use_valgrind=False, use_profiler=False, + stdout_file=None, stderr_file=None): + """Start a plasma store process. + Args: + use_valgrind (bool): True if the plasma store should be started inside + of valgrind. If this is True, use_profiler must be False. + use_profiler (bool): True if the plasma store should be started inside + a profiler. If this is True, use_valgrind must be False. + stdout_file: A file handle opened for writing to redirect stdout to. If + no redirection should happen, then this should be None. + stderr_file: A file handle opened for writing to redirect stderr to. If + no redirection should happen, then this should be None. + Return: + A tuple of the name of the plasma store socket and the process ID of + the plasma store process. + """ + if use_valgrind and use_profiler: + raise Exception("Cannot use valgrind and profiler at the same time.") + plasma_store_executable = os.path.join(pa.__path__[0], "plasma_store") + plasma_store_name = "/tmp/plasma_store{}".format(random_name()) + command = [plasma_store_executable, + "-s", plasma_store_name, + "-m", str(plasma_store_memory)] + if use_valgrind: + pid = subprocess.Popen(["valgrind", + "--track-origins=yes", + "--leak-check=full", + "--show-leak-kinds=all", + "--leak-check-heuristics=stdstring", + "--error-exitcode=1"] + command, + stdout=stdout_file, stderr=stderr_file) + time.sleep(1.0) + elif use_profiler: + pid = subprocess.Popen(["valgrind", "--tool=callgrind"] + command, + stdout=stdout_file, stderr=stderr_file) + time.sleep(1.0) + else: + pid = subprocess.Popen(command, stdout=stdout_file, stderr=stderr_file) + time.sleep(0.1) + return plasma_store_name, pid + + +@pytest.mark.plasma +class TestPlasmaClient(object): + + def setup_method(self, test_method): + import pyarrow.plasma as plasma + # Start Plasma store. + plasma_store_name, self.p = start_plasma_store( + use_valgrind=os.getenv("PLASMA_VALGRIND") == "1") + # Connect to Plasma. + self.plasma_client = plasma.PlasmaClient(plasma_store_name, "", 64) + # For the eviction test + self.plasma_client2 = plasma.PlasmaClient(plasma_store_name, "", 0) + + def teardown_method(self, test_method): + # Check that the Plasma store is still alive. + assert self.p.poll() == None + # Kill the plasma store process. + if os.getenv("PLASMA_VALGRIND") == "1": + self.p.send_signal(signal.SIGTERM) + self.p.wait() + if self.p.returncode != 0: + assert False + else: + self.p.kill() + + def test_create(self): + # Create an object id string. + object_id = random_object_id() + # Create a new buffer and write to it. + length = 50 + memory_buffer = np.frombuffer(self.plasma_client.create(object_id, + length), + dtype="uint8") + for i in range(length): + memory_buffer[i] = i % 256 + # Seal the object. + self.plasma_client.seal(object_id) + # Get the object. + memory_buffer = np.frombuffer(self.plasma_client.get([object_id])[0], + dtype="uint8") + for i in range(length): + assert memory_buffer[i] == i % 256 + + def test_create_with_metadata(self): + for length in range(1000): + # Create an object id string. + object_id = random_object_id() + # Create a random metadata string. + metadata = generate_metadata(length) + # Create a new buffer and write to it. + memory_buffer = np.frombuffer(self.plasma_client.create(object_id, + length, + metadata), + dtype="uint8") + for i in range(length): + memory_buffer[i] = i % 256 + # Seal the object. + self.plasma_client.seal(object_id) + # Get the object. + memory_buffer = np.frombuffer( + self.plasma_client.get([object_id])[0], dtype="uint8") + for i in range(length): + assert memory_buffer[i] == i % 256 + # Get the metadata. + metadata_buffer = np.frombuffer( + self.plasma_client.get_metadata([object_id])[0], dtype="uint8") + assert len(metadata) == len(metadata_buffer) + for i in range(len(metadata)): + assert metadata[i] == metadata_buffer[i] + + def test_create_existing(self): + # This test is partially used to test the code path in which we create + # an object with an ID that already exists + length = 100 + for _ in range(1000): + object_id = random_object_id() + self.plasma_client.create(object_id, length, + generate_metadata(length)) + try: + self.plasma_client.create(object_id, length, + generate_metadata(length)) + # TODO(pcm): Introduce a more specific error type here. + except pa.lib.ArrowException as e: + pass + else: + assert False + + def test_get(self): + num_object_ids = 100 + # Test timing out of get with various timeouts. + for timeout in [0, 10, 100, 1000]: + object_ids = [random_object_id() for _ in range(num_object_ids)] + results = self.plasma_client.get(object_ids, timeout_ms=timeout) + assert results == num_object_ids * [None] + + data_buffers = [] + metadata_buffers = [] + for i in range(num_object_ids): + if i % 2 == 0: + data_buffer, metadata_buffer = create_object_with_id( + self.plasma_client, object_ids[i], 2000, 2000) + data_buffers.append(data_buffer) + metadata_buffers.append(metadata_buffer) + + # Test timing out from some but not all get calls with various + # timeouts. + for timeout in [0, 10, 100, 1000]: + data_results = self.plasma_client.get(object_ids, + timeout_ms=timeout) + # metadata_results = self.plasma_client.get_metadata( + # object_ids, timeout_ms=timeout) + for i in range(num_object_ids): + if i % 2 == 0: + array1 = np.frombuffer(data_buffers[i // 2], dtype="uint8") + array2 = np.frombuffer(data_results[i], dtype="uint8") + np.testing.assert_equal(array1, array2) + # TODO(rkn): We should compare the metadata as well. But + # currently the types are different (e.g., memoryview + # versus bytearray). + # assert plasma.buffers_equal( + # metadata_buffers[i // 2], metadata_results[i]) + else: + assert results[i] is None + + def test_store_arrow_objects(self): + import pyarrow.plasma as plasma + data = np.random.randn(10, 4) + # Write an arrow object. + object_id = random_object_id() + tensor = pa.Tensor.from_numpy(data) + data_size = pa.get_tensor_size(tensor) + buf = self.plasma_client.create(object_id, data_size) + stream = pa.FixedSizeBufferOutputStream(buf) + pa.write_tensor(tensor, stream) + self.plasma_client.seal(object_id) + # Read the arrow object. + [tensor] = self.plasma_client.get([object_id]) + reader = pa.BufferReader(tensor) + array = pa.read_tensor(reader).to_numpy() + # Assert that they are equal. + np.testing.assert_equal(data, array) + + def test_store_pandas_dataframe(self): + import pyarrow.plasma as plasma + d = {'one': pd.Series([1., 2., 3.], index=['a', 'b', 'c']), + 'two': pd.Series([1., 2., 3., 4.], index=['a', 'b', 'c', 'd'])} + df = pd.DataFrame(d) + + # Write the DataFrame. + record_batch = pa.RecordBatch.from_pandas(df) + # Determine the size. + s = pa.MockOutputStream() + stream_writer = pa.RecordBatchStreamWriter(s, record_batch.schema) + stream_writer.write_batch(record_batch) + data_size = s.size() + object_id = plasma.ObjectID(np.random.bytes(20)) + + buf = self.plasma_client.create(object_id, data_size) + stream = pa.FixedSizeBufferOutputStream(buf) + stream_writer = pa.RecordBatchStreamWriter(stream, record_batch.schema) + stream_writer.write_batch(record_batch) + + self.plasma_client.seal(object_id) + + # Read the DataFrame. + [data] = self.plasma_client.get([object_id]) + reader = pa.RecordBatchStreamReader(pa.BufferReader(data)) + result = reader.get_next_batch().to_pandas() + + pd.util.testing.assert_frame_equal(df, result) + + def test_pickle_object_ids(self): + # This can be used for sharing object IDs between processes. + import pickle + object_id = random_object_id() + data = pickle.dumps(object_id) + object_id2 = pickle.loads(data) + assert object_id == object_id2 + + def test_store_full(self): + # The store is started with 1GB, so make sure that create throws an + # exception when it is full. + def assert_create_raises_plasma_full(unit_test, size): + partial_size = np.random.randint(size) + try: + _, memory_buffer, _ = create_object(unit_test.plasma_client, + partial_size, + size - partial_size) + # TODO(pcm): More specific error here. + except pa.lib.ArrowException as e: + pass + else: + # For some reason the above didn't throw an exception, so fail. + assert False + + # Create a list to keep some of the buffers in scope. + memory_buffers = [] + _, memory_buffer, _ = create_object(self.plasma_client, 5 * 10 ** 8, 0) + memory_buffers.append(memory_buffer) + # Remaining space is 5 * 10 ** 8. Make sure that we can't create an + # object of size 5 * 10 ** 8 + 1, but we can create one of size + # 2 * 10 ** 8. + assert_create_raises_plasma_full(self, 5 * 10 ** 8 + 1) + _, memory_buffer, _ = create_object(self.plasma_client, 2 * 10 ** 8, 0) + del memory_buffer + _, memory_buffer, _ = create_object(self.plasma_client, 2 * 10 ** 8, 0) + del memory_buffer + assert_create_raises_plasma_full(self, 5 * 10 ** 8 + 1) + + _, memory_buffer, _ = create_object(self.plasma_client, 2 * 10 ** 8, 0) + memory_buffers.append(memory_buffer) + # Remaining space is 3 * 10 ** 8. + assert_create_raises_plasma_full(self, 3 * 10 ** 8 + 1) + + _, memory_buffer, _ = create_object(self.plasma_client, 10 ** 8, 0) + memory_buffers.append(memory_buffer) + # Remaining space is 2 * 10 ** 8. + assert_create_raises_plasma_full(self, 2 * 10 ** 8 + 1) + + def test_contains(self): + fake_object_ids = [random_object_id() for _ in range(100)] + real_object_ids = [random_object_id() for _ in range(100)] + for object_id in real_object_ids: + assert self.plasma_client.contains(object_id) == False + self.plasma_client.create(object_id, 100) + self.plasma_client.seal(object_id) + assert self.plasma_client.contains(object_id) + for object_id in fake_object_ids: + assert not self.plasma_client.contains(object_id) + for object_id in real_object_ids: + assert self.plasma_client.contains(object_id) + + def test_hash(self): + # Check the hash of an object that doesn't exist. + object_id1 = random_object_id() + try: + self.plasma_client.hash(object_id1) + # TODO(pcm): Introduce a more specific error type here + except pa.lib.ArrowException as e: + pass + else: + assert False + + length = 1000 + # Create a random object, and check that the hash function always + # returns the same value. + metadata = generate_metadata(length) + memory_buffer = np.frombuffer(self.plasma_client.create(object_id1, + length, + metadata), + dtype="uint8") + for i in range(length): + memory_buffer[i] = i % 256 + self.plasma_client.seal(object_id1) + assert (self.plasma_client.hash(object_id1) == + self.plasma_client.hash(object_id1)) + + # Create a second object with the same value as the first, and check + # that their hashes are equal. + object_id2 = random_object_id() + memory_buffer = np.frombuffer(self.plasma_client.create(object_id2, + length, + metadata), + dtype="uint8") + for i in range(length): + memory_buffer[i] = i % 256 + self.plasma_client.seal(object_id2) + assert (self.plasma_client.hash(object_id1) == + self.plasma_client.hash(object_id2)) + + # Create a third object with a different value from the first two, and + # check that its hash is different. + object_id3 = random_object_id() + metadata = generate_metadata(length) + memory_buffer = np.frombuffer(self.plasma_client.create(object_id3, + length, + metadata), + dtype="uint8") + for i in range(length): + memory_buffer[i] = (i + 1) % 256 + self.plasma_client.seal(object_id3) + assert (self.plasma_client.hash(object_id1) != + self.plasma_client.hash(object_id3)) + + # Create a fourth object with the same value as the third, but + # different metadata. Check that its hash is different from any of the + # previous three. + object_id4 = random_object_id() + metadata4 = generate_metadata(length) + memory_buffer = np.frombuffer(self.plasma_client.create(object_id4, + length, + metadata4), + dtype="uint8") + for i in range(length): + memory_buffer[i] = (i + 1) % 256 + self.plasma_client.seal(object_id4) + assert (self.plasma_client.hash(object_id1) != + self.plasma_client.hash(object_id4)) + assert (self.plasma_client.hash(object_id3) != + self.plasma_client.hash(object_id4)) + + def test_many_hashes(self): + hashes = [] + length = 2 ** 10 + + for i in range(256): + object_id = random_object_id() + memory_buffer = np.frombuffer(self.plasma_client.create(object_id, + length), + dtype="uint8") + for j in range(length): + memory_buffer[j] = i + self.plasma_client.seal(object_id) + hashes.append(self.plasma_client.hash(object_id)) + + # Create objects of varying length. Each pair has two bits different. + for i in range(length): + object_id = random_object_id() + memory_buffer = np.frombuffer(self.plasma_client.create(object_id, + length), + dtype="uint8") + for j in range(length): + memory_buffer[j] = 0 + memory_buffer[i] = 1 + self.plasma_client.seal(object_id) + hashes.append(self.plasma_client.hash(object_id)) + + # Create objects of varying length, all with value 0. + for i in range(length): + object_id = random_object_id() + memory_buffer = np.frombuffer(self.plasma_client.create(object_id, + i), + dtype="uint8") + for j in range(i): + memory_buffer[j] = 0 + self.plasma_client.seal(object_id) + hashes.append(self.plasma_client.hash(object_id)) + + # Check that all hashes were unique. + assert len(set(hashes)) == 256 + length + length + + # def test_individual_delete(self): + # length = 100 + # # Create an object id string. + # object_id = random_object_id() + # # Create a random metadata string. + # metadata = generate_metadata(100) + # # Create a new buffer and write to it. + # memory_buffer = self.plasma_client.create(object_id, length, + # metadata) + # for i in range(length): + # memory_buffer[i] = chr(i % 256) + # # Seal the object. + # self.plasma_client.seal(object_id) + # # Check that the object is present. + # assert self.plasma_client.contains(object_id) + # # Delete the object. + # self.plasma_client.delete(object_id) + # # Make sure the object is no longer present. + # self.assertFalse(self.plasma_client.contains(object_id)) + # + # def test_delete(self): + # # Create some objects. + # object_ids = [random_object_id() for _ in range(100)] + # for object_id in object_ids: + # length = 100 + # # Create a random metadata string. + # metadata = generate_metadata(100) + # # Create a new buffer and write to it. + # memory_buffer = self.plasma_client.create(object_id, length, + # metadata) + # for i in range(length): + # memory_buffer[i] = chr(i % 256) + # # Seal the object. + # self.plasma_client.seal(object_id) + # # Check that the object is present. + # assert self.plasma_client.contains(object_id) + # + # # Delete the objects and make sure they are no longer present. + # for object_id in object_ids: + # # Delete the object. + # self.plasma_client.delete(object_id) + # # Make sure the object is no longer present. + # self.assertFalse(self.plasma_client.contains(object_id)) + + def test_illegal_functionality(self): + # Create an object id string. + object_id = random_object_id() + # Create a new buffer and write to it. + length = 1000 + memory_buffer = self.plasma_client.create(object_id, length) + # Make sure we cannot access memory out of bounds. + with pytest.raises(Exception): + memory_buffer[length] + # Seal the object. + self.plasma_client.seal(object_id) + # This test is commented out because it currently fails. + # # Make sure the object is ready only now. + # def illegal_assignment(): + # memory_buffer[0] = chr(0) + # with pytest.raises(Exception): + # illegal_assignment() + # Get the object. + memory_buffer = self.plasma_client.get([object_id])[0] + + # Make sure the object is read only. + def illegal_assignment(): + memory_buffer[0] = chr(0) + with pytest.raises(Exception): + illegal_assignment() + + def test_evict(self): + client = self.plasma_client2 + object_id1 = random_object_id() + b1 = client.create(object_id1, 1000) + client.seal(object_id1) + del b1 + assert client.evict(1) == 1000 + + object_id2 = random_object_id() + object_id3 = random_object_id() + b2 = client.create(object_id2, 999) + b3 = client.create(object_id3, 998) + client.seal(object_id3) + del b3 + assert client.evict(1000) == 998 + + object_id4 = random_object_id() + b4 = client.create(object_id4, 997) + client.seal(object_id4) + del b4 + client.seal(object_id2) + del b2 + assert client.evict(1) == 997 + assert client.evict(1) == 999 + + object_id5 = random_object_id() + object_id6 = random_object_id() + object_id7 = random_object_id() + b5 = client.create(object_id5, 996) + b6 = client.create(object_id6, 995) + b7 = client.create(object_id7, 994) + client.seal(object_id5) + client.seal(object_id6) + client.seal(object_id7) + del b5 + del b6 + del b7 + assert client.evict(2000) == 996 + 995 + 994 + + def test_subscribe(self): + # Subscribe to notifications from the Plasma Store. + self.plasma_client.subscribe() + for i in [1, 10, 100, 1000, 10000, 100000]: + object_ids = [random_object_id() for _ in range(i)] + metadata_sizes = [np.random.randint(1000) for _ in range(i)] + data_sizes = [np.random.randint(1000) for _ in range(i)] + for j in range(i): + self.plasma_client.create( + object_ids[j], data_sizes[j], + metadata=bytearray(np.random.bytes(metadata_sizes[j]))) + self.plasma_client.seal(object_ids[j]) + # Check that we received notifications for all of the objects. + for j in range(i): + notification_info = self.plasma_client.get_next_notification() + recv_objid, recv_dsize, recv_msize = notification_info + assert object_ids[j] == recv_objid + assert data_sizes[j] == recv_dsize + assert metadata_sizes[j] == recv_msize + + def test_subscribe_deletions(self): + # Subscribe to notifications from the Plasma Store. We use + # plasma_client2 to make sure that all used objects will get evicted + # properly. + self.plasma_client2.subscribe() + for i in [1, 10, 100, 1000, 10000, 100000]: + object_ids = [random_object_id() for _ in range(i)] + # Add 1 to the sizes to make sure we have nonzero object sizes. + metadata_sizes = [np.random.randint(1000) + 1 for _ in range(i)] + data_sizes = [np.random.randint(1000) + 1 for _ in range(i)] + for j in range(i): + x = self.plasma_client2.create( + object_ids[j], data_sizes[j], + metadata=bytearray(np.random.bytes(metadata_sizes[j]))) + self.plasma_client2.seal(object_ids[j]) + del x + # Check that we received notifications for creating all of the + # objects. + for j in range(i): + notification_info = self.plasma_client2.get_next_notification() + recv_objid, recv_dsize, recv_msize = notification_info + assert object_ids[j] == recv_objid + assert data_sizes[j] == recv_dsize + assert metadata_sizes[j] == recv_msize + + # Check that we receive notifications for deleting all objects, as + # we evict them. + for j in range(i): + assert (self.plasma_client2.evict(1) == + data_sizes[j] + metadata_sizes[j]) + notification_info = self.plasma_client2.get_next_notification() + recv_objid, recv_dsize, recv_msize = notification_info + assert object_ids[j] == recv_objid + assert -1 == recv_dsize + assert -1 == recv_msize + + # Test multiple deletion notifications. The first 9 object IDs have + # size 0, and the last has a nonzero size. When Plasma evicts 1 byte, + # it will evict all objects, so we should receive deletion + # notifications for each. + num_object_ids = 10 + object_ids = [random_object_id() for _ in range(num_object_ids)] + metadata_sizes = [0] * (num_object_ids - 1) + data_sizes = [0] * (num_object_ids - 1) + metadata_sizes.append(np.random.randint(1000)) + data_sizes.append(np.random.randint(1000)) + for i in range(num_object_ids): + x = self.plasma_client2.create( + object_ids[i], data_sizes[i], + metadata=bytearray(np.random.bytes(metadata_sizes[i]))) + self.plasma_client2.seal(object_ids[i]) + del x + for i in range(num_object_ids): + notification_info = self.plasma_client2.get_next_notification() + recv_objid, recv_dsize, recv_msize = notification_info + assert object_ids[i] == recv_objid + assert data_sizes[i] == recv_dsize + assert metadata_sizes[i] == recv_msize + assert (self.plasma_client2.evict(1) == + data_sizes[-1] + metadata_sizes[-1]) + for i in range(num_object_ids): + notification_info = self.plasma_client2.get_next_notification() + recv_objid, recv_dsize, recv_msize = notification_info + assert object_ids[i] == recv_objid + assert -1 == recv_dsize + assert -1 == recv_msize diff --git a/python/setup.py b/python/setup.py index 1ea57ae2d85..7425b719160 100644 --- a/python/setup.py +++ b/python/setup.py @@ -99,6 +99,10 @@ def initialize_options(self): self.with_parquet = strtobool( os.environ.get('PYARROW_WITH_PARQUET', '0')) + self.with_plasma = strtobool( + os.environ.get('PYARROW_WITH_PLASMA', '0')) + if self.with_plasma and "plasma" not in self.CYTHON_MODULE_NAMES: + self.CYTHON_MODULE_NAMES.append("plasma") self.bundle_arrow_cpp = strtobool( os.environ.get('PYARROW_BUNDLE_ARROW_CPP', '0')) @@ -242,6 +246,8 @@ def move_lib(lib_name): shutil.move(pjoin(build_prefix, 'include'), pjoin(build_lib, 'pyarrow')) move_lib("arrow") move_lib("arrow_python") + if self.with_plasma: + move_lib("plasma") if self.with_parquet: move_lib("parquet") @@ -270,11 +276,20 @@ def move_lib(lib_name): shutil.move(self.get_ext_built_api_header(name), pjoin(os.path.dirname(ext_path), name + '_api.h')) + # Move the plasma store + if self.with_plasma: + build_py = self.get_finalized_command('build_py') + source = os.path.join(self.build_type, "plasma_store") + target = os.path.join(build_lib, build_py.get_package_dir('pyarrow'), "plasma_store") + shutil.move(source, target) + os.chdir(saved_cwd) def _failure_permitted(self, name): if name == '_parquet' and not self.with_parquet: return True + if name == 'plasma' and not self.with_plasma: + return True return False def _get_inplace_dir(self):