From a047572583e17263352d8b5e39e93e8f15293561 Mon Sep 17 00:00:00 2001 From: Zhijun Fu Date: Wed, 9 May 2018 15:08:22 +0800 Subject: [PATCH 1/3] [Plasma] use list-of-object-ids in client instead of list-of-clients in object --- cpp/src/plasma/plasma.cc | 2 +- cpp/src/plasma/plasma.h | 5 +-- cpp/src/plasma/store.cc | 71 +++++++++++++++++++++++----------------- cpp/src/plasma/store.h | 8 +++-- 4 files changed, 51 insertions(+), 35 deletions(-) diff --git a/cpp/src/plasma/plasma.cc b/cpp/src/plasma/plasma.cc index 60b7c3f63cb..dd3d5653180 100644 --- a/cpp/src/plasma/plasma.cc +++ b/cpp/src/plasma/plasma.cc @@ -30,7 +30,7 @@ extern "C" { void dlfree(void* mem); } -ObjectTableEntry::ObjectTableEntry() : pointer(nullptr) {} +ObjectTableEntry::ObjectTableEntry() : pointer(nullptr), refcnt(0) {} ObjectTableEntry::~ObjectTableEntry() { dlfree(pointer); diff --git a/cpp/src/plasma/plasma.h b/cpp/src/plasma/plasma.h index 7a513ea3ce2..748824e376f 100644 --- a/cpp/src/plasma/plasma.h +++ b/cpp/src/plasma/plasma.h @@ -134,8 +134,9 @@ struct ObjectTableEntry { /// IPC GPU handle to share with clients. std::shared_ptr ipc_handle; #endif - /// Set of clients currently using this object. - std::unordered_set clients; + /// Reference count. + int refcnt; + /// The state of the object, e.g., whether it is open or sealed. object_state state; /// The digest of the object. Used to see if two objects are the same. diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc index 6253afbb883..2e0526dcec3 100644 --- a/cpp/src/plasma/store.cc +++ b/cpp/src/plasma/store.cc @@ -124,21 +124,24 @@ const PlasmaStoreInfo* PlasmaStore::get_plasma_store_info() { return &store_info // If this client is not already using the object, add the client to the // object's list of clients, otherwise do nothing. -void PlasmaStore::add_client_to_object_clients(ObjectTableEntry* entry, Client* client) { +void PlasmaStore::add_to_client_object_ids(ObjectTableEntry* entry, Client* client) { // Check if this client is already using the object. - if (entry->clients.find(client) != entry->clients.end()) { + if (client->object_ids.find(entry->object_id) != client->object_ids.end()) { return; } // If there are no other clients using this object, notify the eviction policy // that the object is being used. - if (entry->clients.size() == 0) { + if (entry->refcnt == 0) { // Tell the eviction policy that this object is being used. std::vector objects_to_evict; eviction_policy_.begin_object_access(entry->object_id, &objects_to_evict); delete_objects(objects_to_evict); } - // Add the client pointer to the list of clients using this object. - entry->clients.insert(client); + // Increase refcnt. + entry->refcnt++; + + // Add object id to the list of object ids that this client is using. + client->object_ids.insert(entry->object_id); } // Create a new object buffer in the hash table. @@ -225,11 +228,11 @@ int PlasmaStore::create_object(const ObjectID& object_id, int64_t data_size, result->metadata_size = metadata_size; result->device_num = device_num; // Notify the eviction policy that this object was created. This must be done - // immediately before the call to add_client_to_object_clients so that the + // immediately before the call to add_to_client_object_ids so that the // eviction policy does not have an opportunity to evict the object. eviction_policy_.object_created(object_id); // Record that this client is using this object. - add_client_to_object_clients(store_info_.objects[object_id].get(), client); + add_to_client_object_ids(store_info_.objects[object_id].get(), client); return PlasmaError_OK; } @@ -324,7 +327,7 @@ void PlasmaStore::update_object_get_requests(const ObjectID& object_id) { get_req->num_satisfied += 1; // Record the fact that this client will be using this object and will // be responsible for releasing this object. - add_client_to_object_clients(entry, get_req->client); + add_to_client_object_ids(entry, get_req->client); // If this get request is done, reply to the client. if (get_req->num_satisfied == get_req->num_objects_to_wait_for) { @@ -358,7 +361,7 @@ void PlasmaStore::process_get_request(Client* client, get_req->num_satisfied += 1; // If necessary, record that this client is using this object. In the case // where entry == NULL, this will be called from seal_object. - add_client_to_object_clients(entry, client); + add_to_client_object_ids(entry, client); } else { // Add a placeholder plasma object to the get request to indicate that the // object is not present. This will be parsed by the client. We set the @@ -383,14 +386,16 @@ void PlasmaStore::process_get_request(Client* client, } } -int PlasmaStore::remove_client_from_object_clients(ObjectTableEntry* entry, - Client* client) { - auto it = entry->clients.find(client); - if (it != entry->clients.end()) { - entry->clients.erase(it); +int PlasmaStore::remove_from_client_object_ids(ObjectTableEntry* entry, Client* client) { + auto it = client->object_ids.find(entry->object_id); + if (it != client->object_ids.end()) { + client->object_ids.erase(it); + // Decrease refcnt. + entry->refcnt--; + // If no more clients are using this object, notify the eviction policy // that the object is no longer being used. - if (entry->clients.size() == 0) { + if (entry->refcnt == 0) { // Tell the eviction policy that this object is no longer being used. std::vector objects_to_evict; eviction_policy_.end_object_access(entry->object_id, &objects_to_evict); @@ -408,7 +413,7 @@ void PlasmaStore::release_object(const ObjectID& object_id, Client* client) { auto entry = get_object_table_entry(&store_info_, object_id); ARROW_CHECK(entry != NULL); // Remove the client from the object's array of clients. - ARROW_CHECK(remove_client_from_object_clients(entry, client) == 1); + ARROW_CHECK(remove_from_client_object_ids(entry, client) == 1); } // Check if an object is present. @@ -439,8 +444,8 @@ int PlasmaStore::abort_object(const ObjectID& object_id, Client* client) { ARROW_CHECK(entry != NULL) << "To abort an object it must be in the object table."; ARROW_CHECK(entry->state != PLASMA_SEALED) << "To abort an object it must not have been sealed."; - auto it = entry->clients.find(client); - if (it == entry->clients.end()) { + auto it = client->object_ids.find(object_id); + if (it == client->object_ids.end()) { // If the client requesting the abort is not the creator, do not // perform the abort. return 0; @@ -466,7 +471,7 @@ int PlasmaStore::delete_object(ObjectID& object_id) { return PlasmaError_ObjectNotSealed; } - if (entry->clients.size() != 0) { + if (entry->refcnt != 0) { // To delete an object, there must be no clients currently using it. return PlasmaError_ObjectInUse; } @@ -493,7 +498,7 @@ void PlasmaStore::delete_objects(const std::vector& object_ids) { ARROW_CHECK(entry != NULL) << "To delete an object it must be in the object table."; ARROW_CHECK(entry->state == PLASMA_SEALED) << "To delete an object it must have been sealed."; - ARROW_CHECK(entry->clients.size() == 0) + ARROW_CHECK(entry->refcnt == 0) << "To delete an object, there must be no clients currently using it."; store_info_.objects.erase(object_id); // Inform all subscribers that the object has been deleted. @@ -533,19 +538,25 @@ void PlasmaStore::disconnect_client(int client_fd) { // lists. // TODO(swang): Avoid iteration through the object table. auto client = it->second.get(); - std::vector unsealed_objects; - for (const auto& entry : store_info_.objects) { - if (entry.second->state == PLASMA_SEALED) { - remove_client_from_object_clients(entry.second.get(), client); + std::vector sealed_objects; + for (const auto& object_id : client->object_ids) { + auto it = store_info_.objects.find(object_id); + if (it == store_info_.objects.end()) { + continue; + } + + if (it->second->state == PLASMA_SEALED) { + // Add sealed objects to a temporary list of object IDs. Do not perform + // the remove here, since it potentially modifies the object_ids table. + sealed_objects.push_back(it->second.get()); } else { - // Add unsealed objects to a temporary list of object IDs. Do not perform - // the abort here, since it potentially modifies the object table. - unsealed_objects.push_back(entry.first); + // Abort unsealed object. + abort_object(it->first, client); } } - // If the client was creating any objects, abort them. - for (const auto& entry : unsealed_objects) { - abort_object(entry, client); + + for (const auto& entry : sealed_objects) { + remove_from_client_object_ids(entry, client); } // Note, the store may still attempt to send a message to the disconnected diff --git a/cpp/src/plasma/store.h b/cpp/src/plasma/store.h index ac6b2c4c50c..fd077f98430 100644 --- a/cpp/src/plasma/store.h +++ b/cpp/src/plasma/store.h @@ -22,6 +22,7 @@ #include #include #include +#include #include #include "plasma/common.h" @@ -46,6 +47,9 @@ struct Client { /// The file descriptor used to communicate with the client. int fd; + + /// Object ids that are used by this client. + std::unordered_set object_ids; }; class PlasmaStore { @@ -164,13 +168,13 @@ class PlasmaStore { private: void push_notification(ObjectInfoT* object_notification); - void add_client_to_object_clients(ObjectTableEntry* entry, Client* client); + void add_to_client_object_ids(ObjectTableEntry* entry, Client* client); void return_from_get(GetRequest* get_req); void update_object_get_requests(const ObjectID& object_id); - int remove_client_from_object_clients(ObjectTableEntry* entry, Client* client); + int remove_from_client_object_ids(ObjectTableEntry* entry, Client* client); /// Event loop of the plasma store. EventLoop* loop_; From 8a439e882120a55b918b428fbc0710b2d3bee038 Mon Sep 17 00:00:00 2001 From: Zhijun Fu Date: Wed, 9 May 2018 16:04:34 +0800 Subject: [PATCH 2/3] Trigger From d8db8f7542dc3a3635434f3aaace8f0ae5fc31bf Mon Sep 17 00:00:00 2001 From: Zhijun Fu Date: Thu, 10 May 2018 10:28:55 +0800 Subject: [PATCH 3/3] Address comments from pcmoritz --- cpp/src/plasma/plasma.cc | 2 +- cpp/src/plasma/plasma.h | 4 ++-- cpp/src/plasma/store.cc | 20 +++++++++----------- 3 files changed, 12 insertions(+), 14 deletions(-) diff --git a/cpp/src/plasma/plasma.cc b/cpp/src/plasma/plasma.cc index dd3d5653180..d98cbb978f9 100644 --- a/cpp/src/plasma/plasma.cc +++ b/cpp/src/plasma/plasma.cc @@ -30,7 +30,7 @@ extern "C" { void dlfree(void* mem); } -ObjectTableEntry::ObjectTableEntry() : pointer(nullptr), refcnt(0) {} +ObjectTableEntry::ObjectTableEntry() : pointer(nullptr), ref_count(0) {} ObjectTableEntry::~ObjectTableEntry() { dlfree(pointer); diff --git a/cpp/src/plasma/plasma.h b/cpp/src/plasma/plasma.h index 748824e376f..86730365fdb 100644 --- a/cpp/src/plasma/plasma.h +++ b/cpp/src/plasma/plasma.h @@ -134,8 +134,8 @@ struct ObjectTableEntry { /// IPC GPU handle to share with clients. std::shared_ptr ipc_handle; #endif - /// Reference count. - int refcnt; + /// Number of clients currently using this object. + int ref_count; /// The state of the object, e.g., whether it is open or sealed. object_state state; diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc index 2e0526dcec3..430b088c01a 100644 --- a/cpp/src/plasma/store.cc +++ b/cpp/src/plasma/store.cc @@ -131,14 +131,14 @@ void PlasmaStore::add_to_client_object_ids(ObjectTableEntry* entry, Client* clie } // If there are no other clients using this object, notify the eviction policy // that the object is being used. - if (entry->refcnt == 0) { + if (entry->ref_count == 0) { // Tell the eviction policy that this object is being used. std::vector objects_to_evict; eviction_policy_.begin_object_access(entry->object_id, &objects_to_evict); delete_objects(objects_to_evict); } - // Increase refcnt. - entry->refcnt++; + // Increase reference count. + entry->ref_count++; // Add object id to the list of object ids that this client is using. client->object_ids.insert(entry->object_id); @@ -390,12 +390,12 @@ int PlasmaStore::remove_from_client_object_ids(ObjectTableEntry* entry, Client* auto it = client->object_ids.find(entry->object_id); if (it != client->object_ids.end()) { client->object_ids.erase(it); - // Decrease refcnt. - entry->refcnt--; + // Decrease reference count. + entry->ref_count--; // If no more clients are using this object, notify the eviction policy // that the object is no longer being used. - if (entry->refcnt == 0) { + if (entry->ref_count == 0) { // Tell the eviction policy that this object is no longer being used. std::vector objects_to_evict; eviction_policy_.end_object_access(entry->object_id, &objects_to_evict); @@ -471,7 +471,7 @@ int PlasmaStore::delete_object(ObjectID& object_id) { return PlasmaError_ObjectNotSealed; } - if (entry->refcnt != 0) { + if (entry->ref_count != 0) { // To delete an object, there must be no clients currently using it. return PlasmaError_ObjectInUse; } @@ -498,7 +498,7 @@ void PlasmaStore::delete_objects(const std::vector& object_ids) { ARROW_CHECK(entry != NULL) << "To delete an object it must be in the object table."; ARROW_CHECK(entry->state == PLASMA_SEALED) << "To delete an object it must have been sealed."; - ARROW_CHECK(entry->refcnt == 0) + ARROW_CHECK(entry->ref_count == 0) << "To delete an object, there must be no clients currently using it."; store_info_.objects.erase(object_id); // Inform all subscribers that the object has been deleted. @@ -534,9 +534,7 @@ void PlasmaStore::disconnect_client(int client_fd) { // Close the socket. close(client_fd); ARROW_LOG(INFO) << "Disconnecting client on fd " << client_fd; - // If this client was using any objects, remove it from the appropriate - // lists. - // TODO(swang): Avoid iteration through the object table. + // Release all the objects that the client was using. auto client = it->second.get(); std::vector sealed_objects; for (const auto& object_id : client->object_ids) {