Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 2 additions & 221 deletions presto-native-execution/presto_cpp/main/PeriodicTaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@
#include <folly/stop_watch.h>
#include "presto_cpp/main/PrestoExchangeSource.h"
#include "presto_cpp/main/PrestoServer.h"
#include "presto_cpp/main/TaskManager.h"
#include "presto_cpp/main/common/Counters.h"
#include "presto_cpp/main/http/filters/HttpEndpointLatencyFilter.h"
#include "velox/common/base/PeriodicStatsReporter.h"
#include "velox/common/base/StatsReporter.h"
#include "velox/common/base/SuccinctPrinter.h"
#include "velox/common/caching/AsyncDataCache.h"
#include "velox/common/caching/CacheTTLController.h"
#include "velox/common/caching/SsdFile.h"
#include "velox/common/memory/MemoryAllocator.h"
#include "velox/common/memory/MmapAllocator.h"
#include "velox/connectors/hive/HiveConnector.h"
Expand Down Expand Up @@ -75,15 +73,11 @@ folly::StringPiece getCounterForBlockingReason(

// Every two seconds we export server counters.
static constexpr size_t kTaskPeriodGlobalCounters{2'000'000}; // 2 seconds.
// Every two seconds we export memory counters.
static constexpr size_t kMemoryPeriodGlobalCounters{2'000'000}; // 2 seconds.
// Every two seconds we export exchange source counters.
static constexpr size_t kExchangeSourcePeriodGlobalCounters{
2'000'000}; // 2 seconds.
// Every 1 minute we clean old tasks.
static constexpr size_t kTaskPeriodCleanOldTasks{60'000'000}; // 60 seconds.
// Every 1 minute we export cache counters.
static constexpr size_t kCachePeriodGlobalCounters{60'000'000}; // 60 seconds.
// Every 1 minute we export connector counters.
static constexpr size_t kConnectorPeriodGlobalCounters{
60'000'000}; // 60 seconds.
Expand Down Expand Up @@ -116,6 +110,8 @@ void PeriodicTaskManager::start() {
VELOX_CHECK_NOT_NULL(arbitrator_);
velox::PeriodicStatsReporter::Options opts;
opts.arbitrator = arbitrator_->kind() == "NOOP" ? nullptr : arbitrator_;
opts.allocator = memoryAllocator_;
opts.cache = asyncDataCache_;
velox::startPeriodicStatsReporter(opts);

// If executors are null, don't bother starting this task.
Expand All @@ -130,16 +126,8 @@ void PeriodicTaskManager::start() {
addOldTaskCleanupTask();
}

if (memoryAllocator_ != nullptr) {
addMemoryAllocatorStatsTask();
}

addPrestoExchangeSourceMemoryStatsTask();

if (asyncDataCache_ != nullptr) {
addCacheStatsUpdateTask();
}

addConnectorStatsTask();

addOperatingSystemStatsUpdateTask();
Expand Down Expand Up @@ -252,35 +240,6 @@ void PeriodicTaskManager::addOldTaskCleanupTask() {
"clean_old_tasks");
}

void PeriodicTaskManager::updateMemoryAllocatorStats() {
RECORD_METRIC_VALUE(
kCounterMappedMemoryBytes,
(velox::memory::AllocationTraits::pageBytes(
memoryAllocator_->numMapped())));
RECORD_METRIC_VALUE(
kCounterAllocatedMemoryBytes,
(velox::memory::AllocationTraits::pageBytes(
memoryAllocator_->numAllocated())));
// TODO(jtan6): Remove condition after T150019700 is done
if (auto* mmapAllocator =
dynamic_cast<const velox::memory::MmapAllocator*>(memoryAllocator_)) {
RECORD_METRIC_VALUE(
kCounterMmapRawAllocBytesSmall, (mmapAllocator->numMallocBytes()));
RECORD_METRIC_VALUE(
kCounterMmapExternalMappedBytes,
velox::memory::AllocationTraits::pageBytes(
(mmapAllocator->numExternalMapped())));
}
// TODO(xiaoxmeng): add memory allocation size stats.
}

void PeriodicTaskManager::addMemoryAllocatorStatsTask() {
addTask(
[this]() { updateMemoryAllocatorStats(); },
kMemoryPeriodGlobalCounters,
"mmap_memory_counters");
}

void PeriodicTaskManager::updatePrestoExchangeSourceMemoryStats() {
int64_t currQueuedMemoryBytes{0};
int64_t peakQueuedMemoryBytes{0};
Expand All @@ -298,184 +257,6 @@ void PeriodicTaskManager::addPrestoExchangeSourceMemoryStatsTask() {
"exchange_source_counters");
}

void PeriodicTaskManager::updateCacheStats() {
const auto memoryCacheStats = asyncDataCache_->refreshStats();

// Snapshots.
RECORD_METRIC_VALUE(
kCounterMemoryCacheNumEntries, memoryCacheStats.numEntries);
RECORD_METRIC_VALUE(
kCounterMemoryCacheNumEmptyEntries, memoryCacheStats.numEmptyEntries);
RECORD_METRIC_VALUE(
kCounterMemoryCacheNumSharedEntries, memoryCacheStats.numShared);
RECORD_METRIC_VALUE(
kCounterMemoryCacheNumExclusiveEntries, memoryCacheStats.numExclusive);
RECORD_METRIC_VALUE(
kCounterMemoryCacheNumPrefetchedEntries, memoryCacheStats.numPrefetch);
RECORD_METRIC_VALUE(
kCounterMemoryCacheTotalTinyBytes, memoryCacheStats.tinySize);
RECORD_METRIC_VALUE(
kCounterMemoryCacheTotalLargeBytes, memoryCacheStats.largeSize);
RECORD_METRIC_VALUE(
kCounterMemoryCacheTotalTinyPaddingBytes, memoryCacheStats.tinyPadding);
RECORD_METRIC_VALUE(
kCounterMemoryCacheTotalLargePaddingBytes, memoryCacheStats.largePadding);
RECORD_METRIC_VALUE(
kCounterMemoryCacheTotalPrefetchBytes, memoryCacheStats.prefetchBytes);
RECORD_METRIC_VALUE(
kCounterMemoryCacheSumEvictScore, memoryCacheStats.sumEvictScore);

// Interval cumulatives.
RECORD_METRIC_VALUE(
kCounterMemoryCacheNumHit,
memoryCacheStats.numHit - lastMemoryCacheHits_);
RECORD_METRIC_VALUE(
kCounterMemoryCacheHitBytes,
memoryCacheStats.hitBytes - lastMemoryCacheHitsBytes_);
RECORD_METRIC_VALUE(
kCounterMemoryCacheNumNew,
memoryCacheStats.numNew - lastMemoryCacheInserts_);
RECORD_METRIC_VALUE(
kCounterMemoryCacheNumEvict,
memoryCacheStats.numEvict - lastMemoryCacheEvictions_);
RECORD_METRIC_VALUE(
kCounterMemoryCacheNumEvictChecks,
memoryCacheStats.numEvictChecks - lastMemoryCacheEvictionChecks_);
RECORD_METRIC_VALUE(
kCounterMemoryCacheNumWaitExclusive,
memoryCacheStats.numWaitExclusive - lastMemoryCacheStalls_);
RECORD_METRIC_VALUE(
kCounterMemoryCacheNumAllocClocks,
memoryCacheStats.allocClocks - lastMemoryCacheAllocClocks_);

lastMemoryCacheHits_ = memoryCacheStats.numHit;
lastMemoryCacheHitsBytes_ = memoryCacheStats.hitBytes;
lastMemoryCacheInserts_ = memoryCacheStats.numNew;
lastMemoryCacheEvictions_ = memoryCacheStats.numEvict;
lastMemoryCacheEvictionChecks_ = memoryCacheStats.numEvictChecks;
lastMemoryCacheStalls_ = memoryCacheStats.numWaitExclusive;

// All time cumulatives.
RECORD_METRIC_VALUE(
kCounterMemoryCacheNumCumulativeHit, memoryCacheStats.numHit);
RECORD_METRIC_VALUE(
kCounterMemoryCacheCumulativeHitBytes, memoryCacheStats.hitBytes);
RECORD_METRIC_VALUE(
kCounterMemoryCacheNumCumulativeNew, memoryCacheStats.numNew);
RECORD_METRIC_VALUE(
kCounterMemoryCacheNumCumulativeEvict, memoryCacheStats.numEvict);
RECORD_METRIC_VALUE(
kCounterMemoryCacheNumCumulativeEvictChecks,
memoryCacheStats.numEvictChecks);
RECORD_METRIC_VALUE(
kCounterMemoryCacheNumCumulativeWaitExclusive,
memoryCacheStats.numWaitExclusive);
RECORD_METRIC_VALUE(
kCounterMemoryCacheNumCumulativeAllocClocks,
memoryCacheStats.allocClocks);

if (memoryCacheStats.ssdStats != nullptr) {
RECORD_METRIC_VALUE(
kCounterSsdCacheCumulativeReadEntries,
memoryCacheStats.ssdStats->entriesRead)
RECORD_METRIC_VALUE(
kCounterSsdCacheCumulativeReadBytes,
memoryCacheStats.ssdStats->bytesRead);
RECORD_METRIC_VALUE(
kCounterSsdCacheCumulativeWrittenEntries,
memoryCacheStats.ssdStats->entriesWritten);
RECORD_METRIC_VALUE(
kCounterSsdCacheCumulativeWrittenBytes,
memoryCacheStats.ssdStats->bytesWritten);
RECORD_METRIC_VALUE(
kCounterSsdCacheCumulativeOpenSsdErrors,
memoryCacheStats.ssdStats->openFileErrors);
RECORD_METRIC_VALUE(
kCounterSsdCacheCumulativeOpenCheckpointErrors,
memoryCacheStats.ssdStats->openCheckpointErrors);
RECORD_METRIC_VALUE(
kCounterSsdCacheCumulativeOpenLogErrors,
memoryCacheStats.ssdStats->openLogErrors);
RECORD_METRIC_VALUE(
kCounterSsdCacheCumulativeDeleteCheckpointErrors,
memoryCacheStats.ssdStats->deleteCheckpointErrors);
RECORD_METRIC_VALUE(
kCounterSsdCacheCumulativeGrowFileErrors,
memoryCacheStats.ssdStats->growFileErrors);
RECORD_METRIC_VALUE(
kCounterSsdCacheCumulativeWriteSsdErrors,
memoryCacheStats.ssdStats->writeSsdErrors);
RECORD_METRIC_VALUE(
kCounterSsdCacheCumulativeWriteCheckpointErrors,
memoryCacheStats.ssdStats->writeCheckpointErrors);
RECORD_METRIC_VALUE(
kCounterSsdCacheCumulativeReadSsdErrors,
memoryCacheStats.ssdStats->readSsdErrors);
RECORD_METRIC_VALUE(
kCounterSsdCacheCumulativeReadCheckpointErrors,
memoryCacheStats.ssdStats->readCheckpointErrors);
RECORD_METRIC_VALUE(
kCounterSsdCacheCachedEntries,
memoryCacheStats.ssdStats->entriesCached);
RECORD_METRIC_VALUE(
kCounterSsdCacheCachedRegions,
memoryCacheStats.ssdStats->regionsCached);
RECORD_METRIC_VALUE(
kCounterSsdCacheCachedBytes, memoryCacheStats.ssdStats->bytesCached);
REPORT_IF_NOT_ZERO(
kCounterSsdCacheCheckpointsRead,
memoryCacheStats.ssdStats->checkpointsRead -
lastSsdCacheCheckpointsRead_);
REPORT_IF_NOT_ZERO(
kCounterSsdCacheCheckpointsWritten,
memoryCacheStats.ssdStats->checkpointsWritten -
lastSsdCacheCheckpointsWritten_);
REPORT_IF_NOT_ZERO(
kCounterSsdCacheRegionsEvicted,
memoryCacheStats.ssdStats->regionsEvicted -
lastSsdCacheRegionsEvicted_);

lastSsdCacheCheckpointsWritten_ =
memoryCacheStats.ssdStats->checkpointsWritten;
lastSsdCacheCheckpointsRead_ = memoryCacheStats.ssdStats->checkpointsRead;
lastSsdCacheRegionsEvicted_ =
memoryCacheStats.ssdStats->regionsEvicted - lastSsdCacheRegionsEvicted_;
}

if (auto* cacheTTLController =
velox::cache::CacheTTLController::getInstance()) {
RECORD_METRIC_VALUE(
kCounterCacheMaxAgeSecs,
cacheTTLController->getCacheAgeStats().maxAgeSecs);

RECORD_METRIC_VALUE(
kCounterMemoryCacheNumAgedOutEntries,
memoryCacheStats.numAgedOut - lastMemoryCacheAgedOuts_);
lastMemoryCacheAgedOuts_ = memoryCacheStats.numAgedOut;
RECORD_METRIC_VALUE(
kCounterMemoryCacheNumCumulativeAgedOutEntries,
memoryCacheStats.numAgedOut);

if (memoryCacheStats.ssdStats != nullptr) {
RECORD_METRIC_VALUE(
kCounterSsdCacheCumulativeAgedOutEntries,
memoryCacheStats.ssdStats->entriesAgedOut)
RECORD_METRIC_VALUE(
kCounterSsdCacheCumulativeAgedOutRegions,
memoryCacheStats.ssdStats->regionsAgedOut);
}
}

LOG(INFO) << "Cache stats:\n" << memoryCacheStats.toString();
}

void PeriodicTaskManager::addCacheStatsUpdateTask() {
addTask(
[this]() { updateCacheStats(); },
kCachePeriodGlobalCounters,
"cache_counters");
}

namespace {

class HiveConnectorStatsReporter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,9 @@ class PeriodicTaskManager {
void addOldTaskCleanupTask();
void cleanupOldTask();

void addMemoryAllocatorStatsTask();
void updateMemoryAllocatorStats();

void addPrestoExchangeSourceMemoryStatsTask();
void updatePrestoExchangeSourceMemoryStats();

void addCacheStatsUpdateTask();
void updateCacheStats();

void addConnectorStatsTask();

void addOperatingSystemStatsUpdateTask();
Expand All @@ -125,9 +119,6 @@ class PeriodicTaskManager {
void addHttpEndpointLatencyStatsTask();
void printHttpEndpointLatencyStats();

void addArbitratorStatsTask();
void updateArbitratorStatsTask();

void addWatchdogTask();

void detachWorker(const char* reason);
Expand Down
2 changes: 1 addition & 1 deletion presto-native-execution/velox
Submodule velox updated 135 files