Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@
#include "presto_cpp/main/operators/BroadcastWrite.h"
#include "presto_cpp/main/operators/LocalPersistentShuffle.h"
#include "presto_cpp/main/operators/PartitionAndSerialize.h"
#include "presto_cpp/main/operators/ShuffleExchangeSource.h"
#include "presto_cpp/main/operators/ShuffleRead.h"
#include "presto_cpp/main/operators/CompactRowExchangeSource.h"
#include "presto_cpp/main/types/PrestoToVeloxQueryPlan.h"
#include "presto_cpp/main/types/VeloxPlanConversion.h"
#include "velox/common/base/Counters.h"
Expand Down Expand Up @@ -451,7 +451,7 @@ void PrestoServer::run() {
});

velox::exec::ExchangeSource::registerFactory(
operators::CompactRowExchangeSource::createExchangeSource);
operators::ShuffleExchangeSource::createExchangeSource);

// Batch broadcast exchange source.
velox::exec::ExchangeSource::registerFactory(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
add_library(
presto_operators
PartitionAndSerialize.cpp
ShuffleExchangeSource.cpp
ShuffleRead.cpp
ShuffleWrite.cpp
CompactRowExchangeSource
LocalPersistentShuffle.cpp
BroadcastWrite.cpp
BroadcastFactory.cpp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#include <folly/Uri.h>

#include "presto_cpp/main/common/Configs.h"
#include "presto_cpp/main/operators/CompactRowExchangeSource.h"
#include "presto_cpp/main/operators/ShuffleExchangeSource.h"
#include "velox/serializers/RowSerializer.h"

namespace facebook::presto::operators {
Expand All @@ -29,8 +29,8 @@ namespace facebook::presto::operators {
VELOX_FAIL("ShuffleReader::{} failed: {}", methodName, e.what()); \
}

folly::SemiFuture<CompactRowExchangeSource::Response>
CompactRowExchangeSource::request(
folly::SemiFuture<ShuffleExchangeSource::Response>
ShuffleExchangeSource::request(
uint32_t /*maxBytes*/,
std::chrono::microseconds /*maxWait*/) {
auto nextBatch = [this]() {
Expand All @@ -48,8 +48,7 @@ CompactRowExchangeSource::request(
VELOX_CHECK_LE(totalBytes, std::numeric_limits<int32_t>::max());
++numBatches_;
queue_->enqueueLocked(
std::make_unique<CompactRowBatch>(
std::move(batch)),
std::make_unique<ShuffleRowBatch>(std::move(batch)),
promises);
}
}
Expand All @@ -61,17 +60,16 @@ CompactRowExchangeSource::request(
})
.deferError(
[](folly::exception_wrapper e) mutable
-> CompactRowExchangeSource::Response {
-> ShuffleExchangeSource::Response {
VELOX_FAIL("ShuffleReader::{} failed: {}", "next", e.what());
});
};

CALL_SHUFFLE(return nextBatch(), "next");
}

folly::SemiFuture<CompactRowExchangeSource::Response>
CompactRowExchangeSource::requestDataSizes(
std::chrono::microseconds /*maxWait*/) {
folly::SemiFuture<ShuffleExchangeSource::Response>
ShuffleExchangeSource::requestDataSizes(std::chrono::microseconds /*maxWait*/) {
std::vector<int64_t> remainingBytes;
if (!atEnd_) {
// Use default value of ExchangeClient::getAveragePageSize() for now.
Expand All @@ -82,7 +80,7 @@ CompactRowExchangeSource::requestDataSizes(
return folly::makeSemiFuture(Response{0, atEnd_, std::move(remainingBytes)});
}

folly::F14FastMap<std::string, int64_t> CompactRowExchangeSource::stats() const {
folly::F14FastMap<std::string, int64_t> ShuffleExchangeSource::stats() const {
return shuffleReader_->stats();
}

Expand All @@ -101,7 +99,7 @@ std::optional<std::string> getSerializedShuffleInfo(folly::Uri& uri) {

// static
std::shared_ptr<velox::exec::ExchangeSource>
CompactRowExchangeSource::createExchangeSource(
ShuffleExchangeSource::createExchangeSource(
const std::string& url,
int32_t destination,
const std::shared_ptr<velox::exec::ExchangeQueue>& queue,
Expand All @@ -123,7 +121,7 @@ CompactRowExchangeSource::createExchangeSource(
"shuffle.name is not provided in config.properties to create a shuffle "
"interface.");
auto shuffleFactory = ShuffleInterfaceFactory::factory(shuffleName);
return std::make_shared<CompactRowExchangeSource>(
return std::make_shared<ShuffleExchangeSource>(
uri.host(),
destination,
queue,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@

namespace facebook::presto::operators {

class CompactRowBatch : public velox::exec::SerializedPage {
class ShuffleRowBatch : public velox::exec::SerializedPage {
public:
explicit CompactRowBatch(
explicit ShuffleRowBatch(
std::unique_ptr<ReadBatch> rowBatch)
: velox::exec::
SerializedPage{folly::IOBuf::wrapBuffer(
rowBatch->data->as<char>(), rowBatch->data->size()), nullptr, rowBatch->rows.size()},
rowBatch_{std::move(rowBatch)} {}

~CompactRowBatch() override {}
~ShuffleRowBatch() override {}

const std::vector<std::string_view>& rows() const {
return rowBatch_->rows;
Expand All @@ -40,9 +40,9 @@ class CompactRowBatch : public velox::exec::SerializedPage {
const std::unique_ptr<ReadBatch> rowBatch_;
};

class CompactRowExchangeSource : public velox::exec::ExchangeSource {
class ShuffleExchangeSource : public velox::exec::ExchangeSource {
public:
CompactRowExchangeSource(
ShuffleExchangeSource(
const std::string& taskId,
int destination,
const std::shared_ptr<velox::exec::ExchangeQueue>& queue,
Expand Down
148 changes: 73 additions & 75 deletions presto-native-execution/presto_cpp/main/operators/ShuffleRead.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,88 +12,23 @@
* limitations under the License.
*/
#include "presto_cpp/main/operators/ShuffleRead.h"
#include "presto_cpp/main/operators/ShuffleExchangeSource.h"
#include "velox/common/Casts.h"
#include "velox/exec/Exchange.h"
#include "velox/row/CompactRow.h"
#include "velox/serializers/CompactRowSerializer.h"
#include "velox/serializers/RowSerializer.h"

using namespace facebook::velox::exec;
using namespace facebook::velox;
using facebook::velox::serializer::RowIteratorImpl;

namespace facebook::presto::operators {
velox::core::PlanNodeId deserializePlanNodeId(const folly::dynamic& obj) {
return obj["id"].asString();
}

namespace {
std::unique_ptr<RowIterator> shuffleRowIteratorFactory(
ByteInputStream* source,
const VectorSerde::Options* /*unused*/) {
return std::make_unique<RowIteratorImpl>(source, source->size());
}

class ShuffleVectorSerde : public VectorSerde {
class ShuffleRead : public Exchange {
public:
ShuffleVectorSerde() : VectorSerde(VectorSerde::Kind::kCompactRow) {}

void estimateSerializedSize(
const BaseVector* /* vector */,
const folly::Range<const IndexRange*>& /* ranges */,
vector_size_t** /* sizes */) override {
// Not used.
VELOX_UNREACHABLE();
}

std::unique_ptr<IterativeVectorSerializer> createIterativeSerializer(
RowTypePtr /* type */,
int32_t /* numRows */,
StreamArena* /* streamArena */,
const Options* /* options */) override {
// Not used.
VELOX_UNREACHABLE();
}

void deserialize(
ByteInputStream* source,
velox::memory::MemoryPool* pool,
RowTypePtr type,
RowVectorPtr* result,
const Options* /* options */) override {
VELOX_UNSUPPORTED("ShuffleVectorSerde::deserialize is not supported");
}

void deserialize(
ByteInputStream* source,
std::unique_ptr<RowIterator>& sourceRowIterator,
uint64_t maxRows,
RowTypePtr type,
RowVectorPtr* result,
velox::memory::MemoryPool* pool,
const Options* options) override {
std::vector<std::string_view> serializedRows;
std::vector<std::unique_ptr<std::string>> serializedBuffers;
velox::serializer::RowDeserializer<std::string_view>::deserialize(
source,
maxRows,
sourceRowIterator,
serializedRows,
serializedBuffers,
shuffleRowIteratorFactory,
options);

if (serializedRows.empty()) {
*result = BaseVector::create<RowVector>(type, 0, pool);
return;
}

*result = row::CompactRow::deserialize(serializedRows, type, pool);
}
};

class ShuffleReadOperator : public Exchange {
public:
ShuffleReadOperator(
ShuffleRead(
int32_t operatorId,
DriverCtx* ctx,
const std::shared_ptr<const ShuffleReadNode>& shuffleReadNode,
Expand All @@ -104,19 +39,82 @@ class ShuffleReadOperator : public Exchange {
std::make_shared<core::ExchangeNode>(
shuffleReadNode->id(),
shuffleReadNode->outputType(),
velox::VectorSerde::Kind::kCompactRow),
VectorSerde::Kind::kCompactRow),
exchangeClient,
"ShuffleRead"),
serde_(std::make_unique<ShuffleVectorSerde>()) {}
"ShuffleRead") {}

RowVectorPtr getOutput() override;

protected:
VectorSerde* getSerde() override {
return serde_.get();
VELOX_UNSUPPORTED("ShuffleReadOperator doesn't use serde");
}

private:
std::unique_ptr<ShuffleVectorSerde> serde_;
size_t nextRow_{0};
// Reusable buffers.
std::vector<std::string_view> rows_;
};

RowVectorPtr ShuffleRead::getOutput() {
if (currentPages_.empty()) {
return nullptr;
}

SCOPE_EXIT {
if (nextRow_ == rows_.size()) {
currentPages_.clear();
rows_.clear();
nextRow_ = 0;
}
};

uint64_t rawInputBytes{0};
if (rows_.empty()) {
VELOX_CHECK_EQ(nextRow_, 0);
size_t numRows{0};
for (const auto& page : currentPages_) {
rawInputBytes += page->size();
numRows += page->numRows().value();
}
rows_.reserve(numRows);
for (const auto& page : currentPages_) {
auto* batch = checked_pointer_cast<ShuffleRowBatch>(page.get());
const auto& rows = batch->rows();
for (const auto& row : rows) {
rows_.emplace_back(row);
}
}
}
VELOX_CHECK_LE(nextRow_, rows_.size());
if (rows_.empty()) {
return nullptr;
}

auto numOutputRows = kInitialOutputRows;
if (estimatedRowSize_.has_value()) {
numOutputRows = std::max(
(preferredOutputBatchBytes_ / estimatedRowSize_.value()),
kInitialOutputRows);
}
numOutputRows = std::min(numOutputRows, rows_.size() - nextRow_);

// Create a view of the rows to deserialize from nextRow_ to nextRow_ +
// numOutputRows.
if (numOutputRows == rows_.size()) {
result_ = row::CompactRow::deserialize(rows_, outputType_, pool());
} else {
std::vector<std::string_view> outputRows(
rows_.begin() + nextRow_, rows_.begin() + nextRow_ + numOutputRows);
result_ = row::CompactRow::deserialize(outputRows, outputType_, pool());
}
nextRow_ += numOutputRows;
estimatedRowSize_ = std::max(
result_->estimateFlatSize() / numOutputRows,
estimatedRowSize_.value_or(1L));
recordInputStats(rawInputBytes);
return result_;
}
} // namespace

folly::dynamic ShuffleReadNode::serialize() const {
Expand All @@ -140,7 +138,7 @@ std::unique_ptr<Operator> ShuffleReadTranslator::toOperator(
std::shared_ptr<ExchangeClient> exchangeClient) {
if (auto shuffleReadNode =
std::dynamic_pointer_cast<const ShuffleReadNode>(node)) {
return std::make_unique<ShuffleReadOperator>(
return std::make_unique<ShuffleRead>(
id, ctx, shuffleReadNode, exchangeClient);
}
return nullptr;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ velox::core::PlanNodeId deserializePlanNodeId(const folly::dynamic& obj) {
VELOX_FAIL("ShuffleWriter::{} failed: {}", methodName, e.what()); \
}

class ShuffleWriteOperator : public Operator {
class ShuffleWrite : public Operator {
public:
ShuffleWriteOperator(
ShuffleWrite(
int32_t operatorId,
DriverCtx* FOLLY_NONNULL ctx,
const std::shared_ptr<const ShuffleWriteNode>& planNode)
Expand Down Expand Up @@ -73,9 +73,10 @@ class ShuffleWriteOperator : public Operator {
constexpr int kReplicateNullsAndAny = 3;

checkCreateShuffleWriter();
auto partitions = input->childAt(kPartition)->as<SimpleVector<int32_t>>();
auto serializedKeys = input->childAt(kKey)->as<SimpleVector<StringView>>();
auto serializedData = input->childAt(kData)->as<SimpleVector<StringView>>();
auto* partitions = input->childAt(kPartition)->as<SimpleVector<int32_t>>();
auto* serializedKeys = input->childAt(kKey)->as<SimpleVector<StringView>>();
auto* serializedData =
input->childAt(kData)->as<SimpleVector<StringView>>();
SimpleVector<bool>* replicate = nullptr;
if (input->type()->size() == 4) {
replicate =
Expand Down Expand Up @@ -194,7 +195,7 @@ std::unique_ptr<Operator> ShuffleWriteTranslator::toOperator(
const core::PlanNodePtr& node) {
if (auto shuffleWriteNode =
std::dynamic_pointer_cast<const ShuffleWriteNode>(node)) {
return std::make_unique<ShuffleWriteOperator>(id, ctx, shuffleWriteNode);
return std::make_unique<ShuffleWrite>(id, ctx, shuffleWriteNode);
}
return nullptr;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ target_link_libraries(presto_operators_plan_builder velox_core)

add_executable(
presto_operators_test
PlanNodeSerdeTest.cpp CompactRowShuffleTest.cpp BroadcastTest.cpp
PlanNodeSerdeTest.cpp ShuffleTest.cpp BroadcastTest.cpp
BinarySortableSerializerTest.cpp PlanNodeBuilderTest.cpp)

add_test(presto_operators_test presto_operators_test)
Expand Down
Loading
Loading