Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions presto-native-execution/presto_cpp/main/PeriodicTaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -540,16 +540,42 @@ void PeriodicTaskManager::addWatchdogTask() {
}
RECORD_METRIC_VALUE(kCounterNumStuckDrivers, stuckOpCalls.size());

const char* detachReason = nullptr;

// Detach worker from the cluster if more than a certain number of
// driver threads are blocked by stuck operators (one unique operator
// can only get stuck on one unique thread).
const auto numStuckOperatorsToDetachWorker = std::min(
SystemConfig::instance()->driverNumStuckOperatorsToDetachWorker(),
numDriverThreads_);
if (stuckOpCalls.size() >= numStuckOperatorsToDetachWorker) {
detachWorker("detected stuck operators");
detachReason = "detected stuck operators";
} else if (!deadlockTasks.empty()) {
detachWorker("starving or deadlocked task");
detachReason = "starving or deadlocked task";
}

// Detach worker from the cluster if it has been overloaded for too
// long.
const auto now = velox::getCurrentTimeSec();
const auto lastNotOverloadedTime =
taskManager_->lastNotOverloadedTimeInSecs();
const auto overloadedDurationSec =
taskManager_->isServerOverloaded() && (now > lastNotOverloadedTime)
? now - lastNotOverloadedTime
: 0UL;
RECORD_METRIC_VALUE(
kCounterOverloadedDurationSec, overloadedDurationSec);
if (detachReason == nullptr) {
const uint64_t secondsThreshold =
SystemConfig::instance()->workerOverloadedSecondsToDetachWorker();
if (secondsThreshold > 0 &&
overloadedDurationSec > secondsThreshold) {
detachReason = "worker has been overloaded for too long";
}
}

if (detachReason != nullptr) {
detachWorker(detachReason);
} else {
maybeAttachWorker();
}
Expand Down
10 changes: 9 additions & 1 deletion presto-native-execution/presto_cpp/main/TaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,8 @@ TaskManager::TaskManager(
driverExecutor,
spillerExecutor)),
bufferManager_(velox::exec::OutputBufferManager::getInstanceRef()),
httpSrvCpuExecutor_(httpSrvCpuExecutor) {
httpSrvCpuExecutor_(httpSrvCpuExecutor),
lastNotOverloadedTimeInSecs_(velox::getCurrentTimeSec()) {
VELOX_CHECK_NOT_NULL(bufferManager_, "invalid OutputBufferManager");
}

Expand Down Expand Up @@ -471,6 +472,13 @@ TaskManager::buildTaskSpillDirectoryPath(
std::move(taskSpillDirPath), std::move(dateSpillDirPath));
}

void TaskManager::setServerOverloaded(bool serverOverloaded) {
serverOverloaded_ = serverOverloaded;
if (!serverOverloaded) {
lastNotOverloadedTimeInSecs_ = velox::getCurrentTimeSec();
}
}

