Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/src/plasma/plasma.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ extern "C" {
void dlfree(void* mem);
}

ObjectTableEntry::ObjectTableEntry() : pointer(nullptr) {}
ObjectTableEntry::ObjectTableEntry() : pointer(nullptr), refcnt(0) {}

ObjectTableEntry::~ObjectTableEntry() {
dlfree(pointer);
Expand Down
5 changes: 3 additions & 2 deletions cpp/src/plasma/plasma.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,9 @@ struct ObjectTableEntry {
/// IPC GPU handle to share with clients.
std::shared_ptr<CudaIpcMemHandle> ipc_handle;
#endif
/// Set of clients currently using this object.
std::unordered_set<Client*> clients;
/// Reference count.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better documentation is: Number of clients currently using this object.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this is indeed better:)

int refcnt;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's rename this to ref_count, it's good to be not too cryptic ;)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure.


/// The state of the object, e.g., whether it is open or sealed.
object_state state;
/// The digest of the object. Used to see if two objects are the same.
Expand Down
71 changes: 41 additions & 30 deletions cpp/src/plasma/store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,21 +124,24 @@ const PlasmaStoreInfo* PlasmaStore::get_plasma_store_info() { return &store_info

// If this client is not already using the object, add the client to the
// object's list of clients, otherwise do nothing.
void PlasmaStore::add_client_to_object_clients(ObjectTableEntry* entry, Client* client) {
void PlasmaStore::add_to_client_object_ids(ObjectTableEntry* entry, Client* client) {
// Check if this client is already using the object.
if (entry->clients.find(client) != entry->clients.end()) {
if (client->object_ids.find(entry->object_id) != client->object_ids.end()) {
return;
}
// If there are no other clients using this object, notify the eviction policy
// that the object is being used.
if (entry->clients.size() == 0) {
if (entry->refcnt == 0) {
// Tell the eviction policy that this object is being used.
std::vector<ObjectID> objects_to_evict;
eviction_policy_.begin_object_access(entry->object_id, &objects_to_evict);
delete_objects(objects_to_evict);
}
// Add the client pointer to the list of clients using this object.
entry->clients.insert(client);
// Increase refcnt.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Increase reference count.

entry->refcnt++;

// Add object id to the list of object ids that this client is using.
client->object_ids.insert(entry->object_id);
}

// Create a new object buffer in the hash table.
Expand Down Expand Up @@ -225,11 +228,11 @@ int PlasmaStore::create_object(const ObjectID& object_id, int64_t data_size,
result->metadata_size = metadata_size;
result->device_num = device_num;
// Notify the eviction policy that this object was created. This must be done
// immediately before the call to add_client_to_object_clients so that the
// immediately before the call to add_to_client_object_ids so that the
// eviction policy does not have an opportunity to evict the object.
eviction_policy_.object_created(object_id);
// Record that this client is using this object.
add_client_to_object_clients(store_info_.objects[object_id].get(), client);
add_to_client_object_ids(store_info_.objects[object_id].get(), client);
return PlasmaError_OK;
}

Expand Down Expand Up @@ -324,7 +327,7 @@ void PlasmaStore::update_object_get_requests(const ObjectID& object_id) {
get_req->num_satisfied += 1;
// Record the fact that this client will be using this object and will
// be responsible for releasing this object.
add_client_to_object_clients(entry, get_req->client);
add_to_client_object_ids(entry, get_req->client);

// If this get request is done, reply to the client.
if (get_req->num_satisfied == get_req->num_objects_to_wait_for) {
Expand Down Expand Up @@ -358,7 +361,7 @@ void PlasmaStore::process_get_request(Client* client,
get_req->num_satisfied += 1;
// If necessary, record that this client is using this object. In the case
// where entry == NULL, this will be called from seal_object.
add_client_to_object_clients(entry, client);
add_to_client_object_ids(entry, client);
} else {
// Add a placeholder plasma object to the get request to indicate that the
// object is not present. This will be parsed by the client. We set the
Expand All @@ -383,14 +386,16 @@ void PlasmaStore::process_get_request(Client* client,
}
}

int PlasmaStore::remove_client_from_object_clients(ObjectTableEntry* entry,
Client* client) {
auto it = entry->clients.find(client);
if (it != entry->clients.end()) {
entry->clients.erase(it);
int PlasmaStore::remove_from_client_object_ids(ObjectTableEntry* entry, Client* client) {
auto it = client->object_ids.find(entry->object_id);
if (it != client->object_ids.end()) {
client->object_ids.erase(it);
// Decrease refcnt.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Decrease reference count.

entry->refcnt--;

// If no more clients are using this object, notify the eviction policy
// that the object is no longer being used.
if (entry->clients.size() == 0) {
if (entry->refcnt == 0) {
// Tell the eviction policy that this object is no longer being used.
std::vector<ObjectID> objects_to_evict;
eviction_policy_.end_object_access(entry->object_id, &objects_to_evict);
Expand All @@ -408,7 +413,7 @@ void PlasmaStore::release_object(const ObjectID& object_id, Client* client) {
auto entry = get_object_table_entry(&store_info_, object_id);
ARROW_CHECK(entry != NULL);
// Remove the client from the object's array of clients.
ARROW_CHECK(remove_client_from_object_clients(entry, client) == 1);
ARROW_CHECK(remove_from_client_object_ids(entry, client) == 1);
}

// Check if an object is present.
Expand Down Expand Up @@ -439,8 +444,8 @@ int PlasmaStore::abort_object(const ObjectID& object_id, Client* client) {
ARROW_CHECK(entry != NULL) << "To abort an object it must be in the object table.";
ARROW_CHECK(entry->state != PLASMA_SEALED)
<< "To abort an object it must not have been sealed.";
auto it = entry->clients.find(client);
if (it == entry->clients.end()) {
auto it = client->object_ids.find(object_id);
if (it == client->object_ids.end()) {
// If the client requesting the abort is not the creator, do not
// perform the abort.
return 0;
Expand All @@ -466,7 +471,7 @@ int PlasmaStore::delete_object(ObjectID& object_id) {
return PlasmaError_ObjectNotSealed;
}

if (entry->clients.size() != 0) {
if (entry->refcnt != 0) {
// To delete an object, there must be no clients currently using it.
return PlasmaError_ObjectInUse;
}
Expand All @@ -493,7 +498,7 @@ void PlasmaStore::delete_objects(const std::vector<ObjectID>& object_ids) {
ARROW_CHECK(entry != NULL) << "To delete an object it must be in the object table.";
ARROW_CHECK(entry->state == PLASMA_SEALED)
<< "To delete an object it must have been sealed.";
ARROW_CHECK(entry->clients.size() == 0)
ARROW_CHECK(entry->refcnt == 0)
<< "To delete an object, there must be no clients currently using it.";
store_info_.objects.erase(object_id);
// Inform all subscribers that the object has been deleted.
Expand Down Expand Up @@ -533,19 +538,25 @@ void PlasmaStore::disconnect_client(int client_fd) {
// lists.
// TODO(swang): Avoid iteration through the object table.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can remove the TODO(swang) now

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure:)

auto client = it->second.get();
std::vector<ObjectID> unsealed_objects;
for (const auto& entry : store_info_.objects) {
if (entry.second->state == PLASMA_SEALED) {
remove_client_from_object_clients(entry.second.get(), client);
std::vector<ObjectTableEntry*> sealed_objects;
for (const auto& object_id : client->object_ids) {
auto it = store_info_.objects.find(object_id);
if (it == store_info_.objects.end()) {
continue;
}

if (it->second->state == PLASMA_SEALED) {
// Add sealed objects to a temporary list of object IDs. Do not perform
// the remove here, since it potentially modifies the object_ids table.
sealed_objects.push_back(it->second.get());
} else {
// Add unsealed objects to a temporary list of object IDs. Do not perform
// the abort here, since it potentially modifies the object table.
unsealed_objects.push_back(entry.first);
// Abort unsealed object.
abort_object(it->first, client);
}
}
// If the client was creating any objects, abort them.
for (const auto& entry : unsealed_objects) {
abort_object(entry, client);

for (const auto& entry : sealed_objects) {
remove_from_client_object_ids(entry, client);
}

// Note, the store may still attempt to send a message to the disconnected
Expand Down
8 changes: 6 additions & 2 deletions cpp/src/plasma/store.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <memory>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <vector>

#include "plasma/common.h"
Expand All @@ -46,6 +47,9 @@ struct Client {

/// The file descriptor used to communicate with the client.
int fd;

/// Object ids that are used by this client.
std::unordered_set<ObjectID, UniqueIDHasher> object_ids;
};

class PlasmaStore {
Expand Down Expand Up @@ -164,13 +168,13 @@ class PlasmaStore {
private:
void push_notification(ObjectInfoT* object_notification);

void add_client_to_object_clients(ObjectTableEntry* entry, Client* client);
void add_to_client_object_ids(ObjectTableEntry* entry, Client* client);

void return_from_get(GetRequest* get_req);

void update_object_get_requests(const ObjectID& object_id);

int remove_client_from_object_clients(ObjectTableEntry* entry, Client* client);
int remove_from_client_object_ids(ObjectTableEntry* entry, Client* client);

/// Event loop of the plasma store.
EventLoop* loop_;
Expand Down