From 4391bcfbe9b0663e9975e5d3f52a0a363190f990 Mon Sep 17 00:00:00 2001 From: Karteek Murthy Samba Murthy Date: Mon, 12 Dec 2022 20:31:51 -0800 Subject: [PATCH 1/4] PartitionedOutputBufferManager::getData, handle exceptions and notify caller --- velox/exec/PartitionedOutputBufferManager.cpp | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/velox/exec/PartitionedOutputBufferManager.cpp b/velox/exec/PartitionedOutputBufferManager.cpp index 43138e650c7..b0d743e3da9 100644 --- a/velox/exec/PartitionedOutputBufferManager.cpp +++ b/velox/exec/PartitionedOutputBufferManager.cpp @@ -544,7 +544,14 @@ void PartitionedOutputBufferManager::getData( uint64_t maxBytes, int64_t sequence, DataAvailableCallback notify) { - getBuffer(taskId)->getData(destination, maxBytes, sequence, notify); + std::shared_ptr buffer; + try { + buffer = getBuffer(taskId); + } catch (const VeloxException& e) { + notify({}, sequence); + return; + } + buffer->getData(destination, maxBytes, sequence, notify); } void PartitionedOutputBufferManager::initializeTask( From 9fd12d49b0cbc66ed3c17c67c222493ba98ae9ae Mon Sep 17 00:00:00 2001 From: Karteek Murthy Samba Murthy Date: Wed, 14 Dec 2022 18:45:08 -0800 Subject: [PATCH 2/4] Addressed comments --- velox/exec/Exchange.cpp | 19 +++---- velox/exec/PartitionedOutputBufferManager.cpp | 26 +++++++--- velox/exec/PartitionedOutputBufferManager.h | 36 +++++++++++--- .../PartitionedOutputBufferManagerTest.cpp | 49 +++++++++++++------ 4 files changed, 90 insertions(+), 40 deletions(-) diff --git a/velox/exec/Exchange.cpp b/velox/exec/Exchange.cpp index d56bb18ac26..0b423f18c7c 100644 --- a/velox/exec/Exchange.cpp +++ b/velox/exec/Exchange.cpp @@ -107,20 +107,21 @@ class LocalExchangeSource : public ExchangeSource { // Since this lambda may outlive 'this', we need to capture a // shared_ptr to the current object (self). [self, requestedSequence, buffers, this]( - std::vector> data, int64_t sequence) { - if (requestedSequence > sequence) { + DataAvailableCallbackParamUniqPtr param) { + if (requestedSequence > param->sequence) { VLOG(2) << "Receives earlier sequence than requested: task " << taskId_ << ", destination " << destination_ - << ", requested " << sequence << ", received " + << ", requested " << param->sequence << ", received " << requestedSequence; - int64_t nExtra = requestedSequence - sequence; - VELOX_CHECK(nExtra < data.size()); - data.erase(data.begin(), data.begin() + nExtra); - sequence = requestedSequence; + int64_t nExtra = requestedSequence - param->sequence; + VELOX_CHECK(nExtra < param->pages.size()); + param->pages.erase( + param->pages.begin(), param->pages.begin() + nExtra); + param->sequence = requestedSequence; } std::vector> pages; bool atEnd = false; - for (auto& inputPage : data) { + for (auto& inputPage : param->pages) { if (!inputPage) { atEnd = true; // Keep looping, there could be extra end markers. @@ -142,7 +143,7 @@ class LocalExchangeSource : public ExchangeSource { queue_->enqueue(nullptr); atEnd_ = true; } - ackSequence = sequence_ = sequence + pages.size(); + ackSequence = sequence_ = param->sequence + pages.size(); } // Outside of queue mutex. if (atEnd_) { diff --git a/velox/exec/PartitionedOutputBufferManager.cpp b/velox/exec/PartitionedOutputBufferManager.cpp index b0d743e3da9..bb0730b90c2 100644 --- a/velox/exec/PartitionedOutputBufferManager.cpp +++ b/velox/exec/PartitionedOutputBufferManager.cpp @@ -434,7 +434,9 @@ void PartitionedOutputBuffer::getData( } releaseAfterAcknowledge(freed, promises); if (!data.empty()) { - notify(std::move(data), sequence); + auto param = std::make_unique( + std::move(data), sequence, ERR_BUFFER_NOT_FOUND); + notify(std::move(param)); } } @@ -483,6 +485,14 @@ PartitionedOutputBufferManager::getBuffer(const std::string& taskId) { }); } +std::shared_ptr +PartitionedOutputBufferManager::getBufferIfExists(const std::string& taskId) { + return buffers_.withLock([&](auto& buffers) { + auto it = buffers.find(taskId); + return (it == buffers.end()) ? nullptr : it->second; + }); +} + uint64_t PartitionedOutputBufferManager::numBuffers() const { return buffers_.lock()->size(); } @@ -544,14 +554,14 @@ void PartitionedOutputBufferManager::getData( uint64_t maxBytes, int64_t sequence, DataAvailableCallback notify) { - std::shared_ptr buffer; - try { - buffer = getBuffer(taskId); - } catch (const VeloxException& e) { - notify({}, sequence); - return; + auto buffer = getBufferIfExists(taskId); + if (buffer) { + buffer->getData(destination, maxBytes, sequence, notify); } - buffer->getData(destination, maxBytes, sequence, notify); + std::vector> pages; + auto param = std::make_unique( + std::move(pages), sequence, ERR_BUFFER_NOT_FOUND); + notify(std::move(param)); } void PartitionedOutputBufferManager::initializeTask( diff --git a/velox/exec/PartitionedOutputBufferManager.h b/velox/exec/PartitionedOutputBufferManager.h index 8f7dfd188b3..a6d56e0fa01 100644 --- a/velox/exec/PartitionedOutputBufferManager.h +++ b/velox/exec/PartitionedOutputBufferManager.h @@ -21,12 +21,29 @@ namespace facebook::velox::exec { -// nullptr in pages indicates that there is no more data. -// sequence is the same as specified in BufferManager::getData call. The caller -// is expected to advance sequence by the number of entries in groups and call -// BufferManager::acknowledge. -using DataAvailableCallback = std::function< - void(std::vector> pages, int64_t sequence)>; +enum GetDataStatus { SUCCESS, ERR_BUFFER_NOT_FOUND }; + +class DataAvailableCallbackParam { + public: + DataAvailableCallbackParam( + std::vector> pages, + int64_t sequence, + GetDataStatus status) + : pages(std::move(pages)), sequence(sequence), status(status) {} + std::vector> pages; + int64_t sequence; + GetDataStatus status; +}; + +using DataAvailableCallbackParamUniqPtr = + std::unique_ptr; + +// A callback function a caller must pass to +// PartitionedOutputBufferManager::getData(). This function is invoked when the +// data is available. If the pages is empty and the status is +// GetDataStatus::SUCCESS indicates that there was no data avaialbe. +using DataAvailableCallback = + std::function param)>; struct DataAvailable { DataAvailableCallback callback; @@ -35,7 +52,9 @@ struct DataAvailable { void notify() { if (callback) { - callback(std::move(data), sequence); + auto param = std::make_unique( + std::move(data), sequence, GetDataStatus::SUCCESS); + callback(std::move(param)); } } }; @@ -265,7 +284,8 @@ class PartitionedOutputBufferManager { private: // Retrieves the set of buffers for a query. std::shared_ptr getBuffer(const std::string& taskId); - + std::shared_ptr getBufferIfExists( + const std::string& taskId); folly::Synchronized< std::unordered_map>, std::mutex> diff --git a/velox/exec/tests/PartitionedOutputBufferManagerTest.cpp b/velox/exec/tests/PartitionedOutputBufferManagerTest.cpp index a7fd810f704..cfb730d5b48 100644 --- a/velox/exec/tests/PartitionedOutputBufferManagerTest.cpp +++ b/velox/exec/tests/PartitionedOutputBufferManagerTest.cpp @@ -114,15 +114,16 @@ class PartitionedOutputBufferManagerTest : public testing::Test { maxBytes, sequence, [destination, sequence, expectedGroups, &receivedData]( - std::vector> pages, - int64_t inSequence) { + DataAvailableCallbackParamUniqPtr param) { EXPECT_FALSE(receivedData) << "for destination " << destination; - EXPECT_EQ(pages.size(), expectedGroups) + EXPECT_EQ(param->pages.size(), expectedGroups) << "for destination " << destination; - for (const auto& page : pages) { + for (const auto& page : param->pages) { EXPECT_TRUE(page != nullptr) << "for destination " << destination; } - EXPECT_EQ(inSequence, sequence) << "for destination " << destination; + EXPECT_EQ(param->sequence, sequence) + << "for destination " << destination; + EXPECT_EQ(param->status, GetDataStatus::SUCCESS); receivedData = true; }); EXPECT_TRUE(receivedData) << "for destination " << destination; @@ -150,12 +151,12 @@ class PartitionedOutputBufferManagerTest : public testing::Test { DataAvailableCallback receiveEndMarker(int destination, int64_t sequence, bool& receivedEndMarker) { return [destination, sequence, &receivedEndMarker]( - std::vector> pages, - int64_t inSequence) { + DataAvailableCallbackParamUniqPtr param) { EXPECT_FALSE(receivedEndMarker) << "for destination " << destination; - EXPECT_EQ(pages.size(), 1) << "for destination " << destination; - EXPECT_TRUE(pages[0] == nullptr) << "for destination " << destination; - EXPECT_EQ(inSequence, sequence) << "for destination " << destination; + EXPECT_EQ(param->pages.size(), 1) << "for destination " << destination; + EXPECT_TRUE(param->pages[0] == nullptr) + << "for destination " << destination; + EXPECT_EQ(param->sequence, sequence) << "for destination " << destination; receivedEndMarker = true; }; } @@ -199,15 +200,15 @@ class PartitionedOutputBufferManagerTest : public testing::Test { bool& receivedData) { receivedData = false; return [destination, sequence, expectedGroups, &receivedData]( - std::vector> pages, - int64_t inSequence) { + DataAvailableCallbackParamUniqPtr param) { EXPECT_FALSE(receivedData) << "for destination " << destination; - EXPECT_EQ(pages.size(), expectedGroups) + EXPECT_EQ(param->pages.size(), expectedGroups) << "for destination " << destination; for (int i = 0; i < expectedGroups; i++) { - EXPECT_TRUE(pages[i] != nullptr) << "for destination " << destination; + EXPECT_TRUE(param->pages[i] != nullptr) + << "for destination " << destination; } - EXPECT_EQ(inSequence, sequence) << "for destination " << destination; + EXPECT_EQ(param->sequence, sequence) << "for destination " << destination; receivedData = true; }; } @@ -420,3 +421,21 @@ TEST_F(PartitionedOutputBufferManagerTest, serializedPage) { EXPECT_EQ(mappedMemory->allocateBytesStats().totalSmall, 0); } } + +TEST_F(PartitionedOutputBufferManagerTest, getDataOnTaskWithNoBuffer) { + // Fetching data on a task with no entry in buffer manager shouldn't throw + // any exception. + bool notified = false; + bufferManager_->getData( + "test.0.1", + 1, + 100, + 123, + [¬ified](DataAvailableCallbackParamUniqPtr param) { + ASSERT_TRUE(param->pages.empty()); + ASSERT_EQ(param->sequence, 123); + ASSERT_EQ(param->status, GetDataStatus::ERR_BUFFER_NOT_FOUND); + notified = true; + }); + ASSERT_TRUE(notified); +} \ No newline at end of file From 28f295c5e04e63dbb5dee488a78e2f09321a876b Mon Sep 17 00:00:00 2001 From: Karteek Murthy Samba Murthy Date: Fri, 16 Dec 2022 09:54:19 -0800 Subject: [PATCH 3/4] Reverted callback changes, retained older interface --- velox/exec/Exchange.cpp | 19 +++---- velox/exec/PartitionedOutputBufferManager.cpp | 23 ++------ velox/exec/PartitionedOutputBufferManager.h | 49 ++++++----------- .../PartitionedOutputBufferManagerTest.cpp | 55 +++++++++---------- 4 files changed, 60 insertions(+), 86 deletions(-) diff --git a/velox/exec/Exchange.cpp b/velox/exec/Exchange.cpp index 0b423f18c7c..d56bb18ac26 100644 --- a/velox/exec/Exchange.cpp +++ b/velox/exec/Exchange.cpp @@ -107,21 +107,20 @@ class LocalExchangeSource : public ExchangeSource { // Since this lambda may outlive 'this', we need to capture a // shared_ptr to the current object (self). [self, requestedSequence, buffers, this]( - DataAvailableCallbackParamUniqPtr param) { - if (requestedSequence > param->sequence) { + std::vector> data, int64_t sequence) { + if (requestedSequence > sequence) { VLOG(2) << "Receives earlier sequence than requested: task " << taskId_ << ", destination " << destination_ - << ", requested " << param->sequence << ", received " + << ", requested " << sequence << ", received " << requestedSequence; - int64_t nExtra = requestedSequence - param->sequence; - VELOX_CHECK(nExtra < param->pages.size()); - param->pages.erase( - param->pages.begin(), param->pages.begin() + nExtra); - param->sequence = requestedSequence; + int64_t nExtra = requestedSequence - sequence; + VELOX_CHECK(nExtra < data.size()); + data.erase(data.begin(), data.begin() + nExtra); + sequence = requestedSequence; } std::vector> pages; bool atEnd = false; - for (auto& inputPage : param->pages) { + for (auto& inputPage : data) { if (!inputPage) { atEnd = true; // Keep looping, there could be extra end markers. @@ -143,7 +142,7 @@ class LocalExchangeSource : public ExchangeSource { queue_->enqueue(nullptr); atEnd_ = true; } - ackSequence = sequence_ = param->sequence + pages.size(); + ackSequence = sequence_ = sequence + pages.size(); } // Outside of queue mutex. if (atEnd_) { diff --git a/velox/exec/PartitionedOutputBufferManager.cpp b/velox/exec/PartitionedOutputBufferManager.cpp index bb0730b90c2..262a90ebcac 100644 --- a/velox/exec/PartitionedOutputBufferManager.cpp +++ b/velox/exec/PartitionedOutputBufferManager.cpp @@ -434,9 +434,7 @@ void PartitionedOutputBuffer::getData( } releaseAfterAcknowledge(freed, promises); if (!data.empty()) { - auto param = std::make_unique( - std::move(data), sequence, ERR_BUFFER_NOT_FOUND); - notify(std::move(param)); + notify(std::move(data), sequence); } } @@ -489,7 +487,7 @@ std::shared_ptr PartitionedOutputBufferManager::getBufferIfExists(const std::string& taskId) { return buffers_.withLock([&](auto& buffers) { auto it = buffers.find(taskId); - return (it == buffers.end()) ? nullptr : it->second; + return it == buffers.end() ? nullptr : it->second; }); } @@ -535,20 +533,13 @@ void PartitionedOutputBufferManager::acknowledge( void PartitionedOutputBufferManager::deleteResults( const std::string& taskId, int destination) { - auto buffer = buffers_.withLock( - [&](auto& buffers) -> std::shared_ptr { - auto it = buffers.find(taskId); - if (it == buffers.end()) { - return nullptr; - } - return it->second; - }); + auto buffer = getBufferIfExists(taskId); if (buffer) { buffer->deleteResults(destination); } } -void PartitionedOutputBufferManager::getData( +bool PartitionedOutputBufferManager::getData( const std::string& taskId, int destination, uint64_t maxBytes, @@ -557,11 +548,9 @@ void PartitionedOutputBufferManager::getData( auto buffer = getBufferIfExists(taskId); if (buffer) { buffer->getData(destination, maxBytes, sequence, notify); + return true; } - std::vector> pages; - auto param = std::make_unique( - std::move(pages), sequence, ERR_BUFFER_NOT_FOUND); - notify(std::move(param)); + return false; } void PartitionedOutputBufferManager::initializeTask( diff --git a/velox/exec/PartitionedOutputBufferManager.h b/velox/exec/PartitionedOutputBufferManager.h index a6d56e0fa01..837f5ebe77a 100644 --- a/velox/exec/PartitionedOutputBufferManager.h +++ b/velox/exec/PartitionedOutputBufferManager.h @@ -21,29 +21,12 @@ namespace facebook::velox::exec { -enum GetDataStatus { SUCCESS, ERR_BUFFER_NOT_FOUND }; - -class DataAvailableCallbackParam { - public: - DataAvailableCallbackParam( - std::vector> pages, - int64_t sequence, - GetDataStatus status) - : pages(std::move(pages)), sequence(sequence), status(status) {} - std::vector> pages; - int64_t sequence; - GetDataStatus status; -}; - -using DataAvailableCallbackParamUniqPtr = - std::unique_ptr; - -// A callback function a caller must pass to -// PartitionedOutputBufferManager::getData(). This function is invoked when the -// data is available. If the pages is empty and the status is -// GetDataStatus::SUCCESS indicates that there was no data avaialbe. -using DataAvailableCallback = - std::function param)>; +// nullptr in pages indicates that there is no more data. +// sequence is the same as specified in BufferManager::getData call. The caller +// is expected to advance sequence by the number of entries in groups and call +// BufferManager::acknowledge. +using DataAvailableCallback = std::function< + void(std::vector> pages, int64_t sequence)>; struct DataAvailable { DataAvailableCallback callback; @@ -52,9 +35,7 @@ struct DataAvailable { void notify() { if (callback) { - auto param = std::make_unique( - std::move(data), sequence, GetDataStatus::SUCCESS); - callback(std::move(param)); + callback(std::move(data), sequence); } } }; @@ -244,17 +225,19 @@ class PartitionedOutputBufferManager { void deleteResults(const std::string& taskId, int destination); // Adds up to 'maxBytes' bytes worth of data for 'destination' from - // 'taskId'. The sequence number of the data must be >= - // 'sequence'. If there is no data, 'notify' will be registered and - // called when there is data or the source is at end. Existing data - // with a sequence number < sequence is deleted. The caller is + // 'taskId'. The sequence number of the data must be >= 'sequence'. + // If there is no buffer associated with the given taskId, returns false. + // If there is no data, 'notify' will be registered and + // called when there is data or the source is at end, the function returns + // true. + // Existing data with a sequence number < sequence is deleted. The caller is // expected to increment the sequence number between calls by the // number of items received. In this way the next call implicitly // acknowledges receipt of the results from the previous. The // acknowledge method is offered for an early ack, so that the // producer can continue before the consumer is done processing the // received data. - void getData( + bool getData( const std::string& taskId, int destination, uint64_t maxBytes, @@ -284,8 +267,12 @@ class PartitionedOutputBufferManager { private: // Retrieves the set of buffers for a query. std::shared_ptr getBuffer(const std::string& taskId); + + // Retrieves the set of buffers for a query if exists. If taskId is not found, + // returns NULL. std::shared_ptr getBufferIfExists( const std::string& taskId); + folly::Synchronized< std::unordered_map>, std::mutex> diff --git a/velox/exec/tests/PartitionedOutputBufferManagerTest.cpp b/velox/exec/tests/PartitionedOutputBufferManagerTest.cpp index cfb730d5b48..1cd04b23bbe 100644 --- a/velox/exec/tests/PartitionedOutputBufferManagerTest.cpp +++ b/velox/exec/tests/PartitionedOutputBufferManagerTest.cpp @@ -114,16 +114,15 @@ class PartitionedOutputBufferManagerTest : public testing::Test { maxBytes, sequence, [destination, sequence, expectedGroups, &receivedData]( - DataAvailableCallbackParamUniqPtr param) { + std::vector> pages, + int64_t inSequence) { EXPECT_FALSE(receivedData) << "for destination " << destination; - EXPECT_EQ(param->pages.size(), expectedGroups) + EXPECT_EQ(pages.size(), expectedGroups) << "for destination " << destination; - for (const auto& page : param->pages) { + for (const auto& page : pages) { EXPECT_TRUE(page != nullptr) << "for destination " << destination; } - EXPECT_EQ(param->sequence, sequence) - << "for destination " << destination; - EXPECT_EQ(param->status, GetDataStatus::SUCCESS); + EXPECT_EQ(inSequence, sequence) << "for destination " << destination; receivedData = true; }); EXPECT_TRUE(receivedData) << "for destination " << destination; @@ -151,12 +150,12 @@ class PartitionedOutputBufferManagerTest : public testing::Test { DataAvailableCallback receiveEndMarker(int destination, int64_t sequence, bool& receivedEndMarker) { return [destination, sequence, &receivedEndMarker]( - DataAvailableCallbackParamUniqPtr param) { + std::vector> pages, + int64_t inSequence) { EXPECT_FALSE(receivedEndMarker) << "for destination " << destination; - EXPECT_EQ(param->pages.size(), 1) << "for destination " << destination; - EXPECT_TRUE(param->pages[0] == nullptr) - << "for destination " << destination; - EXPECT_EQ(param->sequence, sequence) << "for destination " << destination; + EXPECT_EQ(pages.size(), 1) << "for destination " << destination; + EXPECT_TRUE(pages[0] == nullptr) << "for destination " << destination; + EXPECT_EQ(inSequence, sequence) << "for destination " << destination; receivedEndMarker = true; }; } @@ -200,15 +199,15 @@ class PartitionedOutputBufferManagerTest : public testing::Test { bool& receivedData) { receivedData = false; return [destination, sequence, expectedGroups, &receivedData]( - DataAvailableCallbackParamUniqPtr param) { + std::vector> pages, + int64_t inSequence) { EXPECT_FALSE(receivedData) << "for destination " << destination; - EXPECT_EQ(param->pages.size(), expectedGroups) + EXPECT_EQ(pages.size(), expectedGroups) << "for destination " << destination; for (int i = 0; i < expectedGroups; i++) { - EXPECT_TRUE(param->pages[i] != nullptr) - << "for destination " << destination; + EXPECT_TRUE(pages[i] != nullptr) << "for destination " << destination; } - EXPECT_EQ(param->sequence, sequence) << "for destination " << destination; + EXPECT_EQ(inSequence, sequence) << "for destination " << destination; receivedData = true; }; } @@ -422,20 +421,20 @@ TEST_F(PartitionedOutputBufferManagerTest, serializedPage) { } } -TEST_F(PartitionedOutputBufferManagerTest, getDataOnTaskWithNoBuffer) { - // Fetching data on a task with no entry in buffer manager shouldn't throw - // any exception. +TEST_F(PartitionedOutputBufferManagerTest, getDataOnFailedTask) { + // Fetching data on a task which was either never initialized in the buffer + // manager or was removed by a parallel thread must return false. The `notify` + // callback must not be registered. bool notified = false; - bufferManager_->getData( + auto ret = bufferManager_->getData( "test.0.1", 1, - 100, - 123, - [¬ified](DataAvailableCallbackParamUniqPtr param) { - ASSERT_TRUE(param->pages.empty()); - ASSERT_EQ(param->sequence, 123); - ASSERT_EQ(param->status, GetDataStatus::ERR_BUFFER_NOT_FOUND); + 10, + 1, + [¬ified]( + std::vector> pages, int64_t sequence) { notified = true; }); - ASSERT_TRUE(notified); -} \ No newline at end of file + ASSERT_TRUE(!notified); + ASSERT_FALSE(ret); +} From 29b779fe2e6f91f200ad0d621b31216cb5b59729 Mon Sep 17 00:00:00 2001 From: Karteek Murthy Samba Murthy Date: Fri, 16 Dec 2022 11:18:19 -0800 Subject: [PATCH 4/4] Addressed comments --- velox/exec/PartitionedOutputBufferManager.cpp | 6 ++-- velox/exec/PartitionedOutputBufferManager.h | 5 ++-- .../PartitionedOutputBufferManagerTest.cpp | 28 ++++++++----------- 3 files changed, 17 insertions(+), 22 deletions(-) diff --git a/velox/exec/PartitionedOutputBufferManager.cpp b/velox/exec/PartitionedOutputBufferManager.cpp index 262a90ebcac..783cfa6edc7 100644 --- a/velox/exec/PartitionedOutputBufferManager.cpp +++ b/velox/exec/PartitionedOutputBufferManager.cpp @@ -533,8 +533,7 @@ void PartitionedOutputBufferManager::acknowledge( void PartitionedOutputBufferManager::deleteResults( const std::string& taskId, int destination) { - auto buffer = getBufferIfExists(taskId); - if (buffer) { + if (auto buffer = getBufferIfExists(taskId)) { buffer->deleteResults(destination); } } @@ -545,8 +544,7 @@ bool PartitionedOutputBufferManager::getData( uint64_t maxBytes, int64_t sequence, DataAvailableCallback notify) { - auto buffer = getBufferIfExists(taskId); - if (buffer) { + if (auto buffer = getBufferIfExists(taskId)) { buffer->getData(destination, maxBytes, sequence, notify); return true; } diff --git a/velox/exec/PartitionedOutputBufferManager.h b/velox/exec/PartitionedOutputBufferManager.h index 837f5ebe77a..bc3bfb3ee9b 100644 --- a/velox/exec/PartitionedOutputBufferManager.h +++ b/velox/exec/PartitionedOutputBufferManager.h @@ -266,10 +266,11 @@ class PartitionedOutputBufferManager { private: // Retrieves the set of buffers for a query. + // Throws an exception if buffer doesn't exist. std::shared_ptr getBuffer(const std::string& taskId); - // Retrieves the set of buffers for a query if exists. If taskId is not found, - // returns NULL. + // Retrieves the set of buffers for a query if exists. + // Returns NULL if task not found. std::shared_ptr getBufferIfExists( const std::string& taskId); diff --git a/velox/exec/tests/PartitionedOutputBufferManagerTest.cpp b/velox/exec/tests/PartitionedOutputBufferManagerTest.cpp index 1cd04b23bbe..ac2a1415fa7 100644 --- a/velox/exec/tests/PartitionedOutputBufferManagerTest.cpp +++ b/velox/exec/tests/PartitionedOutputBufferManagerTest.cpp @@ -108,7 +108,7 @@ class PartitionedOutputBufferManagerTest : public testing::Test { uint64_t maxBytes = 1024, int expectedGroups = 1) { bool receivedData = false; - bufferManager_->getData( + ASSERT_TRUE(bufferManager_->getData( taskId, destination, maxBytes, @@ -124,7 +124,7 @@ class PartitionedOutputBufferManagerTest : public testing::Test { } EXPECT_EQ(inSequence, sequence) << "for destination " << destination; receivedData = true; - }); + })); EXPECT_TRUE(receivedData) << "for destination " << destination; } @@ -163,12 +163,12 @@ class PartitionedOutputBufferManagerTest : public testing::Test { void fetchEndMarker(const std::string& taskId, int destination, int64_t sequence) { bool receivedData = false; - bufferManager_->getData( + ASSERT_TRUE(bufferManager_->getData( taskId, destination, std::numeric_limits::max(), sequence, - receiveEndMarker(destination, sequence, receivedData)); + receiveEndMarker(destination, sequence, receivedData))); EXPECT_TRUE(receivedData) << "for destination " << destination; bufferManager_->deleteResults(taskId, destination); } @@ -183,12 +183,12 @@ class PartitionedOutputBufferManagerTest : public testing::Test { int64_t sequence, bool& receivedEndMarker) { receivedEndMarker = false; - bufferManager_->getData( + ASSERT_TRUE(bufferManager_->getData( taskId, destination, std::numeric_limits::max(), sequence, - receiveEndMarker(destination, 1, receivedEndMarker)); + receiveEndMarker(destination, 1, receivedEndMarker))); EXPECT_FALSE(receivedEndMarker) << "for destination " << destination; } @@ -219,12 +219,12 @@ class PartitionedOutputBufferManagerTest : public testing::Test { int expectedGroups, bool& receivedData) { receivedData = false; - bufferManager_->getData( + ASSERT_TRUE(bufferManager_->getData( taskId, destination, 1024, sequence, - receiveData(destination, sequence, expectedGroups, receivedData)); + receiveData(destination, sequence, expectedGroups, receivedData))); EXPECT_FALSE(receivedData) << "for destination " << destination; } @@ -425,16 +425,12 @@ TEST_F(PartitionedOutputBufferManagerTest, getDataOnFailedTask) { // Fetching data on a task which was either never initialized in the buffer // manager or was removed by a parallel thread must return false. The `notify` // callback must not be registered. - bool notified = false; - auto ret = bufferManager_->getData( + ASSERT_FALSE(bufferManager_->getData( "test.0.1", 1, 10, 1, - [¬ified]( - std::vector> pages, int64_t sequence) { - notified = true; - }); - ASSERT_TRUE(!notified); - ASSERT_FALSE(ret); + [](std::vector> pages, int64_t sequence) { + VELOX_UNREACHABLE(); + })); }