Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ void PrestoExchangeSource::processDataResponse(
contentLength, 0, "next token is not set in non-empty data response");
}

std::unique_ptr<exec::SerializedPage> page;
std::unique_ptr<exec::SerializedPageBase> page;
const bool empty = response->empty();
if (!empty) {
std::vector<std::unique_ptr<folly::IOBuf>> iobufs;
Expand Down Expand Up @@ -351,7 +351,7 @@ void PrestoExchangeSource::processDataResponse(
}

if (enableBufferCopy_) {
page = std::make_unique<exec::SerializedPage>(
page = std::make_unique<exec::PrestoSerializedPage>(
std::move(singleChain), [pool = pool_](folly::IOBuf& iobuf) {
int64_t freedBytes{0};
// Free the backed memory from MemoryAllocator on page dtor
Expand All @@ -365,7 +365,7 @@ void PrestoExchangeSource::processDataResponse(
PrestoExchangeSource::updateMemoryUsage(-freedBytes);
});
} else {
page = std::make_unique<exec::SerializedPage>(
page = std::make_unique<exec::PrestoSerializedPage>(
std::move(singleChain), [totalBytes](folly::IOBuf& iobuf) {
PrestoExchangeSource::updateMemoryUsage(-totalBytes);
});
Expand Down
1 change: 1 addition & 0 deletions presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include "presto_cpp/main/operators/PartitionAndSerialize.h"
#include "presto_cpp/main/operators/ShuffleExchangeSource.h"
#include "presto_cpp/main/operators/ShuffleRead.h"
#include "presto_cpp/main/operators/ShuffleWrite.h"
#include "presto_cpp/main/types/PrestoToVeloxQueryPlan.h"
#include "presto_cpp/main/types/VeloxPlanConversion.h"
#include "velox/common/base/Counters.h"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ SessionProperties::SessionProperties() {
"creating tiny SerializedPages. For "
"PartitionedOutputNode::Kind::kPartitioned, PartitionedOutput operator"
"would buffer up to that number of bytes / number of destinations for "
"each destination before producing a SerializedPage.",
"each destination before producing a SerializedPageBase.",
BIGINT(),
false,
QueryConfig::kMaxPartitionedOutputBufferSize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ class SessionProperties {
/// creating tiny SerializedPages. For
/// PartitionedOutputNode::Kind::kPartitioned, PartitionedOutput operator
/// would buffer up to that number of bytes / number of destinations for each
/// destination before producing a SerializedPage.
/// destination before producing a SerializedPageBase.
static constexpr const char* kMaxPartitionedOutputBufferSize =
"native_max_page_partitioning_buffer_size";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ BroadcastExchangeSource::request(

return folly::makeTryWith([&]() -> Response {
int64_t totalBytes = 0;
std::vector<std::unique_ptr<velox::exec::SerializedPage>> pages;
std::vector<std::unique_ptr<velox::exec::SerializedPageBase>> pages;

while (totalBytes < maxBytes && reader_->hasNext()) {
auto buffer = reader_->next();
VELOX_CHECK_NOT_NULL(buffer);

auto ioBuf = folly::IOBuf::wrapBuffer(buffer->as<char>(), buffer->size());
auto page = std::make_unique<velox::exec::SerializedPage>(
auto page = std::make_unique<velox::exec::PrestoSerializedPage>(
std::move(ioBuf), [buffer](auto& /*unused*/) {});
pages.push_back(std::move(page));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,30 @@ class SortedFileInputStream final : public velox::common::FileInputStream,
std::string currentValue_;
};

class LocalShuffleSerializedPage : public ShuffleSerializedPage {
public:
LocalShuffleSerializedPage(
const std::vector<std::string_view>& rows,
velox::BufferPtr buffer)
: rows_{std::move(rows)}, buffer_{std::move(buffer)} {}

const std::vector<std::string_view>& rows() override {
return rows_;
}

uint64_t size() const override {
return buffer_->size();
}

std::optional<int64_t> numRows() const override {
return rows_.size();
}

private:
const std::vector<std::string_view> rows_;
const velox::BufferPtr buffer_;
};

std::vector<RowMetadata>
extractRowMetadata(const char* buffer, size_t bufferSize, bool sortedShuffle) {
std::vector<RowMetadata> rows;
Expand Down Expand Up @@ -450,9 +474,9 @@ void LocalShuffleReader::initSortedShuffleRead() {
}
}

std::vector<std::unique_ptr<ReadBatch>> LocalShuffleReader::nextSorted(
uint64_t maxBytes) {
std::vector<std::unique_ptr<ReadBatch>> batches;
std::vector<std::unique_ptr<ShuffleSerializedPage>>
LocalShuffleReader::nextSorted(uint64_t maxBytes) {
std::vector<std::unique_ptr<ShuffleSerializedPage>> batches;

if (merge_ == nullptr) {
return batches;
Expand All @@ -469,7 +493,7 @@ std::vector<std::unique_ptr<ReadBatch>> LocalShuffleReader::nextSorted(
if (bufferUsed + data.size() > maxBytes) {
if (bufferUsed > 0) {
batches.push_back(
std::make_unique<ReadBatch>(
std::make_unique<LocalShuffleSerializedPage>(
std::move(rows), std::move(batchBuffer)));
return batches;
}
Expand All @@ -489,15 +513,16 @@ std::vector<std::unique_ptr<ReadBatch>> LocalShuffleReader::nextSorted(

if (!rows.empty()) {
batches.push_back(
std::make_unique<ReadBatch>(std::move(rows), std::move(batchBuffer)));
std::make_unique<LocalShuffleSerializedPage>(
std::move(rows), std::move(batchBuffer)));
}

return batches;
}

std::vector<std::unique_ptr<ReadBatch>> LocalShuffleReader::nextUnsorted(
uint64_t maxBytes) {
std::vector<std::unique_ptr<ReadBatch>> batches;
std::vector<std::unique_ptr<ShuffleSerializedPage>>
LocalShuffleReader::nextUnsorted(uint64_t maxBytes) {
std::vector<std::unique_ptr<ShuffleSerializedPage>> batches;
uint64_t totalBytes{0};

while (readPartitionFileIndex_ < readPartitionFiles_.size()) {
Expand Down Expand Up @@ -527,13 +552,14 @@ std::vector<std::unique_ptr<ReadBatch>> LocalShuffleReader::nextUnsorted(

totalBytes += fileSize;
batches.push_back(
std::make_unique<ReadBatch>(std::move(rows), std::move(buffer)));
std::make_unique<LocalShuffleSerializedPage>(
std::move(rows), std::move(buffer)));
}

return batches;
}

folly::SemiFuture<std::vector<std::unique_ptr<ReadBatch>>>
folly::SemiFuture<std::vector<std::unique_ptr<ShuffleSerializedPage>>>
LocalShuffleReader::next(uint64_t maxBytes) {
VELOX_CHECK(
initialized_,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ class LocalShuffleReader : public ShuffleReader {
/// For sorted shuffle, this opens all shuffle files and prepares k-way merge.
void initialize();

folly::SemiFuture<std::vector<std::unique_ptr<ReadBatch>>> next(
folly::SemiFuture<std::vector<std::unique_ptr<ShuffleSerializedPage>>> next(
uint64_t maxBytes) override;

void noMoreData(bool success) override;
Expand All @@ -192,10 +192,12 @@ class LocalShuffleReader : public ShuffleReader {
void initSortedShuffleRead();

// Reads sorted shuffle data using k-way merge with TreeOfLosers.
std::vector<std::unique_ptr<ReadBatch>> nextSorted(uint64_t maxBytes);
std::vector<std::unique_ptr<ShuffleSerializedPage>> nextSorted(
uint64_t maxBytes);

// Reads unsorted shuffle data in batch-based file reading.
std::vector<std::unique_ptr<ReadBatch>> nextUnsorted(uint64_t maxBytes);
std::vector<std::unique_ptr<ShuffleSerializedPage>> nextUnsorted(
uint64_t maxBytes);

const std::string rootPath_;
const std::string queryId_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,31 +35,29 @@ ShuffleExchangeSource::request(
std::chrono::microseconds /*maxWait*/) {
auto nextBatch = [this, maxBytes]() {
return std::move(shuffleReader_->next(maxBytes))
.deferValue([this](std::vector<std::unique_ptr<ReadBatch>>&& batches) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): totalBytes is no longer updated and will always be 0 in the Response

In the new lambda, totalBytes is set to 0 but never updated in the loop over batches, whereas previously it was set from batch->data->size(). This means request will always return a Response with totalBytes == 0, which can break byte-based accounting and backpressure. Please restore accumulation of totalBytes (e.g., sum batch->size() for each enqueued page and re-apply the int32_t bounds check if still required).

std::vector<velox::ContinuePromise> promises;
int64_t totalBytes{0};
{
std::lock_guard<std::mutex> l(queue_->mutex());
if (batches.empty()) {
atEnd_ = true;
queue_->enqueueLocked(nullptr, promises);
} else {
for (auto& batch : batches) {
totalBytes = batch->data->size();
VELOX_CHECK_LE(totalBytes, std::numeric_limits<int32_t>::max());
++numBatches_;
queue_->enqueueLocked(
std::make_unique<ShuffleRowBatch>(std::move(batch)),
promises);
.deferValue(
[this](
std::vector<std::unique_ptr<ShuffleSerializedPage>>&& batches) {
std::vector<velox::ContinuePromise> promises;
int64_t totalBytes{0};
{
std::lock_guard<std::mutex> l(queue_->mutex());
if (batches.empty()) {
atEnd_ = true;
queue_->enqueueLocked(nullptr, promises);
} else {
for (auto& batch : batches) {
++numBatches_;
queue_->enqueueLocked(std::move(batch), promises);
}
}
}
}
}

for (auto& promise : promises) {
promise.setValue();
}
return folly::makeFuture(Response{totalBytes, atEnd_});
})
for (auto& promise : promises) {
promise.setValue();
}
return folly::makeFuture(Response{totalBytes, atEnd_});
})
.deferError(
[](folly::exception_wrapper e) mutable
-> ShuffleExchangeSource::Response {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,32 +14,11 @@
#pragma once

#include "presto_cpp/main/operators/ShuffleInterface.h"
#include "presto_cpp/main/operators/ShuffleWrite.h"
#include "velox/core/PlanNode.h"
#include "velox/exec/Exchange.h"
#include "velox/exec/Operator.h"

namespace facebook::presto::operators {

class ShuffleRowBatch : public velox::exec::SerializedPage {
public:
explicit ShuffleRowBatch(
std::unique_ptr<ReadBatch> rowBatch)
: velox::exec::
SerializedPage{folly::IOBuf::wrapBuffer(
rowBatch->data->as<char>(), rowBatch->data->size()), nullptr, rowBatch->rows.size()},
rowBatch_{std::move(rowBatch)} {}

~ShuffleRowBatch() override {}

const std::vector<std::string_view>& rows() const {
return rowBatch_->rows;
}

private:
const std::unique_ptr<ReadBatch> rowBatch_;
};

class ShuffleExchangeSource : public velox::exec::ExchangeSource {
public:
ShuffleExchangeSource(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,13 @@
#pragma once

#include <fmt/format.h>
#include "velox/exec/Exchange.h"
#include "velox/exec/Operator.h"

namespace facebook::velox {
class ByteInputStream;
}

namespace facebook::presto::operators {

class ShuffleWriter {
Expand Down Expand Up @@ -44,12 +49,21 @@ class ShuffleWriter {
}
};

struct ReadBatch {
std::vector<std::string_view> rows;
velox::BufferPtr data;
class ShuffleSerializedPage : public velox::exec::SerializedPageBase {
public:
ShuffleSerializedPage() = default;
~ShuffleSerializedPage() override = default;

std::unique_ptr<velox::ByteInputStream> prepareStreamForDeserialize()
override {
VELOX_UNSUPPORTED();
}

std::unique_ptr<folly::IOBuf> getIOBuf() const override {
VELOX_UNSUPPORTED();
}

ReadBatch(std::vector<std::string_view>&& _rows, velox::BufferPtr&& _data)
: rows{std::move(_rows)}, data{std::move(_data)} {}
virtual const std::vector<std::string_view>& rows() = 0;
};

class ShuffleReader {
Expand All @@ -58,10 +72,11 @@ class ShuffleReader {

/// Fetch the next batch of rows from the shuffle reader.
/// @param bytes Maximum number of bytes to read in this batch.
/// @return A semi-future resolving to a vector of ReadBatch pointers, where
/// each ReadBatch contains rows and associated data buffers.
virtual folly::SemiFuture<std::vector<std::unique_ptr<ReadBatch>>> next(
uint64_t maxBytes) = 0;
/// @return A semi-future resolving to a vector of ShuffleSerializedPage
/// pointers, where each ShuffleSerializedPage contains rows and associated
/// data buffers.
virtual folly::SemiFuture<std::vector<std::unique_ptr<ShuffleSerializedPage>>>
next(uint64_t maxBytes) = 0;

/// Tell the shuffle system the reader is done. May be called with 'success'
/// true before reading all the data. This happens when a query has a LIMIT or
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ RowVectorPtr ShuffleRead::getOutput() {
VELOX_CHECK_EQ(nextRow_, 0);
size_t numRows{0};
for (const auto& page : currentPages_) {
auto* batch = checked_pointer_cast<ShuffleSerializedPage>(page.get());
VELOX_CHECK_LE(batch->size(), std::numeric_limits<int32_t>::max());
rawInputBytes += page->size();
const auto pageRows = page->numRows().value();
pageRows_.emplace_back(
Expand All @@ -83,7 +85,7 @@ RowVectorPtr ShuffleRead::getOutput() {
}
rows_.reserve(numRows);
for (const auto& page : currentPages_) {
auto* batch = checked_pointer_cast<ShuffleRowBatch>(page.get());
auto* batch = checked_pointer_cast<ShuffleSerializedPage>(page.get());
const auto& rows = batch->rows();
for (const auto& row : rows) {
rows_.emplace_back(row);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,7 @@ class ShuffleTest : public exec::test::OperatorTestBase {
break;
}
for (auto& batch : batches) {
totalRows += batch->rows.size();
totalRows += batch->rows().size();
}
}
}
Expand Down Expand Up @@ -1643,7 +1643,7 @@ TEST_F(ShuffleTest, shuffleWriterReader) {

++numOutputCalls;
for (const auto& batch : batches) {
for (const auto& row : batch->rows) {
for (const auto& row : batch->rows()) {
readDataValues.emplace_back(row.data(), row.size());
++totalRows;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ class Producer {
bool receivedDeleteResults_ = false;
};

std::string toString(exec::SerializedPage* page) {
std::string toString(exec::SerializedPageBase* page) {
auto input = page->prepareStreamForDeserialize();

auto numBytes = input->read<int32_t>();
Expand All @@ -421,7 +421,7 @@ std::string toString(exec::SerializedPage* page) {
return std::string(data);
}

std::unique_ptr<exec::SerializedPage> waitForNextPage(
std::unique_ptr<exec::SerializedPageBase> waitForNextPage(
const std::shared_ptr<exec::ExchangeQueue>& queue) {
bool atEnd;
facebook::velox::ContinueFuture future;
Expand Down
2 changes: 1 addition & 1 deletion presto-native-execution/velox
Submodule velox updated 126 files
Loading