Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
9044a01
initial plasma cython client commit
pcmoritz Jun 24, 2017
5178ee7
update
pcmoritz Jun 25, 2017
d590c8a
update
pcmoritz Jun 25, 2017
f8e05f2
implement plasma.get in the cython client
pcmoritz Jun 26, 2017
bc681ca
port some python tests
pcmoritz Jun 26, 2017
18e0ac4
get tests
pcmoritz Jun 26, 2017
78d08ac
make eviction work in Cython
pcmoritz Jun 27, 2017
db2d09a
get all python tests in place
pcmoritz Jun 28, 2017
d9261b4
add documentation and license
pcmoritz Jun 28, 2017
c3d462d
remove Python C extension
pcmoritz Jun 28, 2017
187cc24
add travis tests
pcmoritz Jun 29, 2017
1c5434c
fix formatting
pcmoritz Jun 29, 2017
5bf722a
fix plasma path
pcmoritz Jun 29, 2017
bf39297
build and install pyarrow for plasma tests
pcmoritz Jun 29, 2017
3c4de52
use cmake to build the cython extension
pcmoritz Jun 29, 2017
1aea320
run plasma tests
pcmoritz Jun 29, 2017
066d0ea
test
pcmoritz Jun 29, 2017
cba92c1
setup.py for plasma
pcmoritz Jun 30, 2017
d4934a9
update
pcmoritz Jun 30, 2017
2ff2480
workaround for python visibility
pcmoritz Jun 30, 2017
f970df3
reduce logging
pcmoritz Jun 30, 2017
924888b
update
pcmoritz Jun 30, 2017
a4a9628
fix c++ tests
pcmoritz Jun 30, 2017
777e9c7
introduce plasma namespace
pcmoritz Jun 30, 2017
dd5a7d8
fix tests
pcmoritz Jun 30, 2017
3021d59
make ObjectID pickleable
pcmoritz Jul 1, 2017
6371e2e
fix tests
pcmoritz Jul 1, 2017
1d7928f
add arrow roundtrip test
pcmoritz Jul 1, 2017
c06f1b5
fix test
pcmoritz Jul 1, 2017
2b7f949
implement mutable arrow python buffers
pcmoritz Jul 7, 2017
acc71d2
add round trip test for dataframes
pcmoritz Jul 7, 2017
e26527c
Convert plasma test.py from 2 space indentation to 4 space indentation.
robertnishihara Jul 8, 2017
67b0951
Fix long lines in plasma/test/test.py.
robertnishihara Jul 8, 2017
c9f6bcf
Fix indentation and line lengths in plasma.pyx.
robertnishihara Jul 8, 2017
3b69973
Fixed minor python linting.
robertnishihara Jul 8, 2017
fd80203
Plasma Python extension packaging: It compiles!
pcmoritz Jul 11, 2017
4ae1a27
fix
pcmoritz Jul 13, 2017
348f9bf
fixes
pcmoritz Jul 13, 2017
1ff88e7
fix travix ci
pcmoritz Jul 13, 2017
a9f6502
more fixes
pcmoritz Jul 14, 2017
44d1a55
cleanups and release GIL
pcmoritz Jul 14, 2017
3270628
try to get documentation up
pcmoritz Jul 14, 2017
2c6d652
convert docs to numpy format
pcmoritz Jul 14, 2017
54f595e
try fixing python 2 tests
pcmoritz Jul 15, 2017
8b53618
fix
pcmoritz Jul 15, 2017
45f338f
test plasma on macOS
pcmoritz Jul 15, 2017
9bc5c15
implement wait and fetch for the client
pcmoritz Jul 16, 2017
ed84c53
partial fixes
pcmoritz Jul 16, 2017
47033e7
switch to pytest
pcmoritz Jul 17, 2017
47dc739
fixes
pcmoritz Jul 17, 2017
997de1e
fix
pcmoritz Jul 17, 2017
b863d13
fix
pcmoritz Jul 17, 2017
23fe5f5
make plasma store binary part of the pyarrow package for tests
pcmoritz Jul 17, 2017
0bea267
debug
pcmoritz Jul 17, 2017
3e4a84d
fix segfault
pcmoritz Jul 19, 2017
b9e2dee
fix windows build
pcmoritz Jul 19, 2017
5f7b779
changes needed to make Ray work with Plasma in Arrow
pcmoritz Jul 22, 2017
e33443d
fix setup.py develop for plasma
pcmoritz Jul 24, 2017
08f24a5
fix typos and move FixedSizeBufferOutputStream
pcmoritz Jul 24, 2017
d14ab87
get rid of MutableBuffer
pcmoritz Jul 24, 2017
d8319fc
get for of PlasmaClient.connect
pcmoritz Jul 24, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,27 @@ matrix:
- $TRAVIS_BUILD_DIR/ci/travis_before_script_c_glib.sh
script:
- $TRAVIS_BUILD_DIR/ci/travis_script_c_glib.sh
- compiler: gcc
language: cpp
os: linux
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little concerned about not also testing this on OSX, but I agree it might be too burdensome on Travis.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added tests for mac os now, if it becomes too burdensome, we can switch it off

