diff --git a/presto-native-execution/presto_cpp/main/PeriodicTaskManager.cpp b/presto-native-execution/presto_cpp/main/PeriodicTaskManager.cpp index 5ee7afcf4198f..2b8f23e56b30e 100644 --- a/presto-native-execution/presto_cpp/main/PeriodicTaskManager.cpp +++ b/presto-native-execution/presto_cpp/main/PeriodicTaskManager.cpp @@ -17,7 +17,6 @@ #include #include "presto_cpp/main/PrestoExchangeSource.h" #include "presto_cpp/main/PrestoServer.h" -#include "presto_cpp/main/TaskManager.h" #include "presto_cpp/main/common/Counters.h" #include "presto_cpp/main/http/filters/HttpEndpointLatencyFilter.h" #include "velox/common/base/PeriodicStatsReporter.h" @@ -25,7 +24,6 @@ #include "velox/common/base/SuccinctPrinter.h" #include "velox/common/caching/AsyncDataCache.h" #include "velox/common/caching/CacheTTLController.h" -#include "velox/common/caching/SsdFile.h" #include "velox/common/memory/MemoryAllocator.h" #include "velox/common/memory/MmapAllocator.h" #include "velox/connectors/hive/HiveConnector.h" @@ -75,15 +73,11 @@ folly::StringPiece getCounterForBlockingReason( // Every two seconds we export server counters. static constexpr size_t kTaskPeriodGlobalCounters{2'000'000}; // 2 seconds. -// Every two seconds we export memory counters. -static constexpr size_t kMemoryPeriodGlobalCounters{2'000'000}; // 2 seconds. // Every two seconds we export exchange source counters. static constexpr size_t kExchangeSourcePeriodGlobalCounters{ 2'000'000}; // 2 seconds. // Every 1 minute we clean old tasks. static constexpr size_t kTaskPeriodCleanOldTasks{60'000'000}; // 60 seconds. -// Every 1 minute we export cache counters. -static constexpr size_t kCachePeriodGlobalCounters{60'000'000}; // 60 seconds. // Every 1 minute we export connector counters. static constexpr size_t kConnectorPeriodGlobalCounters{ 60'000'000}; // 60 seconds. @@ -116,6 +110,8 @@ void PeriodicTaskManager::start() { VELOX_CHECK_NOT_NULL(arbitrator_); velox::PeriodicStatsReporter::Options opts; opts.arbitrator = arbitrator_->kind() == "NOOP" ? nullptr : arbitrator_; + opts.allocator = memoryAllocator_; + opts.cache = asyncDataCache_; velox::startPeriodicStatsReporter(opts); // If executors are null, don't bother starting this task. @@ -130,16 +126,8 @@ void PeriodicTaskManager::start() { addOldTaskCleanupTask(); } - if (memoryAllocator_ != nullptr) { - addMemoryAllocatorStatsTask(); - } - addPrestoExchangeSourceMemoryStatsTask(); - if (asyncDataCache_ != nullptr) { - addCacheStatsUpdateTask(); - } - addConnectorStatsTask(); addOperatingSystemStatsUpdateTask(); @@ -252,35 +240,6 @@ void PeriodicTaskManager::addOldTaskCleanupTask() { "clean_old_tasks"); } -void PeriodicTaskManager::updateMemoryAllocatorStats() { - RECORD_METRIC_VALUE( - kCounterMappedMemoryBytes, - (velox::memory::AllocationTraits::pageBytes( - memoryAllocator_->numMapped()))); - RECORD_METRIC_VALUE( - kCounterAllocatedMemoryBytes, - (velox::memory::AllocationTraits::pageBytes( - memoryAllocator_->numAllocated()))); - // TODO(jtan6): Remove condition after T150019700 is done - if (auto* mmapAllocator = - dynamic_cast(memoryAllocator_)) { - RECORD_METRIC_VALUE( - kCounterMmapRawAllocBytesSmall, (mmapAllocator->numMallocBytes())); - RECORD_METRIC_VALUE( - kCounterMmapExternalMappedBytes, - velox::memory::AllocationTraits::pageBytes( - (mmapAllocator->numExternalMapped()))); - } - // TODO(xiaoxmeng): add memory allocation size stats. -} - -void PeriodicTaskManager::addMemoryAllocatorStatsTask() { - addTask( - [this]() { updateMemoryAllocatorStats(); }, - kMemoryPeriodGlobalCounters, - "mmap_memory_counters"); -} - void PeriodicTaskManager::updatePrestoExchangeSourceMemoryStats() { int64_t currQueuedMemoryBytes{0}; int64_t peakQueuedMemoryBytes{0}; @@ -298,184 +257,6 @@ void PeriodicTaskManager::addPrestoExchangeSourceMemoryStatsTask() { "exchange_source_counters"); } -void PeriodicTaskManager::updateCacheStats() { - const auto memoryCacheStats = asyncDataCache_->refreshStats(); - - // Snapshots. - RECORD_METRIC_VALUE( - kCounterMemoryCacheNumEntries, memoryCacheStats.numEntries); - RECORD_METRIC_VALUE( - kCounterMemoryCacheNumEmptyEntries, memoryCacheStats.numEmptyEntries); - RECORD_METRIC_VALUE( - kCounterMemoryCacheNumSharedEntries, memoryCacheStats.numShared); - RECORD_METRIC_VALUE( - kCounterMemoryCacheNumExclusiveEntries, memoryCacheStats.numExclusive); - RECORD_METRIC_VALUE( - kCounterMemoryCacheNumPrefetchedEntries, memoryCacheStats.numPrefetch); - RECORD_METRIC_VALUE( - kCounterMemoryCacheTotalTinyBytes, memoryCacheStats.tinySize); - RECORD_METRIC_VALUE( - kCounterMemoryCacheTotalLargeBytes, memoryCacheStats.largeSize); - RECORD_METRIC_VALUE( - kCounterMemoryCacheTotalTinyPaddingBytes, memoryCacheStats.tinyPadding); - RECORD_METRIC_VALUE( - kCounterMemoryCacheTotalLargePaddingBytes, memoryCacheStats.largePadding); - RECORD_METRIC_VALUE( - kCounterMemoryCacheTotalPrefetchBytes, memoryCacheStats.prefetchBytes); - RECORD_METRIC_VALUE( - kCounterMemoryCacheSumEvictScore, memoryCacheStats.sumEvictScore); - - // Interval cumulatives. - RECORD_METRIC_VALUE( - kCounterMemoryCacheNumHit, - memoryCacheStats.numHit - lastMemoryCacheHits_); - RECORD_METRIC_VALUE( - kCounterMemoryCacheHitBytes, - memoryCacheStats.hitBytes - lastMemoryCacheHitsBytes_); - RECORD_METRIC_VALUE( - kCounterMemoryCacheNumNew, - memoryCacheStats.numNew - lastMemoryCacheInserts_); - RECORD_METRIC_VALUE( - kCounterMemoryCacheNumEvict, - memoryCacheStats.numEvict - lastMemoryCacheEvictions_); - RECORD_METRIC_VALUE( - kCounterMemoryCacheNumEvictChecks, - memoryCacheStats.numEvictChecks - lastMemoryCacheEvictionChecks_); - RECORD_METRIC_VALUE( - kCounterMemoryCacheNumWaitExclusive, - memoryCacheStats.numWaitExclusive - lastMemoryCacheStalls_); - RECORD_METRIC_VALUE( - kCounterMemoryCacheNumAllocClocks, - memoryCacheStats.allocClocks - lastMemoryCacheAllocClocks_); - - lastMemoryCacheHits_ = memoryCacheStats.numHit; - lastMemoryCacheHitsBytes_ = memoryCacheStats.hitBytes; - lastMemoryCacheInserts_ = memoryCacheStats.numNew; - lastMemoryCacheEvictions_ = memoryCacheStats.numEvict; - lastMemoryCacheEvictionChecks_ = memoryCacheStats.numEvictChecks; - lastMemoryCacheStalls_ = memoryCacheStats.numWaitExclusive; - - // All time cumulatives. - RECORD_METRIC_VALUE( - kCounterMemoryCacheNumCumulativeHit, memoryCacheStats.numHit); - RECORD_METRIC_VALUE( - kCounterMemoryCacheCumulativeHitBytes, memoryCacheStats.hitBytes); - RECORD_METRIC_VALUE( - kCounterMemoryCacheNumCumulativeNew, memoryCacheStats.numNew); - RECORD_METRIC_VALUE( - kCounterMemoryCacheNumCumulativeEvict, memoryCacheStats.numEvict); - RECORD_METRIC_VALUE( - kCounterMemoryCacheNumCumulativeEvictChecks, - memoryCacheStats.numEvictChecks); - RECORD_METRIC_VALUE( - kCounterMemoryCacheNumCumulativeWaitExclusive, - memoryCacheStats.numWaitExclusive); - RECORD_METRIC_VALUE( - kCounterMemoryCacheNumCumulativeAllocClocks, - memoryCacheStats.allocClocks); - - if (memoryCacheStats.ssdStats != nullptr) { - RECORD_METRIC_VALUE( - kCounterSsdCacheCumulativeReadEntries, - memoryCacheStats.ssdStats->entriesRead) - RECORD_METRIC_VALUE( - kCounterSsdCacheCumulativeReadBytes, - memoryCacheStats.ssdStats->bytesRead); - RECORD_METRIC_VALUE( - kCounterSsdCacheCumulativeWrittenEntries, - memoryCacheStats.ssdStats->entriesWritten); - RECORD_METRIC_VALUE( - kCounterSsdCacheCumulativeWrittenBytes, - memoryCacheStats.ssdStats->bytesWritten); - RECORD_METRIC_VALUE( - kCounterSsdCacheCumulativeOpenSsdErrors, - memoryCacheStats.ssdStats->openFileErrors); - RECORD_METRIC_VALUE( - kCounterSsdCacheCumulativeOpenCheckpointErrors, - memoryCacheStats.ssdStats->openCheckpointErrors); - RECORD_METRIC_VALUE( - kCounterSsdCacheCumulativeOpenLogErrors, - memoryCacheStats.ssdStats->openLogErrors); - RECORD_METRIC_VALUE( - kCounterSsdCacheCumulativeDeleteCheckpointErrors, - memoryCacheStats.ssdStats->deleteCheckpointErrors); - RECORD_METRIC_VALUE( - kCounterSsdCacheCumulativeGrowFileErrors, - memoryCacheStats.ssdStats->growFileErrors); - RECORD_METRIC_VALUE( - kCounterSsdCacheCumulativeWriteSsdErrors, - memoryCacheStats.ssdStats->writeSsdErrors); - RECORD_METRIC_VALUE( - kCounterSsdCacheCumulativeWriteCheckpointErrors, - memoryCacheStats.ssdStats->writeCheckpointErrors); - RECORD_METRIC_VALUE( - kCounterSsdCacheCumulativeReadSsdErrors, - memoryCacheStats.ssdStats->readSsdErrors); - RECORD_METRIC_VALUE( - kCounterSsdCacheCumulativeReadCheckpointErrors, - memoryCacheStats.ssdStats->readCheckpointErrors); - RECORD_METRIC_VALUE( - kCounterSsdCacheCachedEntries, - memoryCacheStats.ssdStats->entriesCached); - RECORD_METRIC_VALUE( - kCounterSsdCacheCachedRegions, - memoryCacheStats.ssdStats->regionsCached); - RECORD_METRIC_VALUE( - kCounterSsdCacheCachedBytes, memoryCacheStats.ssdStats->bytesCached); - REPORT_IF_NOT_ZERO( - kCounterSsdCacheCheckpointsRead, - memoryCacheStats.ssdStats->checkpointsRead - - lastSsdCacheCheckpointsRead_); - REPORT_IF_NOT_ZERO( - kCounterSsdCacheCheckpointsWritten, - memoryCacheStats.ssdStats->checkpointsWritten - - lastSsdCacheCheckpointsWritten_); - REPORT_IF_NOT_ZERO( - kCounterSsdCacheRegionsEvicted, - memoryCacheStats.ssdStats->regionsEvicted - - lastSsdCacheRegionsEvicted_); - - lastSsdCacheCheckpointsWritten_ = - memoryCacheStats.ssdStats->checkpointsWritten; - lastSsdCacheCheckpointsRead_ = memoryCacheStats.ssdStats->checkpointsRead; - lastSsdCacheRegionsEvicted_ = - memoryCacheStats.ssdStats->regionsEvicted - lastSsdCacheRegionsEvicted_; - } - - if (auto* cacheTTLController = - velox::cache::CacheTTLController::getInstance()) { - RECORD_METRIC_VALUE( - kCounterCacheMaxAgeSecs, - cacheTTLController->getCacheAgeStats().maxAgeSecs); - - RECORD_METRIC_VALUE( - kCounterMemoryCacheNumAgedOutEntries, - memoryCacheStats.numAgedOut - lastMemoryCacheAgedOuts_); - lastMemoryCacheAgedOuts_ = memoryCacheStats.numAgedOut; - RECORD_METRIC_VALUE( - kCounterMemoryCacheNumCumulativeAgedOutEntries, - memoryCacheStats.numAgedOut); - - if (memoryCacheStats.ssdStats != nullptr) { - RECORD_METRIC_VALUE( - kCounterSsdCacheCumulativeAgedOutEntries, - memoryCacheStats.ssdStats->entriesAgedOut) - RECORD_METRIC_VALUE( - kCounterSsdCacheCumulativeAgedOutRegions, - memoryCacheStats.ssdStats->regionsAgedOut); - } - } - - LOG(INFO) << "Cache stats:\n" << memoryCacheStats.toString(); -} - -void PeriodicTaskManager::addCacheStatsUpdateTask() { - addTask( - [this]() { updateCacheStats(); }, - kCachePeriodGlobalCounters, - "cache_counters"); -} - namespace { class HiveConnectorStatsReporter { diff --git a/presto-native-execution/presto_cpp/main/PeriodicTaskManager.h b/presto-native-execution/presto_cpp/main/PeriodicTaskManager.h index b639c4aaa89f5..0098eecdd9f22 100644 --- a/presto-native-execution/presto_cpp/main/PeriodicTaskManager.h +++ b/presto-native-execution/presto_cpp/main/PeriodicTaskManager.h @@ -104,15 +104,9 @@ class PeriodicTaskManager { void addOldTaskCleanupTask(); void cleanupOldTask(); - void addMemoryAllocatorStatsTask(); - void updateMemoryAllocatorStats(); - void addPrestoExchangeSourceMemoryStatsTask(); void updatePrestoExchangeSourceMemoryStats(); - void addCacheStatsUpdateTask(); - void updateCacheStats(); - void addConnectorStatsTask(); void addOperatingSystemStatsUpdateTask(); @@ -125,9 +119,6 @@ class PeriodicTaskManager { void addHttpEndpointLatencyStatsTask(); void printHttpEndpointLatencyStats(); - void addArbitratorStatsTask(); - void updateArbitratorStatsTask(); - void addWatchdogTask(); void detachWorker(const char* reason); diff --git a/presto-native-execution/velox b/presto-native-execution/velox index ac553396c068c..2c98308b4563d 160000 --- a/presto-native-execution/velox +++ b/presto-native-execution/velox @@ -1 +1 @@ -Subproject commit ac553396c068c8ef2de10aa83374cb40c457559c +Subproject commit 2c98308b4563d0c58ab016708b835bb7fce4a9ce