From 3b564d9ec6bb5417f6ad775b3bdb6ea818291717 Mon Sep 17 00:00:00 2001 From: xiaoxmeng Date: Wed, 10 Jan 2024 16:52:25 -0800 Subject: [PATCH] [native]Add to handle empty data case in ShuffleReader::next Add to handle empty data case in ShuffleReader::next and update UnsafeRowExchangeSource to remove the use of hasNext(). The followup is to deprecate hasNext. --- .../main/operators/LocalPersistentShuffle.cpp | 15 ++++++--------- .../main/operators/LocalPersistentShuffle.h | 2 -- .../main/operators/ShuffleInterface.h | 9 ++++++--- .../main/operators/UnsafeRowExchangeSource.cpp | 9 +++------ .../operators/tests/UnsafeRowShuffleTest.cpp | 18 +++--------------- 5 files changed, 18 insertions(+), 35 deletions(-) diff --git a/presto-native-execution/presto_cpp/main/operators/LocalPersistentShuffle.cpp b/presto-native-execution/presto_cpp/main/operators/LocalPersistentShuffle.cpp index 77c5cada478f4..436faaac6db97 100644 --- a/presto-native-execution/presto_cpp/main/operators/LocalPersistentShuffle.cpp +++ b/presto-native-execution/presto_cpp/main/operators/LocalPersistentShuffle.cpp @@ -160,16 +160,16 @@ LocalPersistentShuffleReader::LocalPersistentShuffleReader( fileSystem_ = velox::filesystems::getFileSystem(rootPath_, nullptr); } -bool LocalPersistentShuffleReader::hasNext() { +BufferPtr LocalPersistentShuffleReader::next() { if (readPartitionFiles_.empty()) { readPartitionFiles_ = getReadPartitionFiles(); } - return readPartitionFileIndex_ < readPartitionFiles_.size(); -} + if (readPartitionFileIndex_ >= readPartitionFiles_.size()) { + return nullptr; + } -BufferPtr LocalPersistentShuffleReader::next() { - auto filename = readPartitionFiles_[readPartitionFileIndex_]; + const auto filename = readPartitionFiles_[readPartitionFileIndex_]; auto file = fileSystem_->openFileForRead(filename); auto buffer = AlignedBuffer::allocate(file->size(), pool_, 0); file->pread(0, file->size(), buffer->asMutable()); @@ -246,10 +246,7 @@ std::shared_ptr LocalPersistentShuffleFactory::createReader( const operators::LocalShuffleReadInfo readInfo = operators::LocalShuffleReadInfo::deserialize(serializedStr); return std::make_shared( - readInfo.rootPath, - readInfo.queryId, - readInfo.partitionIds, - pool); + readInfo.rootPath, readInfo.queryId, readInfo.partitionIds, pool); } std::shared_ptr LocalPersistentShuffleFactory::createWriter( diff --git a/presto-native-execution/presto_cpp/main/operators/LocalPersistentShuffle.h b/presto-native-execution/presto_cpp/main/operators/LocalPersistentShuffle.h index 1a94b1adf96ab..a5e24c48a44d4 100644 --- a/presto-native-execution/presto_cpp/main/operators/LocalPersistentShuffle.h +++ b/presto-native-execution/presto_cpp/main/operators/LocalPersistentShuffle.h @@ -128,8 +128,6 @@ class LocalPersistentShuffleReader : public ShuffleReader { std::vector partitionIds_, velox::memory::MemoryPool* FOLLY_NONNULL pool); - bool hasNext() override; - velox::BufferPtr next() override; void noMoreData(bool success) override; diff --git a/presto-native-execution/presto_cpp/main/operators/ShuffleInterface.h b/presto-native-execution/presto_cpp/main/operators/ShuffleInterface.h index 44ce8e5aa509d..3dbfc86c45464 100644 --- a/presto-native-execution/presto_cpp/main/operators/ShuffleInterface.h +++ b/presto-native-execution/presto_cpp/main/operators/ShuffleInterface.h @@ -37,10 +37,13 @@ class ShuffleReader { public: virtual ~ShuffleReader() = default; - /// Check by the reader to see if more blocks are available - virtual bool hasNext() = 0; + /// Deprecate, do not use! + virtual bool hasNext() { + return true; + } - /// Read the next block of data. + /// Reads the next block of data. The function returns null if it has read all + /// the data. The function throws if run into any error. virtual velox::BufferPtr next() = 0; /// Tell the shuffle system the reader is done. May be called with 'success' diff --git a/presto-native-execution/presto_cpp/main/operators/UnsafeRowExchangeSource.cpp b/presto-native-execution/presto_cpp/main/operators/UnsafeRowExchangeSource.cpp index 234daddf439a5..89b95fcdb9bfd 100644 --- a/presto-native-execution/presto_cpp/main/operators/UnsafeRowExchangeSource.cpp +++ b/presto-native-execution/presto_cpp/main/operators/UnsafeRowExchangeSource.cpp @@ -40,15 +40,12 @@ UnsafeRowExchangeSource::request( return folly::makeFuture(Response{0, true}); } - bool hasNext; - CALL_SHUFFLE(hasNext = shuffle_->hasNext(), "hasNext"); - - if (!hasNext) { + velox::BufferPtr buffer; + CALL_SHUFFLE(buffer = shuffle_->next(), "next"); + if (buffer == nullptr) { atEnd_ = true; queue_->enqueueLocked(nullptr, promises); } else { - velox::BufferPtr buffer; - CALL_SHUFFLE(buffer = shuffle_->next(), "next"); totalBytes = buffer->size(); ++numBatches_; diff --git a/presto-native-execution/presto_cpp/main/operators/tests/UnsafeRowShuffleTest.cpp b/presto-native-execution/presto_cpp/main/operators/tests/UnsafeRowShuffleTest.cpp index 049898cd59131..b791e1fbbdc12 100644 --- a/presto-native-execution/presto_cpp/main/operators/tests/UnsafeRowShuffleTest.cpp +++ b/presto-native-execution/presto_cpp/main/operators/tests/UnsafeRowShuffleTest.cpp @@ -186,16 +186,12 @@ class TestShuffleReader : public ShuffleReader { readyPartitions) : partition_(partition), readyPartitions_(readyPartitions) {} - bool hasNext() override { - TestValue::adjust( - "facebook::presto::operators::test::TestShuffleReader::hasNext", this); - return !(*readyPartitions_)[partition_].empty(); - } - BufferPtr next() override { TestValue::adjust( "facebook::presto::operators::test::TestShuffleReader::next", this); - VELOX_CHECK(!(*readyPartitions_)[partition_].empty()); + if ((*readyPartitions_)[partition_].empty()) { + return nullptr; + } auto buffer = (*readyPartitions_)[partition_].back(); (*readyPartitions_)[partition_].pop_back(); @@ -811,14 +807,6 @@ TEST_F(UnsafeRowShuffleTest, shuffleReaderExceptions) { params.planNode = exec::test::PlanBuilder() .addNode(addShuffleReadNode(asRowType(data->type()))) .planNode(); - { - SCOPED_TESTVALUE_SET( - "facebook::presto::operators::test::TestShuffleReader::hasNext", - injectFailure); - - VELOX_ASSERT_THROW( - runShuffleReadTask(params, info), "ShuffleReader::hasNext failed"); - } { SCOPED_TESTVALUE_SET(