Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 35 additions & 7 deletions presto-native-execution/presto_cpp/main/PrestoTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,29 @@ std::string prestoTaskStateString(PrestoTaskState state) {

namespace {

// Splits operator stats for operators that represent multiple plan nodes in
// the Presto plan. Currently only IndexLookupJoin needs expansion because it
// embeds IndexSource as a separate logical plan node. FilterProject is
// intentionally not expanded here because Presto's PlanPrinter displays it as
// a single combined node.
std::vector<exec::OperatorStats> splitOperatorStats(
const std::vector<exec::OperatorStats>& operatorStats) {
std::vector<exec::OperatorStats> expanded;
expanded.reserve(operatorStats.size());
for (const auto& opStats : operatorStats) {
if (opStats.operatorType == "IndexLookupJoin" &&
opStats.statsSplitter.has_value()) {
auto splitStats = opStats.statsSplitter.value()(opStats);
for (auto& s : splitStats) {
expanded.push_back(std::move(s));
}
} else {
expanded.push_back(opStats);
}
}
return expanded;
}

#define TASK_STATS_SUM(taskStats, statsName, taskStatusSum) \
do { \
for (int i = 0; i < taskStats.pipelineStats.size(); ++i) { \
Expand Down Expand Up @@ -356,8 +379,13 @@ void updatePipelineStats(
prestoPipelineStats.lastStartTimeInMillis = prestoTaskStats.endTimeInMillis;
prestoPipelineStats.lastEndTimeInMillis = prestoTaskStats.endTimeInMillis;

prestoPipelineStats.operatorSummaries.resize(
veloxPipelineStats.operatorStats.size());
// Split operator stats for operators that represent multiple plan nodes
// in the Presto plan (e.g., IndexLookupJoin -> IndexLookupJoin +
// IndexSource).
const auto expandedOperatorStats =
splitOperatorStats(veloxPipelineStats.operatorStats);

prestoPipelineStats.operatorSummaries.resize(expandedOperatorStats.size());
prestoPipelineStats.totalScheduledTimeInNanos = {};
prestoPipelineStats.totalCpuTimeInNanos = {};
prestoPipelineStats.totalBlockedTimeInNanos = {};
Expand All @@ -367,9 +395,9 @@ void updatePipelineStats(

// tasks may fail before any operators are created;
// collect stats only when we have operators
if (!veloxPipelineStats.operatorStats.empty()) {
const auto& firstVeloxOpStats = veloxPipelineStats.operatorStats[0];
const auto& lastVeloxOpStats = veloxPipelineStats.operatorStats.back();
if (!expandedOperatorStats.empty()) {
const auto& firstVeloxOpStats = expandedOperatorStats[0];
const auto& lastVeloxOpStats = expandedOperatorStats.back();

prestoPipelineStats.pipelineId = firstVeloxOpStats.pipelineId;
prestoPipelineStats.totalDrivers = firstVeloxOpStats.numDrivers;
Expand All @@ -384,9 +412,9 @@ void updatePipelineStats(
prestoPipelineStats.outputDataSizeInBytes = lastVeloxOpStats.outputBytes;
}

for (auto j = 0; j < veloxPipelineStats.operatorStats.size(); ++j) {
for (auto j = 0; j < expandedOperatorStats.size(); ++j) {
auto& prestoOp = prestoPipelineStats.operatorSummaries[j];
auto& veloxOp = veloxPipelineStats.operatorStats[j];
const auto& veloxOp = expandedOperatorStats[j];

prestoOp.stageId = taskId.stageId();
prestoOp.stageExecutionId = taskId.stageExecutionId();
Expand Down
Loading