Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion velox/common/base/RuntimeMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
}

void RuntimeMetric::aggregate() {
count = std::min(count, static_cast<int64_t>(1));
count = std::min(count, static_cast<uint64_t>(1));

Check warning on line 33 in velox/common/base/RuntimeMetrics.cpp

View workflow job for this annotation

GitHub Actions / Build with GCC / Linux release with adapters

misc-include-cleaner

no header providing "uint64_t" is directly included
min = max = sum;
}

Expand Down
11 changes: 9 additions & 2 deletions velox/common/base/RuntimeMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@

namespace facebook::velox {

/// Converts unsigned bigint to signed, capping at int64_t max if overflow
/// happens. Could be replaced by 'std::saturate_cast' since C++26.
inline int64_t saturateCast(uint64_t value) {
return static_cast<int64_t>(std::min(
value, static_cast<uint64_t>(std::numeric_limits<int64_t>::max())));
}

struct RuntimeCounter {
enum class Unit { kNone, kNanos, kBytes };
int64_t value;
Expand All @@ -36,7 +43,7 @@ struct RuntimeMetric {
// Sum, min, max have the same unit, count has kNone.
RuntimeCounter::Unit unit;
int64_t sum{0};
int64_t count{0};
uint64_t count{0};
int64_t min{std::numeric_limits<int64_t>::max()};
int64_t max{std::numeric_limits<int64_t>::min()};

Expand All @@ -51,7 +58,7 @@ struct RuntimeMetric {

explicit RuntimeMetric(
int64_t _sum,
int64_t _count,
uint64_t _count,
int64_t _min,
int64_t _max,
RuntimeCounter::Unit _unit = RuntimeCounter::Unit::kNone)
Expand Down
17 changes: 16 additions & 1 deletion velox/common/base/tests/RuntimeMetricsTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
static void testMetric(
const RuntimeMetric& rm1,
int64_t expectedSum,
int64_t expectedCount,
uint64_t expectedCount,

Check warning on line 27 in velox/common/base/tests/RuntimeMetricsTest.cpp

View workflow job for this annotation

GitHub Actions / Build with GCC / Linux release with adapters

misc-include-cleaner

no header providing "uint64_t" is directly included
int64_t expectedMin = std::numeric_limits<int64_t>::max(),
int64_t expectedMax = std::numeric_limits<int64_t>::min()) {
EXPECT_EQ(expectedSum, rm1.sum);
Expand Down Expand Up @@ -84,4 +84,19 @@
"sum:2.00us, count:1, min:2.00us, max:2.00us, avg: 2.00us");
}

TEST_F(RuntimeMetricsTest, saturateCast) {
auto maxUint64 = std::numeric_limits<uint64_t>::max();

Check warning on line 88 in velox/common/base/tests/RuntimeMetricsTest.cpp

View workflow job for this annotation

GitHub Actions / Build with GCC / Linux release with adapters

misc-include-cleaner

no header providing "std::numeric_limits" is directly included
RuntimeMetric rm{

Check warning on line 89 in velox/common/base/tests/RuntimeMetricsTest.cpp

View workflow job for this annotation

GitHub Actions / Build with GCC / Linux release with adapters

misc-const-correctness

variable 'rm' of type 'RuntimeMetric' can be declared 'const'
saturateCast(maxUint64),
maxUint64,
saturateCast(maxUint64),
saturateCast(maxUint64)};

auto maxInt64 = std::numeric_limits<int64_t>::max();
EXPECT_EQ(rm.sum, maxInt64);
EXPECT_EQ(rm.count, maxUint64);
EXPECT_EQ(rm.min, maxInt64);
EXPECT_EQ(rm.max, maxInt64);
}

} // namespace facebook::velox
3 changes: 2 additions & 1 deletion velox/connectors/hive/HiveDataSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1533,7 +1533,8 @@
ioStats_->rawBytesWritten() - writtenBytesBeforeReclaim;
addThreadLocalRuntimeStat(
kEarlyFlushedRawBytes,
RuntimeCounter(earlyFlushedRawBytes, RuntimeCounter::Unit::kBytes));
RuntimeCounter(
saturateCast(earlyFlushedRawBytes), RuntimeCounter::Unit::kBytes));

Check warning on line 1537 in velox/connectors/hive/HiveDataSink.cpp

View workflow job for this annotation

GitHub Actions / Build with GCC / Linux release with adapters

misc-include-cleaner

no header providing "facebook::velox::saturateCast" is directly included
if (earlyFlushedRawBytes > 0) {
RECORD_METRIC_VALUE(
kMetricFileWriterEarlyFlushedRawBytes, earlyFlushedRawBytes);
Expand Down
78 changes: 44 additions & 34 deletions velox/connectors/hive/HiveDataSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -429,94 +429,104 @@ HiveDataSource::getRuntimeStats() {
res.insert(
{std::string(Connector::kIoWaitWallNanos),
RuntimeMetric(
ioStatistics_->queryThreadIoLatencyUs().sum() * 1'000,
saturateCast(ioStatistics_->queryThreadIoLatencyUs().sum() * 1'000),
ioStatistics_->queryThreadIoLatencyUs().count(),
ioStatistics_->queryThreadIoLatencyUs().min() * 1'000,
ioStatistics_->queryThreadIoLatencyUs().max() * 1'000,
saturateCast(ioStatistics_->queryThreadIoLatencyUs().min() * 1'000),
saturateCast(ioStatistics_->queryThreadIoLatencyUs().max() * 1'000),
RuntimeCounter::Unit::kNanos)});
// Breakdown of ioWaitWallNanos by I/O type
if (ioStatistics_->storageReadLatencyUs().count() > 0) {
res.insert(
{std::string(Connector::kStorageReadWallNanos),
RuntimeMetric(
ioStatistics_->storageReadLatencyUs().sum() * 1'000,
saturateCast(ioStatistics_->storageReadLatencyUs().sum() * 1'000),
ioStatistics_->storageReadLatencyUs().count(),
ioStatistics_->storageReadLatencyUs().min() * 1'000,
ioStatistics_->storageReadLatencyUs().max() * 1'000,
saturateCast(ioStatistics_->storageReadLatencyUs().min() * 1'000),
saturateCast(ioStatistics_->storageReadLatencyUs().max() * 1'000),
RuntimeCounter::Unit::kNanos)});
}
if (ioStatistics_->ssdCacheReadLatencyUs().count() > 0) {
res.insert(
{std::string(Connector::kSsdCacheReadWallNanos),
RuntimeMetric(
ioStatistics_->ssdCacheReadLatencyUs().sum() * 1'000,
saturateCast(ioStatistics_->ssdCacheReadLatencyUs().sum() * 1'000),
ioStatistics_->ssdCacheReadLatencyUs().count(),
ioStatistics_->ssdCacheReadLatencyUs().min() * 1'000,
ioStatistics_->ssdCacheReadLatencyUs().max() * 1'000,
saturateCast(ioStatistics_->ssdCacheReadLatencyUs().min() * 1'000),
saturateCast(ioStatistics_->ssdCacheReadLatencyUs().max() * 1'000),
RuntimeCounter::Unit::kNanos)});
}
if (ioStatistics_->cacheWaitLatencyUs().count() > 0) {
res.insert(
{std::string(Connector::kCacheWaitWallNanos),
RuntimeMetric(
ioStatistics_->cacheWaitLatencyUs().sum() * 1'000,
saturateCast(ioStatistics_->cacheWaitLatencyUs().sum() * 1'000),
ioStatistics_->cacheWaitLatencyUs().count(),
ioStatistics_->cacheWaitLatencyUs().min() * 1'000,
ioStatistics_->cacheWaitLatencyUs().max() * 1'000,
saturateCast(ioStatistics_->cacheWaitLatencyUs().min() * 1'000),
saturateCast(ioStatistics_->cacheWaitLatencyUs().max() * 1'000),
RuntimeCounter::Unit::kNanos)});
}
if (ioStatistics_->coalescedSsdLoadLatencyUs().count() > 0) {
res.insert(
{std::string(Connector::kCoalescedSsdLoadWallNanos),
RuntimeMetric(
ioStatistics_->coalescedSsdLoadLatencyUs().sum() * 1'000,
saturateCast(
ioStatistics_->coalescedSsdLoadLatencyUs().sum() * 1'000),
ioStatistics_->coalescedSsdLoadLatencyUs().count(),
ioStatistics_->coalescedSsdLoadLatencyUs().min() * 1'000,
ioStatistics_->coalescedSsdLoadLatencyUs().max() * 1'000,
saturateCast(
ioStatistics_->coalescedSsdLoadLatencyUs().min() * 1'000),
saturateCast(
ioStatistics_->coalescedSsdLoadLatencyUs().max() * 1'000),
RuntimeCounter::Unit::kNanos)});
}
if (ioStatistics_->coalescedStorageLoadLatencyUs().count() > 0) {
res.insert(
{std::string(Connector::kCoalescedStorageLoadWallNanos),
RuntimeMetric(
ioStatistics_->coalescedStorageLoadLatencyUs().sum() * 1'000,
saturateCast(
ioStatistics_->coalescedStorageLoadLatencyUs().sum() * 1'000),
ioStatistics_->coalescedStorageLoadLatencyUs().count(),
ioStatistics_->coalescedStorageLoadLatencyUs().min() * 1'000,
ioStatistics_->coalescedStorageLoadLatencyUs().max() * 1'000,
saturateCast(
ioStatistics_->coalescedStorageLoadLatencyUs().min() * 1'000),
saturateCast(
ioStatistics_->coalescedStorageLoadLatencyUs().max() * 1'000),
RuntimeCounter::Unit::kNanos)});
}
res.insert(
{{std::string(kNumPrefetch),
RuntimeMetric(ioStatistics_->prefetch().count())},
{std::string(kPrefetchBytes),
RuntimeMetric(
ioStatistics_->prefetch().sum(),
saturateCast(ioStatistics_->prefetch().sum()),
ioStatistics_->prefetch().count(),
ioStatistics_->prefetch().min(),
ioStatistics_->prefetch().max(),
saturateCast(ioStatistics_->prefetch().min()),
saturateCast(ioStatistics_->prefetch().max()),
RuntimeCounter::Unit::kBytes)},
{std::string(kTotalScanTime),
RuntimeMetric(
ioStatistics_->totalScanTime(), RuntimeCounter::Unit::kNanos)},
saturateCast(ioStatistics_->totalScanTime()),
RuntimeCounter::Unit::kNanos)},
{std::string(Connector::kTotalRemainingFilterTime),
RuntimeMetric(
totalRemainingFilterTime_.load(std::memory_order_relaxed),
saturateCast(
totalRemainingFilterTime_.load(std::memory_order_relaxed)),
RuntimeCounter::Unit::kNanos)},
{Connector::kTotalRemainingFilterCpuTime,
RuntimeMetric(
totalRemainingFilterCpuTime_.load(std::memory_order_relaxed),
saturateCast(
totalRemainingFilterCpuTime_.load(std::memory_order_relaxed)),
RuntimeCounter::Unit::kNanos)},
{std::string(kOverreadBytes),
RuntimeMetric(
ioStatistics_->rawOverreadBytes(), RuntimeCounter::Unit::kBytes)}});
saturateCast(ioStatistics_->rawOverreadBytes()),
RuntimeCounter::Unit::kBytes)}});
if (ioStatistics_->read().count() > 0) {
res.insert(
{std::string(kStorageReadBytes),
RuntimeMetric(
ioStatistics_->read().sum(),
saturateCast(ioStatistics_->read().sum()),
ioStatistics_->read().count(),
ioStatistics_->read().min(),
ioStatistics_->read().max(),
saturateCast(ioStatistics_->read().min()),
saturateCast(ioStatistics_->read().max()),
RuntimeCounter::Unit::kBytes)});
}
if (ioStatistics_->ssdRead().count() > 0) {
Expand All @@ -526,10 +536,10 @@ HiveDataSource::getRuntimeStats() {
res.insert(
{std::string(kLocalReadBytes),
RuntimeMetric(
ioStatistics_->ssdRead().sum(),
saturateCast(ioStatistics_->ssdRead().sum()),
ioStatistics_->ssdRead().count(),
ioStatistics_->ssdRead().min(),
ioStatistics_->ssdRead().max(),
saturateCast(ioStatistics_->ssdRead().min()),
saturateCast(ioStatistics_->ssdRead().max()),
RuntimeCounter::Unit::kBytes)});
}
if (ioStatistics_->ramHit().count() > 0) {
Expand All @@ -539,10 +549,10 @@ HiveDataSource::getRuntimeStats() {
res.insert(
{std::string(kRamReadBytes),
RuntimeMetric(
ioStatistics_->ramHit().sum(),
saturateCast(ioStatistics_->ramHit().sum()),
ioStatistics_->ramHit().count(),
ioStatistics_->ramHit().min(),
ioStatistics_->ramHit().max(),
saturateCast(ioStatistics_->ramHit().min()),
saturateCast(ioStatistics_->ramHit().max()),
RuntimeCounter::Unit::kBytes)});
}
if (numBucketConversion_ > 0) {
Expand Down
5 changes: 1 addition & 4 deletions velox/dwio/common/OnDemandUnitLoader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,7 @@ class OnDemandUnitLoader : public UnitLoader {
stats.addCounter(
"unitLoadNanos",
RuntimeCounter(
unitLoadNanos_ > std::numeric_limits<int64_t>::max()
? std::numeric_limits<int64_t>::max()
: unitLoadNanos_,
RuntimeCounter::Unit::kNanos));
saturateCast(unitLoadNanos_), RuntimeCounter::Unit::kNanos));
return stats;
}

Expand Down
4 changes: 1 addition & 3 deletions velox/dwio/common/ParallelUnitLoader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,7 @@ class ParallelUnitLoader : public UnitLoader {
stats.addCounter(
"waitForUnitReadyNanos",
RuntimeCounter(
waitForUnitReadyNanos_ > std::numeric_limits<int64_t>::max()
? std::numeric_limits<int64_t>::max()
: waitForUnitReadyNanos_,
saturateCast(waitForUnitReadyNanos_),
RuntimeCounter::Unit::kNanos));
return stats;
}
Expand Down
16 changes: 8 additions & 8 deletions velox/dwio/common/Statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -634,10 +634,10 @@ struct ColumnMetricsSet {
nodeId,
TypeKindName::toName(stats->typeKind)),
RuntimeMetric{
static_cast<int64_t>(decompressCounter.sum()),
static_cast<int64_t>(decompressCounter.count()),
static_cast<int64_t>(decompressCounter.min()),
static_cast<int64_t>(decompressCounter.max()),
saturateCast(decompressCounter.sum()),
decompressCounter.count(),
saturateCast(decompressCounter.min()),
saturateCast(decompressCounter.max()),
RuntimeCounter::Unit::kNanos});
}
// Export decode timing.
Expand All @@ -649,10 +649,10 @@ struct ColumnMetricsSet {
nodeId,
TypeKindName::toName(stats->typeKind)),
RuntimeMetric{
static_cast<int64_t>(decodeCounter.sum()),
static_cast<int64_t>(decodeCounter.count()),
static_cast<int64_t>(decodeCounter.min()),
static_cast<int64_t>(decodeCounter.max()),
saturateCast(decodeCounter.sum()),
decodeCounter.count(),
saturateCast(decodeCounter.min()),
saturateCast(decodeCounter.max()),
RuntimeCounter::Unit::kNanos});
}
}
Expand Down
3 changes: 1 addition & 2 deletions velox/dwio/dwrf/test/TestReadFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ class TestReadFile : public velox::ReadFile {
if (context.ioStats) {
context.ioStats->addCounter(
"read",
RuntimeCounter(
static_cast<int64_t>(res), RuntimeCounter::Unit::kBytes));
RuntimeCounter(saturateCast(res), RuntimeCounter::Unit::kBytes));
}
++numIos_;
return res;
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/HashAggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ void HashAggregation::resetPartialOutputIfNeed() {
std::string(HashAggregation::kFlushTimes), RuntimeCounter(1));
lockedStats->addRuntimeStat(
std::string(HashAggregation::kPartialAggregationPct),
RuntimeCounter(static_cast<int64_t>(aggregationPct)));
RuntimeCounter(saturateCast(aggregationPct)));
}
groupingSet_->resetTable(/*freeTable=*/false);
partialFull_ = false;
Expand Down
Loading
Loading