Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 22 additions & 55 deletions presto-native-execution/presto_cpp/main/operators/ShuffleRead.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
#include "presto_cpp/main/operators/ShuffleRead.h"
#include "presto_cpp/main/operators/ShuffleExchangeSource.h"
#include "velox/common/Casts.h"
#include "velox/exec/Exchange.h"
#include "velox/row/CompactRow.h"

using namespace facebook::velox::exec;
Expand All @@ -25,61 +24,30 @@ velox::core::PlanNodeId deserializePlanNodeId(const folly::dynamic& obj) {
return obj["id"].asString();
}

namespace {
class ShuffleRead : public Exchange {
public:
ShuffleRead(
int32_t operatorId,
DriverCtx* ctx,
const std::shared_ptr<const ShuffleReadNode>& shuffleReadNode,
std::shared_ptr<ExchangeClient> exchangeClient)
: Exchange(
operatorId,
ctx,
std::make_shared<core::ExchangeNode>(
shuffleReadNode->id(),
shuffleReadNode->outputType(),
VectorSerde::Kind::kCompactRow),
exchangeClient,
"ShuffleRead") {
initStats();
}

RowVectorPtr getOutput() override;

void close() override;

protected:
VectorSerde* getSerde() override {
VELOX_UNSUPPORTED("ShuffleReadOperator doesn't use serde");
}

private:
static inline const std::string kShuffleDecodeTime{"shuffleDecodeWallNanos"};
static inline const std::string kShuffleNumBatchesPerRead{
"shuffleNumBatchesPerRead"};
static inline const std::string kShuffleNumBatches{"shuffleNumBatches"};

void initStats();

void resetOutputState();

int64_t numBatches_{0};
std::unordered_map<std::string, velox::RuntimeMetric> runtimeStats_;

size_t nextRow_{0};
size_t nextPage_{0};
// Reusable buffers.
std::vector<std::string_view> rows_;
std::vector<size_t> pageRows_;
};
ShuffleRead::ShuffleRead(
int32_t operatorId,
DriverCtx* ctx,
const std::shared_ptr<const ShuffleReadNode>& shuffleReadNode,
std::shared_ptr<ExchangeClient> exchangeClient)
: Exchange(
operatorId,
ctx,
std::make_shared<core::ExchangeNode>(
shuffleReadNode->id(),
shuffleReadNode->outputType(),
VectorSerde::Kind::kCompactRow),
exchangeClient,
"ShuffleRead") {
initStats();
}

void ShuffleRead::initStats() {
VELOX_CHECK(runtimeStats_.empty());
runtimeStats_.insert(
std::pair{kShuffleDecodeTime, velox::RuntimeCounter::Unit::kNanos});
runtimeStats_.insert(
std::pair{kShuffleNumBatchesPerRead, velox::RuntimeCounter::Unit::kNone});
std::pair{
kShufflePagesPerInputBatch, velox::RuntimeCounter::Unit::kNone});
}

void ShuffleRead::resetOutputState() {
Expand Down Expand Up @@ -122,8 +90,8 @@ RowVectorPtr ShuffleRead::getOutput() {
}
}
if (!currentPages_.empty()) {
runtimeStats_[kShuffleNumBatchesPerRead].addValue(currentPages_.size());
numBatches_ += currentPages_.size();
runtimeStats_[kShufflePagesPerInputBatch].addValue(currentPages_.size());
++numInputBatches_;
}
}
VELOX_CHECK_LE(nextRow_, rows_.size());
Expand Down Expand Up @@ -177,12 +145,11 @@ void ShuffleRead::close() {
}
lockedStats->runtimeStats[name] = metric;
}
if (numBatches_ != 0) {
if (numInputBatches_ != 0) {
lockedStats->addRuntimeStat(
kShuffleNumBatches, RuntimeCounter(numBatches_));
kShuffleInputBatches, RuntimeCounter(numInputBatches_));
}
}
} // namespace

