diff --git a/presto-native-execution/presto_cpp/main/PeriodicTaskManager.cpp b/presto-native-execution/presto_cpp/main/PeriodicTaskManager.cpp index f1a18146c327d..67124adf0a14f 100644 --- a/presto-native-execution/presto_cpp/main/PeriodicTaskManager.cpp +++ b/presto-native-execution/presto_cpp/main/PeriodicTaskManager.cpp @@ -540,6 +540,8 @@ void PeriodicTaskManager::addWatchdogTask() { } RECORD_METRIC_VALUE(kCounterNumStuckDrivers, stuckOpCalls.size()); + const char* detachReason = nullptr; + // Detach worker from the cluster if more than a certain number of // driver threads are blocked by stuck operators (one unique operator // can only get stuck on one unique thread). @@ -547,9 +549,33 @@ void PeriodicTaskManager::addWatchdogTask() { SystemConfig::instance()->driverNumStuckOperatorsToDetachWorker(), numDriverThreads_); if (stuckOpCalls.size() >= numStuckOperatorsToDetachWorker) { - detachWorker("detected stuck operators"); + detachReason = "detected stuck operators"; } else if (!deadlockTasks.empty()) { - detachWorker("starving or deadlocked task"); + detachReason = "starving or deadlocked task"; + } + + // Detach worker from the cluster if it has been overloaded for too + // long. + const auto now = velox::getCurrentTimeSec(); + const auto lastNotOverloadedTime = + taskManager_->lastNotOverloadedTimeInSecs(); + const auto overloadedDurationSec = + taskManager_->isServerOverloaded() && (now > lastNotOverloadedTime) + ? now - lastNotOverloadedTime + : 0UL; + RECORD_METRIC_VALUE( + kCounterOverloadedDurationSec, overloadedDurationSec); + if (detachReason == nullptr) { + const uint64_t secondsThreshold = + SystemConfig::instance()->workerOverloadedSecondsToDetachWorker(); + if (secondsThreshold > 0 && + overloadedDurationSec > secondsThreshold) { + detachReason = "worker has been overloaded for too long"; + } + } + + if (detachReason != nullptr) { + detachWorker(detachReason); } else { maybeAttachWorker(); } diff --git a/presto-native-execution/presto_cpp/main/TaskManager.cpp b/presto-native-execution/presto_cpp/main/TaskManager.cpp index 2d849295873ff..257841d115dae 100644 --- a/presto-native-execution/presto_cpp/main/TaskManager.cpp +++ b/presto-native-execution/presto_cpp/main/TaskManager.cpp @@ -360,7 +360,8 @@ TaskManager::TaskManager( driverExecutor, spillerExecutor)), bufferManager_(velox::exec::OutputBufferManager::getInstanceRef()), - httpSrvCpuExecutor_(httpSrvCpuExecutor) { + httpSrvCpuExecutor_(httpSrvCpuExecutor), + lastNotOverloadedTimeInSecs_(velox::getCurrentTimeSec()) { VELOX_CHECK_NOT_NULL(bufferManager_, "invalid OutputBufferManager"); } @@ -471,6 +472,13 @@ TaskManager::buildTaskSpillDirectoryPath( std::move(taskSpillDirPath), std::move(dateSpillDirPath)); } +void TaskManager::setServerOverloaded(bool serverOverloaded) { + serverOverloaded_ = serverOverloaded; + if (!serverOverloaded) { + lastNotOverloadedTimeInSecs_ = velox::getCurrentTimeSec(); + } +} + void TaskManager::getDataForResultRequests( const std::unordered_map>& resultRequests) { diff --git a/presto-native-execution/presto_cpp/main/TaskManager.h b/presto-native-execution/presto_cpp/main/TaskManager.h index ea9b493e7ffd1..6d0fac7af8396 100644 --- a/presto-native-execution/presto_cpp/main/TaskManager.h +++ b/presto-native-execution/presto_cpp/main/TaskManager.h @@ -179,8 +179,14 @@ class TaskManager { /// Presto Server can notify the Task Manager that the former is overloaded, /// so the Task Manager can optionally change Task admission algorithm. - void setServerOverloaded(bool serverOverloaded) { - serverOverloaded_ = serverOverloaded; + void setServerOverloaded(bool serverOverloaded); + + bool isServerOverloaded() const { + return serverOverloaded_; + } + + uint64_t lastNotOverloadedTimeInSecs() const { + return lastNotOverloadedTimeInSecs_; } /// Returns last known number of queued drivers. Used in determining if the @@ -236,6 +242,7 @@ class TaskManager { folly::Synchronized taskQueue_; folly::Executor* httpSrvCpuExecutor_; std::atomic_bool serverOverloaded_{false}; + std::atomic_uint64_t lastNotOverloadedTimeInSecs_; std::atomic_uint32_t numQueuedDrivers_{0}; }; diff --git a/presto-native-execution/presto_cpp/main/common/Configs.cpp b/presto-native-execution/presto_cpp/main/common/Configs.cpp index d9100731dc5ee..bf0da5cc5f18a 100644 --- a/presto-native-execution/presto_cpp/main/common/Configs.cpp +++ b/presto-native-execution/presto_cpp/main/common/Configs.cpp @@ -188,6 +188,7 @@ SystemConfig::SystemConfig() { NUM_PROP(kWorkerOverloadedThresholdCpuPct, 0), NUM_PROP(kWorkerOverloadedThresholdNumQueuedDriversHwMultiplier, 0.0), NUM_PROP(kWorkerOverloadedCooldownPeriodSec, 5), + NUM_PROP(kWorkerOverloadedSecondsToDetachWorker, 0), BOOL_PROP(kWorkerOverloadedTaskQueuingEnabled, false), NUM_PROP(kMallocHeapDumpThresholdGb, 20), NUM_PROP(kMallocMemMinHeapDumpInterval, 10), @@ -576,6 +577,11 @@ uint32_t SystemConfig::workerOverloadedCooldownPeriodSec() const { return optionalProperty(kWorkerOverloadedCooldownPeriodSec).value(); } +uint64_t SystemConfig::workerOverloadedSecondsToDetachWorker() const { + return optionalProperty(kWorkerOverloadedSecondsToDetachWorker) + .value(); +} + bool SystemConfig::workerOverloadedTaskQueuingEnabled() const { return optionalProperty(kWorkerOverloadedTaskQueuingEnabled).value(); } diff --git a/presto-native-execution/presto_cpp/main/common/Configs.h b/presto-native-execution/presto_cpp/main/common/Configs.h index ed838ed69f311..f6708cbac733b 100644 --- a/presto-native-execution/presto_cpp/main/common/Configs.h +++ b/presto-native-execution/presto_cpp/main/common/Configs.h @@ -353,6 +353,11 @@ class SystemConfig : public ConfigBase { /// This is to prevent spiky fluctuation of the overloaded status. static constexpr std::string_view kWorkerOverloadedCooldownPeriodSec{ "worker-overloaded-cooldown-period-sec"}; + /// The number of seconds the worker needs to be continuously overloaded for + /// us to detach the worker from the cluster in an attempt to keep the + /// cluster operational. Ignored if set to zero. Default is zero. + static constexpr std::string_view kWorkerOverloadedSecondsToDetachWorker{ + "worker-overloaded-seconds-to-detach-worker"}; /// If true, the worker starts queuing new tasks when overloaded, and /// starts them gradually when it stops being overloaded. static constexpr std::string_view kWorkerOverloadedTaskQueuingEnabled{ @@ -948,6 +953,8 @@ class SystemConfig : public ConfigBase { uint32_t workerOverloadedCooldownPeriodSec() const; + uint64_t workerOverloadedSecondsToDetachWorker() const; + bool workerOverloadedTaskQueuingEnabled() const; bool mallocMemHeapDumpEnabled() const; diff --git a/presto-native-execution/presto_cpp/main/common/Counters.cpp b/presto-native-execution/presto_cpp/main/common/Counters.cpp index 2d9b903971440..2f53260e16a64 100644 --- a/presto-native-execution/presto_cpp/main/common/Counters.cpp +++ b/presto-native-execution/presto_cpp/main/common/Counters.cpp @@ -93,6 +93,7 @@ void registerPrestoMetrics() { DEFINE_METRIC(kCounterOverloaded, facebook::velox::StatType::AVG); DEFINE_METRIC(kCounterNumStuckDrivers, facebook::velox::StatType::AVG); DEFINE_METRIC(kCounterTaskPlannedTimeMs, facebook::velox::StatType::AVG); + DEFINE_METRIC(kCounterOverloadedDurationSec, facebook::velox::StatType::AVG); DEFINE_METRIC( kCounterTotalPartitionedOutputBuffer, facebook::velox::StatType::AVG); DEFINE_METRIC( diff --git a/presto-native-execution/presto_cpp/main/common/Counters.h b/presto-native-execution/presto_cpp/main/common/Counters.h index 28e9fc3cc0999..819d330f2d9cb 100644 --- a/presto-native-execution/presto_cpp/main/common/Counters.h +++ b/presto-native-execution/presto_cpp/main/common/Counters.h @@ -135,6 +135,10 @@ constexpr folly::StringPiece kCounterOverloaded{"presto_cpp.overloaded"}; /// planned) in milliseconds. constexpr folly::StringPiece kCounterTaskPlannedTimeMs{ "presto_cpp.task_planned_time_ms"}; +/// Exports the current overloaded duration in seconds or 0 if not currently +/// overloaded. +constexpr folly::StringPiece kCounterOverloadedDurationSec{ + "presto_cpp.overloaded_duration_sec"}; /// Number of total OutputBuffer managed by all /// OutputBufferManager