From 1bfaa5e2b9f6482b762673b45ef9906e43e5599c Mon Sep 17 00:00:00 2001 From: Rui Mo Date: Tue, 18 Nov 2025 15:01:03 +0800 Subject: [PATCH] Use uint64_t for count --- velox/common/base/RuntimeMetrics.cpp | 2 +- velox/common/base/RuntimeMetrics.h | 11 ++- .../common/base/tests/RuntimeMetricsTest.cpp | 17 +++- velox/connectors/hive/HiveDataSink.cpp | 3 +- velox/connectors/hive/HiveDataSource.cpp | 78 +++++++++++-------- velox/dwio/common/OnDemandUnitLoader.cpp | 5 +- velox/dwio/common/ParallelUnitLoader.cpp | 4 +- velox/dwio/common/Statistics.h | 16 ++-- velox/dwio/dwrf/test/TestReadFile.h | 3 +- velox/exec/HashAggregation.cpp | 2 +- velox/exec/Operator.cpp | 36 ++++----- .../cudf/exec/CudfHashAggregation.cpp | 2 +- 12 files changed, 98 insertions(+), 81 deletions(-) diff --git a/velox/common/base/RuntimeMetrics.cpp b/velox/common/base/RuntimeMetrics.cpp index 4d51cdcab97..f72b396ce91 100644 --- a/velox/common/base/RuntimeMetrics.cpp +++ b/velox/common/base/RuntimeMetrics.cpp @@ -30,7 +30,7 @@ void RuntimeMetric::addValue(int64_t value) { } void RuntimeMetric::aggregate() { - count = std::min(count, static_cast(1)); + count = std::min(count, static_cast(1)); min = max = sum; } diff --git a/velox/common/base/RuntimeMetrics.h b/velox/common/base/RuntimeMetrics.h index cec8855e143..579e54a1435 100644 --- a/velox/common/base/RuntimeMetrics.h +++ b/velox/common/base/RuntimeMetrics.h @@ -23,6 +23,13 @@ namespace facebook::velox { +/// Converts unsigned bigint to signed, capping at int64_t max if overflow +/// happens. Could be replaced by 'std::saturate_cast' since C++26. +inline int64_t saturateCast(uint64_t value) { + return static_cast(std::min( + value, static_cast(std::numeric_limits::max()))); +} + struct RuntimeCounter { enum class Unit { kNone, kNanos, kBytes }; int64_t value; @@ -36,7 +43,7 @@ struct RuntimeMetric { // Sum, min, max have the same unit, count has kNone. RuntimeCounter::Unit unit; int64_t sum{0}; - int64_t count{0}; + uint64_t count{0}; int64_t min{std::numeric_limits::max()}; int64_t max{std::numeric_limits::min()}; @@ -51,7 +58,7 @@ struct RuntimeMetric { explicit RuntimeMetric( int64_t _sum, - int64_t _count, + uint64_t _count, int64_t _min, int64_t _max, RuntimeCounter::Unit _unit = RuntimeCounter::Unit::kNone) diff --git a/velox/common/base/tests/RuntimeMetricsTest.cpp b/velox/common/base/tests/RuntimeMetricsTest.cpp index 7180f682a80..29a5ffe65ae 100644 --- a/velox/common/base/tests/RuntimeMetricsTest.cpp +++ b/velox/common/base/tests/RuntimeMetricsTest.cpp @@ -24,7 +24,7 @@ class RuntimeMetricsTest : public testing::Test { static void testMetric( const RuntimeMetric& rm1, int64_t expectedSum, - int64_t expectedCount, + uint64_t expectedCount, int64_t expectedMin = std::numeric_limits::max(), int64_t expectedMax = std::numeric_limits::min()) { EXPECT_EQ(expectedSum, rm1.sum); @@ -84,4 +84,19 @@ TEST_F(RuntimeMetricsTest, basic) { "sum:2.00us, count:1, min:2.00us, max:2.00us, avg: 2.00us"); } +TEST_F(RuntimeMetricsTest, saturateCast) { + auto maxUint64 = std::numeric_limits::max(); + RuntimeMetric rm{ + saturateCast(maxUint64), + maxUint64, + saturateCast(maxUint64), + saturateCast(maxUint64)}; + + auto maxInt64 = std::numeric_limits::max(); + EXPECT_EQ(rm.sum, maxInt64); + EXPECT_EQ(rm.count, maxUint64); + EXPECT_EQ(rm.min, maxInt64); + EXPECT_EQ(rm.max, maxInt64); +} + } // namespace facebook::velox diff --git a/velox/connectors/hive/HiveDataSink.cpp b/velox/connectors/hive/HiveDataSink.cpp index a1b31803662..fb174e82ebb 100644 --- a/velox/connectors/hive/HiveDataSink.cpp +++ b/velox/connectors/hive/HiveDataSink.cpp @@ -1533,7 +1533,8 @@ uint64_t HiveDataSink::WriterReclaimer::reclaim( ioStats_->rawBytesWritten() - writtenBytesBeforeReclaim; addThreadLocalRuntimeStat( kEarlyFlushedRawBytes, - RuntimeCounter(earlyFlushedRawBytes, RuntimeCounter::Unit::kBytes)); + RuntimeCounter( + saturateCast(earlyFlushedRawBytes), RuntimeCounter::Unit::kBytes)); if (earlyFlushedRawBytes > 0) { RECORD_METRIC_VALUE( kMetricFileWriterEarlyFlushedRawBytes, earlyFlushedRawBytes); diff --git a/velox/connectors/hive/HiveDataSource.cpp b/velox/connectors/hive/HiveDataSource.cpp index 51b3cbf30c7..b92f81f7309 100644 --- a/velox/connectors/hive/HiveDataSource.cpp +++ b/velox/connectors/hive/HiveDataSource.cpp @@ -429,60 +429,66 @@ HiveDataSource::getRuntimeStats() { res.insert( {std::string(Connector::kIoWaitWallNanos), RuntimeMetric( - ioStatistics_->queryThreadIoLatencyUs().sum() * 1'000, + saturateCast(ioStatistics_->queryThreadIoLatencyUs().sum() * 1'000), ioStatistics_->queryThreadIoLatencyUs().count(), - ioStatistics_->queryThreadIoLatencyUs().min() * 1'000, - ioStatistics_->queryThreadIoLatencyUs().max() * 1'000, + saturateCast(ioStatistics_->queryThreadIoLatencyUs().min() * 1'000), + saturateCast(ioStatistics_->queryThreadIoLatencyUs().max() * 1'000), RuntimeCounter::Unit::kNanos)}); // Breakdown of ioWaitWallNanos by I/O type if (ioStatistics_->storageReadLatencyUs().count() > 0) { res.insert( {std::string(Connector::kStorageReadWallNanos), RuntimeMetric( - ioStatistics_->storageReadLatencyUs().sum() * 1'000, + saturateCast(ioStatistics_->storageReadLatencyUs().sum() * 1'000), ioStatistics_->storageReadLatencyUs().count(), - ioStatistics_->storageReadLatencyUs().min() * 1'000, - ioStatistics_->storageReadLatencyUs().max() * 1'000, + saturateCast(ioStatistics_->storageReadLatencyUs().min() * 1'000), + saturateCast(ioStatistics_->storageReadLatencyUs().max() * 1'000), RuntimeCounter::Unit::kNanos)}); } if (ioStatistics_->ssdCacheReadLatencyUs().count() > 0) { res.insert( {std::string(Connector::kSsdCacheReadWallNanos), RuntimeMetric( - ioStatistics_->ssdCacheReadLatencyUs().sum() * 1'000, + saturateCast(ioStatistics_->ssdCacheReadLatencyUs().sum() * 1'000), ioStatistics_->ssdCacheReadLatencyUs().count(), - ioStatistics_->ssdCacheReadLatencyUs().min() * 1'000, - ioStatistics_->ssdCacheReadLatencyUs().max() * 1'000, + saturateCast(ioStatistics_->ssdCacheReadLatencyUs().min() * 1'000), + saturateCast(ioStatistics_->ssdCacheReadLatencyUs().max() * 1'000), RuntimeCounter::Unit::kNanos)}); } if (ioStatistics_->cacheWaitLatencyUs().count() > 0) { res.insert( {std::string(Connector::kCacheWaitWallNanos), RuntimeMetric( - ioStatistics_->cacheWaitLatencyUs().sum() * 1'000, + saturateCast(ioStatistics_->cacheWaitLatencyUs().sum() * 1'000), ioStatistics_->cacheWaitLatencyUs().count(), - ioStatistics_->cacheWaitLatencyUs().min() * 1'000, - ioStatistics_->cacheWaitLatencyUs().max() * 1'000, + saturateCast(ioStatistics_->cacheWaitLatencyUs().min() * 1'000), + saturateCast(ioStatistics_->cacheWaitLatencyUs().max() * 1'000), RuntimeCounter::Unit::kNanos)}); } if (ioStatistics_->coalescedSsdLoadLatencyUs().count() > 0) { res.insert( {std::string(Connector::kCoalescedSsdLoadWallNanos), RuntimeMetric( - ioStatistics_->coalescedSsdLoadLatencyUs().sum() * 1'000, + saturateCast( + ioStatistics_->coalescedSsdLoadLatencyUs().sum() * 1'000), ioStatistics_->coalescedSsdLoadLatencyUs().count(), - ioStatistics_->coalescedSsdLoadLatencyUs().min() * 1'000, - ioStatistics_->coalescedSsdLoadLatencyUs().max() * 1'000, + saturateCast( + ioStatistics_->coalescedSsdLoadLatencyUs().min() * 1'000), + saturateCast( + ioStatistics_->coalescedSsdLoadLatencyUs().max() * 1'000), RuntimeCounter::Unit::kNanos)}); } if (ioStatistics_->coalescedStorageLoadLatencyUs().count() > 0) { res.insert( {std::string(Connector::kCoalescedStorageLoadWallNanos), RuntimeMetric( - ioStatistics_->coalescedStorageLoadLatencyUs().sum() * 1'000, + saturateCast( + ioStatistics_->coalescedStorageLoadLatencyUs().sum() * 1'000), ioStatistics_->coalescedStorageLoadLatencyUs().count(), - ioStatistics_->coalescedStorageLoadLatencyUs().min() * 1'000, - ioStatistics_->coalescedStorageLoadLatencyUs().max() * 1'000, + saturateCast( + ioStatistics_->coalescedStorageLoadLatencyUs().min() * 1'000), + saturateCast( + ioStatistics_->coalescedStorageLoadLatencyUs().max() * 1'000), RuntimeCounter::Unit::kNanos)}); } res.insert( @@ -490,33 +496,37 @@ HiveDataSource::getRuntimeStats() { RuntimeMetric(ioStatistics_->prefetch().count())}, {std::string(kPrefetchBytes), RuntimeMetric( - ioStatistics_->prefetch().sum(), + saturateCast(ioStatistics_->prefetch().sum()), ioStatistics_->prefetch().count(), - ioStatistics_->prefetch().min(), - ioStatistics_->prefetch().max(), + saturateCast(ioStatistics_->prefetch().min()), + saturateCast(ioStatistics_->prefetch().max()), RuntimeCounter::Unit::kBytes)}, {std::string(kTotalScanTime), RuntimeMetric( - ioStatistics_->totalScanTime(), RuntimeCounter::Unit::kNanos)}, + saturateCast(ioStatistics_->totalScanTime()), + RuntimeCounter::Unit::kNanos)}, {std::string(Connector::kTotalRemainingFilterTime), RuntimeMetric( - totalRemainingFilterTime_.load(std::memory_order_relaxed), + saturateCast( + totalRemainingFilterTime_.load(std::memory_order_relaxed)), RuntimeCounter::Unit::kNanos)}, {Connector::kTotalRemainingFilterCpuTime, RuntimeMetric( - totalRemainingFilterCpuTime_.load(std::memory_order_relaxed), + saturateCast( + totalRemainingFilterCpuTime_.load(std::memory_order_relaxed)), RuntimeCounter::Unit::kNanos)}, {std::string(kOverreadBytes), RuntimeMetric( - ioStatistics_->rawOverreadBytes(), RuntimeCounter::Unit::kBytes)}}); + saturateCast(ioStatistics_->rawOverreadBytes()), + RuntimeCounter::Unit::kBytes)}}); if (ioStatistics_->read().count() > 0) { res.insert( {std::string(kStorageReadBytes), RuntimeMetric( - ioStatistics_->read().sum(), + saturateCast(ioStatistics_->read().sum()), ioStatistics_->read().count(), - ioStatistics_->read().min(), - ioStatistics_->read().max(), + saturateCast(ioStatistics_->read().min()), + saturateCast(ioStatistics_->read().max()), RuntimeCounter::Unit::kBytes)}); } if (ioStatistics_->ssdRead().count() > 0) { @@ -526,10 +536,10 @@ HiveDataSource::getRuntimeStats() { res.insert( {std::string(kLocalReadBytes), RuntimeMetric( - ioStatistics_->ssdRead().sum(), + saturateCast(ioStatistics_->ssdRead().sum()), ioStatistics_->ssdRead().count(), - ioStatistics_->ssdRead().min(), - ioStatistics_->ssdRead().max(), + saturateCast(ioStatistics_->ssdRead().min()), + saturateCast(ioStatistics_->ssdRead().max()), RuntimeCounter::Unit::kBytes)}); } if (ioStatistics_->ramHit().count() > 0) { @@ -539,10 +549,10 @@ HiveDataSource::getRuntimeStats() { res.insert( {std::string(kRamReadBytes), RuntimeMetric( - ioStatistics_->ramHit().sum(), + saturateCast(ioStatistics_->ramHit().sum()), ioStatistics_->ramHit().count(), - ioStatistics_->ramHit().min(), - ioStatistics_->ramHit().max(), + saturateCast(ioStatistics_->ramHit().min()), + saturateCast(ioStatistics_->ramHit().max()), RuntimeCounter::Unit::kBytes)}); } if (numBucketConversion_ > 0) { diff --git a/velox/dwio/common/OnDemandUnitLoader.cpp b/velox/dwio/common/OnDemandUnitLoader.cpp index 6a5616a31e5..1a21f5475c1 100644 --- a/velox/dwio/common/OnDemandUnitLoader.cpp +++ b/velox/dwio/common/OnDemandUnitLoader.cpp @@ -83,10 +83,7 @@ class OnDemandUnitLoader : public UnitLoader { stats.addCounter( "unitLoadNanos", RuntimeCounter( - unitLoadNanos_ > std::numeric_limits::max() - ? std::numeric_limits::max() - : unitLoadNanos_, - RuntimeCounter::Unit::kNanos)); + saturateCast(unitLoadNanos_), RuntimeCounter::Unit::kNanos)); return stats; } diff --git a/velox/dwio/common/ParallelUnitLoader.cpp b/velox/dwio/common/ParallelUnitLoader.cpp index 2ef6718e8fc..778fc3c380c 100644 --- a/velox/dwio/common/ParallelUnitLoader.cpp +++ b/velox/dwio/common/ParallelUnitLoader.cpp @@ -129,9 +129,7 @@ class ParallelUnitLoader : public UnitLoader { stats.addCounter( "waitForUnitReadyNanos", RuntimeCounter( - waitForUnitReadyNanos_ > std::numeric_limits::max() - ? std::numeric_limits::max() - : waitForUnitReadyNanos_, + saturateCast(waitForUnitReadyNanos_), RuntimeCounter::Unit::kNanos)); return stats; } diff --git a/velox/dwio/common/Statistics.h b/velox/dwio/common/Statistics.h index 2498ca6f71d..89d46976e32 100644 --- a/velox/dwio/common/Statistics.h +++ b/velox/dwio/common/Statistics.h @@ -634,10 +634,10 @@ struct ColumnMetricsSet { nodeId, TypeKindName::toName(stats->typeKind)), RuntimeMetric{ - static_cast(decompressCounter.sum()), - static_cast(decompressCounter.count()), - static_cast(decompressCounter.min()), - static_cast(decompressCounter.max()), + saturateCast(decompressCounter.sum()), + decompressCounter.count(), + saturateCast(decompressCounter.min()), + saturateCast(decompressCounter.max()), RuntimeCounter::Unit::kNanos}); } // Export decode timing. @@ -649,10 +649,10 @@ struct ColumnMetricsSet { nodeId, TypeKindName::toName(stats->typeKind)), RuntimeMetric{ - static_cast(decodeCounter.sum()), - static_cast(decodeCounter.count()), - static_cast(decodeCounter.min()), - static_cast(decodeCounter.max()), + saturateCast(decodeCounter.sum()), + decodeCounter.count(), + saturateCast(decodeCounter.min()), + saturateCast(decodeCounter.max()), RuntimeCounter::Unit::kNanos}); } } diff --git a/velox/dwio/dwrf/test/TestReadFile.h b/velox/dwio/dwrf/test/TestReadFile.h index 4c5a567a3b1..aca3b13cf2a 100644 --- a/velox/dwio/dwrf/test/TestReadFile.h +++ b/velox/dwio/dwrf/test/TestReadFile.h @@ -64,8 +64,7 @@ class TestReadFile : public velox::ReadFile { if (context.ioStats) { context.ioStats->addCounter( "read", - RuntimeCounter( - static_cast(res), RuntimeCounter::Unit::kBytes)); + RuntimeCounter(saturateCast(res), RuntimeCounter::Unit::kBytes)); } ++numIos_; return res; diff --git a/velox/exec/HashAggregation.cpp b/velox/exec/HashAggregation.cpp index 5170040e260..d138961bec8 100644 --- a/velox/exec/HashAggregation.cpp +++ b/velox/exec/HashAggregation.cpp @@ -285,7 +285,7 @@ void HashAggregation::resetPartialOutputIfNeed() { std::string(HashAggregation::kFlushTimes), RuntimeCounter(1)); lockedStats->addRuntimeStat( std::string(HashAggregation::kPartialAggregationPct), - RuntimeCounter(static_cast(aggregationPct))); + RuntimeCounter(saturateCast(aggregationPct))); } groupingSet_->resetTable(/*freeTable=*/false); partialFull_ = false; diff --git a/velox/exec/Operator.cpp b/velox/exec/Operator.cpp index 40ebe302a7b..63a9d6b5ea2 100644 --- a/velox/exec/Operator.cpp +++ b/velox/exec/Operator.cpp @@ -369,16 +369,14 @@ void Operator::recordSpillStats() { if (fillTime != 0) { lockedStats->addRuntimeStat( kSpillFillTime, - RuntimeCounter{ - static_cast(fillTime), RuntimeCounter::Unit::kNanos}); + RuntimeCounter{saturateCast(fillTime), RuntimeCounter::Unit::kNanos}); } const auto sortTime = spillStats_->spillSortTimeNanos.load(std::memory_order_relaxed); if (sortTime != 0) { lockedStats->addRuntimeStat( kSpillSortTime, - RuntimeCounter{ - static_cast(sortTime), RuntimeCounter::Unit::kNanos}); + RuntimeCounter{saturateCast(sortTime), RuntimeCounter::Unit::kNanos}); } const auto extractTime = spillStats_->spillExtractVectorTimeNanos.load(std::memory_order_relaxed); @@ -386,7 +384,7 @@ void Operator::recordSpillStats() { lockedStats->addRuntimeStat( kSpillExtractVectorTime, RuntimeCounter{ - static_cast(extractTime), RuntimeCounter::Unit::kNanos}); + saturateCast(extractTime), RuntimeCounter::Unit::kNanos}); } const auto serializationTime = spillStats_->spillSerializationTimeNanos.load(std::memory_order_relaxed); @@ -394,34 +392,30 @@ void Operator::recordSpillStats() { lockedStats->addRuntimeStat( kSpillSerializationTime, RuntimeCounter{ - static_cast(serializationTime), - RuntimeCounter::Unit::kNanos}); + saturateCast(serializationTime), RuntimeCounter::Unit::kNanos}); } const auto flushTime = spillStats_->spillFlushTimeNanos.load(std::memory_order_relaxed); if (flushTime != 0) { lockedStats->addRuntimeStat( kSpillFlushTime, - RuntimeCounter{ - static_cast(flushTime), RuntimeCounter::Unit::kNanos}); + RuntimeCounter{saturateCast(flushTime), RuntimeCounter::Unit::kNanos}); } const auto writes = spillStats_->spillWrites.load(std::memory_order_relaxed); if (writes != 0) { lockedStats->addRuntimeStat( - kSpillWrites, RuntimeCounter{static_cast(writes)}); + kSpillWrites, RuntimeCounter{saturateCast(writes)}); } const auto writeTime = spillStats_->spillWriteTimeNanos.load(std::memory_order_relaxed); if (writeTime != 0) { lockedStats->addRuntimeStat( kSpillWriteTime, - RuntimeCounter{ - static_cast(writeTime), RuntimeCounter::Unit::kNanos}); + RuntimeCounter{saturateCast(writeTime), RuntimeCounter::Unit::kNanos}); } const auto runs = spillStats_->spillRuns.load(std::memory_order_relaxed); if (runs != 0) { - lockedStats->addRuntimeStat( - kSpillRuns, RuntimeCounter{static_cast(runs)}); + lockedStats->addRuntimeStat(kSpillRuns, RuntimeCounter{saturateCast(runs)}); updateGlobalSpillRunStats(runs); } @@ -429,8 +423,7 @@ void Operator::recordSpillStats() { spillStats_->spillMaxLevelExceededCount.load(std::memory_order_relaxed); if (maxLevelExceeded != 0) { lockedStats->addRuntimeStat( - kExceededMaxSpillLevel, - RuntimeCounter{static_cast(maxLevelExceeded)}); + kExceededMaxSpillLevel, RuntimeCounter{saturateCast(maxLevelExceeded)}); updateGlobalMaxSpillLevelExceededCount(maxLevelExceeded); } @@ -439,14 +432,13 @@ void Operator::recordSpillStats() { if (readBytes != 0) { lockedStats->addRuntimeStat( kSpillReadBytes, - RuntimeCounter{ - static_cast(readBytes), RuntimeCounter::Unit::kBytes}); + RuntimeCounter{saturateCast(readBytes), RuntimeCounter::Unit::kBytes}); } const auto reads = spillStats_->spillReads.load(std::memory_order_relaxed); if (reads != 0) { lockedStats->addRuntimeStat( - kSpillReads, RuntimeCounter{static_cast(reads)}); + kSpillReads, RuntimeCounter{saturateCast(reads)}); } const auto readTime = @@ -454,8 +446,7 @@ void Operator::recordSpillStats() { if (readTime != 0) { lockedStats->addRuntimeStat( kSpillReadTime, - RuntimeCounter{ - static_cast(readTime), RuntimeCounter::Unit::kNanos}); + RuntimeCounter{saturateCast(readTime), RuntimeCounter::Unit::kNanos}); } const auto deserializationTime = @@ -465,8 +456,7 @@ void Operator::recordSpillStats() { lockedStats->addRuntimeStat( kSpillDeserializationTime, RuntimeCounter{ - static_cast(deserializationTime), - RuntimeCounter::Unit::kNanos}); + saturateCast(deserializationTime), RuntimeCounter::Unit::kNanos}); } // Collect filesystem I/O stats for spilling. diff --git a/velox/experimental/cudf/exec/CudfHashAggregation.cpp b/velox/experimental/cudf/exec/CudfHashAggregation.cpp index 3105c8e48e3..c98d237f02d 100644 --- a/velox/experimental/cudf/exec/CudfHashAggregation.cpp +++ b/velox/experimental/cudf/exec/CudfHashAggregation.cpp @@ -1323,7 +1323,7 @@ CudfVectorPtr CudfHashAggregation::releaseAndResetPartialOutput() { std::string(exec::HashAggregation::kFlushTimes), RuntimeCounter(1)); lockedStats->addRuntimeStat( std::string(exec::HashAggregation::kPartialAggregationPct), - RuntimeCounter(aggregationPct)); + RuntimeCounter(saturateCast(aggregationPct))); } numInputRows_ = 0;