Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/src/plasma/plasma.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ extern "C" {
void dlfree(void* mem);
}

ObjectTableEntry::ObjectTableEntry() : pointer(nullptr) {}
ObjectTableEntry::ObjectTableEntry() : pointer(nullptr), ref_count(0) {}

ObjectTableEntry::~ObjectTableEntry() {
dlfree(pointer);
Expand Down
5 changes: 3 additions & 2 deletions cpp/src/plasma/plasma.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,9 @@ struct ObjectTableEntry {
/// IPC GPU handle to share with clients.
std::shared_ptr<CudaIpcMemHandle> ipc_handle;
#endif
/// Set of clients currently using this object.
std::unordered_set<Client*> clients;
/// Number of clients currently using this object.
int ref_count;

/// The state of the object, e.g., whether it is open or sealed.
object_state state;
/// The digest of the object. Used to see if two objects are the same.
Expand Down
75 changes: 42 additions & 33 deletions cpp/src/plasma/store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,21 +124,24 @@ const PlasmaStoreInfo* PlasmaStore::get_plasma_store_info() { return &store_info

// If this client is not already using the object, add the client to the
// object's list of clients, otherwise do nothing.
void PlasmaStore::add_client_to_object_clients(ObjectTableEntry* entry, Client* client) {
void PlasmaStore::add_to_client_object_ids(ObjectTableEntry* entry, Client* client) {
// Check if this client is already using the object.
if (entry->clients.find(client) != entry->clients.end()) {
if (client->object_ids.find(entry->object_id) != client->object_ids.end()) {
return;
}
// If there are no other clients using this object, notify the eviction policy
// that the object is being used.
if (entry->clients.size() == 0) {
if (entry->ref_count == 0) {
// Tell the eviction policy that this object is being used.
std::vector<ObjectID> objects_to_evict;
eviction_policy_.begin_object_access(entry->object_id, &objects_to_evict);
delete_objects(objects_to_evict);
}
// Add the client pointer to the list of clients using this object.
entry->clients.insert(client);
// Increase reference count.
entry->ref_count++;

// Add object id to the list of object ids that this client is using.
client->object_ids.insert(entry->object_id);
}

// Create a new object buffer in the hash table.
Expand Down Expand Up @@ -225,11 +228,11 @@ int PlasmaStore::create_object(const ObjectID& object_id, int64_t data_size,
result->metadata_size = metadata_size;
result->device_num = device_num;
// Notify the eviction policy that this object was created. This must be done
// immediately before the call to add_client_to_object_clients so that the
// immediately before the call to add_to_client_object_ids so that the
// eviction policy does not have an opportunity to evict the object.
eviction_policy_.object_created(object_id);
// Record that this client is using this object.
add_client_to_object_clients(store_info_.objects[object_id].get(), client);
add_to_client_object_ids(store_info_.objects[object_id].get(), client);
return PlasmaError_OK;
}

Expand Down Expand Up @@ -324,7 +327,7 @@ void PlasmaStore::update_object_get_requests(const ObjectID& object_id) {
get_req->num_satisfied += 1;
// Record the fact that this client will be using this object and will
// be responsible for releasing this object.
add_client_to_object_clients(entry, get_req->client);
add_to_client_object_ids(entry, get_req->client);

// If this get request is done, reply to the client.
if (get_req->num_satisfied == get_req->num_objects_to_wait_for) {
Expand Down Expand Up @@ -358,7 +361,7 @@ void PlasmaStore::process_get_request(Client* client,
get_req->num_satisfied += 1;
// If necessary, record that this client is using this object. In the case
// where entry == NULL, this will be called from seal_object.
add_client_to_object_clients(entry, client);
add_to_client_object_ids(entry, client);
} else {
// Add a placeholder plasma object to the get request to indicate that the
// object is not present. This will be parsed by the client. We set the
Expand All @@ -383,14 +386,16 @@ void PlasmaStore::process_get_request(Client* client,
}
}

int PlasmaStore::remove_client_from_object_clients(ObjectTableEntry* entry,
Client* client) {
auto it = entry->clients.find(client);
if (it != entry->clients.end()) {
entry->clients.erase(it);
int PlasmaStore::remove_from_client_object_ids(ObjectTableEntry* entry, Client* client) {
auto it = client->object_ids.find(entry->object_id);
if (it != client->object_ids.end()) {
client->object_ids.erase(it);
// Decrease reference count.
entry->ref_count--;

// If no more clients are using this object, notify the eviction policy
// that the object is no longer being used.
if (entry->clients.size() == 0) {
if (entry->ref_count == 0) {
// Tell the eviction policy that this object is no longer being used.
std::vector<ObjectID> objects_to_evict;
eviction_policy_.end_object_access(entry->object_id, &objects_to_evict);
Expand All @@ -408,7 +413,7 @@ void PlasmaStore::release_object(const ObjectID& object_id, Client* client) {
auto entry = get_object_table_entry(&store_info_, object_id);
ARROW_CHECK(entry != NULL);
// Remove the client from the object's array of clients.
ARROW_CHECK(remove_client_from_object_clients(entry, client) == 1);
ARROW_CHECK(remove_from_client_object_ids(entry, client) == 1);
}

// Check if an object is present.
Expand Down Expand Up @@ -439,8 +444,8 @@ int PlasmaStore::abort_object(const ObjectID& object_id, Client* client) {
ARROW_CHECK(entry != NULL) << "To abort an object it must be in the object table.";
ARROW_CHECK(entry->state != PLASMA_SEALED)
<< "To abort an object it must not have been sealed.";
auto it = entry->clients.find(client);
if (it == entry->clients.end()) {
auto it = client->object_ids.find(object_id);
if (it == client->object_ids.end()) {
// If the client requesting the abort is not the creator, do not
// perform the abort.
return 0;
Expand All @@ -466,7 +471,7 @@ int PlasmaStore::delete_object(ObjectID& object_id) {
return PlasmaError_ObjectNotSealed;
}

if (entry->clients.size() != 0) {
if (entry->ref_count != 0) {
// To delete an object, there must be no clients currently using it.
return PlasmaError_ObjectInUse;
}
Expand All @@ -493,7 +498,7 @@ void PlasmaStore::delete_objects(const std::vector<ObjectID>& object_ids) {
ARROW_CHECK(entry != NULL) << "To delete an object it must be in the object table.";
ARROW_CHECK(entry->state == PLASMA_SEALED)
<< "To delete an object it must have been sealed.";
ARROW_CHECK(entry->clients.size() == 0)
ARROW_CHECK(entry->ref_count == 0)
<< "To delete an object, there must be no clients currently using it.";
store_info_.objects.erase(object_id);
// Inform all subscribers that the object has been deleted.
Expand Down Expand Up @@ -529,23 +534,27 @@ void PlasmaStore::disconnect_client(int client_fd) {
// Close the socket.
close(client_fd);
ARROW_LOG(INFO) << "Disconnecting client on fd " << client_fd;
// If this client was using any objects, remove it from the appropriate
// lists.
// TODO(swang): Avoid iteration through the object table.
// Release all the objects that the client was using.
auto client = it->second.get();
std::vector<ObjectID> unsealed_objects;
for (const auto& entry : store_info_.objects) {
if (entry.second->state == PLASMA_SEALED) {
remove_client_from_object_clients(entry.second.get(), client);
std::vector<ObjectTableEntry*> sealed_objects;
for (const auto& object_id : client->object_ids) {
auto it = store_info_.objects.find(object_id);
if (it == store_info_.objects.end()) {
continue;
}

if (it->second->state == PLASMA_SEALED) {
// Add sealed objects to a temporary list of object IDs. Do not perform
// the remove here, since it potentially modifies the object_ids table.
sealed_objects.push_back(it->second.get());
} else {
// Add unsealed objects to a temporary list of object IDs. Do not perform
// the abort here, since it potentially modifies the object table.
unsealed_objects.push_back(entry.first);
// Abort unsealed object.
abort_object(it->first, client);
}
}
// If the client was creating any objects, abort them.
for (const auto& entry : unsealed_objects) {
abort_object(entry, client);

for (const auto& entry : sealed_objects) {
remove_from_client_object_ids(entry, client);
}

// Note, the store may still attempt to send a message to the disconnected
Expand Down
8 changes: 6 additions & 2 deletions cpp/src/plasma/store.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <memory>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <vector>

#include "plasma/common.h"
Expand All @@ -46,6 +47,9 @@ struct Client {

/// The file descriptor used to communicate with the client.
int fd;

/// Object ids that are used by this client.
std::unordered_set<ObjectID, UniqueIDHasher> object_ids;
};

class PlasmaStore {
Expand Down Expand Up @@ -164,13 +168,13 @@ class PlasmaStore {
private:
void push_notification(ObjectInfoT* object_notification);

void add_client_to_object_clients(ObjectTableEntry* entry, Client* client);
void add_to_client_object_ids(ObjectTableEntry* entry, Client* client);

void return_from_get(GetRequest* get_req);

void update_object_get_requests(const ObjectID& object_id);

int remove_client_from_object_clients(ObjectTableEntry* entry, Client* client);
int remove_from_client_object_ids(ObjectTableEntry* entry, Client* client);

/// Event loop of the plasma store.
EventLoop* loop_;
Expand Down