diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc index 0dd1c44d71c..d74c0f412d9 100644 --- a/cpp/src/plasma/client.cc +++ b/cpp/src/plasma/client.cc @@ -513,9 +513,20 @@ Status PlasmaClient::Abort(const ObjectID& object_id) { } Status PlasmaClient::Delete(const ObjectID& object_id) { - // TODO(rkn): In the future, we can use this method to give hints to the - // eviction policy about when an object will no longer be needed. - return Status::NotImplemented("PlasmaClient::Delete is not implemented."); + RETURN_NOT_OK(FlushReleaseHistory()); + // If the object is in used, client can't send the remove message. + if (objects_in_use_.count(object_id) > 0) { + return Status::UnknownError("PlasmaClient::Object is in use."); + } else { + // If we don't already have a reference to the object, we can try to remove the object + RETURN_NOT_OK(SendDeleteRequest(store_conn_, object_id)); + std::vector buffer; + RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType_PlasmaDeleteReply, &buffer)); + ObjectID object_id2; + DCHECK_GT(buffer.size(), 0); + RETURN_NOT_OK(ReadDeleteReply(buffer.data(), buffer.size(), &object_id2)); + return Status::OK(); + } } Status PlasmaClient::Evict(int64_t num_bytes, int64_t& num_bytes_evicted) { diff --git a/cpp/src/plasma/client.h b/cpp/src/plasma/client.h index 78793f1a73a..35182f84032 100644 --- a/cpp/src/plasma/client.h +++ b/cpp/src/plasma/client.h @@ -174,7 +174,8 @@ class ARROW_EXPORT PlasmaClient { Status Seal(const ObjectID& object_id); /// Delete an object from the object store. This currently assumes that the - /// object is present and has been sealed. + /// object is present, has been sealed and not used by another client. Otherwise, + /// it is a no operation. /// /// @todo We may want to allow the deletion of objects that are not present or /// haven't been sealed. diff --git a/cpp/src/plasma/eviction_policy.cc b/cpp/src/plasma/eviction_policy.cc index a7758fd2c0e..66a3b2ea298 100644 --- a/cpp/src/plasma/eviction_policy.cc +++ b/cpp/src/plasma/eviction_policy.cc @@ -102,4 +102,14 @@ void EvictionPolicy::end_object_access(const ObjectID& object_id, cache_.add(object_id, entry->info.data_size + entry->info.metadata_size); } +void EvictionPolicy::remove_object(const ObjectID& object_id) { + /* If the object is in the LRU cache, remove it. */ + cache_.remove(object_id); + + auto entry = store_info_->objects[object_id].get(); + int64_t size = entry->info.data_size + entry->info.metadata_size; + ARROW_CHECK(memory_used_ >= size); + memory_used_ -= size; +} + } // namespace plasma diff --git a/cpp/src/plasma/eviction_policy.h b/cpp/src/plasma/eviction_policy.h index cebf35b1c1b..b0763095529 100644 --- a/cpp/src/plasma/eviction_policy.h +++ b/cpp/src/plasma/eviction_policy.h @@ -120,6 +120,11 @@ class EvictionPolicy { int64_t choose_objects_to_evict(int64_t num_bytes_required, std::vector* objects_to_evict); + /// This method will be called when an object is going to be removed + /// + /// @param object_id The ID of the object that is now being used. + void remove_object(const ObjectID& object_id); + private: /// The amount of memory (in bytes) currently being used. int64_t memory_used_; diff --git a/cpp/src/plasma/format/plasma.fbs b/cpp/src/plasma/format/plasma.fbs index b6d03b8a3c1..ea6dc8bb98d 100644 --- a/cpp/src/plasma/format/plasma.fbs +++ b/cpp/src/plasma/format/plasma.fbs @@ -76,7 +76,11 @@ enum PlasmaError:int { // Trying to access an object that doesn't exist. ObjectNonexistent, // Trying to create an object but there isn't enough space in the store. - OutOfMemory + OutOfMemory, + // Trying to delete an object but it's not sealed. + ObjectNotSealed, + // Trying to delete an object but it's in use. + ObjectInUse } // Plasma store messages diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc index c6a19a54718..dde7f9cdfa8 100644 --- a/cpp/src/plasma/store.cc +++ b/cpp/src/plasma/store.cc @@ -411,6 +411,39 @@ int PlasmaStore::abort_object(const ObjectID& object_id, Client* client) { } } +int PlasmaStore::delete_object(ObjectID& object_id) { + auto entry = get_object_table_entry(&store_info_, object_id); + // TODO(rkn): This should probably not fail, but should instead throw an + // error. Maybe we should also support deleting objects that have been + // created but not sealed. + if (entry == NULL) { + // To delete an object it must be in the object table. + return PlasmaError_ObjectNonexistent; + } + + if (entry->state != PLASMA_SEALED) { + // To delete an object it must have been sealed. + return PlasmaError_ObjectNotSealed; + } + + if (entry->clients.size() != 0) { + // To delete an object, there must be no clients currently using it. + return PlasmaError_ObjectInUse; + } + + eviction_policy_.remove_object(object_id); + + dlfree(entry->pointer); + store_info_.objects.erase(object_id); + // Inform all subscribers that the object has been deleted. + ObjectInfoT notification; + notification.object_id = object_id.binary(); + notification.is_deletion = true; + push_notification(¬ification); + + return PlasmaError_OK; +} + void PlasmaStore::delete_objects(const std::vector& object_ids) { for (const auto& object_id : object_ids) { ARROW_LOG(DEBUG) << "deleting object " << object_id.hex(); @@ -626,18 +659,23 @@ Status PlasmaStore::process_message(Client* client) { RETURN_NOT_OK(ReadGetRequest(input, input_size, object_ids_to_get, &timeout_ms)); process_get_request(client, object_ids_to_get, timeout_ms); } break; - case MessageType_PlasmaReleaseRequest: + case MessageType_PlasmaReleaseRequest: { RETURN_NOT_OK(ReadReleaseRequest(input, input_size, &object_id)); release_object(object_id, client); - break; - case MessageType_PlasmaContainsRequest: + } break; + case MessageType_PlasmaDeleteRequest: { + RETURN_NOT_OK(ReadDeleteRequest(input, input_size, &object_id)); + int error_code = delete_object(object_id); + HANDLE_SIGPIPE(SendDeleteReply(client->fd, object_id, error_code), client->fd); + } break; + case MessageType_PlasmaContainsRequest: { RETURN_NOT_OK(ReadContainsRequest(input, input_size, &object_id)); if (contains_object(object_id) == OBJECT_FOUND) { HANDLE_SIGPIPE(SendContainsReply(client->fd, object_id, 1), client->fd); } else { HANDLE_SIGPIPE(SendContainsReply(client->fd, object_id, 0), client->fd); } - break; + } break; case MessageType_PlasmaSealRequest: { unsigned char digest[kDigestSize]; RETURN_NOT_OK(ReadSealRequest(input, input_size, &object_id, &digest[0])); diff --git a/cpp/src/plasma/store.h b/cpp/src/plasma/store.h index a72c6259a9c..7eada5a1269 100644 --- a/cpp/src/plasma/store.h +++ b/cpp/src/plasma/store.h @@ -83,6 +83,15 @@ class PlasmaStore { /// @return 1 if the abort succeeds, else 0. int abort_object(const ObjectID& object_id, Client* client); + /// Delete an specific object by object_id that have been created in the hash table. + /// + /// @param object_id Object ID of the object to be deleted. + /// @return One of the following error codes: + /// - PlasmaError_OK, if the object was delete successfully. + /// - PlasmaError_ObjectNonexistent, if ths object isn't existed. + /// - PlasmaError_ObjectInUse, if the object is in use. + int delete_object(ObjectID& object_id); + /// Delete objects that have been created in the hash table. This should only /// be called on objects that are returned by the eviction policy to evict. /// diff --git a/cpp/src/plasma/test/client_tests.cc b/cpp/src/plasma/test/client_tests.cc index 5cd3063bb43..f19c2bfbdb3 100644 --- a/cpp/src/plasma/test/client_tests.cc +++ b/cpp/src/plasma/test/client_tests.cc @@ -58,6 +58,31 @@ class TestPlasmaStore : public ::testing::Test { PlasmaClient client2_; }; +TEST_F(TestPlasmaStore, DeleteTest) { + ObjectID object_id = ObjectID::from_random(); + + // Test for deleting non-existance object. + Status result = client_.Delete(object_id); + ASSERT_EQ(result.IsPlasmaObjectNonexistent(), true); + + // Test for the object being in local Plasma store. + // First create object. + int64_t data_size = 100; + uint8_t metadata[] = {5}; + int64_t metadata_size = sizeof(metadata); + std::shared_ptr data; + ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data)); + ARROW_CHECK_OK(client_.Seal(object_id)); + + // Object is in use, can't be delete. + result = client_.Delete(object_id); + ASSERT_EQ(result.IsUnknownError(), true); + + // Avoid race condition of Plasma Manager waiting for notification. + ARROW_CHECK_OK(client_.Release(object_id)); + ARROW_CHECK_OK(client_.Delete(object_id)); +} + TEST_F(TestPlasmaStore, ContainsTest) { ObjectID object_id = ObjectID::from_random();