group: deprecated
before_script:
- export CC="gcc-4.9"
- export CXX="g++-4.9"
- $TRAVIS_BUILD_DIR/ci/travis_before_script_cpp.sh
script:
- $TRAVIS_BUILD_DIR/ci/travis_script_cpp.sh
- $TRAVIS_BUILD_DIR/ci/travis_script_plasma.sh
- compiler: clang
osx_image: xcode6.4
os: osx
cache:
addons:
before_script:
- $TRAVIS_BUILD_DIR/ci/travis_before_script_cpp.sh
script:
- $TRAVIS_BUILD_DIR/ci/travis_script_cpp.sh
- $TRAVIS_BUILD_DIR/ci/travis_script_plasma.sh

before_install:
- ulimit -c unlimited -S
Expand Down
2 changes: 1 addition & 1 deletion ci/travis_script_manylinux.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@ set -ex
pushd python/manylinux1
git clone ../../ arrow
docker build -t arrow-base-x86_64 -f Dockerfile-x86_64 .
docker run --rm -e PYARROW_PARALLEL=3 -v $PWD:/io arrow-base-x86_64 /io/build_arrow.sh
docker run --shm-size=2g --rm -e PYARROW_PARALLEL=3 -v $PWD:/io arrow-base-x86_64 /io/build_arrow.sh
97 changes: 97 additions & 0 deletions ci/travis_script_plasma.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
#!/usr/bin/env bash

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License. See accompanying LICENSE file.

set -e

source $TRAVIS_BUILD_DIR/ci/travis_env_common.sh

export ARROW_HOME=$ARROW_CPP_INSTALL
export PYARROW_WITH_PLASMA=1

pushd $ARROW_PYTHON_DIR

function build_arrow_libraries() {
CPP_BUILD_DIR=$1
CPP_DIR=$TRAVIS_BUILD_DIR/cpp

mkdir $CPP_BUILD_DIR
pushd $CPP_BUILD_DIR

cmake -DARROW_BUILD_TESTS=off \
-DARROW_PYTHON=on \
-DARROW_PLASMA=on \
-DCMAKE_INSTALL_PREFIX=$2 \
$CPP_DIR

make -j4
make install

popd
}

