diff --git a/presto-native-execution/presto_cpp/main/TaskManager.cpp b/presto-native-execution/presto_cpp/main/TaskManager.cpp index 855ca7f8e8c9c..ab1f17f9b70b3 100644 --- a/presto-native-execution/presto_cpp/main/TaskManager.cpp +++ b/presto-native-execution/presto_cpp/main/TaskManager.cpp @@ -37,6 +37,21 @@ namespace facebook::presto { constexpr uint32_t kMaxConcurrentLifespans{16}; namespace { +// We request cancellation for tasks which haven't been accessed by coordinator +// for a considerable time. +void cancelAbandonedTasksInternal(const TaskMap& taskMap, int32_t abandonedMs) { + for (const auto& [id, prestoTask] : taskMap) { + if (prestoTask->task != nullptr) { + if (prestoTask->task->isRunning()) { + if (prestoTask->timeSinceLastHeartbeatMs() >= abandonedMs) { + LOG(INFO) << "Cancelling abandoned task '" << id << "'."; + prestoTask->task->requestCancel(); + } + } + } + } +} + // If spilling is enabled and the given Task can spill, then this helper // generates the spilling directory path for the Task, and sets the path to it // in the Task. @@ -688,6 +703,8 @@ size_t TaskManager::cleanOldTasks() { taskIdsToClean.emplace(id); } } + + cancelAbandonedTasksInternal(taskMap, oldTaskCleanUpMs_); } const auto elapsedMs = (getCurrentTimeMs() - startTimeMs); @@ -725,6 +742,13 @@ size_t TaskManager::cleanOldTasks() { return taskIdsToClean.size(); } +void TaskManager::cancelAbandonedTasks() { + // We copy task map locally to avoid locking task map for a potentially long + // time. We also lock for 'read'. + const TaskMap taskMap = *(taskMap_.rlock()); + cancelAbandonedTasksInternal(taskMap, oldTaskCleanUpMs_); +} + folly::Future> TaskManager::getTaskInfo( const TaskId& taskId, bool summarize, @@ -1114,6 +1138,7 @@ void TaskManager::shutdown() { << " seconds so far) for 'Running' tasks to complete. " << numTasks << " tasks left: " << PrestoTask::taskNumbersToString(taskNumbers); std::this_thread::sleep_for(std::chrono::seconds(1)); + cancelAbandonedTasks(); taskNumbers = getTaskNumbers(numTasks); ++seconds; } diff --git a/presto-native-execution/presto_cpp/main/TaskManager.h b/presto-native-execution/presto_cpp/main/TaskManager.h index 9526cc74f28ce..1f30c66a8361c 100644 --- a/presto-native-execution/presto_cpp/main/TaskManager.h +++ b/presto-native-execution/presto_cpp/main/TaskManager.h @@ -167,6 +167,10 @@ class TaskManager { "concurrent_lifespans_per_task"}; static constexpr folly::StringPiece kSessionTimezone{"session_timezone"}; + // We request cancellation for tasks which haven't been accessed by + // coordinator for a considerable time. + void cancelAbandonedTasks(); + std::unique_ptr createOrUpdateTask( const protocol::TaskId& taskId, const velox::core::PlanFragment& planFragment,