From 24beb2775be98bb84456227edef072b6b1ace0ad Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Fri, 7 Dec 2018 14:35:56 -0800 Subject: [PATCH 01/12] working version --- cpp/src/plasma/client.cc | 37 +++++++++++++++++++------------------ cpp/src/plasma/store.cc | 9 +++++++-- cpp/src/plasma/store.h | 3 +++ 3 files changed, 29 insertions(+), 20 deletions(-) diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc index 99cf00cab80..b5bf6db978e 100644 --- a/cpp/src/plasma/client.cc +++ b/cpp/src/plasma/client.cc @@ -152,6 +152,8 @@ struct PlasmaClientConfig { }; struct ClientMmapTableEntry { + /// The associated file descriptor on the client + int fd; /// The result of mmap for this file descriptor. uint8_t* pointer; /// The length of the memory-mapped file. @@ -322,6 +324,7 @@ uint8_t* PlasmaClient::Impl::LookupOrMmap(int fd, int store_fd_val, int64_t map_ close(fd); // Closing this fd has an effect on performance. ClientMmapTableEntry& entry = mmap_table_[store_fd_val]; + entry.fd = fd; entry.pointer = result; entry.length = map_size; entry.count = 0; @@ -397,8 +400,15 @@ Status PlasmaClient::Impl::Create(const ObjectID& object_id, int64_t data_size, // If the CreateReply included an error, then the store will not send a file // descriptor. if (device_num == 0) { - int fd = recv_fd(store_conn_); - ARROW_CHECK(fd >= 0) << "recv not successful"; + int fd = -1; + auto entry = mmap_table_.find(store_fd); + if (entry == mmap_table_.end()) { + ARROW_LOG(WARNING) << "trying to receive store_fd = " << store_fd; + fd = recv_fd(store_conn_); + ARROW_CHECK(fd >= 0) << "recv not successful"; + } else { + fd = entry->second.fd; + } ARROW_CHECK(object.data_size == data_size); ARROW_CHECK(object.metadata_size == metadata_size); // The metadata should come right after the data. @@ -535,7 +545,13 @@ Status PlasmaClient::Impl::GetBuffers( // in the subsequent loop based on just the store file descriptor and without // having to know the relevant file descriptor received from recv_fd. for (size_t i = 0; i < store_fds.size(); i++) { - int fd = recv_fd(store_conn_); + int fd = -1; + auto entry = mmap_table_.find(store_fds[i]); + if (entry == mmap_table_.end()) { + fd = recv_fd(store_conn_); + } else { + fd = entry->second.fd; + } ARROW_CHECK(fd >= 0); LookupOrMmap(fd, store_fds[i], mmap_sizes[i]); } @@ -626,21 +642,6 @@ Status PlasmaClient::Impl::UnmapObject(const ObjectID& object_id) { int fd = object_entry->second->object.store_fd; auto entry = mmap_table_.find(fd); ARROW_CHECK(entry != mmap_table_.end()); - ARROW_CHECK(entry->second.count >= 1); - if (entry->second.count == 1) { - // If no other objects are being used, then unmap the file. - // We subtract kMmapRegionsGap from the length that was added - // in fake_mmap in malloc.h, to make the size page-aligned again. - int err = munmap(entry->second.pointer, entry->second.length - kMmapRegionsGap); - if (err == -1) { - return Status::IOError("Error during munmap"); - } - // Remove the corresponding entry from the hash table. - mmap_table_.erase(fd); - } else { - // If there are other objects being used, decrement the reference count. - entry->second.count -= 1; - } // Update the in_use_object_bytes_. in_use_object_bytes_ -= (object_entry->second->object.data_size + object_entry->second->object.metadata_size); diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc index ae658d757c1..2952dc36740 100644 --- a/cpp/src/plasma/store.cc +++ b/cpp/src/plasma/store.cc @@ -327,7 +327,10 @@ void PlasmaStore::ReturnFromGet(GetRequest* get_req) { if (s.ok()) { // Send all of the file descriptors for the present objects. for (int store_fd : store_fds) { - WarnIfSigpipe(send_fd(get_req->client->fd, store_fd), get_req->client->fd); + if (get_req->client->used_fds.find(store_fd) == get_req->client->used_fds.end()) { + WarnIfSigpipe(send_fd(get_req->client->fd, store_fd), get_req->client->fd); + get_req->client->used_fds.emplace(store_fd); + } } } @@ -783,8 +786,10 @@ Status PlasmaStore::ProcessMessage(Client* client) { HANDLE_SIGPIPE( SendCreateReply(client->fd, object_id, &object, error_code, mmap_size), client->fd); - if (error_code == PlasmaError::OK && device_num == 0) { + if (error_code == PlasmaError::OK && device_num == 0 && client->used_fds.find(object.store_fd) == client->used_fds.end()) { + ARROW_LOG(WARNING) << "sending fd = " << object.store_fd; WarnIfSigpipe(send_fd(client->fd, object.store_fd), client->fd); + client->used_fds.emplace(object.store_fd); } } break; case fb::MessageType::PlasmaCreateAndSealRequest: { diff --git a/cpp/src/plasma/store.h b/cpp/src/plasma/store.h index 8d3facd733f..0e0eb8323f3 100644 --- a/cpp/src/plasma/store.h +++ b/cpp/src/plasma/store.h @@ -54,6 +54,9 @@ struct Client { /// Object ids that are used by this client. std::unordered_set object_ids; + /// File descriptors that are used by this client. + std::unordered_set used_fds; + /// The file descriptor used to push notifications to client. This is only valid /// if client subscribes to plasma store. -1 indicates invalid. int notification_fd; From 5f0919934668f009d934408cdecde2b53539df37 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Fri, 7 Dec 2018 14:47:54 -0800 Subject: [PATCH 02/12] introduce method --- cpp/src/plasma/client.cc | 37 ++++++++++++++++++++----------------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc index b5bf6db978e..dd9cc703e43 100644 --- a/cpp/src/plasma/client.cc +++ b/cpp/src/plasma/client.cc @@ -228,6 +228,13 @@ class PlasmaClient::Impl : public std::enable_shared_from_this= 0) << "recv not successful"; + return fd; + } else { + return entry->second.fd; + } +} + void PlasmaClient::Impl::IncrementObjectCount(const ObjectID& object_id, PlasmaObject* object, bool is_sealed) { // Increment the count of the object to track the fact that it is being used. @@ -400,15 +418,7 @@ Status PlasmaClient::Impl::Create(const ObjectID& object_id, int64_t data_size, // If the CreateReply included an error, then the store will not send a file // descriptor. if (device_num == 0) { - int fd = -1; - auto entry = mmap_table_.find(store_fd); - if (entry == mmap_table_.end()) { - ARROW_LOG(WARNING) << "trying to receive store_fd = " << store_fd; - fd = recv_fd(store_conn_); - ARROW_CHECK(fd >= 0) << "recv not successful"; - } else { - fd = entry->second.fd; - } + int fd = GetStoreFd(store_fd); ARROW_CHECK(object.data_size == data_size); ARROW_CHECK(object.metadata_size == metadata_size); // The metadata should come right after the data. @@ -545,14 +555,7 @@ Status PlasmaClient::Impl::GetBuffers( // in the subsequent loop based on just the store file descriptor and without // having to know the relevant file descriptor received from recv_fd. for (size_t i = 0; i < store_fds.size(); i++) { - int fd = -1; - auto entry = mmap_table_.find(store_fds[i]); - if (entry == mmap_table_.end()) { - fd = recv_fd(store_conn_); - } else { - fd = entry->second.fd; - } - ARROW_CHECK(fd >= 0); + int fd = GetStoreFd(store_fds[i]); LookupOrMmap(fd, store_fds[i], mmap_sizes[i]); } From e5ccbbacc34eea65f6ec12242bdc6809ff9590df Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Fri, 7 Dec 2018 14:54:12 -0800 Subject: [PATCH 03/12] fixes --- cpp/src/plasma/client.cc | 2 +- cpp/src/plasma/store.cc | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc index dd9cc703e43..b96cfb40452 100644 --- a/cpp/src/plasma/client.cc +++ b/cpp/src/plasma/client.cc @@ -152,7 +152,7 @@ struct PlasmaClientConfig { }; struct ClientMmapTableEntry { - /// The associated file descriptor on the client + /// The associated file descriptor on the client. int fd; /// The result of mmap for this file descriptor. uint8_t* pointer; diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc index 2952dc36740..c515c85ad68 100644 --- a/cpp/src/plasma/store.cc +++ b/cpp/src/plasma/store.cc @@ -786,8 +786,8 @@ Status PlasmaStore::ProcessMessage(Client* client) { HANDLE_SIGPIPE( SendCreateReply(client->fd, object_id, &object, error_code, mmap_size), client->fd); - if (error_code == PlasmaError::OK && device_num == 0 && client->used_fds.find(object.store_fd) == client->used_fds.end()) { - ARROW_LOG(WARNING) << "sending fd = " << object.store_fd; + if (error_code == PlasmaError::OK && device_num == 0 && + client->used_fds.find(object.store_fd) == client->used_fds.end()) { WarnIfSigpipe(send_fd(client->fd, object.store_fd), client->fd); client->used_fds.emplace(object.store_fd); } From cfff7e32c7d67be8c0a44d9407a7c947431e6c05 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Fri, 7 Dec 2018 15:53:47 -0800 Subject: [PATCH 04/12] lint --- cpp/src/plasma/store.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc index c515c85ad68..eedb4eaa2ea 100644 --- a/cpp/src/plasma/store.cc +++ b/cpp/src/plasma/store.cc @@ -787,7 +787,7 @@ Status PlasmaStore::ProcessMessage(Client* client) { SendCreateReply(client->fd, object_id, &object, error_code, mmap_size), client->fd); if (error_code == PlasmaError::OK && device_num == 0 && - client->used_fds.find(object.store_fd) == client->used_fds.end()) { + client->used_fds.find(object.store_fd) == client->used_fds.end()) { WarnIfSigpipe(send_fd(client->fd, object.store_fd), client->fd); client->used_fds.emplace(object.store_fd); } From 2887b170940f8f47eaa273dcd63b04e6c3071c1f Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Fri, 7 Dec 2018 16:39:42 -0800 Subject: [PATCH 05/12] clean up some code --- cpp/src/plasma/client.cc | 124 +++------------------------- cpp/src/plasma/client.h | 14 +--- cpp/src/plasma/test/client_tests.cc | 8 -- 3 files changed, 15 insertions(+), 131 deletions(-) diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc index b96cfb40452..29bd9667558 100644 --- a/cpp/src/plasma/client.cc +++ b/cpp/src/plasma/client.cc @@ -83,9 +83,6 @@ typedef struct XXH64_state_s XXH64_state_t; constexpr int64_t kHashingConcurrency = 8; constexpr int64_t kBytesInMB = 1 << 20; -// Use 100MB as an overestimate of the L3 cache size. -constexpr int64_t kL3CacheSizeBytes = 100000000; - // ---------------------------------------------------------------------- // GPU support @@ -143,14 +140,6 @@ struct ObjectInUseEntry { bool is_sealed; }; -/// Configuration options for the plasma client. -struct PlasmaClientConfig { - /// Number of release calls we wait until the object is actually released. - /// This allows us to avoid invalidating the cpu cache on workers if objects - /// are reused accross tasks. - size_t release_delay; -}; - struct ClientMmapTableEntry { /// The associated file descriptor on the client. int fd; @@ -158,9 +147,6 @@ struct ClientMmapTableEntry { uint8_t* pointer; /// The length of the memory-mapped file. size_t length; - /// The number of objects in this memory-mapped file that are currently being - /// used by the client. When this count reaches zeros, we unmap the file. - int count; }; class PlasmaClient::Impl : public std::enable_shared_from_this { @@ -171,7 +157,7 @@ class PlasmaClient::Impl : public std::enable_shared_from_this> objects_in_use_; - /// Object IDs of the last few release calls. This is a deque and - /// is used to delay releasing objects to see if they can be reused by - /// subsequent tasks so we do not unneccessarily invalidate cpu caches. - /// TODO(pcm): replace this with a proper lru cache using the size of the L3 - /// cache. - std::deque release_history_; /// The number of bytes in the combined objects that are held in the release /// history doubly-linked list. If this is too large then the client starts /// releasing objects. int64_t in_use_object_bytes_; - /// Configuration options for the plasma client. - PlasmaClientConfig config_; /// The amount of memory available to the Plasma store. The client needs this /// information to make sure that it does not delay in releasing so much /// memory that the store is unable to evict enough objects to free up space. @@ -334,7 +307,6 @@ uint8_t* PlasmaClient::Impl::LookupOrMmap(int fd, int store_fd_val, int64_t map_ entry.fd = fd; entry.pointer = result; entry.length = map_size; - entry.count = 0; return result; } } @@ -379,16 +351,9 @@ void PlasmaClient::Impl::IncrementObjectCount(const ObjectID& object_id, objects_in_use_[object_id]->is_sealed = is_sealed; object_entry = objects_in_use_[object_id].get(); if (object->device_num == 0) { - // Increment the count of the number of objects in the memory-mapped file - // that are being used. The corresponding decrement should happen in - // PlasmaClient::Release. - auto entry = mmap_table_.find(object->store_fd); - ARROW_CHECK(entry != mmap_table_.end()); - ARROW_CHECK(entry->second.count >= 0); // Update the in_use_object_bytes_. in_use_object_bytes_ += (object_entry->object.data_size + object_entry->object.metadata_size); - entry->second.count += 1; } } else { object_entry = elem->second.get(); @@ -634,39 +599,26 @@ Status PlasmaClient::Impl::Get(const ObjectID* object_ids, int64_t num_objects, return GetBuffers(object_ids, num_objects, timeout_ms, wrap_buffer, out); } -Status PlasmaClient::Impl::UnmapObject(const ObjectID& object_id) { +Status PlasmaClient::Impl::MarkObjectUnused(const ObjectID& object_id) { auto object_entry = objects_in_use_.find(object_id); ARROW_CHECK(object_entry != objects_in_use_.end()); ARROW_CHECK(object_entry->second->count == 0); - // Decrement the count of the number of objects in this memory-mapped file - // that the client is using. The corresponding increment should have - // happened in plasma_get. - int fd = object_entry->second->object.store_fd; - auto entry = mmap_table_.find(fd); - ARROW_CHECK(entry != mmap_table_.end()); // Update the in_use_object_bytes_. in_use_object_bytes_ -= (object_entry->second->object.data_size + object_entry->second->object.metadata_size); DCHECK_GE(in_use_object_bytes_, 0); + // Remove the entry from the hash table of objects currently in use. objects_in_use_.erase(object_id); return Status::OK(); } -/// This is a helper method for implementing plasma_release. We maintain a -/// buffer -/// of release calls and only perform them once the buffer becomes full (as -/// judged by the aggregate sizes of the objects). There may be multiple release -/// calls for the same object ID in the buffer. In this case, the first release -/// calls will not do anything. The client will only send a message to the store -/// releasing the object when the client is truly done with the object. -/// -/// @param object_id The object ID to attempt to release. -Status PlasmaClient::Impl::PerformRelease(const ObjectID& object_id) { - // Decrement the count of the number of instances of this object that are - // being used by this client. The corresponding increment should have happened - // in PlasmaClient::Get. +Status PlasmaClient::Impl::Release(const ObjectID& object_id) { + // If the client is already disconnected, ignore release requests. + if (store_conn_ < 0) { + return Status::OK(); + } auto object_entry = objects_in_use_.find(object_id); ARROW_CHECK(object_entry != objects_in_use_.end()); object_entry->second->count -= 1; @@ -674,7 +626,7 @@ Status PlasmaClient::Impl::PerformRelease(const ObjectID& object_id) { // Check if the client is no longer using this object. if (object_entry->second->count == 0) { // Tell the store that the client no longer needs the object. - RETURN_NOT_OK(UnmapObject(object_id)); + RETURN_NOT_OK(MarkObjectUnused(object_id)); RETURN_NOT_OK(SendReleaseRequest(store_conn_, object_id)); auto iter = deletion_cache_.find(object_id); if (iter != deletion_cache_.end()) { @@ -685,50 +637,6 @@ Status PlasmaClient::Impl::PerformRelease(const ObjectID& object_id) { return Status::OK(); } -Status PlasmaClient::Impl::Release(const ObjectID& object_id) { - // If the client is already disconnected, ignore release requests. - if (store_conn_ < 0) { - return Status::OK(); - } - // If an object is in the deletion cache, handle it directly without waiting. - auto iter = deletion_cache_.find(object_id); - if (iter != deletion_cache_.end()) { - RETURN_NOT_OK(PerformRelease(object_id)); - return Status::OK(); - } - // Add the new object to the release history. - release_history_.push_front(object_id); - // If there are too many bytes in use by the client or if there are too many - // pending release calls, and there are at least some pending release calls in - // the release_history list, then release some objects. - - // TODO(wap): Eviction policy only works on host memory, and thus objects on - // the GPU cannot be released currently. - while ((in_use_object_bytes_ > std::min(kL3CacheSizeBytes, store_capacity_ / 100) || - release_history_.size() > config_.release_delay) && - release_history_.size() > 0) { - // Perform a release for the object ID for the first pending release. - RETURN_NOT_OK(PerformRelease(release_history_.back())); - // Remove the last entry from the release history. - release_history_.pop_back(); - } - return Status::OK(); -} - -Status PlasmaClient::Impl::FlushReleaseHistory() { - // If the client is already disconnected, ignore the flush. - if (store_conn_ < 0) { - return Status::OK(); - } - while (release_history_.size() > 0) { - // Perform a release for the object ID for the first pending release. - RETURN_NOT_OK(PerformRelease(release_history_.back())); - // Remove the last entry from the release history. - release_history_.pop_back(); - } - return Status::OK(); -} - // This method is used to query whether the plasma store contains an object. Status PlasmaClient::Impl::Contains(const ObjectID& object_id, bool* has_object) { // Check if we already have a reference to the object. @@ -859,8 +767,6 @@ Status PlasmaClient::Impl::Abort(const ObjectID& object_id) { ARROW_CHECK(!object_entry->second->is_sealed) << "Plasma client called abort on a sealed object"; - // Flush the release history. - RETURN_NOT_OK(FlushReleaseHistory()); // Make sure that the Plasma client only has one reference to the object. If // it has more, then the client needs to release the buffer before calling // abort. @@ -872,7 +778,7 @@ Status PlasmaClient::Impl::Abort(const ObjectID& object_id) { RETURN_NOT_OK(SendAbortRequest(store_conn_, object_id)); // Decrease the reference count to zero, then remove the object. object_entry->second->count--; - RETURN_NOT_OK(UnmapObject(object_id)); + RETURN_NOT_OK(MarkObjectUnused(object_id)); std::vector buffer; ObjectID id; @@ -882,7 +788,6 @@ Status PlasmaClient::Impl::Abort(const ObjectID& object_id) { } Status PlasmaClient::Impl::Delete(const std::vector& object_ids) { - RETURN_NOT_OK(FlushReleaseHistory()); std::vector not_in_use_ids; for (auto& object_id : object_ids) { // If the object is in used, skip it. @@ -985,7 +890,6 @@ Status PlasmaClient::Impl::Connect(const std::string& store_socket_name, } else { manager_conn_ = -1; } - config_.release_delay = release_delay; in_use_object_bytes_ = 0; // Send a ConnectRequest to the store to get its memory capacity. RETURN_NOT_OK(SendConnectRequest(store_conn_)); @@ -1179,8 +1083,6 @@ Status PlasmaClient::Info(const ObjectID& object_id, int* object_status) { int PlasmaClient::get_manager_fd() const { return impl_->get_manager_fd(); } -Status PlasmaClient::FlushReleaseHistory() { return impl_->FlushReleaseHistory(); } - bool PlasmaClient::IsInUse(const ObjectID& object_id) { return impl_->IsInUse(object_id); } diff --git a/cpp/src/plasma/client.h b/cpp/src/plasma/client.h index 9e080b7760d..148faa36643 100644 --- a/cpp/src/plasma/client.h +++ b/cpp/src/plasma/client.h @@ -34,11 +34,6 @@ using arrow::Status; namespace plasma { -/// We keep a queue of unreleased objects cached in the client until we start -/// sending release requests to the store. This is to avoid frequently mapping -/// and unmapping objects and evicting data from processor caches. -constexpr int64_t kPlasmaDefaultReleaseDelay = 64; - /// Object buffer data structure. struct ObjectBuffer { /// The data buffer. @@ -62,13 +57,12 @@ class ARROW_EXPORT PlasmaClient { /// \param manager_socket_name The name of the UNIX domain socket to use to /// connect to the local Plasma manager. If this is "", then this /// function will not connect to a manager. - /// \param release_delay Number of released objects that are kept around - /// and not evicted to avoid too many munmaps. + /// \param release_delay Deprecated (not used). /// \param num_retries number of attempts to connect to IPC socket, default 50 /// \return The return status. Status Connect(const std::string& store_socket_name, const std::string& manager_socket_name, - int release_delay = kPlasmaDefaultReleaseDelay, int num_retries = -1); + int release_delay = 0, int num_retries = -1); /// Create an object in the Plasma Store. Any metadata for this object must be /// be passed in when the object is created. @@ -354,10 +348,6 @@ class ARROW_EXPORT PlasmaClient { FRIEND_TEST(TestPlasmaStore, LegacyGetTest); FRIEND_TEST(TestPlasmaStore, AbortTest); - /// This is a helper method that flushes all pending release calls to the - /// store. - Status FlushReleaseHistory(); - bool IsInUse(const ObjectID& object_id); class ARROW_NO_EXPORT Impl; diff --git a/cpp/src/plasma/test/client_tests.cc b/cpp/src/plasma/test/client_tests.cc index f820303aba4..2b58ff0c857 100644 --- a/cpp/src/plasma/test/client_tests.cc +++ b/cpp/src/plasma/test/client_tests.cc @@ -277,7 +277,6 @@ TEST_F(TestPlasmaStore, GetTest) { // First create object. std::vector data = {3, 5, 6, 7, 9}; CreateObject(client_, object_id, {42}, data); - ARROW_CHECK_OK(client_.FlushReleaseHistory()); EXPECT_FALSE(client_.IsInUse(object_id)); object_buffers.clear(); @@ -291,11 +290,9 @@ TEST_F(TestPlasmaStore, GetTest) { auto metadata = object_buffers[0].metadata; object_buffers.clear(); ::arrow::AssertBufferEqual(*metadata, std::string{42}); - ARROW_CHECK_OK(client_.FlushReleaseHistory()); EXPECT_TRUE(client_.IsInUse(object_id)); } // Object is automatically released - ARROW_CHECK_OK(client_.FlushReleaseHistory()); EXPECT_FALSE(client_.IsInUse(object_id)); } @@ -314,17 +311,14 @@ TEST_F(TestPlasmaStore, LegacyGetTest) { // First create object. std::vector data = {3, 5, 6, 7, 9}; CreateObject(client_, object_id, {42}, data); - ARROW_CHECK_OK(client_.FlushReleaseHistory()); EXPECT_FALSE(client_.IsInUse(object_id)); ARROW_CHECK_OK(client_.Get(&object_id, 1, -1, &object_buffer)); AssertObjectBufferEqual(object_buffer, {42}, {3, 5, 6, 7, 9}); } // Object needs releasing manually - ARROW_CHECK_OK(client_.FlushReleaseHistory()); EXPECT_TRUE(client_.IsInUse(object_id)); ARROW_CHECK_OK(client_.Release(object_id)); - ARROW_CHECK_OK(client_.FlushReleaseHistory()); EXPECT_FALSE(client_.IsInUse(object_id)); } @@ -377,11 +371,9 @@ TEST_F(TestPlasmaStore, AbortTest) { ASSERT_TRUE(status.IsInvalid()); // Release, then abort. ARROW_CHECK_OK(client_.Release(object_id)); - ARROW_CHECK_OK(client_.FlushReleaseHistory()); EXPECT_TRUE(client_.IsInUse(object_id)); ARROW_CHECK_OK(client_.Abort(object_id)); - ARROW_CHECK_OK(client_.FlushReleaseHistory()); EXPECT_FALSE(client_.IsInUse(object_id)); // Test for object non-existence after the abort. From 502aeda4a5258f1fcb27084e529c8f9393f026c8 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Fri, 7 Dec 2018 16:58:20 -0800 Subject: [PATCH 06/12] linting --- cpp/src/plasma/client.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/plasma/client.h b/cpp/src/plasma/client.h index 148faa36643..514d2bd0d6d 100644 --- a/cpp/src/plasma/client.h +++ b/cpp/src/plasma/client.h @@ -61,8 +61,8 @@ class ARROW_EXPORT PlasmaClient { /// \param num_retries number of attempts to connect to IPC socket, default 50 /// \return The return status. Status Connect(const std::string& store_socket_name, - const std::string& manager_socket_name, - int release_delay = 0, int num_retries = -1); + const std::string& manager_socket_name, int release_delay = 0, + int num_retries = -1); /// Create an object in the Plasma Store. Any metadata for this object must be /// be passed in when the object is created. From f60dcbedc3ad6ec4fbd496a17126d3472322f34e Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Fri, 7 Dec 2018 17:44:34 -0800 Subject: [PATCH 07/12] fix tests --- cpp/src/plasma/test/client_tests.cc | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/cpp/src/plasma/test/client_tests.cc b/cpp/src/plasma/test/client_tests.cc index 2b58ff0c857..1f130dce34f 100644 --- a/cpp/src/plasma/test/client_tests.cc +++ b/cpp/src/plasma/test/client_tests.cc @@ -82,7 +82,8 @@ class TestPlasmaStore : public ::testing::Test { void CreateObject(PlasmaClient& client, const ObjectID& object_id, const std::vector& metadata, - const std::vector& data) { + const std::vector& data, + bool release = true) { std::shared_ptr data_buffer; ARROW_CHECK_OK(client.Create(object_id, data.size(), &metadata[0], metadata.size(), &data_buffer)); @@ -90,7 +91,9 @@ class TestPlasmaStore : public ::testing::Test { data_buffer->mutable_data()[i] = data[i]; } ARROW_CHECK_OK(client.Seal(object_id)); - ARROW_CHECK_OK(client.Release(object_id)); + if (release) { + ARROW_CHECK_OK(client.Release(object_id)); + } } const std::string& GetStoreSocketName() const { return store_socket_name_; } @@ -155,11 +158,12 @@ TEST_F(TestPlasmaStore, SealErrorsTest) { // Create object. std::vector data(100, 0); - CreateObject(client_, object_id, {42}, data); + CreateObject(client_, object_id, {42}, data, false); // Trying to seal it again. result = client_.Seal(object_id); ASSERT_TRUE(result.IsPlasmaObjectAlreadySealed()); + ARROW_CHECK_OK(client_.Release(object_id)); } TEST_F(TestPlasmaStore, DeleteTest) { @@ -228,13 +232,7 @@ TEST_F(TestPlasmaStore, DeleteObjectsTest) { // client2_ won't send the release request immediately because the trigger // condition is not reached. The release is only added to release cache. object_buffers.clear(); - // The reference count went to zero, but the objects are still in the release - // cache. - ARROW_CHECK_OK(client_.Contains(object_id1, &has_object)); - ASSERT_TRUE(has_object); - ARROW_CHECK_OK(client_.Contains(object_id2, &has_object)); - ASSERT_TRUE(has_object); - // The Delete call will flush release cache and send the Delete request. + // Delete the objects. result = client2_.Delete(std::vector{object_id1, object_id2}); ARROW_CHECK_OK(client_.Contains(object_id1, &has_object)); ASSERT_FALSE(has_object); @@ -386,7 +384,6 @@ TEST_F(TestPlasmaStore, AbortTest) { // Test that we can get the object. ARROW_CHECK_OK(client_.Get({object_id}, -1, &object_buffers)); AssertObjectBufferEqual(object_buffers[0], {42, 43}, {1, 2, 3, 4, 5}); - ARROW_CHECK_OK(client_.Release(object_id)); } TEST_F(TestPlasmaStore, MultipleClientTest) { From 0d572823542670b5d516a82d063efc4a2063662f Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Fri, 7 Dec 2018 17:59:13 -0800 Subject: [PATCH 08/12] linting --- cpp/src/plasma/test/client_tests.cc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cpp/src/plasma/test/client_tests.cc b/cpp/src/plasma/test/client_tests.cc index 1f130dce34f..65a9b71b7f2 100644 --- a/cpp/src/plasma/test/client_tests.cc +++ b/cpp/src/plasma/test/client_tests.cc @@ -82,8 +82,7 @@ class TestPlasmaStore : public ::testing::Test { void CreateObject(PlasmaClient& client, const ObjectID& object_id, const std::vector& metadata, - const std::vector& data, - bool release = true) { + const std::vector& data, bool release = true) { std::shared_ptr data_buffer; ARROW_CHECK_OK(client.Create(object_id, data.size(), &metadata[0], metadata.size(), &data_buffer)); From 71c4c5c1b7987552b79172cc9c31a26ef8ec7ce8 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Wed, 12 Dec 2018 18:25:21 -0800 Subject: [PATCH 09/12] don't close fd twice --- cpp/src/plasma/client.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc index 29bd9667558..0ba031a3494 100644 --- a/cpp/src/plasma/client.cc +++ b/cpp/src/plasma/client.cc @@ -290,7 +290,6 @@ PlasmaClient::Impl::~Impl() {} uint8_t* PlasmaClient::Impl::LookupOrMmap(int fd, int store_fd_val, int64_t map_size) { auto entry = mmap_table_.find(store_fd_val); if (entry != mmap_table_.end()) { - close(fd); return entry->second.pointer; } else { // We subtract kMmapRegionsGap from the length that was added From af150c145d8a8548c0bbbe24d95e0f31f205f17b Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Thu, 13 Dec 2018 12:25:11 -0800 Subject: [PATCH 10/12] comments and fixes --- cpp/src/plasma/client.cc | 29 ++++++++--------------- cpp/src/plasma/store.cc | 8 +++++-- docs/source/python/plasma.rst | 10 ++++---- python/pyarrow/_plasma.pyx | 9 ++++--- python/pyarrow/tensorflow/plasma_op.cc | 4 ++-- python/pyarrow/tests/test_plasma.py | 8 +++---- python/pyarrow/tests/test_plasma_tf_op.py | 2 +- 7 files changed, 34 insertions(+), 36 deletions(-) diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc index 0ba031a3494..36dc73f873b 100644 --- a/cpp/src/plasma/client.cc +++ b/cpp/src/plasma/client.cc @@ -213,15 +213,17 @@ class PlasmaClient::Impl : public std::enable_shared_from_this> objects_in_use_; - /// The number of bytes in the combined objects that are held in the release - /// history doubly-linked list. If this is too large then the client starts - /// releasing objects. - int64_t in_use_object_bytes_; /// The amount of memory available to the Plasma store. The client needs this /// information to make sure that it does not delay in releasing so much /// memory that the store is unable to evict enough objects to free up space. @@ -349,11 +347,6 @@ void PlasmaClient::Impl::IncrementObjectCount(const ObjectID& object_id, objects_in_use_[object_id]->count = 0; objects_in_use_[object_id]->is_sealed = is_sealed; object_entry = objects_in_use_[object_id].get(); - if (object->device_num == 0) { - // Update the in_use_object_bytes_. - in_use_object_bytes_ += - (object_entry->object.data_size + object_entry->object.metadata_size); - } } else { object_entry = elem->second.get(); ARROW_CHECK(object_entry->count > 0); @@ -603,11 +596,6 @@ Status PlasmaClient::Impl::MarkObjectUnused(const ObjectID& object_id) { ARROW_CHECK(object_entry != objects_in_use_.end()); ARROW_CHECK(object_entry->second->count == 0); - // Update the in_use_object_bytes_. - in_use_object_bytes_ -= (object_entry->second->object.data_size + - object_entry->second->object.metadata_size); - DCHECK_GE(in_use_object_bytes_, 0); - // Remove the entry from the hash table of objects currently in use. objects_in_use_.erase(object_id); return Status::OK(); @@ -889,7 +877,10 @@ Status PlasmaClient::Impl::Connect(const std::string& store_socket_name, } else { manager_conn_ = -1; } - in_use_object_bytes_ = 0; + if (release_delay != 0) { + ARROW_LOG(WARNING) << "The release_delay parameter in PlasmaClient::Connect " + << "is deprecated"; + } // Send a ConnectRequest to the store to get its memory capacity. RETURN_NOT_OK(SendConnectRequest(store_conn_)); std::vector buffer; diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc index eedb4eaa2ea..f6326ccf588 100644 --- a/cpp/src/plasma/store.cc +++ b/cpp/src/plasma/store.cc @@ -327,9 +327,11 @@ void PlasmaStore::ReturnFromGet(GetRequest* get_req) { if (s.ok()) { // Send all of the file descriptors for the present objects. for (int store_fd : store_fds) { + // Only send the file descriptor if it hasn't been sent (see analogous + // logic in GetStoreFd in client.cc). if (get_req->client->used_fds.find(store_fd) == get_req->client->used_fds.end()) { WarnIfSigpipe(send_fd(get_req->client->fd, store_fd), get_req->client->fd); - get_req->client->used_fds.emplace(store_fd); + get_req->client->used_fds.insert(store_fd); } } } @@ -786,10 +788,12 @@ Status PlasmaStore::ProcessMessage(Client* client) { HANDLE_SIGPIPE( SendCreateReply(client->fd, object_id, &object, error_code, mmap_size), client->fd); + // Only send the file descriptor if it hasn't been sent (see analogous + // logic in GetStoreFd in client.cc). Similar in ReturnFromGet. if (error_code == PlasmaError::OK && device_num == 0 && client->used_fds.find(object.store_fd) == client->used_fds.end()) { WarnIfSigpipe(send_fd(client->fd, object.store_fd), client->fd); - client->used_fds.emplace(object.store_fd); + client->used_fds.insert(object.store_fd); } } break; case fb::MessageType::PlasmaCreateAndSealRequest: { diff --git a/docs/source/python/plasma.rst b/docs/source/python/plasma.rst index 09837cf6e9e..3df68eff59e 100644 --- a/docs/source/python/plasma.rst +++ b/docs/source/python/plasma.rst @@ -60,7 +60,7 @@ socket name: .. code-block:: python import pyarrow.plasma as plasma - client = plasma.connect("/tmp/plasma", "", 0) + client = plasma.connect("/tmp/plasma", "") If the following error occurs from running the above Python code, that means that either the socket given is incorrect, or the ``./plasma_store`` is @@ -68,7 +68,7 @@ not currently running. Check to see if the Plasma store is still running. .. code-block:: shell - >>> client = plasma.connect("/tmp/plasma", "", 0) + >>> client = plasma.connect("/tmp/plasma", "") Connection to socket failed for pathname /tmp/plasma Could not connect to socket /tmp/plasma @@ -179,7 +179,7 @@ the object buffer. # Create a different client. Note that this second client could be # created in the same or in a separate, concurrent Python session. - client2 = plasma.connect("/tmp/plasma", "", 0) + client2 = plasma.connect("/tmp/plasma", "") # Get the object in the second client. This blocks until the object has been sealed. object_id2 = plasma.ObjectID(20 * b"a") @@ -221,7 +221,7 @@ of the object info might change in the future): import pyarrow.plasma as plasma import time - client = plasma.connect("/tmp/plasma", "", 0) + client = plasma.connect("/tmp/plasma", "") client.put("hello, world") # Sleep a little so we get different creation times @@ -452,7 +452,7 @@ You can test this with the following script: import pyarrow.plasma as plasma import time - client = plasma.connect("/tmp/plasma", "", 0) + client = plasma.connect("/tmp/plasma", "") data = np.random.randn(100000000) tensor = pa.Tensor.from_numpy(data) diff --git a/python/pyarrow/_plasma.pyx b/python/pyarrow/_plasma.pyx index 2fad09c0549..3aae14c123d 100644 --- a/python/pyarrow/_plasma.pyx +++ b/python/pyarrow/_plasma.pyx @@ -32,6 +32,7 @@ from cpython.pycapsule cimport * import collections import pyarrow import random +import warnings import socket from pyarrow.lib cimport Buffer, NativeFile, check_status, pyarrow_wrap_buffer @@ -872,7 +873,7 @@ cdef class PlasmaClient: return result -def connect(store_socket_name, manager_socket_name, int release_delay, +def connect(store_socket_name, manager_socket_name, int release_delay=0, int num_retries=-1): """ Return a new PlasmaClient that is connected a plasma store and @@ -885,8 +886,7 @@ def connect(store_socket_name, manager_socket_name, int release_delay, manager_socket_name : str Name of the socket the plasma manager is listening at. release_delay : int - The maximum number of objects that the client will keep and - delay releasing (for caching reasons). + This parameter is deprecated and has no effect. num_retries : int, default -1 Number of times to try to connect to plasma store. Default value of -1 uses the default (50) @@ -894,6 +894,9 @@ def connect(store_socket_name, manager_socket_name, int release_delay, cdef PlasmaClient result = PlasmaClient() result.store_socket_name = store_socket_name.encode() result.manager_socket_name = manager_socket_name.encode() + if release_delay != 0: + warnings.warn("release_delay in PlasmaClient.connect is deprecated", + FutureWarning) with nogil: check_status(result.client.get() .Connect(result.store_socket_name, diff --git a/python/pyarrow/tensorflow/plasma_op.cc b/python/pyarrow/tensorflow/plasma_op.cc index a341d5a5398..4e6449adfc8 100644 --- a/python/pyarrow/tensorflow/plasma_op.cc +++ b/python/pyarrow/tensorflow/plasma_op.cc @@ -77,7 +77,7 @@ class TensorToPlasmaOp : public tf::AsyncOpKernel { if (!connected_) { VLOG(1) << "Connecting to Plasma..."; ARROW_CHECK_OK(client_.Connect(plasma_store_socket_name_, - plasma_manager_socket_name_, 0)); + plasma_manager_socket_name_)); VLOG(1) << "Connected!"; connected_ = true; } @@ -249,7 +249,7 @@ class PlasmaToTensorOp : public tf::AsyncOpKernel { if (!connected_) { VLOG(1) << "Connecting to Plasma..."; ARROW_CHECK_OK(client_.Connect(plasma_store_socket_name_, - plasma_manager_socket_name_, 0)); + plasma_manager_socket_name_)); VLOG(1) << "Connected!"; connected_ = true; } diff --git a/python/pyarrow/tests/test_plasma.py b/python/pyarrow/tests/test_plasma.py index e3d31b7de19..66449e6dba9 100644 --- a/python/pyarrow/tests/test_plasma.py +++ b/python/pyarrow/tests/test_plasma.py @@ -121,8 +121,8 @@ def setup_method(self, test_method): use_one_memory_mapped_file=use_one_memory_mapped_file) self.plasma_store_name, self.p = self.plasma_store_ctx.__enter__() # Connect to Plasma. - self.plasma_client = plasma.connect(self.plasma_store_name, "", 64) - self.plasma_client2 = plasma.connect(self.plasma_store_name, "", 0) + self.plasma_client = plasma.connect(self.plasma_store_name, "") + self.plasma_client2 = plasma.connect(self.plasma_store_name, "") def teardown_method(self, test_method): try: @@ -948,7 +948,7 @@ def test_use_huge_pages(): plasma_store_memory=2*10**9, plasma_directory="/mnt/hugepages", use_hugepages=True) as (plasma_store_name, p): - plasma_client = plasma.connect(plasma_store_name, "", 64) + plasma_client = plasma.connect(plasma_store_name, "") create_object(plasma_client, 10**8) @@ -962,7 +962,7 @@ def test_plasma_client_sharing(): with plasma.start_plasma_store( plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY) \ as (plasma_store_name, p): - plasma_client = plasma.connect(plasma_store_name, "", 64) + plasma_client = plasma.connect(plasma_store_name, "") object_id = plasma_client.put(np.zeros(3)) buf = plasma_client.get(object_id) del plasma_client diff --git a/python/pyarrow/tests/test_plasma_tf_op.py b/python/pyarrow/tests/test_plasma_tf_op.py index d9bf915d663..51e8b283e0a 100644 --- a/python/pyarrow/tests/test_plasma_tf_op.py +++ b/python/pyarrow/tests/test_plasma_tf_op.py @@ -94,7 +94,7 @@ def test_plasma_tf_op(use_gpu=False): pytest.skip("TensorFlow Op not found") with plasma.start_plasma_store(10**8) as (plasma_store_name, p): - client = plasma.connect(plasma_store_name, "", 0) + client = plasma.connect(plasma_store_name, "") for dtype in [np.float32, np.float64, np.int8, np.int16, np.int32, np.int64]: run_tensorflow_test_with_dtype(tf, plasma, plasma_store_name, From a038404053e06a4e5e2db5ff56ca039973a05124 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Thu, 13 Dec 2018 12:27:58 -0800 Subject: [PATCH 11/12] Update _plasma.pyx --- python/pyarrow/_plasma.pyx | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyarrow/_plasma.pyx b/python/pyarrow/_plasma.pyx index 3aae14c123d..f7db3b4e0fe 100644 --- a/python/pyarrow/_plasma.pyx +++ b/python/pyarrow/_plasma.pyx @@ -30,11 +30,11 @@ from cython.operator cimport dereference as deref, preincrement as inc from cpython.pycapsule cimport * import collections -import pyarrow import random -import warnings import socket +import warnings +import pyarrow from pyarrow.lib cimport Buffer, NativeFile, check_status, pyarrow_wrap_buffer from pyarrow.includes.libarrow cimport (CBuffer, CMutableBuffer, CFixedSizeBufferWriter, CStatus) From f899f4597b322414586f15f3e597a0b1676c36c1 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Thu, 13 Dec 2018 12:57:37 -0800 Subject: [PATCH 12/12] Update client.cc --- cpp/src/plasma/client.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc index 36dc73f873b..2dbe2b41478 100644 --- a/cpp/src/plasma/client.cc +++ b/cpp/src/plasma/client.cc @@ -223,7 +223,7 @@ class PlasmaClient::Impl : public std::enable_shared_from_this