python_version_tests() {
PYTHON_VERSION=$1
CONDA_ENV_DIR=$TRAVIS_BUILD_DIR/pyarrow-test-$PYTHON_VERSION

export ARROW_HOME=$TRAVIS_BUILD_DIR/arrow-install-$PYTHON_VERSION
export LD_LIBRARY_PATH=$ARROW_HOME/lib:$PARQUET_HOME/lib

conda create -y -q -p $CONDA_ENV_DIR python=$PYTHON_VERSION cmake curl
source activate $CONDA_ENV_DIR

python --version
which python

# faster builds, please
conda install -y -q nomkl

# Expensive dependencies install from Continuum package repo
conda install -y -q pip numpy pandas cython

# Build C++ libraries
build_arrow_libraries arrow-build-$PYTHON_VERSION $ARROW_HOME

# Other stuff pip install
pip install -r requirements.txt

python setup.py build_ext --inplace

python -m pytest -vv -r sxX pyarrow

# Build documentation once
if [[ "$PYTHON_VERSION" == "3.6" ]]
then
conda install -y -q --file=doc/requirements.txt
python setup.py build_sphinx -s doc/source
fi

# Build and install pyarrow
pushd $TRAVIS_BUILD_DIR/python
python setup.py install
popd

# Run Plasma tests
pushd $TRAVIS_BUILD_DIR/python
python -m pytest pyarrow/tests/test_plasma.py
if [ $TRAVIS_OS_NAME == "linux" ]; then
PLASMA_VALGRIND=1 python -m pytest pyarrow/tests/test_plasma.py
fi
popd
}

# run tests for python 2.7 and 3.6
python_version_tests 2.7
python_version_tests 3.6

popd
4 changes: 2 additions & 2 deletions ci/travis_script_python.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ set -e
source $TRAVIS_BUILD_DIR/ci/travis_env_common.sh

export ARROW_HOME=$ARROW_CPP_INSTALL
export PYARROW_WITH_PLASMA=1

