diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc index 80cf7188cc1..e88c5cadff0 100644 --- a/cpp/src/plasma/client.cc +++ b/cpp/src/plasma/client.cc @@ -105,7 +105,7 @@ static std::mutex gpu_mutex; // PlasmaBuffer /// A Buffer class that automatically releases the backing plasma object -/// when it goes out of scope. +/// when it goes out of scope. This is returned by Get. class ARROW_NO_EXPORT PlasmaBuffer : public Buffer { public: ~PlasmaBuffer(); @@ -123,6 +123,19 @@ class ARROW_NO_EXPORT PlasmaBuffer : public Buffer { ObjectID object_id_; }; +/// A mutable Buffer class that keeps the backing data alive by keeping a +/// PlasmaClient shared pointer. This is returned by Create. Release will +/// be called in the associated Seal call. +class ARROW_NO_EXPORT PlasmaMutableBuffer : public MutableBuffer { + public: + PlasmaMutableBuffer(std::shared_ptr client, uint8_t* mutable_data, + int64_t data_size) + : MutableBuffer(mutable_data, data_size), client_(client) {} + + private: + std::shared_ptr client_; +}; + // ---------------------------------------------------------------------- // PlasmaClient::Impl @@ -140,13 +153,47 @@ struct ObjectInUseEntry { bool is_sealed; }; -struct ClientMmapTableEntry { +class ClientMmapTableEntry { + public: + ClientMmapTableEntry(int fd, int64_t map_size) + : fd_(fd), pointer_(nullptr), length_(0) { + // We subtract kMmapRegionsGap from the length that was added + // in fake_mmap in malloc.h, to make map_size page-aligned again. + length_ = map_size - kMmapRegionsGap; + pointer_ = reinterpret_cast( + mmap(NULL, length_, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0)); + // TODO(pcm): Don't fail here, instead return a Status. + if (pointer_ == MAP_FAILED) { + ARROW_LOG(FATAL) << "mmap failed"; + } + close(fd); // Closing this fd has an effect on performance. + } + + ~ClientMmapTableEntry() { + // At this point it is safe to unmap the memory, as the PlasmaBuffer + // keeps the PlasmaClient (and therefore the ClientMmapTableEntry) + // alive until it is destroyed. + // We don't need to close the associated file, since it has + // already been closed in the constructor. + int r = munmap(pointer_, length_); + if (r != 0) { + ARROW_LOG(ERROR) << "munmap returned " << r << ", errno = " << errno; + } + } + + uint8_t* pointer() { return pointer_; } + + int fd() { return fd_; } + + private: /// The associated file descriptor on the client. - int fd; + int fd_; /// The result of mmap for this file descriptor. - uint8_t* pointer; + uint8_t* pointer_; /// The length of the memory-mapped file. - size_t length; + size_t length_; + + ARROW_DISALLOW_COPY_AND_ASSIGN(ClientMmapTableEntry); }; class PlasmaClient::Impl : public std::enable_shared_from_this { @@ -244,7 +291,7 @@ class PlasmaClient::Impl : public std::enable_shared_from_this mmap_table_; + std::unordered_map> mmap_table_; /// A hash table of the object IDs that are currently being used by this /// client. std::unordered_map> objects_in_use_; @@ -277,23 +324,11 @@ PlasmaClient::Impl::~Impl() {} uint8_t* PlasmaClient::Impl::LookupOrMmap(int fd, int store_fd_val, int64_t map_size) { auto entry = mmap_table_.find(store_fd_val); if (entry != mmap_table_.end()) { - return entry->second.pointer; + return entry->second->pointer(); } else { - // We subtract kMmapRegionsGap from the length that was added - // in fake_mmap in malloc.h, to make map_size page-aligned again. - uint8_t* result = reinterpret_cast(mmap( - NULL, map_size - kMmapRegionsGap, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0)); - // TODO(pcm): Don't fail here, instead return a Status. - if (result == MAP_FAILED) { - ARROW_LOG(FATAL) << "mmap failed"; - } - close(fd); // Closing this fd has an effect on performance. - - ClientMmapTableEntry& entry = mmap_table_[store_fd_val]; - entry.fd = fd; - entry.pointer = result; - entry.length = map_size; - return result; + mmap_table_[store_fd_val] = + std::unique_ptr(new ClientMmapTableEntry(fd, map_size)); + return mmap_table_[store_fd_val]->pointer(); } } @@ -302,7 +337,7 @@ uint8_t* PlasmaClient::Impl::LookupOrMmap(int fd, int store_fd_val, int64_t map_ uint8_t* PlasmaClient::Impl::LookupMmappedFile(int store_fd_val) { auto entry = mmap_table_.find(store_fd_val); ARROW_CHECK(entry != mmap_table_.end()); - return entry->second.pointer; + return entry->second->pointer(); } bool PlasmaClient::Impl::IsInUse(const ObjectID& object_id) { @@ -317,7 +352,7 @@ int PlasmaClient::Impl::GetStoreFd(int store_fd) { ARROW_CHECK(fd >= 0) << "recv not successful"; return fd; } else { - return entry->second.fd; + return entry->second->fd(); } } @@ -369,8 +404,9 @@ Status PlasmaClient::Impl::Create(const ObjectID& object_id, int64_t data_size, ARROW_CHECK(object.metadata_size == metadata_size); // The metadata should come right after the data. ARROW_CHECK(object.metadata_offset == object.data_offset + data_size); - *data = std::make_shared( - LookupOrMmap(fd, store_fd, mmap_size) + object.data_offset, data_size); + *data = std::make_shared( + shared_from_this(), LookupOrMmap(fd, store_fd, mmap_size) + object.data_offset, + data_size); // If plasma_create is being called from a transfer, then we will not copy the // metadata here. The metadata will be written along with the data streamed // from the transfer. diff --git a/cpp/src/plasma/client.h b/cpp/src/plasma/client.h index 8d1d6a7a574..facfd37ca78 100644 --- a/cpp/src/plasma/client.h +++ b/cpp/src/plasma/client.h @@ -257,6 +257,7 @@ class ARROW_EXPORT PlasmaClient { private: friend class PlasmaBuffer; + friend class PlasmaMutableBuffer; FRIEND_TEST(TestPlasmaStore, GetTest); FRIEND_TEST(TestPlasmaStore, LegacyGetTest); FRIEND_TEST(TestPlasmaStore, AbortTest);