From 1a4dd640a4c77849531044e41a0c0819a3d8ac95 Mon Sep 17 00:00:00 2001 From: Patrick Stuedi Date: Sun, 19 Nov 2023 22:16:57 -0800 Subject: [PATCH] [native] Add non-blocking shuffle reader interface Avoid blocking during shuffle read to support suspending of the shuffle operator. --- .../main/operators/LocalPersistentShuffle.cpp | 6 +- .../main/operators/LocalPersistentShuffle.h | 2 +- .../main/operators/ShuffleInterface.h | 2 +- .../operators/UnsafeRowExchangeSource.cpp | 70 ++++++++++--------- .../main/operators/UnsafeRowExchangeSource.h | 4 +- .../operators/tests/UnsafeRowShuffleTest.cpp | 7 +- 6 files changed, 49 insertions(+), 42 deletions(-) diff --git a/presto-native-execution/presto_cpp/main/operators/LocalPersistentShuffle.cpp b/presto-native-execution/presto_cpp/main/operators/LocalPersistentShuffle.cpp index 436faaac6db97..c4e067f3f9932 100644 --- a/presto-native-execution/presto_cpp/main/operators/LocalPersistentShuffle.cpp +++ b/presto-native-execution/presto_cpp/main/operators/LocalPersistentShuffle.cpp @@ -160,13 +160,13 @@ LocalPersistentShuffleReader::LocalPersistentShuffleReader( fileSystem_ = velox::filesystems::getFileSystem(rootPath_, nullptr); } -BufferPtr LocalPersistentShuffleReader::next() { +folly::SemiFuture LocalPersistentShuffleReader::next() { if (readPartitionFiles_.empty()) { readPartitionFiles_ = getReadPartitionFiles(); } if (readPartitionFileIndex_ >= readPartitionFiles_.size()) { - return nullptr; + return folly::makeSemiFuture(BufferPtr{}); } const auto filename = readPartitionFiles_[readPartitionFileIndex_]; @@ -174,7 +174,7 @@ BufferPtr LocalPersistentShuffleReader::next() { auto buffer = AlignedBuffer::allocate(file->size(), pool_, 0); file->pread(0, file->size(), buffer->asMutable()); ++readPartitionFileIndex_; - return buffer; + return folly::makeSemiFuture(std::move(buffer)); } void LocalPersistentShuffleReader::noMoreData(bool success) { diff --git a/presto-native-execution/presto_cpp/main/operators/LocalPersistentShuffle.h b/presto-native-execution/presto_cpp/main/operators/LocalPersistentShuffle.h index a5e24c48a44d4..09e91b84975af 100644 --- a/presto-native-execution/presto_cpp/main/operators/LocalPersistentShuffle.h +++ b/presto-native-execution/presto_cpp/main/operators/LocalPersistentShuffle.h @@ -128,7 +128,7 @@ class LocalPersistentShuffleReader : public ShuffleReader { std::vector partitionIds_, velox::memory::MemoryPool* FOLLY_NONNULL pool); - velox::BufferPtr next() override; + folly::SemiFuture next() override; void noMoreData(bool success) override; diff --git a/presto-native-execution/presto_cpp/main/operators/ShuffleInterface.h b/presto-native-execution/presto_cpp/main/operators/ShuffleInterface.h index 51ac096641f95..ccb9022340c57 100644 --- a/presto-native-execution/presto_cpp/main/operators/ShuffleInterface.h +++ b/presto-native-execution/presto_cpp/main/operators/ShuffleInterface.h @@ -39,7 +39,7 @@ class ShuffleReader { /// Reads the next block of data. The function returns null if it has read all /// the data. The function throws if run into any error. - virtual velox::BufferPtr next() = 0; + virtual folly::SemiFuture next() = 0; /// Tell the shuffle system the reader is done. May be called with 'success' /// true before reading all the data. This happens when a query has a LIMIT or diff --git a/presto-native-execution/presto_cpp/main/operators/UnsafeRowExchangeSource.cpp b/presto-native-execution/presto_cpp/main/operators/UnsafeRowExchangeSource.cpp index 89b95fcdb9bfd..fccf4e1286a0c 100644 --- a/presto-native-execution/presto_cpp/main/operators/UnsafeRowExchangeSource.cpp +++ b/presto-native-execution/presto_cpp/main/operators/UnsafeRowExchangeSource.cpp @@ -32,41 +32,45 @@ folly::SemiFuture UnsafeRowExchangeSource::request( uint32_t /*maxBytes*/, uint32_t /*maxWaitSeconds*/) { - std::vector promises; - int64_t totalBytes = 0; - { - std::lock_guard l(queue_->mutex()); - if (atEnd_) { - return folly::makeFuture(Response{0, true}); - } + auto nextBatch = [this]() { + return std::move(shuffle_->next()) + .deferValue([this](velox::BufferPtr buffer) { + std::vector promises; + int64_t totalBytes = 0; - velox::BufferPtr buffer; - CALL_SHUFFLE(buffer = shuffle_->next(), "next"); - if (buffer == nullptr) { - atEnd_ = true; - queue_->enqueueLocked(nullptr, promises); - } else { - totalBytes = buffer->size(); - - ++numBatches_; - - auto ioBuf = folly::IOBuf::wrapBuffer(buffer->as(), buffer->size()); - // NOTE: SerializedPage's onDestructionCb_ captures one reference on - // 'buffer' to keep its alive until SerializedPage destruction. Also note - // that 'buffer' should have been allocated from memory pool. Hence, we - // don't need to update the memory usage counting for the associated - // 'ioBuf' attached to SerializedPage on destruction. - queue_->enqueueLocked( - std::make_unique( - std::move(ioBuf), [buffer](auto& /*unused*/) {}), - promises); - } - } - for (auto& promise : promises) { - promise.setValue(); - } + { + std::lock_guard l(queue_->mutex()); + if (buffer == nullptr) { + atEnd_ = true; + queue_->enqueueLocked(nullptr, promises); + } else { + totalBytes = buffer->size(); + + ++numBatches_; + + auto ioBuf = + folly::IOBuf::wrapBuffer(buffer->as(), buffer->size()); + queue_->enqueueLocked( + std::make_unique( + std::move(ioBuf), [buffer](auto& /*unused*/) {}), + promises); + } + } + + for (auto& promise : promises) { + promise.setValue(); + } + + return folly::makeFuture(Response{totalBytes, atEnd_}); + }) + .deferError( + [](folly::exception_wrapper e) mutable + -> UnsafeRowExchangeSource::Response { + VELOX_FAIL("ShuffleReader::{} failed: {}", "next", e.what()); + }); + }; - return folly::makeFuture(Response{totalBytes, atEnd_}); + CALL_SHUFFLE(return nextBatch(), "next"); } folly::F14FastMap UnsafeRowExchangeSource::stats() const { diff --git a/presto-native-execution/presto_cpp/main/operators/UnsafeRowExchangeSource.h b/presto-native-execution/presto_cpp/main/operators/UnsafeRowExchangeSource.h index 72eaff778d3f8..231489672af5f 100644 --- a/presto-native-execution/presto_cpp/main/operators/UnsafeRowExchangeSource.h +++ b/presto-native-execution/presto_cpp/main/operators/UnsafeRowExchangeSource.h @@ -38,7 +38,9 @@ class UnsafeRowExchangeSource : public velox::exec::ExchangeSource { uint32_t maxBytes, uint32_t maxWaitSeconds) override; - void close() override {} + void close() override { + shuffle_->noMoreData(true); + } folly::F14FastMap stats() const override; diff --git a/presto-native-execution/presto_cpp/main/operators/tests/UnsafeRowShuffleTest.cpp b/presto-native-execution/presto_cpp/main/operators/tests/UnsafeRowShuffleTest.cpp index b791e1fbbdc12..b8cc46b3e47c3 100644 --- a/presto-native-execution/presto_cpp/main/operators/tests/UnsafeRowShuffleTest.cpp +++ b/presto-native-execution/presto_cpp/main/operators/tests/UnsafeRowShuffleTest.cpp @@ -186,16 +186,17 @@ class TestShuffleReader : public ShuffleReader { readyPartitions) : partition_(partition), readyPartitions_(readyPartitions) {} - BufferPtr next() override { + folly::SemiFuture next() override { TestValue::adjust( "facebook::presto::operators::test::TestShuffleReader::next", this); if ((*readyPartitions_)[partition_].empty()) { - return nullptr; + BufferPtr buffer = nullptr; + return folly::makeSemiFuture(std::move(buffer)); } auto buffer = (*readyPartitions_)[partition_].back(); (*readyPartitions_)[partition_].pop_back(); - return buffer; + return folly::makeSemiFuture(std::move(buffer)); } void noMoreData(bool success) override {