diff --git a/velox/exec/Exchange.cpp b/velox/exec/Exchange.cpp index d9bb6764181f..ed0955e2a40d 100644 --- a/velox/exec/Exchange.cpp +++ b/velox/exec/Exchange.cpp @@ -60,7 +60,7 @@ std::shared_ptr ExchangeSource::create( const std::string& taskId, int destination, std::shared_ptr queue, - memory::MemoryPool* FOLLY_NONNULL pool) { + memory::MemoryPool* pool) { for (auto& factory : factories()) { auto result = factory(taskId, destination, queue, pool); if (result) { @@ -83,7 +83,7 @@ class LocalExchangeSource : public ExchangeSource { const std::string& taskId, int destination, std::shared_ptr queue, - memory::MemoryPool* FOLLY_NONNULL pool) + memory::MemoryPool* pool) : ExchangeSource(taskId, destination, queue, pool) {} bool shouldRequestLocked() override { @@ -133,16 +133,22 @@ class LocalExchangeSource : public ExchangeSource { } int64_t ackSequence; { - std::lock_guard l(queue_->mutex()); - requestPending_ = false; - for (auto& page : pages) { - queue_->enqueue(std::move(page)); + std::vector promises; + { + std::lock_guard l(queue_->mutex()); + requestPending_ = false; + for (auto& page : pages) { + queue_->enqueueLocked(std::move(page), promises); + } + if (atEnd) { + queue_->enqueueLocked(nullptr, promises); + atEnd_ = true; + } + ackSequence = sequence_ = sequence + pages.size(); } - if (atEnd) { - queue_->enqueue(nullptr); - atEnd_ = true; + for (auto& promise : promises) { + promise.setValue(); } - ackSequence = sequence_ = sequence + pages.size(); } // Outside of queue mutex. if (atEnd_) { @@ -166,7 +172,7 @@ std::unique_ptr createLocalExchangeSource( const std::string& taskId, int destination, std::shared_ptr queue, - memory::MemoryPool* FOLLY_NONNULL pool) { + memory::MemoryPool* pool) { if (strncmp(taskId.c_str(), "local://", 8) == 0) { return std::make_unique( taskId, destination, std::move(queue), pool); @@ -194,7 +200,7 @@ void ExchangeClient::addRemoteTaskId(const std::string& taskId) { toClose = std::move(source); } else { sources_.push_back(source); - queue_->addSource(); + queue_->addSourceLocked(); if (source->shouldRequestLocked()) { toRequest = source; } @@ -210,7 +216,6 @@ void ExchangeClient::addRemoteTaskId(const std::string& taskId) { } void ExchangeClient::noMoreRemoteTasks() { - std::lock_guard l(queue_->mutex()); queue_->noMoreSources(); } @@ -229,10 +234,7 @@ void ExchangeClient::close() { for (auto& source : sources) { source->close(); } - { - std::lock_guard l(queue_->mutex()); - queue_->closeLocked(); - } + queue_->close(); } std::unique_ptr ExchangeClient::next( @@ -243,7 +245,7 @@ std::unique_ptr ExchangeClient::next( { std::lock_guard l(queue_->mutex()); *atEnd = false; - page = queue_->dequeue(atEnd, future); + page = queue_->dequeueLocked(atEnd, future); if (*atEnd) { return page; } @@ -278,7 +280,7 @@ std::string ExchangeClient::toString() { return out.str(); } -bool Exchange::getSplits(ContinueFuture* FOLLY_NONNULL future) { +bool Exchange::getSplits(ContinueFuture* future) { if (operatorCtx_->driverCtx()->driverId != 0) { // When there are multiple pipelines, a single operator, the one from // pipeline 0, is responsible for feeding splits into shared ExchangeClient. @@ -313,7 +315,7 @@ bool Exchange::getSplits(ContinueFuture* FOLLY_NONNULL future) { } } -BlockingReason Exchange::isBlocked(ContinueFuture* FOLLY_NONNULL future) { +BlockingReason Exchange::isBlocked(ContinueFuture* future) { if (currentPage_ || atEnd_) { return BlockingReason::kNotBlocked; } diff --git a/velox/exec/Exchange.h b/velox/exec/Exchange.h index 12bb9f8bd634..65e77f18774a 100644 --- a/velox/exec/Exchange.h +++ b/velox/exec/Exchange.h @@ -35,7 +35,7 @@ class SerializedPage { // TODO: consider to enforce setting memory pool if possible. explicit SerializedPage( std::unique_ptr iobuf, - memory::MemoryPool* FOLLY_NULLABLE pool = nullptr, + memory::MemoryPool* pool = nullptr, std::function onDestructionCb = nullptr); ~SerializedPage(); @@ -47,7 +47,7 @@ class SerializedPage { // Makes 'input' ready for deserializing 'this' with // VectorStreamGroup::read(). - void prepareStreamForDeserialize(ByteStream* FOLLY_NONNULL input); + void prepareStreamForDeserialize(ByteStream* input); std::unique_ptr getIOBuf() const { return iobuf_->clone(); @@ -70,7 +70,7 @@ class SerializedPage { // Number of payload bytes in 'iobuf_'. const int64_t iobufBytes_; - memory::MemoryPool* FOLLY_NULLABLE pool_; + memory::MemoryPool* pool_; // Callback that will be called on destruction of the SerializedPage, // primarily used to free externally allocated memory backing folly::IOBuf @@ -87,7 +87,6 @@ class ExchangeQueue { explicit ExchangeQueue(int64_t minBytes) : minBytes_(minBytes) {} ~ExchangeQueue() { - std::lock_guard l(mutex_); clearAllPromises(); } @@ -99,17 +98,23 @@ class ExchangeQueue { return queue_.empty(); } - void enqueue(std::unique_ptr&& page) { - if (!page) { + void enqueueLocked( + std::unique_ptr&& page, + std::vector& promises) { + if (page == nullptr) { ++numCompleted_; - checkComplete(); + auto completedPromises = checkCompleteLocked(); + promises.reserve(promises.size() + completedPromises.size()); + for (auto& promise : completedPromises) { + promises.push_back(std::move(promise)); + } return; } totalBytes_ += page->size(); queue_.push_back(std::move(page)); if (!promises_.empty()) { // Resume one of the waiting drivers. - promises_.back().setValue(); + promises.push_back(std::move(promises_.back())); promises_.pop_back(); } } @@ -117,18 +122,23 @@ class ExchangeQueue { // If data is permanently not available, e.g. the source cannot be // contacted, this registers an error message and causes the reading // Exchanges to throw with the message - void setErrorLocked(const std::string& error) { - if (!error_.empty()) { - return; + void setError(const std::string& error) { + std::vector promises; + { + std::lock_guard l(mutex_); + if (!error_.empty()) { + return; + } + error_ = error; + atEnd_ = true; + promises = clearAllPromisesLocked(); } - error_ = error; - atEnd_ = true; - clearAllPromises(); + clearPromises(promises); } - std::unique_ptr dequeue( - bool* FOLLY_NONNULL atEnd, - ContinueFuture* FOLLY_NONNULL future) { + std::unique_ptr dequeueLocked( + bool* atEnd, + ContinueFuture* future) { VELOX_CHECK(future); if (!error_.empty()) { *atEnd = true; @@ -163,34 +173,61 @@ class ExchangeQueue { return minBytes_; } - void addSource() { + void addSourceLocked() { VELOX_CHECK(!noMoreSources_, "addSource called after noMoreSources"); numSources_++; } void noMoreSources() { - noMoreSources_ = true; - checkComplete(); + std::vector promises; + { + std::lock_guard l(mutex_); + noMoreSources_ = true; + promises = checkCompleteLocked(); + } + clearPromises(promises); } - void closeLocked() { - queue_.clear(); - clearAllPromises(); + void close() { + std::vector promises; + { + std::lock_guard l(mutex_); + promises = closeLocked(); + } + clearPromises(promises); } private: - void checkComplete() { + std::vector closeLocked() { + queue_.clear(); + return clearAllPromisesLocked(); + } + + std::vector checkCompleteLocked() { if (noMoreSources_ && numCompleted_ == numSources_) { atEnd_ = true; - clearAllPromises(); + return clearAllPromisesLocked(); } + return {}; } void clearAllPromises() { - for (auto& promise : promises_) { + std::vector promises; + { + std::lock_guard l(mutex_); + promises = clearAllPromisesLocked(); + } + clearPromises(promises); + } + + std::vector clearAllPromisesLocked() { + return std::move(promises_); + } + + static void clearPromises(std::vector& promises) { + for (auto& promise : promises) { promise.setValue(); } - promises_.clear(); } int numCompleted_ = 0; @@ -217,13 +254,13 @@ class ExchangeSource : public std::enable_shared_from_this { const std::string& taskId, int destination, std::shared_ptr queue, - memory::MemoryPool* FOLLY_NONNULL pool)>; + memory::MemoryPool* pool)>; ExchangeSource( const std::string& taskId, int destination, std::shared_ptr queue, - memory::MemoryPool* FOLLY_NONNULL pool) + memory::MemoryPool* pool) : taskId_(taskId), destination_(destination), queue_(std::move(queue)), @@ -235,7 +272,7 @@ class ExchangeSource : public std::enable_shared_from_this { const std::string& taskId, int destination, std::shared_ptr queue, - memory::MemoryPool* FOLLY_NONNULL pool); + memory::MemoryPool* pool); // Returns true if there is no request to the source pending or if // this should be retried. If true, the caller is expected to call @@ -283,7 +320,7 @@ class ExchangeSource : public std::enable_shared_from_this { bool atEnd_ = false; protected: - memory::MemoryPool* FOLLY_NONNULL pool_; + memory::MemoryPool* pool_; }; struct RemoteConnectorSplit : public connector::ConnectorSplit { @@ -301,7 +338,7 @@ class ExchangeClient { ExchangeClient( int destination, - memory::MemoryPool* FOLLY_NONNULL pool, + memory::MemoryPool* pool, int64_t minSize = kDefaultMinSize) : destination_(destination), pool_(pool), @@ -315,7 +352,7 @@ class ExchangeClient { ~ExchangeClient(); - memory::MemoryPool* FOLLY_NULLABLE pool() const { + memory::MemoryPool* pool() const { return pool_; } @@ -334,15 +371,13 @@ class ExchangeClient { return queue_; } - std::unique_ptr next( - bool* FOLLY_NONNULL atEnd, - ContinueFuture* FOLLY_NONNULL future); + std::unique_ptr next(bool* atEnd, ContinueFuture* future); std::string toString(); private: const int destination_; - memory::MemoryPool* const FOLLY_NONNULL pool_; + memory::MemoryPool* const pool_; std::shared_ptr queue_; std::unordered_set taskIds_; std::vector> sources_; @@ -353,7 +388,7 @@ class Exchange : public SourceOperator { public: Exchange( int32_t operatorId, - DriverCtx* FOLLY_NONNULL ctx, + DriverCtx* ctx, const std::shared_ptr& exchangeNode, std::shared_ptr exchangeClient, const std::string& operatorType = "Exchange") @@ -382,7 +417,7 @@ class Exchange : public SourceOperator { exchangeClient_ = nullptr; } - BlockingReason isBlocked(ContinueFuture* FOLLY_NONNULL future) override; + BlockingReason isBlocked(ContinueFuture* future) override; bool isFinished() override; @@ -397,7 +432,7 @@ class Exchange : public SourceOperator { /// this operator is not the first operator in the pipeline and therefore is /// not responsible for fetching splits and adding them to the /// exchangeClient_. - bool getSplits(ContinueFuture* FOLLY_NONNULL future); + bool getSplits(ContinueFuture* future); const core::PlanNodeId planNodeId_; bool noMoreSplits_ = false; diff --git a/velox/exec/tests/PartitionedOutputBufferManagerTest.cpp b/velox/exec/tests/PartitionedOutputBufferManagerTest.cpp index becbd57f9708..44cac97bd5cb 100644 --- a/velox/exec/tests/PartitionedOutputBufferManagerTest.cpp +++ b/velox/exec/tests/PartitionedOutputBufferManagerTest.cpp @@ -367,13 +367,15 @@ TEST_F(PartitionedOutputBufferManagerTest, outOfOrderAcks) { TEST_F(PartitionedOutputBufferManagerTest, errorInQueue) { auto queue = std::make_shared(1 << 20); auto page = std::make_unique(folly::IOBuf::copyBuffer("", 0)); - { - std::lock_guard l(queue->mutex()); - queue->setErrorLocked("error"); + std::vector promises; + { queue->setError("error"); } + for (auto& promise : promises) { + promise.setValue(); } ContinueFuture future; bool atEnd = false; - EXPECT_THROW(auto page = queue->dequeue(&atEnd, &future), std::runtime_error); + EXPECT_THROW( + auto page = queue->dequeueLocked(&atEnd, &future), std::runtime_error); } TEST_F(PartitionedOutputBufferManagerTest, serializedPage) {