pushd $ARROW_PYTHON_DIR
export PARQUET_HOME=$TRAVIS_BUILD_DIR/parquet-env
Expand Down Expand Up @@ -71,9 +72,8 @@ function build_arrow_libraries() {
pushd $CPP_BUILD_DIR

cmake -DARROW_BUILD_TESTS=off \
-DARROW_PYTHON=on \
-DPLASMA_PYTHON=on \
-DARROW_PLASMA=on \
-DARROW_PYTHON=on \
-DCMAKE_INSTALL_PREFIX=$2 \
$CPP_DIR

Expand Down
6 changes: 4 additions & 2 deletions cpp/src/arrow/util/logging.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,10 @@ class CerrLog {

template <class T>
CerrLog& operator<<(const T& t) {
has_logged_ = true;
std::cerr << t;
if (severity_ != ARROW_DEBUG) {
has_logged_ = true;
std::cerr << t;
}
return *this;
}

Expand Down
51 changes: 31 additions & 20 deletions cpp/src/plasma/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,13 @@ cmake_minimum_required(VERSION 2.8)

project(plasma)

set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_SOURCE_DIR}/../python/cmake_modules")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could also move these cmake_modules to the top-level so that they are available to any arrow-cpp component.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think that's where they belong. Shall we do this as a separate PR to make it a little more visible? It might break other people's scripts.


find_package(PythonLibsNew REQUIRED)
find_package(Threads)

option(PLASMA_PYTHON
"Build the Plasma Python extensions"
OFF)

if(APPLE)
SET(CMAKE_SHARED_LIBRARY_SUFFIX ".so")
endif(APPLE)
set(PLASMA_SO_VERSION "0")
set(PLASMA_ABI_VERSION "${PLASMA_SO_VERSION}.0.0")

include_directories(SYSTEM ${PYTHON_INCLUDE_DIRS})
include_directories("${FLATBUFFERS_INCLUDE_DIR}" "${CMAKE_CURRENT_LIST_DIR}/" "${CMAKE_CURRENT_LIST_DIR}/thirdparty/" "${CMAKE_CURRENT_LIST_DIR}/../")
Expand All @@ -40,7 +37,7 @@ set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wno-conversion")
# Compile flatbuffers

set(PLASMA_FBS_SRC "${CMAKE_CURRENT_LIST_DIR}/format/plasma.fbs" "${CMAKE_CURRENT_LIST_DIR}/format/common.fbs")
set(OUTPUT_DIR ${CMAKE_CURRENT_LIST_DIR}/format/)
set(OUTPUT_DIR ${CMAKE_CURRENT_LIST_DIR}/)

set(PLASMA_FBS_OUTPUT_FILES
"${OUTPUT_DIR}/common_generated.h"
Expand Down Expand Up @@ -69,8 +66,6 @@ endif()

set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fPIC")

set_source_files_properties(extension.cc PROPERTIES COMPILE_FLAGS -Wno-strict-aliasing)

set(PLASMA_SRCS
client.cc
common.cc
Expand All @@ -97,17 +92,33 @@ set_source_files_properties(malloc.cc PROPERTIES COMPILE_FLAGS "-Wno-error -O3")
add_executable(plasma_store store.cc)
target_link_libraries(plasma_store plasma_static)

# Headers: top level
install(FILES
common.h
common_generated.h
client.h
events.h
plasma.h
plasma_generated.h
protocol.h
DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/plasma")

# Plasma store
install(TARGETS plasma_store DESTINATION ${CMAKE_INSTALL_BINDIR})

# pkg-config support
configure_file(plasma.pc.in
"${CMAKE_CURRENT_BINARY_DIR}/plasma.pc"
@ONLY)
install(
FILES "${CMAKE_CURRENT_BINARY_DIR}/plasma.pc"
DESTINATION "${CMAKE_INSTALL_LIBDIR}/pkgconfig/")

#######################################
# Unit tests
#######################################

ADD_ARROW_TEST(test/serialization_tests)
ARROW_TEST_LINK_LIBRARIES(test/serialization_tests plasma_static)
ADD_ARROW_TEST(test/client_tests)
ARROW_TEST_LINK_LIBRARIES(test/client_tests plasma_static)

if(PLASMA_PYTHON)
add_library(plasma_extension SHARED extension.cc)

if(APPLE)
target_link_libraries(plasma_extension plasma_static "-undefined dynamic_lookup")
else(APPLE)
target_link_libraries(plasma_extension plasma_static -Wl,--whole-archive ${FLATBUFFERS_STATIC_LIB} -Wl,--no-whole-archive)
endif(APPLE)
endif()
86 changes: 68 additions & 18 deletions cpp/src/plasma/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,31 @@

#define XXH64_DEFAULT_SEED 0

namespace plasma {

// Number of threads used for memcopy and hash computations.
constexpr int64_t kThreadPoolSize = 8;
constexpr int64_t kBytesInMB = 1 << 20;
static std::vector<std::thread> threadpool_(kThreadPoolSize);

struct ObjectInUseEntry {
/// A count of the number of times this client has called PlasmaClient::Create
/// or
/// PlasmaClient::Get on this object ID minus the number of calls to
/// PlasmaClient::Release.
/// When this count reaches zero, we remove the entry from the ObjectsInUse
/// and decrement a count in the relevant ClientMmapTableEntry.
int count;
/// Cached information to read the object.
PlasmaObject object;
/// A flag representing whether the object has been sealed.
bool is_sealed;
};

PlasmaClient::PlasmaClient() {}

PlasmaClient::~PlasmaClient() {}

// If the file descriptor fd has been mmapped in this client process before,
// return the pointer that was returned by mmap, otherwise mmap it and store the
// pointer in a hash table.
Expand Down Expand Up @@ -300,6 +320,10 @@ Status PlasmaClient::PerformRelease(const ObjectID& object_id) {
}

Status PlasmaClient::Release(const ObjectID& object_id) {
// If the client is already disconnected, ignore release requests.
if (store_conn_ < 0) {
return Status::OK();
}
// Add the new object to the release history.
release_history_.push_front(object_id);
// If there are too many bytes in use by the client or if there are too many
Expand Down Expand Up @@ -386,22 +410,6 @@ static uint64_t compute_object_hash(const ObjectBuffer& obj_buffer) {
return XXH64_digest(&hash_state);
}

bool plasma_compute_object_hash(
PlasmaClient* conn, ObjectID object_id, unsigned char* digest) {
// Get the plasma object data. We pass in a timeout of 0 to indicate that
// the operation should timeout immediately.
ObjectBuffer object_buffer;
ARROW_CHECK_OK(conn->Get(&object_id, 1, 0, &object_buffer));
// If the object was not retrieved, return false.
if (object_buffer.data_size == -1) { return false; }
// Compute the hash.
uint64_t hash = compute_object_hash(object_buffer);
memcpy(digest, &hash, sizeof(hash));
// Release the plasma object.
ARROW_CHECK_OK(conn->Release(object_id));
return true;
}

Status PlasmaClient::Seal(const ObjectID& object_id) {
// Make sure this client has a reference to the object before sending the
// request to Plasma.
Expand All @@ -413,7 +421,7 @@ Status PlasmaClient::Seal(const ObjectID& object_id) {
object_entry->second->is_sealed = true;
/// Send the seal request to Plasma.
static unsigned char digest[kDigestSize];
ARROW_CHECK(plasma_compute_object_hash(this, object_id, &digest[0]));
RETURN_NOT_OK(Hash(object_id, &digest[0]));
RETURN_NOT_OK(SendSealRequest(store_conn_, object_id, &digest[0]));
// We call PlasmaClient::Release to decrement the number of instances of this
// object
Expand All @@ -439,6 +447,22 @@ Status PlasmaClient::Evict(int64_t num_bytes, int64_t& num_bytes_evicted) {
return ReadEvictReply(buffer.data(), num_bytes_evicted);
}

Status PlasmaClient::Hash(const ObjectID& object_id, uint8_t* digest) {
// Get the plasma object data. We pass in a timeout of 0 to indicate that
// the operation should timeout immediately.
ObjectBuffer object_buffer;
RETURN_NOT_OK(Get(&object_id, 1, 0, &object_buffer));
// If the object was not retrieved, return false.
if (object_buffer.data_size == -1) {
return Status::PlasmaObjectNonexistent("Object not found");
}
// Compute the hash.
uint64_t hash = compute_object_hash(object_buffer);
memcpy(digest, &hash, sizeof(hash));
// Release the plasma object.
return Release(object_id);
}

Status PlasmaClient::Subscribe(int* fd) {
int sock[2];
// Create a non-blocking socket pair. This will only be used to send
Expand All @@ -459,6 +483,26 @@ Status PlasmaClient::Subscribe(int* fd) {
return Status::OK();
}

Status PlasmaClient::GetNotification(
int fd, ObjectID* object_id, int64_t* data_size, int64_t* metadata_size) {
uint8_t* notification = read_message_async(fd);
if (notification == NULL) {
return Status::IOError("Failed to read object notification from Plasma socket");
}
auto object_info = flatbuffers::GetRoot<ObjectInfo>(notification);
ARROW_CHECK(object_info->object_id()->size() == sizeof(ObjectID));
memcpy(object_id, object_info->object_id()->data(), sizeof(ObjectID));
if (object_info->is_deletion()) {
*data_size = -1;
*metadata_size = -1;
} else {
*data_size = object_info->data_size();
*metadata_size = object_info->metadata_size();
}
delete[] notification;
return Status::OK();
}

Status PlasmaClient::Connect(const std::string& store_socket_name,
const std::string& manager_socket_name, int release_delay) {
store_conn_ = connect_ipc_sock_retry(store_socket_name, -1, -1);
Expand All @@ -485,7 +529,11 @@ Status PlasmaClient::Disconnect() {
// Close the connections to Plasma. The Plasma store will release the objects
// that were in use by us when handling the SIGPIPE.
close(store_conn_);
if (manager_conn_ >= 0) { close(manager_conn_); }
store_conn_ = -1;
if (manager_conn_ >= 0) {
close(manager_conn_);
manager_conn_ = -1;
}
return Status::OK();
}

Expand Down Expand Up @@ -555,3 +603,5 @@ Status PlasmaClient::Wait(int64_t num_object_requests, ObjectRequest* object_req
}
return Status::OK();
}

} // namespace plasma
Loading