diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc index 171f062225e..69a02dc0f20 100644 --- a/cpp/src/plasma/store.cc +++ b/cpp/src/plasma/store.cc @@ -103,7 +103,7 @@ GetRequest::GetRequest(Client* client, const std::vector& object_ids) num_objects_to_wait_for = unique_ids.size(); } -Client::Client(int fd) : fd(fd) {} +Client::Client(int fd) : fd(fd), notification_fd(-1) {} PlasmaStore::PlasmaStore(EventLoop* loop, int64_t system_memory, std::string directory, bool hugepages_enabled) @@ -559,10 +559,18 @@ void PlasmaStore::disconnect_client(int client_fd) { remove_from_client_object_ids(entry, client); } - // Note, the store may still attempt to send a message to the disconnected - // client (for example, when an object ID that the client was waiting for - // is ready). In these cases, the attempt to send the message will fail, but - // the store should just ignore the failure. + if (client->notification_fd > 0) { + // This client has subscribed for notifications. + auto notify_fd = client->notification_fd; + loop_->RemoveFileEvent(notify_fd); + // Close socket. + close(notify_fd); + // Remove notification queue for this fd from global map. + pending_notifications_.erase(notify_fd); + // Reset fd. + client->notification_fd = -1; + } + connected_clients_.erase(it); } @@ -642,9 +650,23 @@ void PlasmaStore::push_notification(ObjectInfoT* object_info) { } } +void PlasmaStore::push_notification(ObjectInfoT* object_info, int client_fd) { + auto it = pending_notifications_.find(client_fd); + if (it != pending_notifications_.end()) { + auto notification = create_object_info_buffer(object_info); + it->second.object_notifications.emplace_back(std::move(notification)); + send_notifications(it); + } +} + // Subscribe to notifications about sealed objects. void PlasmaStore::subscribe_to_updates(Client* client) { ARROW_LOG(DEBUG) << "subscribing to updates on fd " << client->fd; + if (client->notification_fd > 0) { + // This client has already subscribed. Return. + return; + } + // TODO(rkn): The store could block here if the client doesn't send a file // descriptor. int fd = recv_fd(client->fd); @@ -657,12 +679,14 @@ void PlasmaStore::subscribe_to_updates(Client* client) { // Add this fd to global map, which is needed for this client to receive notifications. pending_notifications_[fd]; + client->notification_fd = fd; - // Push notifications to the new subscriber about existing objects. + // Push notifications to the new subscriber about existing sealed objects. for (const auto& entry : store_info_.objects) { - push_notification(&entry.second->info); + if (entry.second->state == PLASMA_SEALED) { + push_notification(&entry.second->info, fd); + } } - send_notifications(pending_notifications_.find(fd)); } Status PlasmaStore::process_message(Client* client) { diff --git a/cpp/src/plasma/store.h b/cpp/src/plasma/store.h index 9b3850b5a1f..e5ef9172eec 100644 --- a/cpp/src/plasma/store.h +++ b/cpp/src/plasma/store.h @@ -50,6 +50,10 @@ struct Client { /// Object ids that are used by this client. std::unordered_set object_ids; + + /// The file descriptor used to push notifications to client. This is only valid + /// if client subscribes to plasma store. -1 indicates invalid. + int notification_fd; }; class PlasmaStore { @@ -170,6 +174,8 @@ class PlasmaStore { private: void push_notification(ObjectInfoT* object_notification); + void push_notification(ObjectInfoT* object_notification, int client_fd); + void add_to_client_object_ids(ObjectTableEntry* entry, Client* client); void return_from_get(GetRequest* get_req);