diff --git a/presto-native-execution/presto_cpp/main/operators/LocalPersistentShuffle.cpp b/presto-native-execution/presto_cpp/main/operators/LocalPersistentShuffle.cpp index c5d6f5886be29..158583aa677a5 100644 --- a/presto-native-execution/presto_cpp/main/operators/LocalPersistentShuffle.cpp +++ b/presto-native-execution/presto_cpp/main/operators/LocalPersistentShuffle.cpp @@ -161,43 +161,51 @@ LocalPersistentShuffleReader::LocalPersistentShuffleReader( fileSystem_ = velox::filesystems::getFileSystem(rootPath_, nullptr); } -folly::SemiFuture> LocalPersistentShuffleReader::next() { +folly::SemiFuture>> +LocalPersistentShuffleReader::next(size_t numBatches) { using TRowSize = uint32_t; if (readPartitionFiles_.empty()) { readPartitionFiles_ = getReadPartitionFiles(); } - if (readPartitionFileIndex_ >= readPartitionFiles_.size()) { - return folly::makeSemiFuture(std::unique_ptr{}); - } + std::vector> batches; + batches.reserve(numBatches); + + for (size_t i = 0; i < numBatches; ++i) { + if (readPartitionFileIndex_ >= readPartitionFiles_.size()) { + break; + } + + const auto filename = readPartitionFiles_[readPartitionFileIndex_]; + auto file = fileSystem_->openFileForRead(filename); + auto buffer = AlignedBuffer::allocate(file->size(), pool_, 0); + file->pread(0, file->size(), buffer->asMutable()); + ++readPartitionFileIndex_; + + // Parse the buffer to extract individual rows. + // Each row is stored as: | row-size (4 bytes) | row-data (row-size bytes) | + std::vector rows; + const char* data = buffer->as(); + size_t offset = 0; + const size_t totalSize = buffer->size(); + + while (offset + sizeof(TRowSize) <= totalSize) { + // Read row size (stored in big endian). + const TRowSize rowSize = folly::Endian::big(*(TRowSize*)(data + offset)); + offset += sizeof(TRowSize); + + VELOX_CHECK_LE(offset + rowSize, totalSize, "Invalid row data: row size"); + // Create a Row with empty key and the row data as value. + rows.emplace_back(std::string_view{data + offset, rowSize}); + offset += rowSize; + } - const auto filename = readPartitionFiles_[readPartitionFileIndex_]; - auto file = fileSystem_->openFileForRead(filename); - auto buffer = AlignedBuffer::allocate(file->size(), pool_, 0); - file->pread(0, file->size(), buffer->asMutable()); - ++readPartitionFileIndex_; - - // Parse the buffer to extract individual rows. - // Each row is stored as: | row-size (4 bytes) | row-data (row-size bytes) | - std::vector rows; - const char* data = buffer->as(); - size_t offset = 0; - const size_t totalSize = buffer->size(); - - while (offset + sizeof(TRowSize) <= totalSize) { - // Read row size (stored in big endian). - const TRowSize rowSize = folly::Endian::big(*(TRowSize*)(data + offset)); - offset += sizeof(TRowSize); - - VELOX_CHECK_LE(offset + rowSize, totalSize, "Invalid row data: row size"); - // Create a Row with empty key and the row data as value. - rows.emplace_back(std::string_view{data + offset, rowSize}); - offset += rowSize; + batches.push_back( + std::make_unique(std::move(rows), std::move(buffer))); } - return folly::makeSemiFuture>( - std::make_unique(std::move(rows), std::move(buffer))); + return folly::makeSemiFuture(std::move(batches)); } void LocalPersistentShuffleReader::noMoreData(bool success) { diff --git a/presto-native-execution/presto_cpp/main/operators/LocalPersistentShuffle.h b/presto-native-execution/presto_cpp/main/operators/LocalPersistentShuffle.h index 1dc2492a73ad4..a5182a5f5e975 100644 --- a/presto-native-execution/presto_cpp/main/operators/LocalPersistentShuffle.h +++ b/presto-native-execution/presto_cpp/main/operators/LocalPersistentShuffle.h @@ -131,7 +131,7 @@ class LocalPersistentShuffleReader : public ShuffleReader { std::vector partitionIds, velox::memory::MemoryPool* pool); - folly::SemiFuture> next() override; + folly::SemiFuture>> next(size_t numBatches) override; void noMoreData(bool success) override; diff --git a/presto-native-execution/presto_cpp/main/operators/ShuffleExchangeSource.cpp b/presto-native-execution/presto_cpp/main/operators/ShuffleExchangeSource.cpp index cdd86eec79853..e3246f875f729 100644 --- a/presto-native-execution/presto_cpp/main/operators/ShuffleExchangeSource.cpp +++ b/presto-native-execution/presto_cpp/main/operators/ShuffleExchangeSource.cpp @@ -34,22 +34,24 @@ ShuffleExchangeSource::request( uint32_t /*maxBytes*/, std::chrono::microseconds /*maxWait*/) { auto nextBatch = [this]() { - return std::move(shuffleReader_->next()) - .deferValue([this](std::unique_ptr batch) { + return std::move(shuffleReader_->next(1)) + .deferValue([this](std::vector>&& batches) { std::vector promises; int64_t totalBytes{0}; { std::lock_guard l(queue_->mutex()); - if (batch == nullptr) { + if (batches.empty()) { atEnd_ = true; queue_->enqueueLocked(nullptr, promises); } else { - totalBytes = batch->data->size(); + for (auto& batch : batches) { + totalBytes = batch->data->size(); VELOX_CHECK_LE(totalBytes, std::numeric_limits::max()); ++numBatches_; queue_->enqueueLocked( std::make_unique(std::move(batch)), promises); + } } } diff --git a/presto-native-execution/presto_cpp/main/operators/ShuffleInterface.h b/presto-native-execution/presto_cpp/main/operators/ShuffleInterface.h index 79b4081b8f5ca..a499ae8bd1a48 100644 --- a/presto-native-execution/presto_cpp/main/operators/ShuffleInterface.h +++ b/presto-native-execution/presto_cpp/main/operators/ShuffleInterface.h @@ -46,9 +46,8 @@ class ShuffleReader { public: virtual ~ShuffleReader() = default; - /// Reads the next block of data. The function returns null if it has read all - /// the data. The function throws if run into any error. - virtual folly::SemiFuture> next() = 0; + virtual folly::SemiFuture>> next( + size_t numBatches) = 0; /// Tell the shuffle system the reader is done. May be called with 'success' /// true before reading all the data. This happens when a query has a LIMIT or diff --git a/presto-native-execution/presto_cpp/main/operators/tests/ShuffleTest.cpp b/presto-native-execution/presto_cpp/main/operators/tests/ShuffleTest.cpp index ade4100b6e4de..2dd95b3c18016 100644 --- a/presto-native-execution/presto_cpp/main/operators/tests/ShuffleTest.cpp +++ b/presto-native-execution/presto_cpp/main/operators/tests/ShuffleTest.cpp @@ -234,16 +234,19 @@ class TestShuffleReader : public ShuffleReader { readyPartitions) : partition_(partition), readyPartitions_(readyPartitions) {} - folly::SemiFuture> next() override { + folly::SemiFuture>> next( + size_t numBatches) override { TestValue::adjust( "facebook::presto::operators::test::TestShuffleReader::next", this); - if ((*readyPartitions_)[partition_].empty()) { - return folly::makeSemiFuture(std::unique_ptr(nullptr)); + std::vector> result; + auto& partitionBatches = (*readyPartitions_)[partition_]; + + for (size_t i = 0; i < numBatches && !partitionBatches.empty(); ++i) { + result.push_back(std::move(partitionBatches.back())); + partitionBatches.pop_back(); } - auto readBatch = std::move((*readyPartitions_)[partition_].back()); - (*readyPartitions_)[partition_].pop_back(); - return folly::makeSemiFuture>( - std::move(readBatch)); + + return folly::makeSemiFuture(std::move(result)); } void noMoreData(bool success) override {