diff --git a/presto-native-execution/presto_cpp/main/PrestoExchangeSource.cpp b/presto-native-execution/presto_cpp/main/PrestoExchangeSource.cpp index 19c1931b420eb..0b6709975aa4e 100644 --- a/presto-native-execution/presto_cpp/main/PrestoExchangeSource.cpp +++ b/presto-native-execution/presto_cpp/main/PrestoExchangeSource.cpp @@ -323,7 +323,7 @@ void PrestoExchangeSource::processDataResponse( contentLength, 0, "next token is not set in non-empty data response"); } - std::unique_ptr page; + std::unique_ptr page; const bool empty = response->empty(); if (!empty) { std::vector> iobufs; @@ -351,7 +351,7 @@ void PrestoExchangeSource::processDataResponse( } if (enableBufferCopy_) { - page = std::make_unique( + page = std::make_unique( std::move(singleChain), [pool = pool_](folly::IOBuf& iobuf) { int64_t freedBytes{0}; // Free the backed memory from MemoryAllocator on page dtor @@ -365,7 +365,7 @@ void PrestoExchangeSource::processDataResponse( PrestoExchangeSource::updateMemoryUsage(-freedBytes); }); } else { - page = std::make_unique( + page = std::make_unique( std::move(singleChain), [totalBytes](folly::IOBuf& iobuf) { PrestoExchangeSource::updateMemoryUsage(-totalBytes); }); diff --git a/presto-native-execution/presto_cpp/main/PrestoServer.cpp b/presto-native-execution/presto_cpp/main/PrestoServer.cpp index 1043f0ef64884..c638aa811b88f 100644 --- a/presto-native-execution/presto_cpp/main/PrestoServer.cpp +++ b/presto-native-execution/presto_cpp/main/PrestoServer.cpp @@ -41,6 +41,7 @@ #include "presto_cpp/main/operators/PartitionAndSerialize.h" #include "presto_cpp/main/operators/ShuffleExchangeSource.h" #include "presto_cpp/main/operators/ShuffleRead.h" +#include "presto_cpp/main/operators/ShuffleWrite.h" #include "presto_cpp/main/types/PrestoToVeloxQueryPlan.h" #include "presto_cpp/main/types/VeloxPlanConversion.h" #include "velox/common/base/Counters.h" diff --git a/presto-native-execution/presto_cpp/main/SessionProperties.cpp b/presto-native-execution/presto_cpp/main/SessionProperties.cpp index 48511a5ab0c28..063ed97cde423 100644 --- a/presto-native-execution/presto_cpp/main/SessionProperties.cpp +++ b/presto-native-execution/presto_cpp/main/SessionProperties.cpp @@ -345,7 +345,7 @@ SessionProperties::SessionProperties() { "creating tiny SerializedPages. For " "PartitionedOutputNode::Kind::kPartitioned, PartitionedOutput operator" "would buffer up to that number of bytes / number of destinations for " - "each destination before producing a SerializedPage.", + "each destination before producing a SerializedPageBase.", BIGINT(), false, QueryConfig::kMaxPartitionedOutputBufferSize, diff --git a/presto-native-execution/presto_cpp/main/SessionProperties.h b/presto-native-execution/presto_cpp/main/SessionProperties.h index 02299ccfd875b..f8d9fd0b2295b 100644 --- a/presto-native-execution/presto_cpp/main/SessionProperties.h +++ b/presto-native-execution/presto_cpp/main/SessionProperties.h @@ -268,7 +268,7 @@ class SessionProperties { /// creating tiny SerializedPages. For /// PartitionedOutputNode::Kind::kPartitioned, PartitionedOutput operator /// would buffer up to that number of bytes / number of destinations for each - /// destination before producing a SerializedPage. + /// destination before producing a SerializedPageBase. static constexpr const char* kMaxPartitionedOutputBufferSize = "native_max_page_partitioning_buffer_size"; diff --git a/presto-native-execution/presto_cpp/main/operators/BroadcastExchangeSource.cpp b/presto-native-execution/presto_cpp/main/operators/BroadcastExchangeSource.cpp index b87c50e841404..b649540c9d678 100644 --- a/presto-native-execution/presto_cpp/main/operators/BroadcastExchangeSource.cpp +++ b/presto-native-execution/presto_cpp/main/operators/BroadcastExchangeSource.cpp @@ -42,14 +42,14 @@ BroadcastExchangeSource::request( return folly::makeTryWith([&]() -> Response { int64_t totalBytes = 0; - std::vector> pages; + std::vector> pages; while (totalBytes < maxBytes && reader_->hasNext()) { auto buffer = reader_->next(); VELOX_CHECK_NOT_NULL(buffer); auto ioBuf = folly::IOBuf::wrapBuffer(buffer->as(), buffer->size()); - auto page = std::make_unique( + auto page = std::make_unique( std::move(ioBuf), [buffer](auto& /*unused*/) {}); pages.push_back(std::move(page)); diff --git a/presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp b/presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp index 16a8a972d7851..a253bb2a70a65 100644 --- a/presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp +++ b/presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp @@ -105,6 +105,30 @@ class SortedFileInputStream final : public velox::common::FileInputStream, std::string currentValue_; }; +class LocalShuffleSerializedPage : public ShuffleSerializedPage { + public: + LocalShuffleSerializedPage( + const std::vector& rows, + velox::BufferPtr buffer) + : rows_{std::move(rows)}, buffer_{std::move(buffer)} {} + + const std::vector& rows() override { + return rows_; + } + + uint64_t size() const override { + return buffer_->size(); + } + + std::optional numRows() const override { + return rows_.size(); + } + + private: + const std::vector rows_; + const velox::BufferPtr buffer_; +}; + std::vector extractRowMetadata(const char* buffer, size_t bufferSize, bool sortedShuffle) { std::vector rows; @@ -450,9 +474,9 @@ void LocalShuffleReader::initSortedShuffleRead() { } } -std::vector> LocalShuffleReader::nextSorted( - uint64_t maxBytes) { - std::vector> batches; +std::vector> +LocalShuffleReader::nextSorted(uint64_t maxBytes) { + std::vector> batches; if (merge_ == nullptr) { return batches; @@ -469,7 +493,7 @@ std::vector> LocalShuffleReader::nextSorted( if (bufferUsed + data.size() > maxBytes) { if (bufferUsed > 0) { batches.push_back( - std::make_unique( + std::make_unique( std::move(rows), std::move(batchBuffer))); return batches; } @@ -489,15 +513,16 @@ std::vector> LocalShuffleReader::nextSorted( if (!rows.empty()) { batches.push_back( - std::make_unique(std::move(rows), std::move(batchBuffer))); + std::make_unique( + std::move(rows), std::move(batchBuffer))); } return batches; } -std::vector> LocalShuffleReader::nextUnsorted( - uint64_t maxBytes) { - std::vector> batches; +std::vector> +LocalShuffleReader::nextUnsorted(uint64_t maxBytes) { + std::vector> batches; uint64_t totalBytes{0}; while (readPartitionFileIndex_ < readPartitionFiles_.size()) { @@ -527,13 +552,14 @@ std::vector> LocalShuffleReader::nextUnsorted( totalBytes += fileSize; batches.push_back( - std::make_unique(std::move(rows), std::move(buffer))); + std::make_unique( + std::move(rows), std::move(buffer))); } return batches; } -folly::SemiFuture>> +folly::SemiFuture>> LocalShuffleReader::next(uint64_t maxBytes) { VELOX_CHECK( initialized_, diff --git a/presto-native-execution/presto_cpp/main/operators/LocalShuffle.h b/presto-native-execution/presto_cpp/main/operators/LocalShuffle.h index 8d362c8d8c77c..fb2314f5b3acb 100644 --- a/presto-native-execution/presto_cpp/main/operators/LocalShuffle.h +++ b/presto-native-execution/presto_cpp/main/operators/LocalShuffle.h @@ -173,7 +173,7 @@ class LocalShuffleReader : public ShuffleReader { /// For sorted shuffle, this opens all shuffle files and prepares k-way merge. void initialize(); - folly::SemiFuture>> next( + folly::SemiFuture>> next( uint64_t maxBytes) override; void noMoreData(bool success) override; @@ -192,10 +192,12 @@ class LocalShuffleReader : public ShuffleReader { void initSortedShuffleRead(); // Reads sorted shuffle data using k-way merge with TreeOfLosers. - std::vector> nextSorted(uint64_t maxBytes); + std::vector> nextSorted( + uint64_t maxBytes); // Reads unsorted shuffle data in batch-based file reading. - std::vector> nextUnsorted(uint64_t maxBytes); + std::vector> nextUnsorted( + uint64_t maxBytes); const std::string rootPath_; const std::string queryId_; diff --git a/presto-native-execution/presto_cpp/main/operators/ShuffleExchangeSource.cpp b/presto-native-execution/presto_cpp/main/operators/ShuffleExchangeSource.cpp index 0506d04a20294..45f3675f3e958 100644 --- a/presto-native-execution/presto_cpp/main/operators/ShuffleExchangeSource.cpp +++ b/presto-native-execution/presto_cpp/main/operators/ShuffleExchangeSource.cpp @@ -35,31 +35,29 @@ ShuffleExchangeSource::request( std::chrono::microseconds /*maxWait*/) { auto nextBatch = [this, maxBytes]() { return std::move(shuffleReader_->next(maxBytes)) - .deferValue([this](std::vector>&& batches) { - std::vector promises; - int64_t totalBytes{0}; - { - std::lock_guard l(queue_->mutex()); - if (batches.empty()) { - atEnd_ = true; - queue_->enqueueLocked(nullptr, promises); - } else { - for (auto& batch : batches) { - totalBytes = batch->data->size(); - VELOX_CHECK_LE(totalBytes, std::numeric_limits::max()); - ++numBatches_; - queue_->enqueueLocked( - std::make_unique(std::move(batch)), - promises); + .deferValue( + [this]( + std::vector>&& batches) { + std::vector promises; + int64_t totalBytes{0}; + { + std::lock_guard l(queue_->mutex()); + if (batches.empty()) { + atEnd_ = true; + queue_->enqueueLocked(nullptr, promises); + } else { + for (auto& batch : batches) { + ++numBatches_; + queue_->enqueueLocked(std::move(batch), promises); + } + } } - } - } - for (auto& promise : promises) { - promise.setValue(); - } - return folly::makeFuture(Response{totalBytes, atEnd_}); - }) + for (auto& promise : promises) { + promise.setValue(); + } + return folly::makeFuture(Response{totalBytes, atEnd_}); + }) .deferError( [](folly::exception_wrapper e) mutable -> ShuffleExchangeSource::Response { diff --git a/presto-native-execution/presto_cpp/main/operators/ShuffleExchangeSource.h b/presto-native-execution/presto_cpp/main/operators/ShuffleExchangeSource.h index 5365331faa3b5..e3379a1fb0c36 100644 --- a/presto-native-execution/presto_cpp/main/operators/ShuffleExchangeSource.h +++ b/presto-native-execution/presto_cpp/main/operators/ShuffleExchangeSource.h @@ -14,32 +14,11 @@ #pragma once #include "presto_cpp/main/operators/ShuffleInterface.h" -#include "presto_cpp/main/operators/ShuffleWrite.h" #include "velox/core/PlanNode.h" -#include "velox/exec/Exchange.h" #include "velox/exec/Operator.h" namespace facebook::presto::operators { -class ShuffleRowBatch : public velox::exec::SerializedPage { - public: - explicit ShuffleRowBatch( - std::unique_ptr rowBatch) - : velox::exec:: - SerializedPage{folly::IOBuf::wrapBuffer( - rowBatch->data->as(), rowBatch->data->size()), nullptr, rowBatch->rows.size()}, - rowBatch_{std::move(rowBatch)} {} - - ~ShuffleRowBatch() override {} - - const std::vector& rows() const { - return rowBatch_->rows; - } - - private: - const std::unique_ptr rowBatch_; -}; - class ShuffleExchangeSource : public velox::exec::ExchangeSource { public: ShuffleExchangeSource( diff --git a/presto-native-execution/presto_cpp/main/operators/ShuffleInterface.h b/presto-native-execution/presto_cpp/main/operators/ShuffleInterface.h index 944bcf84d4728..456dcaf27099f 100644 --- a/presto-native-execution/presto_cpp/main/operators/ShuffleInterface.h +++ b/presto-native-execution/presto_cpp/main/operators/ShuffleInterface.h @@ -14,8 +14,13 @@ #pragma once #include +#include "velox/exec/Exchange.h" #include "velox/exec/Operator.h" +namespace facebook::velox { +class ByteInputStream; +} + namespace facebook::presto::operators { class ShuffleWriter { @@ -44,12 +49,21 @@ class ShuffleWriter { } }; -struct ReadBatch { - std::vector rows; - velox::BufferPtr data; +class ShuffleSerializedPage : public velox::exec::SerializedPageBase { + public: + ShuffleSerializedPage() = default; + ~ShuffleSerializedPage() override = default; + + std::unique_ptr prepareStreamForDeserialize() + override { + VELOX_UNSUPPORTED(); + } + + std::unique_ptr getIOBuf() const override { + VELOX_UNSUPPORTED(); + } - ReadBatch(std::vector&& _rows, velox::BufferPtr&& _data) - : rows{std::move(_rows)}, data{std::move(_data)} {} + virtual const std::vector& rows() = 0; }; class ShuffleReader { @@ -58,10 +72,11 @@ class ShuffleReader { /// Fetch the next batch of rows from the shuffle reader. /// @param bytes Maximum number of bytes to read in this batch. - /// @return A semi-future resolving to a vector of ReadBatch pointers, where - /// each ReadBatch contains rows and associated data buffers. - virtual folly::SemiFuture>> next( - uint64_t maxBytes) = 0; + /// @return A semi-future resolving to a vector of ShuffleSerializedPage + /// pointers, where each ShuffleSerializedPage contains rows and associated + /// data buffers. + virtual folly::SemiFuture>> + next(uint64_t maxBytes) = 0; /// Tell the shuffle system the reader is done. May be called with 'success' /// true before reading all the data. This happens when a query has a LIMIT or diff --git a/presto-native-execution/presto_cpp/main/operators/ShuffleRead.cpp b/presto-native-execution/presto_cpp/main/operators/ShuffleRead.cpp index bc85443dea742..3e08f5369036e 100644 --- a/presto-native-execution/presto_cpp/main/operators/ShuffleRead.cpp +++ b/presto-native-execution/presto_cpp/main/operators/ShuffleRead.cpp @@ -75,6 +75,8 @@ RowVectorPtr ShuffleRead::getOutput() { VELOX_CHECK_EQ(nextRow_, 0); size_t numRows{0}; for (const auto& page : currentPages_) { + auto* batch = checked_pointer_cast(page.get()); + VELOX_CHECK_LE(batch->size(), std::numeric_limits::max()); rawInputBytes += page->size(); const auto pageRows = page->numRows().value(); pageRows_.emplace_back( @@ -83,7 +85,7 @@ RowVectorPtr ShuffleRead::getOutput() { } rows_.reserve(numRows); for (const auto& page : currentPages_) { - auto* batch = checked_pointer_cast(page.get()); + auto* batch = checked_pointer_cast(page.get()); const auto& rows = batch->rows(); for (const auto& row : rows) { rows_.emplace_back(row); diff --git a/presto-native-execution/presto_cpp/main/operators/tests/ShuffleTest.cpp b/presto-native-execution/presto_cpp/main/operators/tests/ShuffleTest.cpp index 9ea336bd08c1f..4d877f346c891 100644 --- a/presto-native-execution/presto_cpp/main/operators/tests/ShuffleTest.cpp +++ b/presto-native-execution/presto_cpp/main/operators/tests/ShuffleTest.cpp @@ -607,7 +607,7 @@ class ShuffleTest : public exec::test::OperatorTestBase { break; } for (auto& batch : batches) { - totalRows += batch->rows.size(); + totalRows += batch->rows().size(); } } } @@ -1643,7 +1643,7 @@ TEST_F(ShuffleTest, shuffleWriterReader) { ++numOutputCalls; for (const auto& batch : batches) { - for (const auto& row : batch->rows) { + for (const auto& row : batch->rows()) { readDataValues.emplace_back(row.data(), row.size()); ++totalRows; } diff --git a/presto-native-execution/presto_cpp/main/tests/PrestoExchangeSourceTest.cpp b/presto-native-execution/presto_cpp/main/tests/PrestoExchangeSourceTest.cpp index d48d3fb4fb626..77a3892b36d5e 100644 --- a/presto-native-execution/presto_cpp/main/tests/PrestoExchangeSourceTest.cpp +++ b/presto-native-execution/presto_cpp/main/tests/PrestoExchangeSourceTest.cpp @@ -411,7 +411,7 @@ class Producer { bool receivedDeleteResults_ = false; }; -std::string toString(exec::SerializedPage* page) { +std::string toString(exec::SerializedPageBase* page) { auto input = page->prepareStreamForDeserialize(); auto numBytes = input->read(); @@ -421,7 +421,7 @@ std::string toString(exec::SerializedPage* page) { return std::string(data); } -std::unique_ptr waitForNextPage( +std::unique_ptr waitForNextPage( const std::shared_ptr& queue) { bool atEnd; facebook::velox::ContinueFuture future; diff --git a/presto-native-execution/velox b/presto-native-execution/velox index a2c77748eab7d..ba2d78eee973b 160000 --- a/presto-native-execution/velox +++ b/presto-native-execution/velox @@ -1 +1 @@ -Subproject commit a2c77748eab7dab8c0728afe3373a6bff59a05da +Subproject commit ba2d78eee973be24c0cb595d9f7f7c9a8e13baa2