folly::dynamic ShuffleReadNode::serialize() const {
auto obj = PlanNode::serialize();
Expand Down
38 changes: 38 additions & 0 deletions presto-native-execution/presto_cpp/main/operators/ShuffleRead.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#pragma once

#include "velox/core/PlanNode.h"
#include "velox/exec/Exchange.h"
#include "velox/exec/Operator.h"

namespace facebook::presto::operators {
Expand Down Expand Up @@ -90,6 +91,43 @@ class ShuffleReadNode : public velox::core::PlanNode {
const velox::RowTypePtr outputType_;
};

class ShuffleRead : public velox::exec::Exchange {
public:
ShuffleRead(
int32_t operatorId,
velox::exec::DriverCtx* ctx,
const std::shared_ptr<const ShuffleReadNode>& shuffleReadNode,
std::shared_ptr<velox::exec::ExchangeClient> exchangeClient);

velox::RowVectorPtr getOutput() override;

void close() override;

static inline const std::string kShuffleDecodeTime{"shuffleDecodeWallNanos"};
static inline const std::string kShufflePagesPerInputBatch{
"shuffleNumPagesPerInputBatch"};
static inline const std::string kShuffleInputBatches{"shuffleInputBatches"};

protected:
velox::VectorSerde* getSerde() override {
VELOX_UNSUPPORTED("ShuffleReadOperator doesn't use serde");
}

private:
void initStats();

void resetOutputState();

int64_t numInputBatches_{0};
std::unordered_map<std::string, velox::RuntimeMetric> runtimeStats_;

size_t nextRow_{0};
size_t nextPage_{0};
// Reusable buffers.
std::vector<std::string_view> rows_;
std::vector<size_t> pageRows_;
};

class ShuffleReadTranslator : public velox::exec::Operator::PlanNodeTranslator {
public:
std::unique_ptr<velox::exec::Operator> toOperator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1470,21 +1470,23 @@ TEST_F(ShuffleTest, shuffleReadRuntimeStats) {
taskCursor->task()->taskStats().pipelineStats[0].operatorStats[0];
const auto& runtimeStats = operatorStats.runtimeStats;

ASSERT_EQ(runtimeStats.count("shuffleDecodeWallNanos"), 1);
const auto& decodeTimeStat = runtimeStats.at("shuffleDecodeWallNanos");
ASSERT_EQ(runtimeStats.count(ShuffleRead::kShuffleDecodeTime), 1);
const auto& decodeTimeStat =
runtimeStats.at(ShuffleRead::kShuffleDecodeTime);
ASSERT_GT(decodeTimeStat.count, 0);
ASSERT_GT(decodeTimeStat.sum, 0);
ASSERT_EQ(velox::RuntimeCounter::Unit::kNanos, decodeTimeStat.unit);

ASSERT_EQ(runtimeStats.count("shuffleNumBatchesPerRead"), 1);
ASSERT_EQ(runtimeStats.count(ShuffleRead::kShufflePagesPerInputBatch), 1);
const auto& batchesPerReadStat =
runtimeStats.at("shuffleNumBatchesPerRead");
runtimeStats.at(ShuffleRead::kShufflePagesPerInputBatch);
ASSERT_EQ(velox::RuntimeCounter::Unit::kNone, batchesPerReadStat.unit);
ASSERT_GT(batchesPerReadStat.count, 0);
ASSERT_GT(batchesPerReadStat.sum, 0);

ASSERT_EQ(runtimeStats.count("shuffleNumBatches"), 1);
const auto& numBatchesStat = runtimeStats.at("shuffleNumBatches");
ASSERT_EQ(runtimeStats.count(ShuffleRead::kShuffleInputBatches), 1);
const auto& numBatchesStat =
runtimeStats.at(ShuffleRead::kShuffleInputBatches);
ASSERT_GT(numBatchesStat.count, 0);
ASSERT_GT(numBatchesStat.sum, 0);
ASSERT_EQ(velox::RuntimeCounter::Unit::kNone, numBatchesStat.unit);
Expand Down
Loading