diff --git a/presto-native-execution/presto_cpp/main/TaskManager.cpp b/presto-native-execution/presto_cpp/main/TaskManager.cpp index ca5cee1aa9678..ab0fb8809c816 100644 --- a/presto-native-execution/presto_cpp/main/TaskManager.cpp +++ b/presto-native-execution/presto_cpp/main/TaskManager.cpp @@ -55,53 +55,53 @@ void cancelAbandonedTasksInternal(const TaskMap& taskMap, int32_t abandonedMs) { } } -// If spilling is enabled and the given Task can spill, then this helper -// generates the spilling directory path for the Task, and sets the path to it -// in the Task. -static void maybeSetupTaskSpillDirectory( +// If spilling is enabled and the task plan fragment can spill, then this helper +// generates the disk spilling options for the task. +std::optional getTaskSpillOptions( + const TaskId& taskId, const core::PlanFragment& planFragment, - exec::Task& execTask, - const std::string& baseSpillDirectory) { - if (baseSpillDirectory.empty() || - !planFragment.canSpill(execTask.queryCtx()->queryConfig())) { - return; + const std::shared_ptr& queryCtx, + const std::string& baseSpillDir) { + if (baseSpillDir.empty() || !planFragment.canSpill(queryCtx->queryConfig())) { + return std::nullopt; } - const auto includeNodeInSpillPath = + common::SpillDiskOptions spillDiskOpts; + const bool includeNodeInSpillPath = SystemConfig::instance()->includeNodeInSpillPath(); auto nodeConfig = NodeConfig::instance(); const auto [taskSpillDirPath, dateSpillDirPath] = TaskManager::buildTaskSpillDirectoryPath( - baseSpillDirectory, + baseSpillDir, nodeConfig->nodeInternalAddress(), nodeConfig->nodeId(), - execTask.queryCtx()->queryId(), - execTask.taskId(), + queryCtx->queryId(), + taskId, includeNodeInSpillPath); - execTask.setSpillDirectory(taskSpillDirPath, /*alreadyCreated=*/false); - - execTask.setCreateSpillDirectoryCb( - [spillDir = taskSpillDirPath, dateStrDir = dateSpillDirPath]() { - auto fs = filesystems::getFileSystem(dateStrDir, nullptr); - // First create the top level directory (date string of the query) with - // TTL or other configs if set. - filesystems::DirectoryOptions options; - // Do not fail if the directory already exist because another process - // may have already created the dateStrDir. - options.failIfExists = false; - auto config = SystemConfig::instance()->spillerDirectoryCreateConfig(); - if (!config.empty()) { - options.values.emplace( - filesystems::DirectoryOptions::kMakeDirectoryConfig.toString(), - config); - } - fs->mkdir(dateStrDir, options); - - // After the parent directory is created, - // then create the spill directory for the actual task. - fs->mkdir(spillDir); - return spillDir; - }); + spillDiskOpts.spillDirPath = taskSpillDirPath; + spillDiskOpts.spillDirCreated = false; + spillDiskOpts.spillDirCreateCb = [spillDir = taskSpillDirPath, + dateDir = dateSpillDirPath]() { + auto fs = filesystems::getFileSystem(dateDir, nullptr); + // First create the top level directory (date string of the query) with + // TTL or other configs if set. + filesystems::DirectoryOptions options; + // Do not fail if the directory already exist because another process + // may have already created the dateStrDir. + options.failIfExists = false; + auto config = SystemConfig::instance()->spillerDirectoryCreateConfig(); + if (!config.empty()) { + options.values.emplace( + filesystems::DirectoryOptions::kMakeDirectoryConfig.toString(), + config); + } + fs->mkdir(dateDir, options); + // After the parent directory is created, + // then create the spill directory for the actual task. + fs->mkdir(spillDir); + return spillDir; + }; + return spillDiskOpts; } // Keep outstanding Promises in RequestHandler's state itself. @@ -554,6 +554,10 @@ std::unique_ptr TaskManager::createOrUpdateTaskImpl( prestoTask->updateInfoLocked(summarize)); } + const auto baseSpillDir = *(baseSpillDir_.rlock()); + auto spillDiskOpts = + getTaskSpillOptions(taskId, planFragment, queryCtx, baseSpillDir); + // Uses a temp variable to store the created velox task to destroy it // under presto task lock if spill directory setup fails. Otherwise, the // concurrent task creation retry from the coordinator might see the @@ -567,12 +571,8 @@ std::unique_ptr TaskManager::createOrUpdateTaskImpl( std::move(queryCtx), exec::Task::ExecutionMode::kParallel, static_cast(nullptr), - prestoTask->id.stageId()); - // TODO: move spill directory creation inside velox task execution - // whenever spilling is triggered. It will reduce the unnecessary file - // operations on remote storage. - const auto baseSpillDir = *(baseSpillDir_.rlock()); - maybeSetupTaskSpillDirectory(planFragment, *newExecTask, baseSpillDir); + prestoTask->id.stageId(), + spillDiskOpts); prestoTask->task = std::move(newExecTask); prestoTask->info.needsPlan = false; diff --git a/presto-native-execution/velox b/presto-native-execution/velox index 0477c1cf89406..6dce9b24e2052 160000 --- a/presto-native-execution/velox +++ b/presto-native-execution/velox @@ -1 +1 @@ -Subproject commit 0477c1cf894065365f5d3a5d5bcc3394ed0b1ea0 +Subproject commit 6dce9b24e2052463bcf43770652911af14077787