Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cpp/src/plasma/eviction_policy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ void EvictionPolicy::ObjectCreated(const ObjectID& object_id) {
ARROW_CHECK(memory_used_ <= store_info_->memory_capacity);
}

bool EvictionPolicy::AbleToAdd(int64_t need_size) {
return memory_used_ + need_size <= store_info_->memory_capacity;
}

bool EvictionPolicy::RequireSpace(int64_t size, std::vector<ObjectID>* objects_to_evict) {
// Check if there is enough space to create the object.
int64_t required_space = memory_used_ + size - store_info_->memory_capacity;
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/plasma/eviction_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,11 @@ class EvictionPolicy {
/// @param object_id The ID of the object that is now being used.
void RemoveObject(const ObjectID& object_id);

/// To check whether we can add an object of a specific size.
///
/// @param size The object size to check.
bool AbleToAdd(int64_t size);

private:
/// The amount of memory (in bytes) currently being used.
int64_t memory_used_;
Expand Down
6 changes: 4 additions & 2 deletions cpp/src/plasma/store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,11 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, int64_t data_si
// it is not guaranteed that the corresponding pointer in the client will be
// 64-byte aligned, but in practice it often will be.
if (device_num == 0) {
pointer =
reinterpret_cast<uint8_t*>(dlmemalign(kBlockSize, data_size + metadata_size));
if (pointer == nullptr) {
pointer =
reinterpret_cast<uint8_t*>(dlmemalign(kBlockSize, data_size + metadata_size));
}
if (pointer == nullptr || !eviction_policy_.AbleToAdd(data_size + metadata_size)) {
// Tell the eviction policy how much space we need to create this object.
std::vector<ObjectID> objects_to_evict;
bool success =
Expand Down
39 changes: 35 additions & 4 deletions cpp/src/plasma/test/client_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ void AssertObjectBufferEqual(const ObjectBuffer& object_buffer,
arrow::AssertBufferEqual(*object_buffer.data, data);
}

class TestPlasmaStore : public ::testing::Test {
template <unsigned int M>
class TestPlasmaStoreBase : public ::testing::Test {
public:
// TODO(pcm): At the moment, stdout of the test gets mixed up with
// stdout of the object store. Consider changing that.
Expand All @@ -59,9 +60,10 @@ class TestPlasmaStore : public ::testing::Test {

std::string plasma_directory =
test_executable.substr(0, test_executable.find_last_of("/"));
std::string plasma_command = plasma_directory +
"/plasma_store_server -m 1000000000 -s " +
store_socket_name_ + " 1> /dev/null 2> /dev/null &";
std::ostringstream ostream;
ostream << plasma_directory << "/plasma_store_server -m " << M << " -s "
<< store_socket_name_ + " 1> /dev/null 2> /dev/null &";
std::string plasma_command = ostream.str();
system(plasma_command.c_str());
ARROW_CHECK_OK(client_.Connect(store_socket_name_, ""));
ARROW_CHECK_OK(client2_.Connect(store_socket_name_, ""));
Expand Down Expand Up @@ -101,6 +103,8 @@ class TestPlasmaStore : public ::testing::Test {
std::string store_socket_name_;
};

using TestPlasmaStore = TestPlasmaStoreBase<1000000000>;

TEST_F(TestPlasmaStore, NewSubscriberTest) {
PlasmaClient local_client, local_client2;

Expand Down Expand Up @@ -487,6 +491,33 @@ TEST_F(TestPlasmaStore, ManyObjectTest) {
}
}

using TestSmallMemoryPlasmaStore = TestPlasmaStoreBase<1000>;

TEST_F(TestSmallMemoryPlasmaStore, SmallMemoryTest) {
// Create many objects on the first client. Seal one third, abort one third,
// and leave the last third unsealed.
std::vector<ObjectID> object_ids;
for (int i = 0; i < 10; i++) {
ObjectID object_id = random_object_id();
object_ids.push_back(object_id);

// Test for object non-existence on the first client.
bool has_object;
ARROW_CHECK_OK(client_.Contains(object_id, &has_object));
ASSERT_FALSE(has_object);

// Test for the object being in local Plasma store.
// First create and seal object on the first client.
int64_t data_size = 100;
uint8_t metadata[] = {5};
int64_t metadata_size = sizeof(metadata);
std::shared_ptr<Buffer> data;
ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data));
ARROW_CHECK_OK(client_.Seal(object_id));
ARROW_CHECK_OK(client_.Release(object_id));
}
}

#ifdef PLASMA_GPU
using arrow::gpu::CudaBuffer;
using arrow::gpu::CudaBufferReader;
Expand Down