Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions presto-native-execution/presto_cpp/main/TaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,21 @@ namespace facebook::presto {
constexpr uint32_t kMaxConcurrentLifespans{16};

namespace {
// We request cancellation for tasks which haven't been accessed by coordinator
// for a considerable time.
void cancelAbandonedTasksInternal(const TaskMap& taskMap, int32_t abandonedMs) {
for (const auto& [id, prestoTask] : taskMap) {
if (prestoTask->task != nullptr) {
if (prestoTask->task->isRunning()) {
if (prestoTask->timeSinceLastHeartbeatMs() >= abandonedMs) {
LOG(INFO) << "Cancelling abandoned task '" << id << "'.";
prestoTask->task->requestCancel();
}
}
}
}
}

// If spilling is enabled and the given Task can spill, then this helper
// generates the spilling directory path for the Task, and sets the path to it
// in the Task.
Expand Down Expand Up @@ -688,6 +703,8 @@ size_t TaskManager::cleanOldTasks() {
taskIdsToClean.emplace(id);
}
}

cancelAbandonedTasksInternal(taskMap, oldTaskCleanUpMs_);
}

const auto elapsedMs = (getCurrentTimeMs() - startTimeMs);
Expand Down Expand Up @@ -725,6 +742,13 @@ size_t TaskManager::cleanOldTasks() {
return taskIdsToClean.size();
}

void TaskManager::cancelAbandonedTasks() {
// We copy task map locally to avoid locking task map for a potentially long
// time. We also lock for 'read'.
const TaskMap taskMap = *(taskMap_.rlock());
cancelAbandonedTasksInternal(taskMap, oldTaskCleanUpMs_);
}

folly::Future<std::unique_ptr<protocol::TaskInfo>> TaskManager::getTaskInfo(
const TaskId& taskId,
bool summarize,
Expand Down Expand Up @@ -1114,6 +1138,7 @@ void TaskManager::shutdown() {
<< " seconds so far) for 'Running' tasks to complete. " << numTasks
<< " tasks left: " << PrestoTask::taskNumbersToString(taskNumbers);
std::this_thread::sleep_for(std::chrono::seconds(1));
cancelAbandonedTasks();
taskNumbers = getTaskNumbers(numTasks);
++seconds;
}
Expand Down
4 changes: 4 additions & 0 deletions presto-native-execution/presto_cpp/main/TaskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ class TaskManager {
"concurrent_lifespans_per_task"};
static constexpr folly::StringPiece kSessionTimezone{"session_timezone"};

// We request cancellation for tasks which haven't been accessed by
// coordinator for a considerable time.
void cancelAbandonedTasks();

std::unique_ptr<protocol::TaskInfo> createOrUpdateTask(
const protocol::TaskId& taskId,
const velox::core::PlanFragment& planFragment,
Expand Down