diff --git a/presto-native-execution/presto_cpp/main/PrestoTask.cpp b/presto-native-execution/presto_cpp/main/PrestoTask.cpp index afdf9f4f82dc5..7e616f5266938 100644 --- a/presto-native-execution/presto_cpp/main/PrestoTask.cpp +++ b/presto-native-execution/presto_cpp/main/PrestoTask.cpp @@ -45,6 +45,29 @@ std::string prestoTaskStateString(PrestoTaskState state) { namespace { +// Splits operator stats for operators that represent multiple plan nodes in +// the Presto plan. Currently only IndexLookupJoin needs expansion because it +// embeds IndexSource as a separate logical plan node. FilterProject is +// intentionally not expanded here because Presto's PlanPrinter displays it as +// a single combined node. +std::vector splitOperatorStats( + const std::vector& operatorStats) { + std::vector expanded; + expanded.reserve(operatorStats.size()); + for (const auto& opStats : operatorStats) { + if (opStats.operatorType == "IndexLookupJoin" && + opStats.statsSplitter.has_value()) { + auto splitStats = opStats.statsSplitter.value()(opStats); + for (auto& s : splitStats) { + expanded.push_back(std::move(s)); + } + } else { + expanded.push_back(opStats); + } + } + return expanded; +} + #define TASK_STATS_SUM(taskStats, statsName, taskStatusSum) \ do { \ for (int i = 0; i < taskStats.pipelineStats.size(); ++i) { \ @@ -356,8 +379,13 @@ void updatePipelineStats( prestoPipelineStats.lastStartTimeInMillis = prestoTaskStats.endTimeInMillis; prestoPipelineStats.lastEndTimeInMillis = prestoTaskStats.endTimeInMillis; - prestoPipelineStats.operatorSummaries.resize( - veloxPipelineStats.operatorStats.size()); + // Split operator stats for operators that represent multiple plan nodes + // in the Presto plan (e.g., IndexLookupJoin -> IndexLookupJoin + + // IndexSource). + const auto expandedOperatorStats = + splitOperatorStats(veloxPipelineStats.operatorStats); + + prestoPipelineStats.operatorSummaries.resize(expandedOperatorStats.size()); prestoPipelineStats.totalScheduledTimeInNanos = {}; prestoPipelineStats.totalCpuTimeInNanos = {}; prestoPipelineStats.totalBlockedTimeInNanos = {}; @@ -367,9 +395,9 @@ void updatePipelineStats( // tasks may fail before any operators are created; // collect stats only when we have operators - if (!veloxPipelineStats.operatorStats.empty()) { - const auto& firstVeloxOpStats = veloxPipelineStats.operatorStats[0]; - const auto& lastVeloxOpStats = veloxPipelineStats.operatorStats.back(); + if (!expandedOperatorStats.empty()) { + const auto& firstVeloxOpStats = expandedOperatorStats[0]; + const auto& lastVeloxOpStats = expandedOperatorStats.back(); prestoPipelineStats.pipelineId = firstVeloxOpStats.pipelineId; prestoPipelineStats.totalDrivers = firstVeloxOpStats.numDrivers; @@ -384,9 +412,9 @@ void updatePipelineStats( prestoPipelineStats.outputDataSizeInBytes = lastVeloxOpStats.outputBytes; } - for (auto j = 0; j < veloxPipelineStats.operatorStats.size(); ++j) { + for (auto j = 0; j < expandedOperatorStats.size(); ++j) { auto& prestoOp = prestoPipelineStats.operatorSummaries[j]; - auto& veloxOp = veloxPipelineStats.operatorStats[j]; + const auto& veloxOp = expandedOperatorStats[j]; prestoOp.stageId = taskId.stageId(); prestoOp.stageExecutionId = taskId.stageExecutionId();