Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 43 additions & 43 deletions presto-native-execution/presto_cpp/main/TaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,53 +55,53 @@ void cancelAbandonedTasksInternal(const TaskMap& taskMap, int32_t abandonedMs) {
}
}

// If spilling is enabled and the given Task can spill, then this helper
// generates the spilling directory path for the Task, and sets the path to it
// in the Task.
static void maybeSetupTaskSpillDirectory(
// If spilling is enabled and the task plan fragment can spill, then this helper
// generates the disk spilling options for the task.
std::optional<common::SpillDiskOptions> getTaskSpillOptions(
const TaskId& taskId,
const core::PlanFragment& planFragment,
exec::Task& execTask,
const std::string& baseSpillDirectory) {
if (baseSpillDirectory.empty() ||
!planFragment.canSpill(execTask.queryCtx()->queryConfig())) {
return;
const std::shared_ptr<core::QueryCtx>& queryCtx,
const std::string& baseSpillDir) {
if (baseSpillDir.empty() || !planFragment.canSpill(queryCtx->queryConfig())) {
return std::nullopt;
}

const auto includeNodeInSpillPath =
common::SpillDiskOptions spillDiskOpts;
const bool includeNodeInSpillPath =
SystemConfig::instance()->includeNodeInSpillPath();
auto nodeConfig = NodeConfig::instance();
const auto [taskSpillDirPath, dateSpillDirPath] =
TaskManager::buildTaskSpillDirectoryPath(
baseSpillDirectory,
baseSpillDir,
nodeConfig->nodeInternalAddress(),
nodeConfig->nodeId(),
execTask.queryCtx()->queryId(),
execTask.taskId(),
queryCtx->queryId(),
taskId,
includeNodeInSpillPath);
execTask.setSpillDirectory(taskSpillDirPath, /*alreadyCreated=*/false);

execTask.setCreateSpillDirectoryCb(
[spillDir = taskSpillDirPath, dateStrDir = dateSpillDirPath]() {
auto fs = filesystems::getFileSystem(dateStrDir, nullptr);
// First create the top level directory (date string of the query) with
// TTL or other configs if set.
filesystems::DirectoryOptions options;
// Do not fail if the directory already exist because another process
// may have already created the dateStrDir.
options.failIfExists = false;
auto config = SystemConfig::instance()->spillerDirectoryCreateConfig();
if (!config.empty()) {
options.values.emplace(
filesystems::DirectoryOptions::kMakeDirectoryConfig.toString(),
config);
}
fs->mkdir(dateStrDir, options);

// After the parent directory is created,
// then create the spill directory for the actual task.
fs->mkdir(spillDir);
return spillDir;
});
spillDiskOpts.spillDirPath = taskSpillDirPath;
spillDiskOpts.spillDirCreated = false;
spillDiskOpts.spillDirCreateCb = [spillDir = taskSpillDirPath,
dateDir = dateSpillDirPath]() {
auto fs = filesystems::getFileSystem(dateDir, nullptr);
// First create the top level directory (date string of the query) with
// TTL or other configs if set.
filesystems::DirectoryOptions options;
// Do not fail if the directory already exist because another process
// may have already created the dateStrDir.
options.failIfExists = false;
auto config = SystemConfig::instance()->spillerDirectoryCreateConfig();
Comment on lines +83 to +92
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Check for errors when creating directories in the spillDirCreateCb callback.

Currently, errors from mkdir are not handled, which may cause silent failures if directory creation fails. Please add error handling or propagate errors to improve robustness.

if (!config.empty()) {
options.values.emplace(
filesystems::DirectoryOptions::kMakeDirectoryConfig.toString(),
config);
}
fs->mkdir(dateDir, options);
// After the parent directory is created,
// then create the spill directory for the actual task.
fs->mkdir(spillDir);
return spillDir;
};
return spillDiskOpts;
}

// Keep outstanding Promises in RequestHandler's state itself.
Expand Down Expand Up @@ -554,6 +554,10 @@ std::unique_ptr<TaskInfo> TaskManager::createOrUpdateTaskImpl(
prestoTask->updateInfoLocked(summarize));
}

const auto baseSpillDir = *(baseSpillDir_.rlock());
auto spillDiskOpts =
getTaskSpillOptions(taskId, planFragment, queryCtx, baseSpillDir);

// Uses a temp variable to store the created velox task to destroy it
// under presto task lock if spill directory setup fails. Otherwise, the
// concurrent task creation retry from the coordinator might see the
Expand All @@ -567,12 +571,8 @@ std::unique_ptr<TaskInfo> TaskManager::createOrUpdateTaskImpl(
std::move(queryCtx),
exec::Task::ExecutionMode::kParallel,
static_cast<exec::Consumer>(nullptr),
prestoTask->id.stageId());
// TODO: move spill directory creation inside velox task execution
// whenever spilling is triggered. It will reduce the unnecessary file
// operations on remote storage.
const auto baseSpillDir = *(baseSpillDir_.rlock());
maybeSetupTaskSpillDirectory(planFragment, *newExecTask, baseSpillDir);
prestoTask->id.stageId(),
spillDiskOpts);

prestoTask->task = std::move(newExecTask);
prestoTask->info.needsPlan = false;
Expand Down
Loading