diff --git a/presto-native-execution/presto_cpp/main/PrestoTask.cpp b/presto-native-execution/presto_cpp/main/PrestoTask.cpp index 7dd07550193bd..3504d924beee3 100644 --- a/presto-native-execution/presto_cpp/main/PrestoTask.cpp +++ b/presto-native-execution/presto_cpp/main/PrestoTask.cpp @@ -142,30 +142,40 @@ static void addRuntimeMetricIfNotZero( } } +// Utility to generate presto runtime stat name when translating velox runtime +// stats over to presto. +std::string generateRuntimeStatName( + const exec::OperatorStats& veloxOperatorStats, + const std::string& statName) { + return fmt::format( + "{}.{}.{}", + veloxOperatorStats.operatorType, + veloxOperatorStats.planNodeId, + statName); +} + // Add 'spilling' metrics from Velox operator stats to Presto operator stats. static void addSpillingOperatorMetrics( protocol::OperatorStats& opOut, protocol::TaskStats& prestoTaskStats, const exec::OperatorStats& op) { - std::string statName = - fmt::format("{}.{}.spilledBytes", op.operatorType, op.planNodeId); + std::string statName = generateRuntimeStatName(op, "spilledBytes"); auto prestoMetric = createProtocolRuntimeMetric( statName, op.spilledBytes, protocol::RuntimeUnit::BYTE); opOut.runtimeStats.emplace(statName, prestoMetric); prestoTaskStats.runtimeStats[statName] = prestoMetric; - statName = fmt::format("{}.{}.spilledRows", op.operatorType, op.planNodeId); + statName = generateRuntimeStatName(op, "spilledRows"); prestoMetric = createProtocolRuntimeMetric(statName, op.spilledRows); opOut.runtimeStats.emplace(statName, prestoMetric); prestoTaskStats.runtimeStats[statName] = prestoMetric; - statName = - fmt::format("{}.{}.spilledPartitions", op.operatorType, op.planNodeId); + statName = generateRuntimeStatName(op, "spilledPartitions"); prestoMetric = createProtocolRuntimeMetric(statName, op.spilledPartitions); opOut.runtimeStats.emplace(statName, prestoMetric); prestoTaskStats.runtimeStats[statName] = prestoMetric; - statName = fmt::format("{}.{}.spilledFiles", op.operatorType, op.planNodeId); + statName = generateRuntimeStatName(op, "spilledFiles"); prestoMetric = createProtocolRuntimeMetric(statName, op.spilledFiles); opOut.runtimeStats.emplace(statName, prestoMetric); prestoTaskStats.runtimeStats[statName] = prestoMetric; @@ -541,8 +551,7 @@ protocol::TaskInfo PrestoTask::updateInfoLocked() { protocol::DataSize(op.spilledBytes, protocol::DataUnit::BYTE); for (const auto& stat : op.runtimeStats) { - auto statName = - fmt::format("{}.{}.{}", op.operatorType, op.planNodeId, stat.first); + auto statName = generateRuntimeStatName(op, stat.first); opOut.runtimeStats[statName] = toRuntimeMetric(statName, stat.second); if (taskRuntimeStats.count(statName)) { taskRuntimeStats[statName].merge(stat.second); @@ -552,23 +561,27 @@ protocol::TaskInfo PrestoTask::updateInfoLocked() { } if (op.numSplits != 0) { - const auto statName = - fmt::format("{}.{}.numSplits", op.operatorType, op.planNodeId); + const auto statName = generateRuntimeStatName(op, "numSplits"); opOut.runtimeStats.emplace( statName, createProtocolRuntimeMetric(statName, op.numSplits)); } if (op.inputVectors != 0) { - auto statName = fmt::format( - "{}.{}.{}", op.operatorType, op.planNodeId, "inputBatches"); + auto statName = generateRuntimeStatName(op, "inputBatches"); opOut.runtimeStats.emplace( statName, createProtocolRuntimeMetric(statName, op.inputVectors)); } if (op.outputVectors != 0) { - auto statName = fmt::format( - "{}.{}.{}", op.operatorType, op.planNodeId, "outputBatches"); + auto statName = generateRuntimeStatName(op, "outputBatches"); opOut.runtimeStats.emplace( statName, createProtocolRuntimeMetric(statName, op.outputVectors)); } + if (op.memoryStats.numMemoryAllocations > 0) { + auto statName = generateRuntimeStatName(op, "numMemoryAllocations"); + opOut.runtimeStats.emplace( + statName, + createProtocolRuntimeMetric( + statName, op.memoryStats.numMemoryAllocations)); + } // If Velox operator has spilling stats, then add them to the Presto // operator stats and the task stats as runtime stats.