Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions cpp/src/plasma/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -513,9 +513,20 @@ Status PlasmaClient::Abort(const ObjectID& object_id) {
}

Status PlasmaClient::Delete(const ObjectID& object_id) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In client.h we should probably clarify the behavior of this method. In particular, if a client calls Delete, that does not guarantee that the object will be deleted. In particular, if the object is in use by another client, then the Delete call is a no-op, right? Can you mention this in the documentation for this method in client.h?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it doesn't guarantee the object will be deleted for some reasons.

// TODO(rkn): In the future, we can use this method to give hints to the
// eviction policy about when an object will no longer be needed.
return Status::NotImplemented("PlasmaClient::Delete is not implemented.");
RETURN_NOT_OK(FlushReleaseHistory());
// If the object is in used, client can't send the remove message.
if (objects_in_use_.count(object_id) > 0) {
return Status::UnknownError("PlasmaClient::Object is in use.");
} else {
// If we don't already have a reference to the object, we can try to remove the object
RETURN_NOT_OK(SendDeleteRequest(store_conn_, object_id));
std::vector<uint8_t> buffer;
RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType_PlasmaDeleteReply, &buffer));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about not having the plasma store reply to the client? The client could just continue on. If it really wants to know whether the object was deleted or not, it can call Contains to check.

Are there use cases where you really want to know if the object was deleted or not?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Providing a blocked API is the most common thinking. If delete operation failed, client also need to know why the delete operation failed and do the next step according to the reason.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you describe the use case?

If you view the command as more of a "hint" that doesn't necessarily do anything but simply gives the plasma store more information that it can make use of, then a non-blocking API is natural.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, just update operation. And it can be implemented as: check contains, delete, check contains again to make it happen.
Currently, only two reasons: not sealed object and object is in use will make the DELETE operation failed. We can make it as non-blocking API. But when I look at other APIs such as CREATE, they also will provide the reply to the client. I think DELETE operation should follow the same way as CREATE operation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, let's try out this approach and see if it's useful, though I think we should consider switching to "hint" semantics at some point.

ObjectID object_id2;
DCHECK_GT(buffer.size(), 0);
RETURN_NOT_OK(ReadDeleteReply(buffer.data(), buffer.size(), &object_id2));
return Status::OK();
}
}

