From ba2aea8d9c8a854b74583eac09d230cf63563268 Mon Sep 17 00:00:00 2001 From: Yuhong Guo Date: Wed, 12 Sep 2018 17:59:45 +0800 Subject: [PATCH 1/4] Fix Plasma Crash when capacity is smaller than total memory. --- cpp/src/plasma/eviction_policy.cc | 3 +++ cpp/src/plasma/eviction_policy.h | 2 +- cpp/src/plasma/store.cc | 12 ++++++++++-- 3 files changed, 14 insertions(+), 3 deletions(-) diff --git a/cpp/src/plasma/eviction_policy.cc b/cpp/src/plasma/eviction_policy.cc index e5beb5a579e..c0ae6a5eb1f 100644 --- a/cpp/src/plasma/eviction_policy.cc +++ b/cpp/src/plasma/eviction_policy.cc @@ -72,6 +72,9 @@ void EvictionPolicy::ObjectCreated(const ObjectID& object_id) { memory_used_ += size; ARROW_CHECK(memory_used_ <= store_info_->memory_capacity); } +bool EvictionPolicy::IsCapableOf(int64_t need_size) { + return memory_used_ + need_size <= store_info_->memory_capacity; +} bool EvictionPolicy::RequireSpace(int64_t size, std::vector* objects_to_evict) { // Check if there is enough space to create the object. diff --git a/cpp/src/plasma/eviction_policy.h b/cpp/src/plasma/eviction_policy.h index bbd3fc43203..7a684437b01 100644 --- a/cpp/src/plasma/eviction_policy.h +++ b/cpp/src/plasma/eviction_policy.h @@ -124,7 +124,7 @@ class EvictionPolicy { /// /// @param object_id The ID of the object that is now being used. void RemoveObject(const ObjectID& object_id); - + bool IsCapableOf(int64_t need_size); private: /// The amount of memory (in bytes) currently being used. int64_t memory_used_; diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc index 58f8d0193ce..788da7558f7 100644 --- a/cpp/src/plasma/store.cc +++ b/cpp/src/plasma/store.cc @@ -175,14 +175,19 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, int64_t data_si // it is not guaranteed that the corresponding pointer in the client will be // 64-byte aligned, but in practice it often will be. if (device_num == 0) { - pointer = - reinterpret_cast(dlmemalign(kBlockSize, data_size + metadata_size)); if (pointer == nullptr) { + pointer = + reinterpret_cast(dlmemalign(kBlockSize, data_size + metadata_size)); + } + if (pointer == nullptr || !eviction_policy_.IsCapableOf(data_size + metadata_size)) { + ARROW_LOG(WARNING) << "dlmemalign returned with nullptr."; // Tell the eviction policy how much space we need to create this object. std::vector objects_to_evict; bool success = eviction_policy_.RequireSpace(data_size + metadata_size, &objects_to_evict); DeleteObjects(objects_to_evict); + ARROW_LOG(WARNING) << "Calling Delete Objects."; + // Return an error to the client if not enough space could be freed to // create the object. if (!success) { @@ -217,6 +222,8 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, int64_t data_si entry->device_num = device_num; entry->create_time = std::time(nullptr); entry->construct_duration = -1; + + ARROW_LOG(WARNING) << "Created the entry."; #ifdef PLASMA_GPU if (device_num != 0) { DCHECK_OK(gpu_handle->ExportForIpc(&entry->ipc_handle)); @@ -230,6 +237,7 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, int64_t data_si result->data_size = data_size; result->metadata_size = metadata_size; result->device_num = device_num; + ARROW_LOG(WARNING) << "Put object to store_info_ and call ObjectCreated."; // Notify the eviction policy that this object was created. This must be done // immediately before the call to AddToClientObjectIds so that the // eviction policy does not have an opportunity to evict the object. From 3ad70b797e44648872eae2ecebae3002b028c0e7 Mon Sep 17 00:00:00 2001 From: Yuhong Guo Date: Wed, 12 Sep 2018 21:52:59 +0800 Subject: [PATCH 2/4] Add small memory test. --- cpp/src/plasma/test/client_tests.cc | 39 ++++++++++++++++++++++++++--- 1 file changed, 35 insertions(+), 4 deletions(-) diff --git a/cpp/src/plasma/test/client_tests.cc b/cpp/src/plasma/test/client_tests.cc index 1ad60396af9..63e64580c41 100644 --- a/cpp/src/plasma/test/client_tests.cc +++ b/cpp/src/plasma/test/client_tests.cc @@ -47,7 +47,8 @@ void AssertObjectBufferEqual(const ObjectBuffer& object_buffer, arrow::AssertBufferEqual(*object_buffer.data, data); } -class TestPlasmaStore : public ::testing::Test { +template +class TestPlasmaStoreBase : public ::testing::Test { public: // TODO(pcm): At the moment, stdout of the test gets mixed up with // stdout of the object store. Consider changing that. @@ -59,9 +60,10 @@ class TestPlasmaStore : public ::testing::Test { std::string plasma_directory = test_executable.substr(0, test_executable.find_last_of("/")); - std::string plasma_command = plasma_directory + - "/plasma_store_server -m 1000000000 -s " + - store_socket_name_ + " 1> /dev/null 2> /dev/null &"; + std::ostringstream ostream; + ostream << plasma_directory << "/plasma_store_server -m " << M + << " -s " << store_socket_name_ + " 1> /dev/null 2> /dev/null &"; + std::string plasma_command = ostream.str(); system(plasma_command.c_str()); ARROW_CHECK_OK(client_.Connect(store_socket_name_, "")); ARROW_CHECK_OK(client2_.Connect(store_socket_name_, "")); @@ -101,6 +103,8 @@ class TestPlasmaStore : public ::testing::Test { std::string store_socket_name_; }; +using TestPlasmaStore = TestPlasmaStoreBase<1000000000>; + TEST_F(TestPlasmaStore, NewSubscriberTest) { PlasmaClient local_client, local_client2; @@ -487,6 +491,33 @@ TEST_F(TestPlasmaStore, ManyObjectTest) { } } +using SmallMemoryTestPlasmaStore = TestPlasmaStoreBase<1000>; + +TEST_F(SmallMemoryTestPlasmaStore, SmallMemoryTest) { + // Create many objects on the first client. Seal one third, abort one third, + // and leave the last third unsealed. + std::vector object_ids; + for (int i = 0; i < 10; i++) { + ObjectID object_id = random_object_id(); + object_ids.push_back(object_id); + + // Test for object non-existence on the first client. + bool has_object; + ARROW_CHECK_OK(client_.Contains(object_id, &has_object)); + ASSERT_FALSE(has_object); + + // Test for the object being in local Plasma store. + // First create and seal object on the first client. + int64_t data_size = 100; + uint8_t metadata[] = {5}; + int64_t metadata_size = sizeof(metadata); + std::shared_ptr data; + ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data)); + ARROW_CHECK_OK(client_.Seal(object_id)); + ARROW_CHECK_OK(client_.Release(object_id)); + } +} + #ifdef PLASMA_GPU using arrow::gpu::CudaBuffer; using arrow::gpu::CudaBufferReader; From 6b43d2ba4b9557da81b428e3f12ae6e212c2aae8 Mon Sep 17 00:00:00 2001 From: Yuhong Guo Date: Wed, 12 Sep 2018 22:47:31 +0800 Subject: [PATCH 3/4] Lint and refine. --- cpp/src/plasma/eviction_policy.cc | 3 ++- cpp/src/plasma/eviction_policy.h | 7 ++++++- cpp/src/plasma/store.cc | 2 +- cpp/src/plasma/test/client_tests.cc | 10 +++++----- 4 files changed, 14 insertions(+), 8 deletions(-) diff --git a/cpp/src/plasma/eviction_policy.cc b/cpp/src/plasma/eviction_policy.cc index c0ae6a5eb1f..44082c810a6 100644 --- a/cpp/src/plasma/eviction_policy.cc +++ b/cpp/src/plasma/eviction_policy.cc @@ -72,7 +72,8 @@ void EvictionPolicy::ObjectCreated(const ObjectID& object_id) { memory_used_ += size; ARROW_CHECK(memory_used_ <= store_info_->memory_capacity); } -bool EvictionPolicy::IsCapableOf(int64_t need_size) { + +bool EvictionPolicy::AbleToAdd(int64_t need_size) { return memory_used_ + need_size <= store_info_->memory_capacity; } diff --git a/cpp/src/plasma/eviction_policy.h b/cpp/src/plasma/eviction_policy.h index 7a684437b01..6a432c13450 100644 --- a/cpp/src/plasma/eviction_policy.h +++ b/cpp/src/plasma/eviction_policy.h @@ -124,7 +124,12 @@ class EvictionPolicy { /// /// @param object_id The ID of the object that is now being used. void RemoveObject(const ObjectID& object_id); - bool IsCapableOf(int64_t need_size); + + /// To check whether we can add an object of a specific size. + /// + /// @param size The object size to check. + bool AbleToAdd(int64_t size); + private: /// The amount of memory (in bytes) currently being used. int64_t memory_used_; diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc index 788da7558f7..068e3154a9b 100644 --- a/cpp/src/plasma/store.cc +++ b/cpp/src/plasma/store.cc @@ -179,7 +179,7 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, int64_t data_si pointer = reinterpret_cast(dlmemalign(kBlockSize, data_size + metadata_size)); } - if (pointer == nullptr || !eviction_policy_.IsCapableOf(data_size + metadata_size)) { + if (pointer == nullptr || !eviction_policy_.AbleToAdd(data_size + metadata_size)) { ARROW_LOG(WARNING) << "dlmemalign returned with nullptr."; // Tell the eviction policy how much space we need to create this object. std::vector objects_to_evict; diff --git a/cpp/src/plasma/test/client_tests.cc b/cpp/src/plasma/test/client_tests.cc index 63e64580c41..8f4ccee0dcc 100644 --- a/cpp/src/plasma/test/client_tests.cc +++ b/cpp/src/plasma/test/client_tests.cc @@ -47,7 +47,7 @@ void AssertObjectBufferEqual(const ObjectBuffer& object_buffer, arrow::AssertBufferEqual(*object_buffer.data, data); } -template +template class TestPlasmaStoreBase : public ::testing::Test { public: // TODO(pcm): At the moment, stdout of the test gets mixed up with @@ -61,8 +61,8 @@ class TestPlasmaStoreBase : public ::testing::Test { std::string plasma_directory = test_executable.substr(0, test_executable.find_last_of("/")); std::ostringstream ostream; - ostream << plasma_directory << "/plasma_store_server -m " << M - << " -s " << store_socket_name_ + " 1> /dev/null 2> /dev/null &"; + ostream << plasma_directory << "/plasma_store_server -m " << M << " -s " + << store_socket_name_ + " 1> /dev/null 2> /dev/null &"; std::string plasma_command = ostream.str(); system(plasma_command.c_str()); ARROW_CHECK_OK(client_.Connect(store_socket_name_, "")); @@ -491,9 +491,9 @@ TEST_F(TestPlasmaStore, ManyObjectTest) { } } -using SmallMemoryTestPlasmaStore = TestPlasmaStoreBase<1000>; +using TestSmallMemoryPlasmaStore = TestPlasmaStoreBase<1000>; -TEST_F(SmallMemoryTestPlasmaStore, SmallMemoryTest) { +TEST_F(TestSmallMemoryPlasmaStore, SmallMemoryTest) { // Create many objects on the first client. Seal one third, abort one third, // and leave the last third unsealed. std::vector object_ids; From 592200035e222f121f40ffae0eca70c905adb0c1 Mon Sep 17 00:00:00 2001 From: Yuhong Guo Date: Wed, 12 Sep 2018 23:49:48 +0800 Subject: [PATCH 4/4] Remove log --- cpp/src/plasma/store.cc | 6 ------ 1 file changed, 6 deletions(-) diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc index 068e3154a9b..0e21b739740 100644 --- a/cpp/src/plasma/store.cc +++ b/cpp/src/plasma/store.cc @@ -180,14 +180,11 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, int64_t data_si reinterpret_cast(dlmemalign(kBlockSize, data_size + metadata_size)); } if (pointer == nullptr || !eviction_policy_.AbleToAdd(data_size + metadata_size)) { - ARROW_LOG(WARNING) << "dlmemalign returned with nullptr."; // Tell the eviction policy how much space we need to create this object. std::vector objects_to_evict; bool success = eviction_policy_.RequireSpace(data_size + metadata_size, &objects_to_evict); DeleteObjects(objects_to_evict); - ARROW_LOG(WARNING) << "Calling Delete Objects."; - // Return an error to the client if not enough space could be freed to // create the object. if (!success) { @@ -222,8 +219,6 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, int64_t data_si entry->device_num = device_num; entry->create_time = std::time(nullptr); entry->construct_duration = -1; - - ARROW_LOG(WARNING) << "Created the entry."; #ifdef PLASMA_GPU if (device_num != 0) { DCHECK_OK(gpu_handle->ExportForIpc(&entry->ipc_handle)); @@ -237,7 +232,6 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, int64_t data_si result->data_size = data_size; result->metadata_size = metadata_size; result->device_num = device_num; - ARROW_LOG(WARNING) << "Put object to store_info_ and call ObjectCreated."; // Notify the eviction policy that this object was created. This must be done // immediately before the call to AddToClientObjectIds so that the // eviction policy does not have an opportunity to evict the object.