diff --git a/velox/common/base/RuntimeMetrics.h b/velox/common/base/RuntimeMetrics.h index 5b699a41a99..6402719fa97 100644 --- a/velox/common/base/RuntimeMetrics.h +++ b/velox/common/base/RuntimeMetrics.h @@ -48,6 +48,14 @@ struct RuntimeMetric { RuntimeCounter::Unit _unit = RuntimeCounter::Unit::kNone) : unit(_unit), sum{value}, count{1}, min{value}, max{value} {} + explicit RuntimeMetric( + int64_t _sum, + int64_t _count, + int64_t _min, + int64_t _max, + RuntimeCounter::Unit _unit = RuntimeCounter::Unit::kNone) + : unit(_unit), sum{_sum}, count{_count}, min{_min}, max{_max} {} + void addValue(int64_t value); /// Aggregate sets 'min' and 'max' to 'sum', also sets 'count' to 1 if diff --git a/velox/connectors/hive/HiveDataSource.cpp b/velox/connectors/hive/HiveDataSource.cpp index 31a9364a1bb..c1573292232 100644 --- a/velox/connectors/hive/HiveDataSource.cpp +++ b/velox/connectors/hive/HiveDataSource.cpp @@ -465,7 +465,11 @@ HiveDataSource::getRuntimeStats() { {{"numPrefetch", RuntimeMetric(ioStats_->prefetch().count())}, {"prefetchBytes", RuntimeMetric( - ioStats_->prefetch().sum(), RuntimeCounter::Unit::kBytes)}, + ioStats_->prefetch().sum(), + ioStats_->prefetch().count(), + ioStats_->prefetch().min(), + ioStats_->prefetch().max(), + RuntimeCounter::Unit::kBytes)}, {"totalScanTime", RuntimeMetric(ioStats_->totalScanTime(), RuntimeCounter::Unit::kNanos)}, {Connector::kTotalRemainingFilterTime, @@ -475,33 +479,44 @@ HiveDataSource::getRuntimeStats() { {"ioWaitWallNanos", RuntimeMetric( ioStats_->queryThreadIoLatency().sum() * 1000, - RuntimeCounter::Unit::kNanos)}, - {"maxSingleIoWaitWallNanos", - RuntimeMetric( + ioStats_->queryThreadIoLatency().count(), + ioStats_->queryThreadIoLatency().min() * 1000, ioStats_->queryThreadIoLatency().max() * 1000, RuntimeCounter::Unit::kNanos)}, {"overreadBytes", RuntimeMetric( ioStats_->rawOverreadBytes(), RuntimeCounter::Unit::kBytes)}}); if (ioStats_->read().count() > 0) { - res.insert({"numStorageRead", RuntimeMetric(ioStats_->read().count())}); res.insert( {"storageReadBytes", - RuntimeMetric(ioStats_->read().sum(), RuntimeCounter::Unit::kBytes)}); + RuntimeMetric( + ioStats_->read().sum(), + ioStats_->read().count(), + ioStats_->read().min(), + ioStats_->read().max(), + RuntimeCounter::Unit::kBytes)}); } if (ioStats_->ssdRead().count() > 0) { res.insert({"numLocalRead", RuntimeMetric(ioStats_->ssdRead().count())}); res.insert( {"localReadBytes", RuntimeMetric( - ioStats_->ssdRead().sum(), RuntimeCounter::Unit::kBytes)}); + ioStats_->ssdRead().sum(), + ioStats_->ssdRead().count(), + ioStats_->ssdRead().min(), + ioStats_->ssdRead().max(), + RuntimeCounter::Unit::kBytes)}); } if (ioStats_->ramHit().count() > 0) { res.insert({"numRamRead", RuntimeMetric(ioStats_->ramHit().count())}); res.insert( {"ramReadBytes", RuntimeMetric( - ioStats_->ramHit().sum(), RuntimeCounter::Unit::kBytes)}); + ioStats_->ramHit().sum(), + ioStats_->ramHit().count(), + ioStats_->ramHit().min(), + ioStats_->ramHit().max(), + RuntimeCounter::Unit::kBytes)}); } if (numBucketConversion_ > 0) { res.insert({"numBucketConversion", RuntimeMetric(numBucketConversion_)}); diff --git a/velox/dwio/common/Statistics.h b/velox/dwio/common/Statistics.h index cfd27627a34..77660e50da3 100644 --- a/velox/dwio/common/Statistics.h +++ b/velox/dwio/common/Statistics.h @@ -604,7 +604,9 @@ struct RuntimeStatistics { if (columnReaderStatistics.pageLoadTimeNs > 0) { result.emplace( "pageLoadTimeNs", - RuntimeMetric(columnReaderStatistics.pageLoadTimeNs)); + RuntimeMetric( + columnReaderStatistics.pageLoadTimeNs, + RuntimeCounter::Unit::kNanos)); } return result; } diff --git a/velox/exec/tests/PrintPlanWithStatsTest.cpp b/velox/exec/tests/PrintPlanWithStatsTest.cpp index 7c8d1c6ff74..ee212614006 100644 --- a/velox/exec/tests/PrintPlanWithStatsTest.cpp +++ b/velox/exec/tests/PrintPlanWithStatsTest.cpp @@ -210,26 +210,24 @@ TEST_F(PrintPlanWithStatsTest, innerJoinWithTableScan) { {" dynamicFiltersAccepted[ ]* sum: 1, count: 1, min: 1, max: 1, avg: 1"}, {" footerBufferOverread[ ]* sum: .+, count: 1, min: .+, max: .+"}, {" ioWaitWallNanos [ ]* sum: .+, count: .+ min: .+, max: .+"}, - {" maxSingleIoWaitWallNanos[ ]*sum: .+, count: 1, min: .+, max: .+"}, {" numPrefetch [ ]* sum: .+, count: 1, min: .+, max: .+"}, {" numRamRead [ ]* sum: 60, count: 1, min: 60, max: 60, avg: 60"}, - {" numStorageRead [ ]* sum: .+, count: 1, min: .+, max: .+"}, {" numStripes[ ]* sum: .+, count: 1, min: .+, max: .+"}, {" overreadBytes[ ]* sum: 0B, count: 1, min: 0B, max: 0B, avg: 0B"}, - {" prefetchBytes [ ]* sum: .+, count: 1, min: .+, max: .+"}, + {" prefetchBytes [ ]* sum: .+, count: .+, min: .+, max: .+"}, {" preloadSplitPrepareTimeNanos[ ]* sum: .+, count: .+, min: .+, max: .+, avg: .+"}, {" preloadedSplits[ ]+sum: .+, count: .+, min: .+, max: .+", true}, {" processedSplits[ ]+sum: 20, count: 1, min: 20, max: 20, avg: 20"}, {" processedStrides[ ]+sum: 20, count: 1, min: 20, max: 20, avg: 20"}, {" processedUnits [ ]* sum: .+, count: .+, min: .+, max: .+"}, - {" ramReadBytes [ ]* sum: .+, count: 1, min: .+, max: .+"}, + {" ramReadBytes [ ]* sum: .+, count: .+, min: .+, max: .+"}, {" readyPreloadedSplits[ ]+sum: .+, count: .+, min: .+, max: .+", true}, {" runningAddInputWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, {" runningFinishWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, {" runningGetOutputWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, - {" storageReadBytes [ ]* sum: .+, count: 1, min: .+, max: .+"}, + {" storageReadBytes [ ]* sum: .+, count: .+, min: .+, max: .+"}, {" totalRemainingFilterWallNanos\\s+sum: .+, count: .+, min: .+, max: .+"}, {" totalScanTime [ ]* sum: .+, count: .+, min: .+, max: .+"}, {" unitLoadNanos[ ]* sum: .+, count: .+, min: .+, max: .+, avg: .+"}, @@ -308,10 +306,8 @@ TEST_F(PrintPlanWithStatsTest, partialAggregateWithTableScan) { {" dataSourceReadWallNanos[ ]* sum: .+, count: .+, min: .+, max: .+"}, {" footerBufferOverread[ ]* sum: .+, count: 1, min: .+, max: .+"}, {" ioWaitWallNanos [ ]* sum: .+, count: .+ min: .+, max: .+"}, - {" maxSingleIoWaitWallNanos[ ]*sum: .+, count: 1, min: .+, max: .+"}, {" numPrefetch [ ]* sum: .+, count: .+, min: .+, max: .+"}, {" numRamRead [ ]* sum: 7, count: 1, min: 7, max: 7, avg: 7"}, - {" numStorageRead [ ]* sum: .+, count: 1, min: .+, max: .+"}, {" numStripes[ ]* sum: .+, count: 1, min: .+, max: .+"}, {" overreadBytes[ ]* sum: 0B, count: 1, min: 0B, max: 0B, avg: 0B"}, @@ -321,13 +317,13 @@ TEST_F(PrintPlanWithStatsTest, partialAggregateWithTableScan) { {" processedUnits [ ]* sum: .+, count: .+, min: .+, max: .+"}, {" preloadedSplits[ ]+sum: .+, count: .+, min: .+, max: .+", true}, - {" ramReadBytes [ ]* sum: .+, count: 1, min: .+, max: .+"}, + {" ramReadBytes [ ]* sum: .+, count: .+, min: .+, max: .+"}, {" readyPreloadedSplits[ ]+sum: .+, count: .+, min: .+, max: .+", true}, {" runningAddInputWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, {" runningFinishWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, {" runningGetOutputWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, - {" storageReadBytes [ ]* sum: .+, count: 1, min: .+, max: .+"}, + {" storageReadBytes [ ]* sum: .+, count: .+, min: .+, max: .+"}, {" totalRemainingFilterWallNanos\\s+sum: .+, count: .+, min: .+, max: .+"}, {" totalScanTime [ ]* sum: .+, count: .+, min: .+, max: .+"}, {" unitLoadNanos[ ]* sum: .+, count: .+, min: .+, max: .+, avg: .+"}}); @@ -385,10 +381,8 @@ TEST_F(PrintPlanWithStatsTest, tableWriterWithTableScan) { {" dataSourceReadWallNanos[ ]* sum: .+, count: .+, min: .+, max: .+"}, {" footerBufferOverread[ ]* sum: .+, count: 1, min: .+, max: .+"}, {" ioWaitWallNanos [ ]* sum: .+, count: .+ min: .+, max: .+"}, - {" maxSingleIoWaitWallNanos[ ]*sum: .+, count: 1, min: .+, max: .+"}, {" numPrefetch [ ]* sum: .+, count: .+, min: .+, max: .+"}, {" numRamRead [ ]* sum: 7, count: 1, min: 7, max: 7, avg: 7"}, - {" numStorageRead [ ]* sum: .+, count: 1, min: .+, max: .+"}, {" numStripes[ ]* sum: .+, count: 1, min: .+, max: .+"}, {" overreadBytes[ ]* sum: 0B, count: 1, min: 0B, max: 0B, avg: 0B"}, @@ -398,13 +392,13 @@ TEST_F(PrintPlanWithStatsTest, tableWriterWithTableScan) { {" processedUnits [ ]* sum: .+, count: .+, min: .+, max: .+"}, {" preloadedSplits[ ]+sum: .+, count: .+, min: .+, max: .+", true}, - {" ramReadBytes [ ]* sum: .+, count: 1, min: .+, max: .+"}, + {" ramReadBytes [ ]* sum: .+, count: .+, min: .+, max: .+"}, {" readyPreloadedSplits[ ]+sum: .+, count: .+, min: .+, max: .+", true}, {" runningAddInputWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, {" runningFinishWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, {" runningGetOutputWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, - {" storageReadBytes [ ]* sum: .+, count: 1, min: .+, max: .+"}, + {" storageReadBytes [ ]* sum: .+, count: .+, min: .+, max: .+"}, {" totalRemainingFilterWallNanos\\s+sum: .+, count: .+, min: .+, max: .+"}, {" totalScanTime [ ]* sum: .+, count: .+, min: .+, max: .+"}, {" unitLoadNanos[ ]* sum: .+, count: .+, min: .+, max: .+, avg: .+"}}); diff --git a/velox/exec/tests/TableScanTest.cpp b/velox/exec/tests/TableScanTest.cpp index e74aff8ce87..103d1fecaf1 100644 --- a/velox/exec/tests/TableScanTest.cpp +++ b/velox/exec/tests/TableScanTest.cpp @@ -5758,7 +5758,7 @@ TEST_F(TableScanTest, footerIOCount) { .assertResults( BaseVector::create(vector->type(), 0, pool())); auto stats = getTableScanRuntimeStats(task); - ASSERT_EQ(stats.at("numStorageRead").sum, 1); + ASSERT_EQ(stats.at("storageReadBytes").count, 1); ASSERT_GT(stats.at("footerBufferOverread").sum, 0); } @@ -5849,7 +5849,7 @@ TEST_F(TableScanTest, statsBasedFilterReorderDisabled) { auto tableScanStats = getTableScanStats(task); ASSERT_EQ(tableScanStats.customStats.count("storageReadBytes"), 1); ASSERT_GT(tableScanStats.customStats["storageReadBytes"].sum, 0); - ASSERT_EQ(tableScanStats.customStats["storageReadBytes"].count, 1); + ASSERT_GT(tableScanStats.customStats["storageReadBytes"].count, 0); ASSERT_EQ(tableScanStats.numSplits, numSplits); }