diff --git a/presto-native-execution/presto_cpp/main/operators/ShuffleRead.cpp b/presto-native-execution/presto_cpp/main/operators/ShuffleRead.cpp index 9d049ec037442..bc85443dea742 100644 --- a/presto-native-execution/presto_cpp/main/operators/ShuffleRead.cpp +++ b/presto-native-execution/presto_cpp/main/operators/ShuffleRead.cpp @@ -14,7 +14,6 @@ #include "presto_cpp/main/operators/ShuffleRead.h" #include "presto_cpp/main/operators/ShuffleExchangeSource.h" #include "velox/common/Casts.h" -#include "velox/exec/Exchange.h" #include "velox/row/CompactRow.h" using namespace facebook::velox::exec; @@ -25,61 +24,30 @@ velox::core::PlanNodeId deserializePlanNodeId(const folly::dynamic& obj) { return obj["id"].asString(); } -namespace { -class ShuffleRead : public Exchange { - public: - ShuffleRead( - int32_t operatorId, - DriverCtx* ctx, - const std::shared_ptr& shuffleReadNode, - std::shared_ptr exchangeClient) - : Exchange( - operatorId, - ctx, - std::make_shared( - shuffleReadNode->id(), - shuffleReadNode->outputType(), - VectorSerde::Kind::kCompactRow), - exchangeClient, - "ShuffleRead") { - initStats(); - } - - RowVectorPtr getOutput() override; - - void close() override; - - protected: - VectorSerde* getSerde() override { - VELOX_UNSUPPORTED("ShuffleReadOperator doesn't use serde"); - } - - private: - static inline const std::string kShuffleDecodeTime{"shuffleDecodeWallNanos"}; - static inline const std::string kShuffleNumBatchesPerRead{ - "shuffleNumBatchesPerRead"}; - static inline const std::string kShuffleNumBatches{"shuffleNumBatches"}; - - void initStats(); - - void resetOutputState(); - - int64_t numBatches_{0}; - std::unordered_map runtimeStats_; - - size_t nextRow_{0}; - size_t nextPage_{0}; - // Reusable buffers. - std::vector rows_; - std::vector pageRows_; -}; +ShuffleRead::ShuffleRead( + int32_t operatorId, + DriverCtx* ctx, + const std::shared_ptr& shuffleReadNode, + std::shared_ptr exchangeClient) + : Exchange( + operatorId, + ctx, + std::make_shared( + shuffleReadNode->id(), + shuffleReadNode->outputType(), + VectorSerde::Kind::kCompactRow), + exchangeClient, + "ShuffleRead") { + initStats(); +} void ShuffleRead::initStats() { VELOX_CHECK(runtimeStats_.empty()); runtimeStats_.insert( std::pair{kShuffleDecodeTime, velox::RuntimeCounter::Unit::kNanos}); runtimeStats_.insert( - std::pair{kShuffleNumBatchesPerRead, velox::RuntimeCounter::Unit::kNone}); + std::pair{ + kShufflePagesPerInputBatch, velox::RuntimeCounter::Unit::kNone}); } void ShuffleRead::resetOutputState() { @@ -122,8 +90,8 @@ RowVectorPtr ShuffleRead::getOutput() { } } if (!currentPages_.empty()) { - runtimeStats_[kShuffleNumBatchesPerRead].addValue(currentPages_.size()); - numBatches_ += currentPages_.size(); + runtimeStats_[kShufflePagesPerInputBatch].addValue(currentPages_.size()); + ++numInputBatches_; } } VELOX_CHECK_LE(nextRow_, rows_.size()); @@ -177,12 +145,11 @@ void ShuffleRead::close() { } lockedStats->runtimeStats[name] = metric; } - if (numBatches_ != 0) { + if (numInputBatches_ != 0) { lockedStats->addRuntimeStat( - kShuffleNumBatches, RuntimeCounter(numBatches_)); + kShuffleInputBatches, RuntimeCounter(numInputBatches_)); } } -} // namespace folly::dynamic ShuffleReadNode::serialize() const { auto obj = PlanNode::serialize(); diff --git a/presto-native-execution/presto_cpp/main/operators/ShuffleRead.h b/presto-native-execution/presto_cpp/main/operators/ShuffleRead.h index 33a410151eb60..fbda62cf2e0e1 100644 --- a/presto-native-execution/presto_cpp/main/operators/ShuffleRead.h +++ b/presto-native-execution/presto_cpp/main/operators/ShuffleRead.h @@ -14,6 +14,7 @@ #pragma once #include "velox/core/PlanNode.h" +#include "velox/exec/Exchange.h" #include "velox/exec/Operator.h" namespace facebook::presto::operators { @@ -90,6 +91,43 @@ class ShuffleReadNode : public velox::core::PlanNode { const velox::RowTypePtr outputType_; }; +class ShuffleRead : public velox::exec::Exchange { + public: + ShuffleRead( + int32_t operatorId, + velox::exec::DriverCtx* ctx, + const std::shared_ptr& shuffleReadNode, + std::shared_ptr exchangeClient); + + velox::RowVectorPtr getOutput() override; + + void close() override; + + static inline const std::string kShuffleDecodeTime{"shuffleDecodeWallNanos"}; + static inline const std::string kShufflePagesPerInputBatch{ + "shuffleNumPagesPerInputBatch"}; + static inline const std::string kShuffleInputBatches{"shuffleInputBatches"}; + + protected: + velox::VectorSerde* getSerde() override { + VELOX_UNSUPPORTED("ShuffleReadOperator doesn't use serde"); + } + + private: + void initStats(); + + void resetOutputState(); + + int64_t numInputBatches_{0}; + std::unordered_map runtimeStats_; + + size_t nextRow_{0}; + size_t nextPage_{0}; + // Reusable buffers. + std::vector rows_; + std::vector pageRows_; +}; + class ShuffleReadTranslator : public velox::exec::Operator::PlanNodeTranslator { public: std::unique_ptr toOperator( diff --git a/presto-native-execution/presto_cpp/main/operators/tests/ShuffleTest.cpp b/presto-native-execution/presto_cpp/main/operators/tests/ShuffleTest.cpp index b04b940ec6c3e..46b6bb70230bb 100644 --- a/presto-native-execution/presto_cpp/main/operators/tests/ShuffleTest.cpp +++ b/presto-native-execution/presto_cpp/main/operators/tests/ShuffleTest.cpp @@ -1470,21 +1470,23 @@ TEST_F(ShuffleTest, shuffleReadRuntimeStats) { taskCursor->task()->taskStats().pipelineStats[0].operatorStats[0]; const auto& runtimeStats = operatorStats.runtimeStats; - ASSERT_EQ(runtimeStats.count("shuffleDecodeWallNanos"), 1); - const auto& decodeTimeStat = runtimeStats.at("shuffleDecodeWallNanos"); + ASSERT_EQ(runtimeStats.count(ShuffleRead::kShuffleDecodeTime), 1); + const auto& decodeTimeStat = + runtimeStats.at(ShuffleRead::kShuffleDecodeTime); ASSERT_GT(decodeTimeStat.count, 0); ASSERT_GT(decodeTimeStat.sum, 0); ASSERT_EQ(velox::RuntimeCounter::Unit::kNanos, decodeTimeStat.unit); - ASSERT_EQ(runtimeStats.count("shuffleNumBatchesPerRead"), 1); + ASSERT_EQ(runtimeStats.count(ShuffleRead::kShufflePagesPerInputBatch), 1); const auto& batchesPerReadStat = - runtimeStats.at("shuffleNumBatchesPerRead"); + runtimeStats.at(ShuffleRead::kShufflePagesPerInputBatch); ASSERT_EQ(velox::RuntimeCounter::Unit::kNone, batchesPerReadStat.unit); ASSERT_GT(batchesPerReadStat.count, 0); ASSERT_GT(batchesPerReadStat.sum, 0); - ASSERT_EQ(runtimeStats.count("shuffleNumBatches"), 1); - const auto& numBatchesStat = runtimeStats.at("shuffleNumBatches"); + ASSERT_EQ(runtimeStats.count(ShuffleRead::kShuffleInputBatches), 1); + const auto& numBatchesStat = + runtimeStats.at(ShuffleRead::kShuffleInputBatches); ASSERT_GT(numBatchesStat.count, 0); ASSERT_GT(numBatchesStat.sum, 0); ASSERT_EQ(velox::RuntimeCounter::Unit::kNone, numBatchesStat.unit);