diff --git a/cpp/src/plasma/eviction_policy.cc b/cpp/src/plasma/eviction_policy.cc index e5beb5a579e..44082c810a6 100644 --- a/cpp/src/plasma/eviction_policy.cc +++ b/cpp/src/plasma/eviction_policy.cc @@ -73,6 +73,10 @@ void EvictionPolicy::ObjectCreated(const ObjectID& object_id) { ARROW_CHECK(memory_used_ <= store_info_->memory_capacity); } +bool EvictionPolicy::AbleToAdd(int64_t need_size) { + return memory_used_ + need_size <= store_info_->memory_capacity; +} + bool EvictionPolicy::RequireSpace(int64_t size, std::vector* objects_to_evict) { // Check if there is enough space to create the object. int64_t required_space = memory_used_ + size - store_info_->memory_capacity; diff --git a/cpp/src/plasma/eviction_policy.h b/cpp/src/plasma/eviction_policy.h index bbd3fc43203..6a432c13450 100644 --- a/cpp/src/plasma/eviction_policy.h +++ b/cpp/src/plasma/eviction_policy.h @@ -125,6 +125,11 @@ class EvictionPolicy { /// @param object_id The ID of the object that is now being used. void RemoveObject(const ObjectID& object_id); + /// To check whether we can add an object of a specific size. + /// + /// @param size The object size to check. + bool AbleToAdd(int64_t size); + private: /// The amount of memory (in bytes) currently being used. int64_t memory_used_; diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc index 58f8d0193ce..0e21b739740 100644 --- a/cpp/src/plasma/store.cc +++ b/cpp/src/plasma/store.cc @@ -175,9 +175,11 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, int64_t data_si // it is not guaranteed that the corresponding pointer in the client will be // 64-byte aligned, but in practice it often will be. if (device_num == 0) { - pointer = - reinterpret_cast(dlmemalign(kBlockSize, data_size + metadata_size)); if (pointer == nullptr) { + pointer = + reinterpret_cast(dlmemalign(kBlockSize, data_size + metadata_size)); + } + if (pointer == nullptr || !eviction_policy_.AbleToAdd(data_size + metadata_size)) { // Tell the eviction policy how much space we need to create this object. std::vector objects_to_evict; bool success = diff --git a/cpp/src/plasma/test/client_tests.cc b/cpp/src/plasma/test/client_tests.cc index 1ad60396af9..8f4ccee0dcc 100644 --- a/cpp/src/plasma/test/client_tests.cc +++ b/cpp/src/plasma/test/client_tests.cc @@ -47,7 +47,8 @@ void AssertObjectBufferEqual(const ObjectBuffer& object_buffer, arrow::AssertBufferEqual(*object_buffer.data, data); } -class TestPlasmaStore : public ::testing::Test { +template +class TestPlasmaStoreBase : public ::testing::Test { public: // TODO(pcm): At the moment, stdout of the test gets mixed up with // stdout of the object store. Consider changing that. @@ -59,9 +60,10 @@ class TestPlasmaStore : public ::testing::Test { std::string plasma_directory = test_executable.substr(0, test_executable.find_last_of("/")); - std::string plasma_command = plasma_directory + - "/plasma_store_server -m 1000000000 -s " + - store_socket_name_ + " 1> /dev/null 2> /dev/null &"; + std::ostringstream ostream; + ostream << plasma_directory << "/plasma_store_server -m " << M << " -s " + << store_socket_name_ + " 1> /dev/null 2> /dev/null &"; + std::string plasma_command = ostream.str(); system(plasma_command.c_str()); ARROW_CHECK_OK(client_.Connect(store_socket_name_, "")); ARROW_CHECK_OK(client2_.Connect(store_socket_name_, "")); @@ -101,6 +103,8 @@ class TestPlasmaStore : public ::testing::Test { std::string store_socket_name_; }; +using TestPlasmaStore = TestPlasmaStoreBase<1000000000>; + TEST_F(TestPlasmaStore, NewSubscriberTest) { PlasmaClient local_client, local_client2; @@ -487,6 +491,33 @@ TEST_F(TestPlasmaStore, ManyObjectTest) { } } +using TestSmallMemoryPlasmaStore = TestPlasmaStoreBase<1000>; + +TEST_F(TestSmallMemoryPlasmaStore, SmallMemoryTest) { + // Create many objects on the first client. Seal one third, abort one third, + // and leave the last third unsealed. + std::vector object_ids; + for (int i = 0; i < 10; i++) { + ObjectID object_id = random_object_id(); + object_ids.push_back(object_id); + + // Test for object non-existence on the first client. + bool has_object; + ARROW_CHECK_OK(client_.Contains(object_id, &has_object)); + ASSERT_FALSE(has_object); + + // Test for the object being in local Plasma store. + // First create and seal object on the first client. + int64_t data_size = 100; + uint8_t metadata[] = {5}; + int64_t metadata_size = sizeof(metadata); + std::shared_ptr data; + ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data)); + ARROW_CHECK_OK(client_.Seal(object_id)); + ARROW_CHECK_OK(client_.Release(object_id)); + } +} + #ifdef PLASMA_GPU using arrow::gpu::CudaBuffer; using arrow::gpu::CudaBufferReader;