Status PlasmaClient::Evict(int64_t num_bytes, int64_t& num_bytes_evicted) {
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/plasma/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ class ARROW_EXPORT PlasmaClient {
Status Seal(const ObjectID& object_id);

/// Delete an object from the object store. This currently assumes that the
/// object is present and has been sealed.
/// object is present, has been sealed and not used by another client. Otherwise,
/// it is a no operation.
///
/// @todo We may want to allow the deletion of objects that are not present or
/// haven't been sealed.
Expand Down
10 changes: 10 additions & 0 deletions cpp/src/plasma/eviction_policy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,14 @@ void EvictionPolicy::end_object_access(const ObjectID& object_id,
cache_.add(object_id, entry->info.data_size + entry->info.metadata_size);
}

void EvictionPolicy::remove_object(const ObjectID& object_id) {
/* If the object is in the LRU cache, remove it. */
cache_.remove(object_id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we also need to update the memory_used_ field.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, fixed.


auto entry = store_info_->objects[object_id].get();
int64_t size = entry->info.data_size + entry->info.metadata_size;
ARROW_CHECK(memory_used_ >= size);
memory_used_ -= size;
}

} // namespace plasma
5 changes: 5 additions & 0 deletions cpp/src/plasma/eviction_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ class EvictionPolicy {
int64_t choose_objects_to_evict(int64_t num_bytes_required,
std::vector<ObjectID>* objects_to_evict);

/// This method will be called when an object is going to be removed
///
/// @param object_id The ID of the object that is now being used.
void remove_object(const ObjectID& object_id);

private:
/// The amount of memory (in bytes) currently being used.
int64_t memory_used_;
Expand Down
6 changes: 5 additions & 1 deletion cpp/src/plasma/format/plasma.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,11 @@ enum PlasmaError:int {
// Trying to access an object that doesn't exist.
ObjectNonexistent,
// Trying to create an object but there isn't enough space in the store.
OutOfMemory
OutOfMemory,
// Trying to delete an object but it's not sealed.
ObjectNotSealed,
// Trying to delete an object but it's in use.
ObjectInUse
}

// Plasma store messages
Expand Down
46 changes: 42 additions & 4 deletions cpp/src/plasma/store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,39 @@ int PlasmaStore::abort_object(const ObjectID& object_id, Client* client) {
}
}

int PlasmaStore::delete_object(ObjectID& object_id) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that we already have a delete_objects method, does it make sense to also have a delete_object method? Is the behavior of this method different from calling delete_objects on a vector of one object ID?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest, they are similar. But in delete_object will issue the error status to client if delete object failed. As for delete_objects used by evict objects function, I don't want to impact it too much.

auto entry = get_object_table_entry(&store_info_, object_id);
// TODO(rkn): This should probably not fail, but should instead throw an
// error. Maybe we should also support deleting objects that have been
// created but not sealed.
if (entry == NULL) {
// To delete an object it must be in the object table.
return PlasmaError_ObjectNonexistent;
}

if (entry->state != PLASMA_SEALED) {
// To delete an object it must have been sealed.
return PlasmaError_ObjectNotSealed;
}

if (entry->clients.size() != 0) {
// To delete an object, there must be no clients currently using it.
return PlasmaError_ObjectInUse;
}

eviction_policy_.remove_object(object_id);

dlfree(entry->pointer);
store_info_.objects.erase(object_id);
// Inform all subscribers that the object has been deleted.
ObjectInfoT notification;
notification.object_id = object_id.binary();
notification.is_deletion = true;
push_notification(&notification);

return PlasmaError_OK;
}

void PlasmaStore::delete_objects(const std::vector<ObjectID>& object_ids) {
for (const auto& object_id : object_ids) {
ARROW_LOG(DEBUG) << "deleting object " << object_id.hex();
Expand Down Expand Up @@ -626,18 +659,23 @@ Status PlasmaStore::process_message(Client* client) {
RETURN_NOT_OK(ReadGetRequest(input, input_size, object_ids_to_get, &timeout_ms));
process_get_request(client, object_ids_to_get, timeout_ms);
} break;
case MessageType_PlasmaReleaseRequest:
case MessageType_PlasmaReleaseRequest: {
RETURN_NOT_OK(ReadReleaseRequest(input, input_size, &object_id));
release_object(object_id, client);
break;
case MessageType_PlasmaContainsRequest:
} break;
case MessageType_PlasmaDeleteRequest: {
RETURN_NOT_OK(ReadDeleteRequest(input, input_size, &object_id));
int error_code = delete_object(object_id);
HANDLE_SIGPIPE(SendDeleteReply(client->fd, object_id, error_code), client->fd);
} break;
case MessageType_PlasmaContainsRequest: {
RETURN_NOT_OK(ReadContainsRequest(input, input_size, &object_id));
if (contains_object(object_id) == OBJECT_FOUND) {
HANDLE_SIGPIPE(SendContainsReply(client->fd, object_id, 1), client->fd);
} else {
HANDLE_SIGPIPE(SendContainsReply(client->fd, object_id, 0), client->fd);
}
break;
} break;
case MessageType_PlasmaSealRequest: {
unsigned char digest[kDigestSize];
RETURN_NOT_OK(ReadSealRequest(input, input_size, &object_id, &digest[0]));
Expand Down
9 changes: 9 additions & 0 deletions cpp/src/plasma/store.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,15 @@ class PlasmaStore {
/// @return 1 if the abort succeeds, else 0.
int abort_object(const ObjectID& object_id, Client* client);

/// Delete an specific object by object_id that have been created in the hash table.
///
/// @param object_id Object ID of the object to be deleted.
/// @return One of the following error codes:
/// - PlasmaError_OK, if the object was delete successfully.
/// - PlasmaError_ObjectNonexistent, if ths object isn't existed.
/// - PlasmaError_ObjectInUse, if the object is in use.
int delete_object(ObjectID& object_id);

/// Delete objects that have been created in the hash table. This should only
/// be called on objects that are returned by the eviction policy to evict.
///
Expand Down
25 changes: 25 additions & 0 deletions cpp/src/plasma/test/client_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,31 @@ class TestPlasmaStore : public ::testing::Test {
PlasmaClient client2_;
};

TEST_F(TestPlasmaStore, DeleteTest) {
ObjectID object_id = ObjectID::from_random();

// Test for deleting non-existance object.
Status result = client_.Delete(object_id);
ASSERT_EQ(result.IsPlasmaObjectNonexistent(), true);

// Test for the object being in local Plasma store.
// First create object.
int64_t data_size = 100;
uint8_t metadata[] = {5};
int64_t metadata_size = sizeof(metadata);
std::shared_ptr<Buffer> data;
ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data));
ARROW_CHECK_OK(client_.Seal(object_id));

// Object is in use, can't be delete.
result = client_.Delete(object_id);
ASSERT_EQ(result.IsUnknownError(), true);

// Avoid race condition of Plasma Manager waiting for notification.
ARROW_CHECK_OK(client_.Release(object_id));
ARROW_CHECK_OK(client_.Delete(object_id));
}

TEST_F(TestPlasmaStore, ContainsTest) {
ObjectID object_id = ObjectID::from_random();

Expand Down