diff --git a/cpp/src/plasma/plasma.cc b/cpp/src/plasma/plasma.cc index 60b7c3f63cbd..d98cbb978f90 100644 --- a/cpp/src/plasma/plasma.cc +++ b/cpp/src/plasma/plasma.cc @@ -30,7 +30,7 @@ extern "C" { void dlfree(void* mem); } -ObjectTableEntry::ObjectTableEntry() : pointer(nullptr) {} +ObjectTableEntry::ObjectTableEntry() : pointer(nullptr), ref_count(0) {} ObjectTableEntry::~ObjectTableEntry() { dlfree(pointer); diff --git a/cpp/src/plasma/plasma.h b/cpp/src/plasma/plasma.h index 7a513ea3ce25..86730365fdbb 100644 --- a/cpp/src/plasma/plasma.h +++ b/cpp/src/plasma/plasma.h @@ -134,8 +134,9 @@ struct ObjectTableEntry { /// IPC GPU handle to share with clients. std::shared_ptr ipc_handle; #endif - /// Set of clients currently using this object. - std::unordered_set clients; + /// Number of clients currently using this object. + int ref_count; + /// The state of the object, e.g., whether it is open or sealed. object_state state; /// The digest of the object. Used to see if two objects are the same. diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc index 6253afbb8831..430b088c01a2 100644 --- a/cpp/src/plasma/store.cc +++ b/cpp/src/plasma/store.cc @@ -124,21 +124,24 @@ const PlasmaStoreInfo* PlasmaStore::get_plasma_store_info() { return &store_info // If this client is not already using the object, add the client to the // object's list of clients, otherwise do nothing. -void PlasmaStore::add_client_to_object_clients(ObjectTableEntry* entry, Client* client) { +void PlasmaStore::add_to_client_object_ids(ObjectTableEntry* entry, Client* client) { // Check if this client is already using the object. - if (entry->clients.find(client) != entry->clients.end()) { + if (client->object_ids.find(entry->object_id) != client->object_ids.end()) { return; } // If there are no other clients using this object, notify the eviction policy // that the object is being used. - if (entry->clients.size() == 0) { + if (entry->ref_count == 0) { // Tell the eviction policy that this object is being used. std::vector objects_to_evict; eviction_policy_.begin_object_access(entry->object_id, &objects_to_evict); delete_objects(objects_to_evict); } - // Add the client pointer to the list of clients using this object. - entry->clients.insert(client); + // Increase reference count. + entry->ref_count++; + + // Add object id to the list of object ids that this client is using. + client->object_ids.insert(entry->object_id); } // Create a new object buffer in the hash table. @@ -225,11 +228,11 @@ int PlasmaStore::create_object(const ObjectID& object_id, int64_t data_size, result->metadata_size = metadata_size; result->device_num = device_num; // Notify the eviction policy that this object was created. This must be done - // immediately before the call to add_client_to_object_clients so that the + // immediately before the call to add_to_client_object_ids so that the // eviction policy does not have an opportunity to evict the object. eviction_policy_.object_created(object_id); // Record that this client is using this object. - add_client_to_object_clients(store_info_.objects[object_id].get(), client); + add_to_client_object_ids(store_info_.objects[object_id].get(), client); return PlasmaError_OK; } @@ -324,7 +327,7 @@ void PlasmaStore::update_object_get_requests(const ObjectID& object_id) { get_req->num_satisfied += 1; // Record the fact that this client will be using this object and will // be responsible for releasing this object. - add_client_to_object_clients(entry, get_req->client); + add_to_client_object_ids(entry, get_req->client); // If this get request is done, reply to the client. if (get_req->num_satisfied == get_req->num_objects_to_wait_for) { @@ -358,7 +361,7 @@ void PlasmaStore::process_get_request(Client* client, get_req->num_satisfied += 1; // If necessary, record that this client is using this object. In the case // where entry == NULL, this will be called from seal_object. - add_client_to_object_clients(entry, client); + add_to_client_object_ids(entry, client); } else { // Add a placeholder plasma object to the get request to indicate that the // object is not present. This will be parsed by the client. We set the @@ -383,14 +386,16 @@ void PlasmaStore::process_get_request(Client* client, } } -int PlasmaStore::remove_client_from_object_clients(ObjectTableEntry* entry, - Client* client) { - auto it = entry->clients.find(client); - if (it != entry->clients.end()) { - entry->clients.erase(it); +int PlasmaStore::remove_from_client_object_ids(ObjectTableEntry* entry, Client* client) { + auto it = client->object_ids.find(entry->object_id); + if (it != client->object_ids.end()) { + client->object_ids.erase(it); + // Decrease reference count. + entry->ref_count--; + // If no more clients are using this object, notify the eviction policy // that the object is no longer being used. - if (entry->clients.size() == 0) { + if (entry->ref_count == 0) { // Tell the eviction policy that this object is no longer being used. std::vector objects_to_evict; eviction_policy_.end_object_access(entry->object_id, &objects_to_evict); @@ -408,7 +413,7 @@ void PlasmaStore::release_object(const ObjectID& object_id, Client* client) { auto entry = get_object_table_entry(&store_info_, object_id); ARROW_CHECK(entry != NULL); // Remove the client from the object's array of clients. - ARROW_CHECK(remove_client_from_object_clients(entry, client) == 1); + ARROW_CHECK(remove_from_client_object_ids(entry, client) == 1); } // Check if an object is present. @@ -439,8 +444,8 @@ int PlasmaStore::abort_object(const ObjectID& object_id, Client* client) { ARROW_CHECK(entry != NULL) << "To abort an object it must be in the object table."; ARROW_CHECK(entry->state != PLASMA_SEALED) << "To abort an object it must not have been sealed."; - auto it = entry->clients.find(client); - if (it == entry->clients.end()) { + auto it = client->object_ids.find(object_id); + if (it == client->object_ids.end()) { // If the client requesting the abort is not the creator, do not // perform the abort. return 0; @@ -466,7 +471,7 @@ int PlasmaStore::delete_object(ObjectID& object_id) { return PlasmaError_ObjectNotSealed; } - if (entry->clients.size() != 0) { + if (entry->ref_count != 0) { // To delete an object, there must be no clients currently using it. return PlasmaError_ObjectInUse; } @@ -493,7 +498,7 @@ void PlasmaStore::delete_objects(const std::vector& object_ids) { ARROW_CHECK(entry != NULL) << "To delete an object it must be in the object table."; ARROW_CHECK(entry->state == PLASMA_SEALED) << "To delete an object it must have been sealed."; - ARROW_CHECK(entry->clients.size() == 0) + ARROW_CHECK(entry->ref_count == 0) << "To delete an object, there must be no clients currently using it."; store_info_.objects.erase(object_id); // Inform all subscribers that the object has been deleted. @@ -529,23 +534,27 @@ void PlasmaStore::disconnect_client(int client_fd) { // Close the socket. close(client_fd); ARROW_LOG(INFO) << "Disconnecting client on fd " << client_fd; - // If this client was using any objects, remove it from the appropriate - // lists. - // TODO(swang): Avoid iteration through the object table. + // Release all the objects that the client was using. auto client = it->second.get(); - std::vector unsealed_objects; - for (const auto& entry : store_info_.objects) { - if (entry.second->state == PLASMA_SEALED) { - remove_client_from_object_clients(entry.second.get(), client); + std::vector sealed_objects; + for (const auto& object_id : client->object_ids) { + auto it = store_info_.objects.find(object_id); + if (it == store_info_.objects.end()) { + continue; + } + + if (it->second->state == PLASMA_SEALED) { + // Add sealed objects to a temporary list of object IDs. Do not perform + // the remove here, since it potentially modifies the object_ids table. + sealed_objects.push_back(it->second.get()); } else { - // Add unsealed objects to a temporary list of object IDs. Do not perform - // the abort here, since it potentially modifies the object table. - unsealed_objects.push_back(entry.first); + // Abort unsealed object. + abort_object(it->first, client); } } - // If the client was creating any objects, abort them. - for (const auto& entry : unsealed_objects) { - abort_object(entry, client); + + for (const auto& entry : sealed_objects) { + remove_from_client_object_ids(entry, client); } // Note, the store may still attempt to send a message to the disconnected diff --git a/cpp/src/plasma/store.h b/cpp/src/plasma/store.h index ac6b2c4c50cc..fd077f98430a 100644 --- a/cpp/src/plasma/store.h +++ b/cpp/src/plasma/store.h @@ -22,6 +22,7 @@ #include #include #include +#include #include #include "plasma/common.h" @@ -46,6 +47,9 @@ struct Client { /// The file descriptor used to communicate with the client. int fd; + + /// Object ids that are used by this client. + std::unordered_set object_ids; }; class PlasmaStore { @@ -164,13 +168,13 @@ class PlasmaStore { private: void push_notification(ObjectInfoT* object_notification); - void add_client_to_object_clients(ObjectTableEntry* entry, Client* client); + void add_to_client_object_ids(ObjectTableEntry* entry, Client* client); void return_from_get(GetRequest* get_req); void update_object_get_requests(const ObjectID& object_id); - int remove_client_from_object_clients(ObjectTableEntry* entry, Client* client); + int remove_from_client_object_ids(ObjectTableEntry* entry, Client* client); /// Event loop of the plasma store. EventLoop* loop_;