void TaskManager::getDataForResultRequests(
const std::unordered_map<int64_t, std::shared_ptr<ResultRequest>>&
resultRequests) {
Expand Down
11 changes: 9 additions & 2 deletions presto-native-execution/presto_cpp/main/TaskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,14 @@ class TaskManager {

/// Presto Server can notify the Task Manager that the former is overloaded,
/// so the Task Manager can optionally change Task admission algorithm.
void setServerOverloaded(bool serverOverloaded) {
serverOverloaded_ = serverOverloaded;
void setServerOverloaded(bool serverOverloaded);

bool isServerOverloaded() const {
return serverOverloaded_;
}

uint64_t lastNotOverloadedTimeInSecs() const {
return lastNotOverloadedTimeInSecs_;
}

/// Returns last known number of queued drivers. Used in determining if the
Expand Down Expand Up @@ -236,6 +242,7 @@ class TaskManager {
folly::Synchronized<TaskQueue> taskQueue_;
folly::Executor* httpSrvCpuExecutor_;
std::atomic_bool serverOverloaded_{false};
std::atomic_uint64_t lastNotOverloadedTimeInSecs_;
std::atomic_uint32_t numQueuedDrivers_{0};
};

Expand Down
6 changes: 6 additions & 0 deletions presto-native-execution/presto_cpp/main/common/Configs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ SystemConfig::SystemConfig() {
NUM_PROP(kWorkerOverloadedThresholdCpuPct, 0),
NUM_PROP(kWorkerOverloadedThresholdNumQueuedDriversHwMultiplier, 0.0),
NUM_PROP(kWorkerOverloadedCooldownPeriodSec, 5),
NUM_PROP(kWorkerOverloadedSecondsToDetachWorker, 0),
BOOL_PROP(kWorkerOverloadedTaskQueuingEnabled, false),
NUM_PROP(kMallocHeapDumpThresholdGb, 20),
NUM_PROP(kMallocMemMinHeapDumpInterval, 10),
Expand Down Expand Up @@ -576,6 +577,11 @@ uint32_t SystemConfig::workerOverloadedCooldownPeriodSec() const {
return optionalProperty<uint32_t>(kWorkerOverloadedCooldownPeriodSec).value();
}

uint64_t SystemConfig::workerOverloadedSecondsToDetachWorker() const {
return optionalProperty<uint64_t>(kWorkerOverloadedSecondsToDetachWorker)
.value();
}

bool SystemConfig::workerOverloadedTaskQueuingEnabled() const {
return optionalProperty<bool>(kWorkerOverloadedTaskQueuingEnabled).value();
}
Expand Down
7 changes: 7 additions & 0 deletions presto-native-execution/presto_cpp/main/common/Configs.h
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,11 @@ class SystemConfig : public ConfigBase {
/// This is to prevent spiky fluctuation of the overloaded status.
static constexpr std::string_view kWorkerOverloadedCooldownPeriodSec{
"worker-overloaded-cooldown-period-sec"};
/// The number of seconds the worker needs to be continuously overloaded for
/// us to detach the worker from the cluster in an attempt to keep the
/// cluster operational. Ignored if set to zero. Default is zero.
static constexpr std::string_view kWorkerOverloadedSecondsToDetachWorker{
"worker-overloaded-seconds-to-detach-worker"};
/// If true, the worker starts queuing new tasks when overloaded, and
/// starts them gradually when it stops being overloaded.
static constexpr std::string_view kWorkerOverloadedTaskQueuingEnabled{
Expand Down Expand Up @@ -948,6 +953,8 @@ class SystemConfig : public ConfigBase {

uint32_t workerOverloadedCooldownPeriodSec() const;

uint64_t workerOverloadedSecondsToDetachWorker() const;

bool workerOverloadedTaskQueuingEnabled() const;

bool mallocMemHeapDumpEnabled() const;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ void registerPrestoMetrics() {
DEFINE_METRIC(kCounterOverloaded, facebook::velox::StatType::AVG);
DEFINE_METRIC(kCounterNumStuckDrivers, facebook::velox::StatType::AVG);
DEFINE_METRIC(kCounterTaskPlannedTimeMs, facebook::velox::StatType::AVG);
DEFINE_METRIC(kCounterOverloadedDurationSec, facebook::velox::StatType::AVG);
DEFINE_METRIC(
kCounterTotalPartitionedOutputBuffer, facebook::velox::StatType::AVG);
DEFINE_METRIC(
Expand Down
4 changes: 4 additions & 0 deletions presto-native-execution/presto_cpp/main/common/Counters.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ constexpr folly::StringPiece kCounterOverloaded{"presto_cpp.overloaded"};
/// planned) in milliseconds.
constexpr folly::StringPiece kCounterTaskPlannedTimeMs{
"presto_cpp.task_planned_time_ms"};
/// Exports the current overloaded duration in seconds or 0 if not currently
/// overloaded.
constexpr folly::StringPiece kCounterOverloadedDurationSec{
"presto_cpp.overloaded_duration_sec"};

/// Number of total OutputBuffer managed by all
/// OutputBufferManager
Expand Down
Loading